001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.planner; 022 023 import java.io.File; 024 import java.io.FileWriter; 025 import java.io.IOException; 026 import java.io.Writer; 027 import java.util.ArrayList; 028 import java.util.Collection; 029 import java.util.HashMap; 030 import java.util.HashSet; 031 import java.util.LinkedList; 032 import java.util.List; 033 import java.util.Map; 034 import java.util.Set; 035 036 import cascading.flow.FlowElement; 037 import cascading.operation.PlannedOperation; 038 import cascading.operation.PlannerLevel; 039 import cascading.pipe.Checkpoint; 040 import cascading.pipe.CoGroup; 041 import cascading.pipe.Each; 042 import cascading.pipe.Every; 043 import cascading.pipe.Group; 044 import cascading.pipe.Operator; 045 import cascading.pipe.Pipe; 046 import cascading.pipe.Splice; 047 import cascading.pipe.SubAssembly; 048 import cascading.tap.Tap; 049 import cascading.util.Util; 050 import cascading.util.Version; 051 import org.jgrapht.GraphPath; 052 import org.jgrapht.Graphs; 053 import org.jgrapht.alg.KShortestPaths; 054 import org.jgrapht.ext.EdgeNameProvider; 055 import org.jgrapht.ext.IntegerNameProvider; 056 import org.jgrapht.ext.VertexNameProvider; 057 import org.jgrapht.graph.SimpleDirectedGraph; 058 import org.jgrapht.traverse.DepthFirstIterator; 059 import org.jgrapht.traverse.TopologicalOrderIterator; 060 import org.slf4j.Logger; 061 import org.slf4j.LoggerFactory; 062 063 /** Class ElementGraph represents the executable FlowElement graph. */ 064 public class ElementGraph extends SimpleDirectedGraph<FlowElement, Scope> 065 { 066 /** Field LOG */ 067 private static final Logger LOG = LoggerFactory.getLogger( ElementGraph.class ); 068 069 /** Field head */ 070 public static final Extent head = new Extent( "head" ); 071 /** Field tail */ 072 public static final Extent tail = new Extent( "tail" ); 073 /** Field resolved */ 074 private boolean resolved; 075 076 private PlatformInfo platformInfo; 077 /** Field sources */ 078 private Map<String, Tap> sources; 079 /** Field sinks */ 080 private Map<String, Tap> sinks; 081 /** Field traps */ 082 private Map<String, Tap> traps; 083 /** Field checkpoints */ 084 private Map<String, Tap> checkpoints; 085 /** Field requireUniqueCheckpoints */ 086 private boolean requireUniqueCheckpoints; 087 /** Field assertionLevel */ 088 private PlannerLevel[] plannerLevels; 089 090 ElementGraph() 091 { 092 super( Scope.class ); 093 } 094 095 public ElementGraph( ElementGraph elementGraph ) 096 { 097 this(); 098 this.platformInfo = elementGraph.platformInfo; 099 this.sources = elementGraph.sources; 100 this.sinks = elementGraph.sinks; 101 this.traps = elementGraph.traps; 102 this.checkpoints = elementGraph.checkpoints; 103 this.plannerLevels = elementGraph.plannerLevels; 104 this.requireUniqueCheckpoints = elementGraph.requireUniqueCheckpoints; 105 106 Graphs.addAllVertices( this, elementGraph.vertexSet() ); 107 Graphs.addAllEdges( this, elementGraph, elementGraph.edgeSet() ); 108 } 109 110 /** 111 * Constructor ElementGraph creates a new ElementGraph instance. 112 * 113 * @param pipes of type Pipe[] 114 * @param sources of type Map<String, Tap> 115 * @param sinks of type Map<String, Tap> 116 */ 117 public ElementGraph( PlatformInfo platformInfo, Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Map<String, Tap> checkpoints, boolean requireUniqueCheckpoints, PlannerLevel... plannerLevels ) 118 { 119 super( Scope.class ); 120 this.platformInfo = platformInfo; 121 this.sources = sources; 122 this.sinks = sinks; 123 this.traps = traps; 124 this.checkpoints = checkpoints; 125 this.requireUniqueCheckpoints = requireUniqueCheckpoints; 126 this.plannerLevels = plannerLevels; 127 128 assembleGraph( pipes, sources, sinks ); 129 130 verifyGraph(); 131 } 132 133 public Map<String, Tap> getSourceMap() 134 { 135 return sources; 136 } 137 138 public Map<String, Tap> getSinkMap() 139 { 140 return sinks; 141 } 142 143 public Map<String, Tap> getTrapMap() 144 { 145 return traps; 146 } 147 148 public Map<String, Tap> getCheckpointsMap() 149 { 150 return checkpoints; 151 } 152 153 public Collection<Tap> getSources() 154 { 155 return sources.values(); 156 } 157 158 public Collection<Tap> getSinks() 159 { 160 return sinks.values(); 161 } 162 163 public Collection<Tap> getTraps() 164 { 165 return traps.values(); 166 } 167 168 private void assembleGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks ) 169 { 170 HashMap<String, Tap> sourcesCopy = new HashMap<String, Tap>( sources ); 171 HashMap<String, Tap> sinksCopy = new HashMap<String, Tap>( sinks ); 172 173 for( Pipe pipe : pipes ) 174 makeGraph( pipe, sourcesCopy, sinksCopy ); 175 176 addExtents( sources, sinks ); 177 } 178 179 /** Method verifyGraphConnections ... */ 180 private void verifyGraph() 181 { 182 if( vertexSet().isEmpty() ) 183 return; 184 185 Set<String> checkpointNames = new HashSet<String>(); 186 187 // need to verify that only Extent instances are origins in this graph. Otherwise a Tap was not properly connected 188 TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator(); 189 190 FlowElement flowElement = null; 191 192 while( iterator.hasNext() ) 193 { 194 try 195 { 196 flowElement = iterator.next(); 197 } 198 catch( IllegalArgumentException exception ) 199 { 200 if( flowElement == null ) 201 throw new ElementGraphException( "unable to traverse to the first element" ); 202 203 throw new ElementGraphException( flowElement, "unable to traverse to the next element after " + flowElement ); 204 } 205 206 if( requireUniqueCheckpoints && flowElement instanceof Checkpoint ) 207 { 208 String name = ( (Checkpoint) flowElement ).getName(); 209 210 if( checkpointNames.contains( name ) ) 211 throw new ElementGraphException( (Pipe) flowElement, "may not have duplicate checkpoint names in assembly, found: " + name ); 212 213 checkpointNames.add( name ); 214 } 215 216 if( incomingEdgesOf( flowElement ).size() != 0 && outgoingEdgesOf( flowElement ).size() != 0 ) 217 continue; 218 219 if( flowElement instanceof Extent ) 220 continue; 221 222 if( flowElement instanceof Pipe ) 223 { 224 if( incomingEdgesOf( flowElement ).size() == 0 ) 225 throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to head Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming heads" ); 226 else 227 throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to tail Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" ); 228 } 229 230 if( flowElement instanceof Tap ) 231 throw new ElementGraphException( (Tap) flowElement, "no Pipe connected to Tap: " + flowElement ); 232 else 233 throw new ElementGraphException( flowElement, "unknown element type: " + flowElement ); 234 } 235 } 236 237 /** 238 * Method copyGraph returns a partial copy of the current ElementGraph. Only Vertices and Edges are copied. 239 * 240 * @return ElementGraph 241 */ 242 public ElementGraph copyElementGraph() 243 { 244 ElementGraph copy = new ElementGraph(); 245 Graphs.addGraph( copy, this ); 246 247 copy.traps = new HashMap<String, Tap>( this.traps ); 248 249 return copy; 250 } 251 252 /** 253 * created to support the ability to generate all paths between the head and tail of the process. 254 * 255 * @param sources 256 * @param sinks 257 */ 258 private void addExtents( Map<String, Tap> sources, Map<String, Tap> sinks ) 259 { 260 addVertex( head ); 261 262 for( String source : sources.keySet() ) 263 { 264 Scope scope = addEdge( head, sources.get( source ) ); 265 266 // edge may already exist, if so, above returns null 267 if( scope != null ) 268 scope.setName( source ); 269 } 270 271 addVertex( tail ); 272 273 for( String sink : sinks.keySet() ) 274 { 275 Scope scope; 276 277 try 278 { 279 scope = addEdge( sinks.get( sink ), tail ); 280 } 281 catch( IllegalArgumentException exception ) 282 { 283 throw new ElementGraphException( "missing pipe for sink tap: [" + sink + "]" ); 284 } 285 286 if( scope == null ) 287 throw new ElementGraphException( "cannot sink to the same path from multiple branches: [" + Util.join( sinks.values() ) + "]" ); 288 289 scope.setName( sink ); 290 } 291 } 292 293 /** 294 * Performs one rule check, verifies group does not join duplicate tap resources. 295 * <p/> 296 * Scopes are always named after the source side of the source -> target relationship 297 */ 298 private void makeGraph( Pipe current, Map<String, Tap> sources, Map<String, Tap> sinks ) 299 { 300 if( LOG.isDebugEnabled() ) 301 LOG.debug( "adding pipe: " + current ); 302 303 if( current instanceof SubAssembly ) 304 { 305 for( Pipe pipe : SubAssembly.unwind( current.getPrevious() ) ) 306 makeGraph( pipe, sources, sinks ); 307 308 return; 309 } 310 311 if( containsVertex( current ) ) 312 return; 313 314 addVertex( current ); 315 316 Tap sink = sinks.remove( current.getName() ); 317 318 if( sink != null ) 319 { 320 if( LOG.isDebugEnabled() ) 321 LOG.debug( "adding sink: " + sink ); 322 323 addVertex( sink ); 324 325 if( LOG.isDebugEnabled() ) 326 LOG.debug( "adding edge: " + current + " -> " + sink ); 327 328 addEdge( current, sink ).setName( current.getName() ); // name scope after sink 329 } 330 331 // PipeAssemblies should always have a previous 332 if( SubAssembly.unwind( current.getPrevious() ).length == 0 ) 333 { 334 Tap source = sources.remove( current.getName() ); 335 336 if( source != null ) 337 { 338 if( LOG.isDebugEnabled() ) 339 LOG.debug( "adding source: " + source ); 340 341 addVertex( source ); 342 343 if( LOG.isDebugEnabled() ) 344 LOG.debug( "adding edge: " + source + " -> " + current ); 345 346 addEdge( source, current ).setName( current.getName() ); // name scope after source 347 } 348 } 349 350 for( Pipe previous : SubAssembly.unwind( current.getPrevious() ) ) 351 { 352 makeGraph( previous, sources, sinks ); 353 354 if( LOG.isDebugEnabled() ) 355 LOG.debug( "adding edge: " + previous + " -> " + current ); 356 357 if( getEdge( previous, current ) != null ) 358 throw new ElementGraphException( previous, "cannot distinguish pipe branches, give pipe unique name: " + previous ); 359 360 addEdge( previous, current ).setName( previous.getName() ); // name scope after previous pipe 361 } 362 } 363 364 /** 365 * Method getTopologicalIterator returns the topologicalIterator of this ElementGraph object. 366 * 367 * @return the topologicalIterator (type TopologicalOrderIterator<FlowElement, Scope>) of this ElementGraph object. 368 */ 369 public TopologicalOrderIterator<FlowElement, Scope> getTopologicalIterator() 370 { 371 return new TopologicalOrderIterator<FlowElement, Scope>( this ); 372 } 373 374 /** 375 * Method getAllShortestPathsFrom ... 376 * 377 * @param flowElement of type FlowElement 378 * @return List<GraphPath<FlowElement, Scope>> 379 */ 380 public List<GraphPath<FlowElement, Scope>> getAllShortestPathsFrom( FlowElement flowElement ) 381 { 382 return ElementGraphs.getAllShortestPathsBetween( this, flowElement, tail ); 383 } 384 385 /** 386 * Method getAllShortestPathsTo ... 387 * 388 * @param flowElement of type FlowElement 389 * @return List<GraphPath<FlowElement, Scope>> 390 */ 391 public List<GraphPath<FlowElement, Scope>> getAllShortestPathsTo( FlowElement flowElement ) 392 { 393 return ElementGraphs.getAllShortestPathsBetween( this, head, flowElement ); 394 } 395 396 /** 397 * Method getAllShortestPathsBetweenExtents returns the allShortestPathsBetweenExtents of this ElementGraph object. 398 * 399 * @return the allShortestPathsBetweenExtents (type List<GraphPath<FlowElement, Scope>>) of this ElementGraph object. 400 */ 401 public List<GraphPath<FlowElement, Scope>> getAllShortestPathsBetweenExtents() 402 { 403 List<GraphPath<FlowElement, Scope>> paths = new KShortestPaths<FlowElement, Scope>( this, head, Integer.MAX_VALUE ).getPaths( tail ); 404 405 if( paths == null ) 406 return new ArrayList<GraphPath<FlowElement, Scope>>(); 407 408 return paths; 409 } 410 411 /** 412 * Method getDepthFirstIterator returns the depthFirstIterator of this ElementGraph object. 413 * 414 * @return the depthFirstIterator (type DepthFirstIterator<FlowElement, Scope>) of this ElementGraph object. 415 */ 416 public DepthFirstIterator<FlowElement, Scope> getDepthFirstIterator() 417 { 418 return new DepthFirstIterator<FlowElement, Scope>( this, head ); 419 } 420 421 private SimpleDirectedGraph<FlowElement, Scope> copyWithTraps() 422 { 423 ElementGraph copy = this.copyElementGraph(); 424 425 copy.addTraps(); 426 427 return copy; 428 } 429 430 private void addTraps() 431 { 432 DepthFirstIterator<FlowElement, Scope> iterator = getDepthFirstIterator(); 433 434 while( iterator.hasNext() ) 435 { 436 FlowElement element = iterator.next(); 437 438 if( !( element instanceof Pipe ) ) 439 continue; 440 441 Pipe pipe = (Pipe) element; 442 Tap trap = traps.get( pipe.getName() ); 443 444 if( trap == null ) 445 continue; 446 447 addVertex( trap ); 448 449 if( LOG.isDebugEnabled() ) 450 LOG.debug( "adding trap edge: " + pipe + " -> " + trap ); 451 452 if( getEdge( pipe, trap ) != null ) 453 continue; 454 455 addEdge( pipe, trap ).setName( pipe.getName() ); // name scope after previous pipe 456 } 457 } 458 459 /** 460 * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging. 461 * 462 * @param filename of type String 463 */ 464 public void writeDOT( String filename ) 465 { 466 printElementGraph( filename, this.copyWithTraps() ); 467 } 468 469 protected void printElementGraph( String filename, final SimpleDirectedGraph<FlowElement, Scope> graph ) 470 { 471 try 472 { 473 File parentFile = new File( filename ).getParentFile(); 474 475 if( parentFile != null && !parentFile.exists() ) 476 parentFile.mkdirs(); 477 478 Writer writer = new FileWriter( filename ); 479 480 Util.writeDOT( writer, graph, new IntegerNameProvider<FlowElement>(), new VertexNameProvider<FlowElement>() 481 { 482 public String getVertexName( FlowElement object ) 483 { 484 if( graph.incomingEdgesOf( object ).isEmpty() ) 485 { 486 String result = object.toString().replaceAll( "\"", "\'" ); 487 String versionString = Version.getRelease(); 488 489 if( platformInfo != null ) 490 versionString = ( versionString == null ? "" : versionString + "\\n" ) + platformInfo; 491 492 return versionString == null ? result : result + "\\n" + versionString; 493 } 494 495 if( object instanceof Tap || object instanceof Extent ) 496 return object.toString().replaceAll( "\"", "\'" ); 497 498 Scope scope = graph.outgoingEdgesOf( object ).iterator().next(); 499 500 return ( (Pipe) object ).print( scope ).replaceAll( "\"", "\'" ); 501 } 502 }, new EdgeNameProvider<Scope>() 503 { 504 public String getEdgeName( Scope object ) 505 { 506 return object.toString().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz 507 } 508 } 509 ); 510 511 writer.close(); 512 } 513 catch( IOException exception ) 514 { 515 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 516 } 517 } 518 519 /** Method removeEmptyPipes performs a depth first traversal and removes instance of {@link cascading.pipe.Pipe} or {@link cascading.pipe.SubAssembly}. */ 520 public void removeUnnecessaryPipes() 521 { 522 while( !internalRemoveUnnecessaryPipes() ) 523 ; 524 525 int numPipes = 0; 526 for( FlowElement flowElement : vertexSet() ) 527 { 528 if( flowElement instanceof Pipe ) 529 numPipes++; 530 } 531 532 if( numPipes == 0 ) 533 throw new ElementGraphException( "resulting graph has no pipe elements after removing empty Pipe, assertions, and SubAssembly containers" ); 534 } 535 536 private boolean internalRemoveUnnecessaryPipes() 537 { 538 DepthFirstIterator<FlowElement, Scope> iterator = getDepthFirstIterator(); 539 540 while( iterator.hasNext() ) 541 { 542 FlowElement flowElement = iterator.next(); 543 544 if( flowElement.getClass() == Pipe.class || flowElement.getClass() == Checkpoint.class || 545 flowElement instanceof SubAssembly || testPlannerLevel( flowElement ) ) 546 { 547 // Pipe class is guaranteed to have one input 548 removeElement( flowElement ); 549 550 return false; 551 } 552 } 553 554 return true; 555 } 556 557 private void removeElement( FlowElement flowElement ) 558 { 559 LOG.debug( "removing: " + flowElement ); 560 561 Set<Scope> incomingScopes = incomingEdgesOf( flowElement ); 562 563 if( incomingScopes.size() != 1 ) 564 throw new IllegalStateException( "flow element:" + flowElement + ", has multiple input paths: " + incomingScopes.size() ); 565 566 Scope incoming = incomingScopes.iterator().next(); 567 Set<Scope> outgoingScopes = outgoingEdgesOf( flowElement ); 568 569 // source -> incoming -> flowElement -> outgoing -> target 570 FlowElement source = getEdgeSource( incoming ); 571 572 for( Scope outgoing : outgoingScopes ) 573 { 574 FlowElement target = getEdgeTarget( outgoing ); 575 576 addEdge( source, target, new Scope( outgoing ) ); 577 } 578 579 removeVertex( flowElement ); 580 } 581 582 private boolean testPlannerLevel( FlowElement flowElement ) 583 { 584 if( !( flowElement instanceof Operator ) ) 585 return false; 586 587 Operator operator = (Operator) flowElement; 588 589 if( !operator.hasPlannerLevel() ) 590 return false; 591 592 for( PlannerLevel plannerLevel : plannerLevels ) 593 { 594 if( ( (PlannedOperation) operator.getOperation() ).supportsPlannerLevel( plannerLevel ) ) 595 return operator.getPlannerLevel().isStricterThan( plannerLevel ); 596 } 597 598 throw new IllegalStateException( "encountered unsupported planner level: " + operator.getPlannerLevel().getClass().getName() ); 599 } 600 601 /** Method resolveFields performs a breadth first traversal and resolves the tuple fields between each Pipe instance. */ 602 public void resolveFields() 603 { 604 if( resolved ) 605 throw new IllegalStateException( "element graph already resolved" ); 606 607 TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator(); 608 609 while( iterator.hasNext() ) 610 resolveFields( iterator.next() ); 611 612 resolved = true; 613 } 614 615 private void resolveFields( FlowElement source ) 616 { 617 if( source instanceof Extent ) 618 return; 619 620 Set<Scope> incomingScopes = incomingEdgesOf( source ); 621 Set<Scope> outgoingScopes = outgoingEdgesOf( source ); 622 623 List<FlowElement> flowElements = Graphs.successorListOf( this, source ); 624 625 if( flowElements.size() == 0 ) 626 throw new IllegalStateException( "unable to find next elements in pipeline from: " + source.toString() ); 627 628 Scope outgoingScope = source.outgoingScopeFor( incomingScopes ); 629 630 if( LOG.isDebugEnabled() && outgoingScope != null ) 631 { 632 LOG.debug( "for modifier: " + source ); 633 if( outgoingScope.getArgumentsSelector() != null ) 634 LOG.debug( "setting outgoing arguments: " + outgoingScope.getArgumentsSelector() ); 635 if( outgoingScope.getOperationDeclaredFields() != null ) 636 LOG.debug( "setting outgoing declared: " + outgoingScope.getOperationDeclaredFields() ); 637 if( outgoingScope.getKeySelectors() != null ) 638 LOG.debug( "setting outgoing group: " + outgoingScope.getKeySelectors() ); 639 if( outgoingScope.getOutValuesSelector() != null ) 640 LOG.debug( "setting outgoing values: " + outgoingScope.getOutValuesSelector() ); 641 } 642 643 for( Scope scope : outgoingScopes ) 644 scope.copyFields( outgoingScope ); 645 } 646 647 /** 648 * Finds all groups that merge/join streams. returned in topological order. 649 * 650 * @return a List fo Group instances 651 */ 652 public List<Group> findAllMergeJoinGroups() 653 { 654 return findAllOfType( 2, 1, Group.class, new LinkedList<Group>() ); 655 } 656 657 /** 658 * Finds all splices that merge/join streams. returned in topological order. 659 * 660 * @return a List fo Group instances 661 */ 662 public List<Splice> findAllMergeJoinSplices() 663 { 664 return findAllOfType( 2, 1, Splice.class, new LinkedList<Splice>() ); 665 } 666 667 public List<CoGroup> findAllCoGroups() 668 { 669 return findAllOfType( 2, 1, CoGroup.class, new LinkedList<CoGroup>() ); 670 } 671 672 /** 673 * Method findAllGroups ... 674 * 675 * @return List<Group> 676 */ 677 public List<Group> findAllGroups() 678 { 679 return findAllOfType( 1, 1, Group.class, new LinkedList<Group>() ); 680 } 681 682 /** 683 * Method findAllEveries ... 684 * 685 * @return List<Every> 686 */ 687 public List<Every> findAllEveries() 688 { 689 return findAllOfType( 1, 1, Every.class, new LinkedList<Every>() ); 690 } 691 692 /** 693 * Method findAllTaps ... 694 * 695 * @return List<Tap> 696 */ 697 public List<Tap> findAllTaps() 698 { 699 return findAllOfType( 1, 1, Tap.class, new LinkedList<Tap>() ); 700 } 701 702 /** 703 * Method findAllSplits ... 704 * 705 * @return List<FlowElement> 706 */ 707 public List<Each> findAllEachSplits() 708 { 709 return findAllOfType( 1, 2, Each.class, new LinkedList<Each>() ); 710 } 711 712 public List<Pipe> findAllPipeSplits() 713 { 714 return findAllOfType( 1, 2, Pipe.class, new LinkedList<Pipe>() ); 715 } 716 717 /** 718 * Method findAllOfType ... 719 * 720 * @param minInDegree of type int 721 * @param minOutDegree 722 * @param type of type Class<P> 723 * @param results of type List<P> @return List<P> 724 */ 725 public <P> List<P> findAllOfType( int minInDegree, int minOutDegree, Class<P> type, List<P> results ) 726 { 727 TopologicalOrderIterator<FlowElement, Scope> topoIterator = getTopologicalIterator(); 728 729 while( topoIterator.hasNext() ) 730 { 731 FlowElement flowElement = topoIterator.next(); 732 733 if( type.isInstance( flowElement ) && inDegreeOf( flowElement ) >= minInDegree && outDegreeOf( flowElement ) >= minOutDegree ) 734 results.add( (P) flowElement ); 735 } 736 737 return results; 738 } 739 740 public void insertFlowElementAfter( FlowElement previousElement, FlowElement flowElement ) 741 { 742 Set<Scope> outgoing = new HashSet<Scope>( outgoingEdgesOf( previousElement ) ); 743 744 addVertex( flowElement ); 745 746 String name = previousElement.toString(); 747 748 if( previousElement instanceof Pipe ) 749 name = ( (Pipe) previousElement ).getName(); 750 751 addEdge( previousElement, flowElement, new Scope( name ) ); 752 753 for( Scope scope : outgoing ) 754 { 755 FlowElement target = getEdgeTarget( scope ); 756 removeEdge( previousElement, target ); // remove scope 757 addEdge( flowElement, target, scope ); // add scope back 758 } 759 } 760 761 /** Simple class that acts in as the root of the graph */ 762 /** 763 * Method makeTapGraph returns a directed graph of all taps in the current element graph. 764 * 765 * @return SimpleDirectedGraph<Tap, Integer> 766 */ 767 public SimpleDirectedGraph<Tap, Integer> makeTapGraph() 768 { 769 SimpleDirectedGraph<Tap, Integer> tapGraph = new SimpleDirectedGraph<Tap, Integer>( Integer.class ); 770 List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetweenExtents(); 771 int count = 0; 772 773 if( LOG.isDebugEnabled() ) 774 LOG.debug( "found num paths: " + paths.size() ); 775 776 for( GraphPath<FlowElement, Scope> element : paths ) 777 { 778 List<Scope> path = element.getEdgeList(); 779 Tap lastTap = null; 780 781 for( Scope scope : path ) 782 { 783 FlowElement target = getEdgeTarget( scope ); 784 785 if( target instanceof Extent ) 786 continue; 787 788 if( !( target instanceof Tap ) ) 789 continue; 790 791 tapGraph.addVertex( (Tap) target ); 792 793 if( lastTap != null ) 794 { 795 if( LOG.isDebugEnabled() ) 796 LOG.debug( "adding tap edge: " + lastTap + " -> " + target ); 797 798 if( tapGraph.getEdge( lastTap, (Tap) target ) == null && !tapGraph.addEdge( lastTap, (Tap) target, count++ ) ) 799 throw new ElementGraphException( "could not add graph edge: " + lastTap + " -> " + target ); 800 } 801 802 lastTap = (Tap) target; 803 } 804 } 805 806 return tapGraph; 807 } 808 809 public int getMaxNumPathsBetweenElementAndGroupingMergeJoin( FlowElement flowElement ) 810 { 811 List<Group> groups = findAllMergeJoinGroups(); 812 813 int maxPaths = 0; 814 815 if( groups == null ) 816 return 0; 817 818 for( Group group : groups ) 819 { 820 if( flowElement != group ) 821 { 822 List<GraphPath<FlowElement, Scope>> paths = ElementGraphs.getAllShortestPathsBetween( this, flowElement, group ); 823 824 if( paths != null ) 825 maxPaths = Math.max( maxPaths, paths.size() ); 826 } 827 } 828 829 return maxPaths; 830 } 831 832 public List<FlowElement> getAllSuccessors( FlowElement element ) 833 { 834 return Graphs.successorListOf( this, element ); 835 } 836 837 public void replaceElementWith( FlowElement element, FlowElement replacement ) 838 { 839 Set<Scope> incoming = new HashSet<Scope>( incomingEdgesOf( element ) ); 840 Set<Scope> outgoing = new HashSet<Scope>( outgoingEdgesOf( element ) ); 841 842 if( !containsVertex( replacement ) ) 843 addVertex( replacement ); 844 845 for( Scope scope : incoming ) 846 { 847 FlowElement source = getEdgeSource( scope ); 848 removeEdge( source, element ); // remove scope 849 850 // drop edge between, if any 851 if( source != replacement ) 852 addEdge( source, replacement, scope ); // add scope back 853 } 854 855 for( Scope scope : outgoing ) 856 { 857 FlowElement target = getEdgeTarget( scope ); 858 removeEdge( element, target ); // remove scope 859 860 // drop edge between, if any 861 if( target != replacement ) 862 addEdge( replacement, target, scope ); // add scope back 863 } 864 865 removeVertex( element ); 866 } 867 868 public <A extends FlowElement> Set<A> getAllChildrenOfType( FlowElement flowElement, Class<A> type ) 869 { 870 Set<A> allChildren = new HashSet<A>(); 871 872 getAllChildrenOfType( allChildren, flowElement, type ); 873 874 return allChildren; 875 } 876 877 private <A extends FlowElement> void getAllChildrenOfType( Set<A> allSuccessors, FlowElement flowElement, Class<A> type ) 878 { 879 List<FlowElement> successors = getAllSuccessors( flowElement ); 880 881 for( FlowElement successor : successors ) 882 { 883 if( type.isInstance( successor ) ) 884 allSuccessors.add( (A) successor ); 885 else 886 getAllChildrenOfType( allSuccessors, successor, type ); 887 } 888 } 889 890 public Set<FlowElement> getAllChildrenNotExactlyType( FlowElement flowElement, Class<? extends FlowElement> type ) 891 { 892 Set<FlowElement> allChildren = new HashSet<FlowElement>(); 893 894 getAllChildrenNotExactlyType( allChildren, flowElement, type ); 895 896 return allChildren; 897 } 898 899 private void getAllChildrenNotExactlyType( Set<FlowElement> allSuccessors, FlowElement flowElement, Class<? extends FlowElement> type ) 900 { 901 List<FlowElement> successors = getAllSuccessors( flowElement ); 902 903 for( FlowElement successor : successors ) 904 { 905 if( type != successor.getClass() ) 906 allSuccessors.add( successor ); 907 else 908 getAllChildrenNotExactlyType( allSuccessors, successor, type ); 909 } 910 } 911 912 public static class Extent extends Pipe 913 { 914 915 /** @see cascading.pipe.Pipe#Pipe(String) */ 916 public Extent( String name ) 917 { 918 super( name ); 919 } 920 921 @Override 922 public Scope outgoingScopeFor( Set<Scope> scopes ) 923 { 924 return new Scope(); 925 } 926 927 @Override 928 public String toString() 929 { 930 return "[" + getName() + "]"; 931 } 932 933 public boolean equals( Object object ) 934 { 935 if( object == null ) 936 return false; 937 938 if( this == object ) 939 return true; 940 941 if( object.getClass() != this.getClass() ) 942 return false; 943 944 return this.getName().equals( ( (Pipe) object ).getName() ); 945 } 946 } 947 }