001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.process;
022
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029
030import cascading.flow.FlowElement;
031import cascading.flow.FlowNode;
032import cascading.flow.planner.BaseFlowNode;
033import cascading.flow.planner.FlowPlanner;
034import cascading.flow.planner.graph.ElementGraph;
035import cascading.flow.planner.graph.FlowElementGraph;
036
037import static cascading.util.Util.createIdentitySet;
038
039/**
040 *
041 */
042public class FlowNodeGraph extends BaseProcessGraph<FlowNode>
043  {
044  public static final FlowNodeComparator FLOW_NODE_COMPARATOR = new FlowNodeComparator();
045
046  /**
047   * Class FlowNodeComparator provides a consistent tie breaker when ordering nodes topologically.
048   * <p/>
049   * This should have no effect on submission and execution priority as all FlowNodes are submitted simultaneously.
050   */
051  public static class FlowNodeComparator implements Comparator<FlowNode>
052    {
053    @Override
054    public int compare( FlowNode lhs, FlowNode rhs )
055      {
056      // larger graph first
057      int lhsSize = lhs.getElementGraph().vertexSet().size();
058      int rhsSize = rhs.getElementGraph().vertexSet().size();
059      int result = ( lhsSize < rhsSize ) ? -1 : ( ( lhsSize == rhsSize ) ? 0 : 1 );
060
061      if( result != 0 )
062        return result;
063
064      // more inputs second
065      lhsSize = lhs.getSourceElements().size();
066      rhsSize = rhs.getSourceElements().size();
067
068      return ( lhsSize < rhsSize ) ? -1 : ( ( lhsSize == rhsSize ) ? 0 : 1 );
069      }
070    }
071
072  public FlowNodeGraph()
073    {
074    }
075
076  public FlowNodeGraph( FlowPlanner<?, ?> flowPlanner, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs )
077    {
078    this( flowPlanner, flowElementGraph, nodeSubGraphs, Collections.<ElementGraph, List<? extends ElementGraph>>emptyMap() );
079    }
080
081  public FlowNodeGraph( FlowPlanner<?, ?> flowPlanner, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap )
082    {
083    buildGraph( flowPlanner, flowElementGraph, nodeSubGraphs, pipelineSubGraphsMap );
084
085    // consistently sets ordinal of node based on topological dependencies and tie breaking by the given Comparator
086    Iterator<FlowNode> iterator = getOrderedTopologicalIterator();
087
088    int ordinal = 0;
089    int size = vertexSet().size();
090
091    while( iterator.hasNext() )
092      {
093      BaseFlowNode next = (BaseFlowNode) iterator.next();
094
095      next.setOrdinal( ordinal );
096      next.setName( flowPlanner.makeFlowNodeName( next, size, ordinal ) );
097
098      ordinal++;
099      }
100    }
101
102  protected void buildGraph( FlowPlanner<?, ?> flowPlanner, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap )
103    {
104    for( ElementGraph nodeSubGraph : nodeSubGraphs )
105      {
106      List<? extends ElementGraph> pipelineGraphs = pipelineSubGraphsMap.get( nodeSubGraph );
107
108      FlowNode flowNode = flowPlanner.createFlowNode( flowElementGraph, nodeSubGraph, pipelineGraphs );
109
110      addVertex( flowNode );
111      }
112
113    bindEdges();
114    }
115
116  public Set<FlowElement> getFlowElementsFor( Enum annotation )
117    {
118    Set<FlowElement> results = createIdentitySet();
119
120    for( FlowNode flowNode : vertexSet() )
121      results.addAll( flowNode.getFlowElementsFor( annotation ) );
122
123    return results;
124    }
125
126  public Iterator<FlowNode> getOrderedTopologicalIterator()
127    {
128    return super.getOrderedTopologicalIterator( FLOW_NODE_COMPARATOR );
129    }
130  }