001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.process; 022 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026 027import cascading.flow.FlowElement; 028import cascading.flow.FlowStep; 029import cascading.flow.planner.BaseFlowStep; 030import cascading.flow.planner.FlowPlanner; 031import cascading.flow.planner.graph.AnnotatedDecoratedElementGraph; 032import cascading.flow.planner.graph.ElementGraph; 033import cascading.flow.planner.graph.FlowElementGraph; 034import cascading.util.EnumMultiMap; 035 036public class FlowStepGraph extends BaseProcessGraph<FlowStep> 037 { 038 public FlowStepGraph() 039 { 040 } 041 042 public FlowStepGraph( FlowStepFactory flowStepFactory, FlowElementGraph flowElementGraph, Map<ElementGraph, List<? extends ElementGraph>> nodeSubGraphsMap ) 043 { 044 this( flowStepFactory, flowElementGraph, nodeSubGraphsMap, null ); 045 } 046 047 public FlowStepGraph( FlowStepFactory flowStepFactory, FlowElementGraph flowElementGraph, Map<ElementGraph, List<? extends ElementGraph>> nodeSubGraphsMap, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap ) 048 { 049 buildGraph( flowStepFactory, flowElementGraph, nodeSubGraphsMap, pipelineSubGraphsMap ); 050 051 Iterator<FlowStep> iterator = getTopologicalIterator(); 052 053 int ordinal = 0; 054 int size = vertexSet().size(); 055 056 while( iterator.hasNext() ) 057 { 058 BaseFlowStep flowStep = (BaseFlowStep) iterator.next(); 059 060 flowStep.setOrdinal( ordinal++ ); 061 flowStep.setName( flowStepFactory.makeFlowStepName( flowStep, size, flowStep.getOrdinal() ) ); 062 } 063 } 064 065 protected void buildGraph( FlowStepFactory flowStepFactory, FlowElementGraph flowElementGraph, Map<ElementGraph, List<? extends ElementGraph>> nodeSubGraphsMap, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap ) 066 { 067 for( ElementGraph stepSubGraph : nodeSubGraphsMap.keySet() ) 068 { 069 List<? extends ElementGraph> nodeSubGraphs = nodeSubGraphsMap.get( stepSubGraph ); 070 FlowNodeGraph flowNodeGraph = createFlowNodeGraph( flowStepFactory, flowElementGraph, pipelineSubGraphsMap, nodeSubGraphs ); 071 072 EnumMultiMap<FlowElement> annotations = flowNodeGraph.getAnnotations(); 073 074 // pull up annotations 075 if( !annotations.isEmpty() ) 076 stepSubGraph = new AnnotatedDecoratedElementGraph( stepSubGraph, annotations ); 077 078 FlowStep flowStep = flowStepFactory.createFlowStep( stepSubGraph, flowNodeGraph ); 079 080 addVertex( flowStep ); 081 } 082 083 bindEdges(); 084 } 085 086 protected FlowNodeGraph createFlowNodeGraph( FlowStepFactory flowStepFactory, FlowElementGraph flowElementGraph, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap, List<? extends ElementGraph> nodeSubGraphs ) 087 { 088 return new FlowNodeGraph( flowStepFactory.getFlowNodeFactory(), flowElementGraph, nodeSubGraphs, pipelineSubGraphsMap ); 089 } 090 }