001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.graph; 022 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030 031import cascading.flow.FlowElement; 032import cascading.flow.FlowElements; 033import cascading.flow.planner.ElementGraphException; 034import cascading.flow.planner.PlatformInfo; 035import cascading.flow.planner.Scope; 036import cascading.pipe.Checkpoint; 037import cascading.pipe.Pipe; 038import cascading.pipe.Splice; 039import cascading.pipe.SubAssembly; 040import cascading.tap.Tap; 041import cascading.util.EnumMultiMap; 042import cascading.util.Util; 043import org.jgrapht.Graphs; 044import org.jgrapht.traverse.DepthFirstIterator; 045import org.jgrapht.traverse.TopologicalOrderIterator; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** Class ElementGraph represents the executable FlowElement graph. */ 050public class FlowElementGraph extends ElementDirectedGraph implements AnnotatedGraph 051 { 052 /** Field LOG */ 053 private static final Logger LOG = LoggerFactory.getLogger( FlowElementGraph.class ); 054 055 /** Field resolved */ 056 private boolean resolved; 057 /** Field platformInfo */ 058 private PlatformInfo platformInfo; 059 /** Field sources */ 060 private Map<String, Tap> sources; 061 /** Field sinks */ 062 private Map<String, Tap> sinks; 063 /** Field traps */ 064 private Map<String, Tap> traps; 065 /** Field checkpoints */ 066 private Map<String, Tap> checkpoints; 067 /** Field requireUniqueCheckpoints */ 068 private boolean requireUniqueCheckpoints; 069 070 // used for creating isolated test graphs 071 protected FlowElementGraph() 072 { 073 } 074 075 public FlowElementGraph( FlowElementGraph flowElementGraph ) 076 { 077 this(); 078 this.platformInfo = flowElementGraph.platformInfo; 079 this.sources = flowElementGraph.sources; 080 this.sinks = flowElementGraph.sinks; 081 this.traps = flowElementGraph.traps; 082 this.checkpoints = flowElementGraph.checkpoints; 083 this.requireUniqueCheckpoints = flowElementGraph.requireUniqueCheckpoints; 084 085 if( flowElementGraph.annotations != null ) 086 this.annotations = new EnumMultiMap( flowElementGraph.annotations ); 087 088 Graphs.addAllVertices( this.graph, flowElementGraph.vertexSet() ); 089 Graphs.addAllEdges( this.graph, flowElementGraph.graph, flowElementGraph.edgeSet() ); 090 } 091 092 public FlowElementGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks ) 093 { 094 this( null, pipes, sources, sinks, Collections.<String, Tap>emptyMap(), Collections.<String, Tap>emptyMap(), false ); 095 } 096 097 /** 098 * Constructor ElementGraph creates a new ElementGraph instance. 099 * 100 * @param pipes of type Pipe[] 101 * @param sources of type Map<String, Tap> 102 * @param sinks of type Map<String, Tap> 103 */ 104 public FlowElementGraph( PlatformInfo platformInfo, Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Map<String, Tap> checkpoints, boolean requireUniqueCheckpoints ) 105 { 106 this(); 107 this.platformInfo = platformInfo; 108 this.sources = sources; 109 this.sinks = sinks; 110 this.traps = traps; 111 this.checkpoints = checkpoints; 112 this.requireUniqueCheckpoints = requireUniqueCheckpoints; 113 114 assembleGraph( pipes, sources, sinks ); 115 116 verifyGraph(); 117 } 118 119 public Map<String, Tap> getSourceMap() 120 { 121 return sources; 122 } 123 124 public Map<String, Tap> getSinkMap() 125 { 126 return sinks; 127 } 128 129 public Map<String, Tap> getTrapMap() 130 { 131 return traps; 132 } 133 134 public Map<String, Tap> getCheckpointsMap() 135 { 136 return checkpoints; 137 } 138 139 public Collection<Tap> getSources() 140 { 141 return sources.values(); 142 } 143 144 public Collection<Tap> getSinks() 145 { 146 return sinks.values(); 147 } 148 149 public Collection<Tap> getTraps() 150 { 151 return traps.values(); 152 } 153 154 protected void initialize( Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails ) 155 { 156 this.sources = sources; 157 this.sinks = sinks; 158 this.traps = Util.createHashMap(); 159 160 assembleGraph( tails, sources, sinks ); 161 162 verifyGraph(); 163 } 164 165 private void assembleGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks ) 166 { 167 HashMap<String, Tap> sourcesCopy = new HashMap<String, Tap>( sources ); 168 HashMap<String, Tap> sinksCopy = new HashMap<String, Tap>( sinks ); 169 170 for( Pipe pipe : pipes ) 171 makeGraph( pipe, sourcesCopy, sinksCopy ); 172 173 addExtents( sources, sinks ); 174 } 175 176 private void verifyGraph() 177 { 178 if( vertexSet().isEmpty() ) 179 return; 180 181 Set<String> checkpointNames = new HashSet<String>(); 182 183 // need to verify that only Extent instances are origins in this graph. Otherwise a Tap was not properly connected 184 TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator(); 185 186 FlowElement flowElement = null; 187 188 while( iterator.hasNext() ) 189 { 190 try 191 { 192 flowElement = iterator.next(); 193 } 194 catch( IllegalArgumentException exception ) 195 { 196 if( flowElement == null ) 197 throw new ElementGraphException( "unable to traverse to the first element" ); 198 199 throw new ElementGraphException( flowElement, "unable to traverse to the next element after " + flowElement ); 200 } 201 202 if( requireUniqueCheckpoints && flowElement instanceof Checkpoint ) 203 { 204 String name = ( (Checkpoint) flowElement ).getName(); 205 206 if( checkpointNames.contains( name ) ) 207 throw new ElementGraphException( (Pipe) flowElement, "may not have duplicate checkpoint names in assembly, found: " + name ); 208 209 checkpointNames.add( name ); 210 } 211 212 if( incomingEdgesOf( flowElement ).size() != 0 && outgoingEdgesOf( flowElement ).size() != 0 ) 213 continue; 214 215 if( flowElement instanceof Extent ) 216 continue; 217 218 if( flowElement instanceof Pipe ) 219 { 220 if( incomingEdgesOf( flowElement ).size() == 0 ) 221 throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to head Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" ); 222 else 223 throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to tail Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" ); 224 } 225 226 if( flowElement instanceof Tap ) 227 throw new ElementGraphException( (Tap) flowElement, "no Pipe connected to Tap: " + flowElement ); 228 else 229 throw new ElementGraphException( flowElement, "unknown element type: " + flowElement ); 230 } 231 } 232 233 protected FlowElementGraph shallowCopyElementGraph() 234 { 235 FlowElementGraph copy = new FlowElementGraph(); 236 Graphs.addGraph( copy.graph, this.graph ); 237 238 copy.traps = new HashMap<String, Tap>( this.traps ); 239 240 return copy; 241 } 242 243 /** 244 * created to support the ability to generate all paths between the head and tail of the process. 245 * 246 * @param sources 247 * @param sinks 248 */ 249 private void addExtents( Map<String, Tap> sources, Map<String, Tap> sinks ) 250 { 251 addVertex( Extent.head ); 252 253 for( String source : sources.keySet() ) 254 { 255 Scope scope = addEdge( Extent.head, sources.get( source ) ); 256 257 // edge may already exist, if so, above returns null 258 if( scope != null ) 259 scope.setName( source ); 260 } 261 262 addVertex( Extent.tail ); 263 264 for( String sink : sinks.keySet() ) 265 { 266 Scope scope; 267 268 try 269 { 270 scope = addEdge( sinks.get( sink ), Extent.tail ); 271 } 272 catch( IllegalArgumentException exception ) 273 { 274 throw new ElementGraphException( "missing pipe for sink tap: [" + sink + "]" ); 275 } 276 277 if( scope == null ) 278 throw new ElementGraphException( "cannot sink to the same path from multiple branches: [" + Util.join( sinks.values() ) + "]" ); 279 280 scope.setName( sink ); 281 } 282 } 283 284 /** 285 * Performs one rule check, verifies group does not join duplicate tap resources. 286 * <p/> 287 * Scopes are always named after the source side of the source -> target relationship 288 */ 289 private void makeGraph( Pipe current, Map<String, Tap> sources, Map<String, Tap> sinks ) 290 { 291 LOG.debug( "adding pipe: {}", current ); 292 293 if( current instanceof SubAssembly ) 294 { 295 for( Pipe pipe : SubAssembly.unwind( current.getPrevious() ) ) 296 makeGraph( pipe, sources, sinks ); 297 298 return; 299 } 300 301 if( containsVertex( current ) ) 302 return; 303 304 addVertex( current ); 305 306 Tap sink = sinks.remove( current.getName() ); 307 308 if( sink != null ) 309 { 310 LOG.debug( "adding sink: {}", sink ); 311 312 addVertex( sink ); 313 314 LOG.debug( "adding edge: {} -> {}", current, sink ); 315 316 addEdge( current, sink ).setName( current.getName() ); 317 } 318 319 // PipeAssemblies should always have a previous 320 if( SubAssembly.unwind( current.getPrevious() ).length == 0 ) 321 { 322 Tap source = sources.remove( current.getName() ); 323 324 if( source != null ) 325 { 326 LOG.debug( "adding source: {}", source ); 327 328 addVertex( source ); 329 330 LOG.debug( "adding edge: {} -> {}", source, current ); 331 332 Scope scope = addEdge( source, current ); 333 334 scope.setName( current.getName() ); 335 336 setOrdinal( source, current, scope ); 337 } 338 } 339 340 for( Pipe previous : SubAssembly.unwind( current.getPrevious() ) ) 341 { 342 makeGraph( previous, sources, sinks ); 343 344 LOG.debug( "adding edge: {} -> ", previous, current ); 345 346 if( getEdge( previous, current ) != null ) 347 throw new ElementGraphException( previous, "cannot distinguish pipe branches, give pipe unique name: " + previous ); 348 349 Scope scope = addEdge( previous, current ); 350 351 scope.setName( previous.getName() ); // name scope after previous pipe 352 353 setOrdinal( previous, current, scope ); 354 } 355 } 356 357 private void setOrdinal( FlowElement previous, Pipe current, Scope scope ) 358 { 359 if( current instanceof Splice ) 360 { 361 Splice splice = (Splice) current; 362 363 Integer ordinal; 364 365 if( previous instanceof Tap ) // revert to pipe name 366 ordinal = splice.getPipePos().get( scope.getName() ); 367 else // GroupBy allows for duplicate pipe names, this guarantees correct ordinality 368 ordinal = FlowElements.findOrdinal( splice, (Pipe) previous ); 369 370 scope.setOrdinal( ordinal ); 371 372 Set<Scope> scopes = new HashSet<>( incomingEdgesOf( current ) ); 373 374 scopes.remove( scope ); 375 376 for( Scope other : scopes ) 377 { 378 if( other.getOrdinal() == scope.getOrdinal() ) 379 throw new IllegalStateException( "duplicate ordinals" ); 380 } 381 382 if( splice.isJoin() && ordinal != 0 ) 383 scope.setNonBlocking( false ); 384 } 385 } 386 387 /** 388 * Method getTopologicalIterator returns the topologicalIterator of this ElementGraph object. 389 * 390 * @return the topologicalIterator (type TopologicalOrderIterator<FlowElement, Scope>) of this ElementGraph object. 391 */ 392 public TopologicalOrderIterator<FlowElement, Scope> getTopologicalIterator() 393 { 394 return new TopologicalOrderIterator<>( this.graph ); 395 } 396 397 /** 398 * Method getDepthFirstIterator returns the depthFirstIterator of this ElementGraph object. 399 * 400 * @return the depthFirstIterator (type DepthFirstIterator<FlowElement, Scope>) of this ElementGraph object. 401 */ 402 public DepthFirstIterator<FlowElement, Scope> getDepthFirstIterator() 403 { 404 return new DepthFirstIterator<>( this.graph, Extent.head ); 405 } 406 407 private BaseElementGraph copyWithTraps() 408 { 409 FlowElementGraph copy = shallowCopyElementGraph(); 410 411 copy.addTrapsToGraph(); 412 413 return copy; 414 } 415 416 private void addTrapsToGraph() 417 { 418 DepthFirstIterator<FlowElement, Scope> iterator = getDepthFirstIterator(); 419 420 while( iterator.hasNext() ) 421 { 422 FlowElement element = iterator.next(); 423 424 if( !( element instanceof Pipe ) ) 425 continue; 426 427 Pipe pipe = (Pipe) element; 428 Tap trap = traps.get( pipe.getName() ); 429 430 if( trap == null ) 431 continue; 432 433 addVertex( trap ); 434 435 if( LOG.isDebugEnabled() ) 436 LOG.debug( "adding trap edge: " + pipe + " -> " + trap ); 437 438 if( getEdge( pipe, trap ) != null ) 439 continue; 440 441 addEdge( pipe, trap ).setName( pipe.getName() ); // name scope after previous pipe 442 } 443 } 444 445 /** 446 * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging. 447 * 448 * @param filename of type String 449 */ 450 @Override 451 public void writeDOT( String filename ) 452 { 453 boolean success = ElementGraphs.printElementGraph( filename, this.copyWithTraps(), platformInfo ); 454 455 if( success ) 456 Util.writePDF( filename ); 457 } 458 459 /** Method resolveFields performs a breadth first traversal and resolves the tuple fields between each Pipe instance. */ 460 public void resolveFields() 461 { 462 if( resolved ) 463 throw new IllegalStateException( "element graph already resolved" ); 464 465 TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator(); 466 467 while( iterator.hasNext() ) 468 resolveFields( iterator.next() ); 469 470 resolved = true; 471 } 472 473 private void resolveFields( FlowElement source ) 474 { 475 if( source instanceof Extent ) 476 return; 477 478 Set<Scope> incomingScopes = incomingEdgesOf( source ); 479 Set<Scope> outgoingScopes = outgoingEdgesOf( source ); 480 481 List<FlowElement> flowElements = successorListOf( source ); 482 483 if( flowElements.size() == 0 ) 484 throw new IllegalStateException( "unable to find next elements in pipeline from: " + source.toString() ); 485 486 Scope outgoingScope = source.outgoingScopeFor( incomingScopes ); 487 488 if( LOG.isDebugEnabled() && outgoingScope != null ) 489 { 490 LOG.debug( "for modifier: " + source ); 491 if( outgoingScope.getArgumentsSelector() != null ) 492 LOG.debug( "setting outgoing arguments: " + outgoingScope.getArgumentsSelector() ); 493 if( outgoingScope.getOperationDeclaredFields() != null ) 494 LOG.debug( "setting outgoing declared: " + outgoingScope.getOperationDeclaredFields() ); 495 if( outgoingScope.getKeySelectors() != null ) 496 LOG.debug( "setting outgoing group: " + outgoingScope.getKeySelectors() ); 497 if( outgoingScope.getOutValuesSelector() != null ) 498 LOG.debug( "setting outgoing values: " + outgoingScope.getOutValuesSelector() ); 499 } 500 501 for( Scope scope : outgoingScopes ) 502 scope.copyFields( outgoingScope ); 503 } 504 505 @Override 506 public ElementGraph copyElementGraph() 507 { 508 return new FlowElementGraph( this ); 509 } 510 }