001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.process;
022
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026
027import cascading.flow.FlowElement;
028import cascading.flow.FlowStep;
029import cascading.flow.planner.BaseFlowStep;
030import cascading.flow.planner.FlowPlanner;
031import cascading.flow.planner.graph.AnnotatedDecoratedElementGraph;
032import cascading.flow.planner.graph.ElementGraph;
033import cascading.flow.planner.graph.FlowElementGraph;
034import cascading.util.EnumMultiMap;
035
036public class FlowStepGraph extends BaseProcessGraph<FlowStep>
037  {
038  public FlowStepGraph()
039    {
040    }
041
042  public FlowStepGraph( FlowPlanner<?, ?> flowPlanner, FlowElementGraph flowElementGraph, Map<ElementGraph, List<? extends ElementGraph>> nodeSubGraphsMap, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap )
043    {
044    buildGraph( flowPlanner, flowElementGraph, nodeSubGraphsMap, pipelineSubGraphsMap );
045
046    Iterator<FlowStep> iterator = getTopologicalIterator();
047
048    int ordinal = 0;
049    int size = vertexSet().size();
050
051    while( iterator.hasNext() )
052      {
053      BaseFlowStep flowStep = (BaseFlowStep) iterator.next();
054
055      flowStep.setOrdinal( ordinal++ );
056      flowStep.setName( flowPlanner.makeFlowStepName( flowStep, size, flowStep.getOrdinal() ) );
057      }
058    }
059
060  protected void buildGraph( FlowPlanner<?, ?> flowPlanner, FlowElementGraph flowElementGraph, Map<ElementGraph, List<? extends ElementGraph>> nodeSubGraphsMap, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap )
061    {
062    for( ElementGraph stepSubGraph : nodeSubGraphsMap.keySet() )
063      {
064      List<? extends ElementGraph> nodeSubGraphs = nodeSubGraphsMap.get( stepSubGraph );
065      FlowNodeGraph flowNodeGraph = createFlowNodeGraph( flowPlanner, flowElementGraph, pipelineSubGraphsMap, nodeSubGraphs );
066
067      EnumMultiMap<FlowElement> annotations = flowNodeGraph.getAnnotations();
068
069      // pull up annotations
070      if( !annotations.isEmpty() )
071        stepSubGraph = new AnnotatedDecoratedElementGraph( stepSubGraph, annotations );
072
073      FlowStep flowStep = flowPlanner.createFlowStep( stepSubGraph, flowNodeGraph );
074
075      addVertex( flowStep );
076      }
077
078    bindEdges();
079    }
080
081  protected FlowNodeGraph createFlowNodeGraph( FlowPlanner<?, ?> flowPlanner, FlowElementGraph flowElementGraph, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap, List<? extends ElementGraph> nodeSubGraphs )
082    {
083    return new FlowNodeGraph( flowPlanner, flowElementGraph, nodeSubGraphs, pipelineSubGraphsMap );
084    }
085  }