001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.process; 022 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029 030import cascading.flow.FlowElement; 031import cascading.flow.FlowNode; 032import cascading.flow.planner.BaseFlowNode; 033import cascading.flow.planner.BaseFlowNodeFactory; 034import cascading.flow.planner.graph.ElementGraph; 035import cascading.flow.planner.graph.FlowElementGraph; 036 037import static cascading.util.Util.createIdentitySet; 038 039/** 040 * 041 */ 042public class FlowNodeGraph extends BaseProcessGraph<FlowNode> 043 { 044 public static final FlowNodeComparator FLOW_NODE_COMPARATOR = new FlowNodeComparator(); 045 046 /** 047 * Class FlowNodeComparator provides a consistent tie breaker when ordering nodes topologically. 048 * <p/> 049 * This should have no effect on submission and execution priority as all FlowNodes are submitted simultaneously. 050 */ 051 public static class FlowNodeComparator implements Comparator<FlowNode> 052 { 053 @Override 054 public int compare( FlowNode lhs, FlowNode rhs ) 055 { 056 // larger graph first 057 int lhsSize = lhs.getElementGraph().vertexSet().size(); 058 int rhsSize = rhs.getElementGraph().vertexSet().size(); 059 int result = ( lhsSize < rhsSize ) ? -1 : ( ( lhsSize == rhsSize ) ? 0 : 1 ); 060 061 if( result != 0 ) 062 return result; 063 064 // more inputs second 065 lhsSize = lhs.getSourceElements().size(); 066 rhsSize = rhs.getSourceElements().size(); 067 068 return ( lhsSize < rhsSize ) ? -1 : ( ( lhsSize == rhsSize ) ? 0 : 1 ); 069 } 070 } 071 072 public FlowNodeGraph() 073 { 074 } 075 076 public FlowNodeGraph( FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs ) 077 { 078 this( new BaseFlowNodeFactory(), flowElementGraph, nodeSubGraphs ); 079 } 080 081 public FlowNodeGraph( FlowNodeFactory flowNodeFactory, List<? extends ElementGraph> nodeSubGraphs ) 082 { 083 this( flowNodeFactory, null, nodeSubGraphs ); 084 } 085 086 public FlowNodeGraph( FlowNodeFactory flowNodeFactory, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs ) 087 { 088 this( flowNodeFactory, flowElementGraph, nodeSubGraphs, Collections.<ElementGraph, List<? extends ElementGraph>>emptyMap() ); 089 } 090 091 public FlowNodeGraph( FlowNodeFactory flowNodeFactory, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap ) 092 { 093 buildGraph( flowNodeFactory, flowElementGraph, nodeSubGraphs, pipelineSubGraphsMap ); 094 095 // consistently sets ordinal of node based on topological dependencies and tie breaking by the given Comparator 096 Iterator<FlowNode> iterator = getOrderedTopologicalIterator(); 097 098 int ordinal = 0; 099 int size = vertexSet().size(); 100 101 while( iterator.hasNext() ) 102 { 103 BaseFlowNode next = (BaseFlowNode) iterator.next(); 104 105 next.setOrdinal( ordinal ); 106 next.setName( flowNodeFactory.makeFlowNodeName( next, size, ordinal ) ); 107 108 ordinal++; 109 } 110 } 111 112 protected void buildGraph( FlowNodeFactory flowNodeFactory, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap ) 113 { 114 if( pipelineSubGraphsMap == null ) 115 pipelineSubGraphsMap = Collections.emptyMap(); 116 117 for( ElementGraph nodeSubGraph : nodeSubGraphs ) 118 { 119 List<? extends ElementGraph> pipelineGraphs = pipelineSubGraphsMap.get( nodeSubGraph ); 120 121 FlowNode flowNode = flowNodeFactory.createFlowNode( flowElementGraph, nodeSubGraph, pipelineGraphs ); 122 123 addVertex( flowNode ); 124 } 125 126 bindEdges(); 127 } 128 129 public Set<FlowElement> getFlowElementsFor( Enum annotation ) 130 { 131 Set<FlowElement> results = createIdentitySet(); 132 133 for( FlowNode flowNode : vertexSet() ) 134 results.addAll( flowNode.getFlowElementsFor( annotation ) ); 135 136 return results; 137 } 138 139 public Iterator<FlowNode> getOrderedTopologicalIterator() 140 { 141 return super.getOrderedTopologicalIterator( FLOW_NODE_COMPARATOR ); 142 } 143 }