001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.process; 022 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026 027import cascading.flow.FlowElement; 028import cascading.flow.FlowStep; 029import cascading.flow.planner.BaseFlowStep; 030import cascading.flow.planner.FlowPlanner; 031import cascading.flow.planner.graph.AnnotatedDecoratedElementGraph; 032import cascading.flow.planner.graph.ElementGraph; 033import cascading.flow.planner.graph.FlowElementGraph; 034import cascading.util.EnumMultiMap; 035 036public class FlowStepGraph extends BaseProcessGraph<FlowStep> 037 { 038 public FlowStepGraph() 039 { 040 } 041 042 public FlowStepGraph( FlowPlanner<?, ?> flowPlanner, FlowElementGraph flowElementGraph, Map<ElementGraph, List<? extends ElementGraph>> nodeSubGraphsMap, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap ) 043 { 044 buildGraph( flowPlanner, flowElementGraph, nodeSubGraphsMap, pipelineSubGraphsMap ); 045 046 Iterator<FlowStep> iterator = getTopologicalIterator(); 047 048 int ordinal = 0; 049 int size = vertexSet().size(); 050 051 while( iterator.hasNext() ) 052 { 053 BaseFlowStep flowStep = (BaseFlowStep) iterator.next(); 054 055 flowStep.setOrdinal( ordinal++ ); 056 flowStep.setName( flowPlanner.makeFlowStepName( flowStep, size, flowStep.getOrdinal() ) ); 057 } 058 } 059 060 protected void buildGraph( FlowPlanner<?, ?> flowPlanner, FlowElementGraph flowElementGraph, Map<ElementGraph, List<? extends ElementGraph>> nodeSubGraphsMap, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap ) 061 { 062 for( ElementGraph stepSubGraph : nodeSubGraphsMap.keySet() ) 063 { 064 List<? extends ElementGraph> nodeSubGraphs = nodeSubGraphsMap.get( stepSubGraph ); 065 FlowNodeGraph flowNodeGraph = createFlowNodeGraph( flowPlanner, flowElementGraph, pipelineSubGraphsMap, nodeSubGraphs ); 066 067 EnumMultiMap<FlowElement> annotations = flowNodeGraph.getAnnotations(); 068 069 // pull up annotations 070 if( !annotations.isEmpty() ) 071 stepSubGraph = new AnnotatedDecoratedElementGraph( stepSubGraph, annotations ); 072 073 FlowStep flowStep = flowPlanner.createFlowStep( stepSubGraph, flowNodeGraph ); 074 075 addVertex( flowStep ); 076 } 077 078 bindEdges(); 079 } 080 081 protected FlowNodeGraph createFlowNodeGraph( FlowPlanner<?, ?> flowPlanner, FlowElementGraph flowElementGraph, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap, List<? extends ElementGraph> nodeSubGraphs ) 082 { 083 return new FlowNodeGraph( flowPlanner, flowElementGraph, nodeSubGraphs, pipelineSubGraphsMap ); 084 } 085 }