001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.graph; 022 023import java.io.File; 024import java.io.FileWriter; 025import java.io.IOException; 026import java.io.Writer; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.HashSet; 032import java.util.IdentityHashMap; 033import java.util.Iterator; 034import java.util.LinkedHashSet; 035import java.util.LinkedList; 036import java.util.List; 037import java.util.ListIterator; 038import java.util.Map; 039import java.util.Set; 040 041import cascading.flow.FlowElement; 042import cascading.flow.FlowElements; 043import cascading.flow.planner.BaseFlowStep; 044import cascading.flow.planner.PlatformInfo; 045import cascading.flow.planner.Scope; 046import cascading.flow.planner.iso.expression.ElementCapture; 047import cascading.flow.planner.iso.expression.ExpressionGraph; 048import cascading.flow.planner.iso.expression.FlowElementExpression; 049import cascading.flow.planner.iso.expression.TypeExpression; 050import cascading.flow.planner.iso.finder.SearchOrder; 051import cascading.flow.planner.iso.subgraph.SubGraphIterator; 052import cascading.flow.planner.iso.subgraph.iterator.ExpressionSubGraphIterator; 053import cascading.flow.planner.process.ProcessGraph; 054import cascading.flow.planner.process.ProcessModel; 055import cascading.pipe.Group; 056import cascading.pipe.HashJoin; 057import cascading.pipe.Operator; 058import cascading.pipe.Pipe; 059import cascading.pipe.Splice; 060import cascading.tap.Tap; 061import cascading.util.DOTProcessGraphWriter; 062import cascading.util.Murmur3; 063import cascading.util.Pair; 064import cascading.util.Util; 065import cascading.util.Version; 066import cascading.util.jgrapht.ComponentAttributeProvider; 067import cascading.util.jgrapht.EdgeNameProvider; 068import cascading.util.jgrapht.IntegerNameProvider; 069import cascading.util.jgrapht.VertexNameProvider; 070import org.jgrapht.DirectedGraph; 071import org.jgrapht.Graph; 072import org.jgrapht.GraphPath; 073import org.jgrapht.Graphs; 074import org.jgrapht.alg.DijkstraShortestPath; 075import org.jgrapht.alg.FloydWarshallShortestPaths; 076import org.jgrapht.alg.KShortestPaths; 077import org.jgrapht.graph.AbstractGraph; 078import org.jgrapht.graph.EdgeReversedGraph; 079import org.jgrapht.graph.SimpleDirectedGraph; 080import org.jgrapht.traverse.TopologicalOrderIterator; 081import org.jgrapht.util.TypeUtil; 082import org.slf4j.Logger; 083import org.slf4j.LoggerFactory; 084 085import static cascading.util.Util.*; 086import static java.lang.Double.POSITIVE_INFINITY; 087 088/** 089 * Class ElementGraphs maintains a collection of operations that can be performed on an {@link ElementGraph}. 090 */ 091public class ElementGraphs 092 { 093 private static final Logger LOG = LoggerFactory.getLogger( ElementGraphs.class ); 094 095 // not for instantiation 096 private ElementGraphs() 097 { 098 } 099 100 public static DirectedGraph<FlowElement, Scope> directed( ElementGraph elementGraph ) 101 { 102 if( elementGraph == null ) 103 return null; 104 105 if( elementGraph instanceof DecoratedElementGraph ) 106 return directed( ( (DecoratedElementGraph) elementGraph ).getDecorated() ); 107 108 return ( (BaseElementGraph) elementGraph ).graph; 109 } 110 111 public static int hashCodeIgnoreAnnotations( ElementGraph elementGraph ) 112 { 113 return hashCodeIgnoreAnnotations( directed( elementGraph ) ); 114 } 115 116 public static <V, E> int hashCodeIgnoreAnnotations( Graph<V, E> graph ) 117 { 118 int hash = graph.vertexSet().hashCode(); 119 120 for( E e : graph.edgeSet() ) 121 { 122 int part = e.hashCode(); 123 124 int source = graph.getEdgeSource( e ).hashCode(); 125 int target = graph.getEdgeTarget( e ).hashCode(); 126 127 int pairing = pair( source, target ); 128 129 part = ( 27 * part ) + pairing; 130 131 long weight = (long) graph.getEdgeWeight( e ); 132 part = ( 27 * part ) + (int) ( weight ^ ( weight >>> 32 ) ); 133 134 hash += part; 135 } 136 137 return hash; 138 } 139 140 public static boolean equalsIgnoreAnnotations( ElementGraph lhs, ElementGraph rhs ) 141 { 142 return equalsIgnoreAnnotations( directed( lhs ), directed( rhs ) ); 143 } 144 145 public static <V, E> boolean equalsIgnoreAnnotations( Graph<V, E> lhs, Graph<V, E> rhs ) 146 { 147 if( lhs == rhs ) 148 return true; 149 150 TypeUtil<Graph<V, E>> typeDecl = null; 151 Graph<V, E> lhsGraph = TypeUtil.uncheckedCast( lhs, typeDecl ); 152 Graph<V, E> rhsGraph = TypeUtil.uncheckedCast( rhs, typeDecl ); 153 154 if( !lhsGraph.vertexSet().equals( rhsGraph.vertexSet() ) ) 155 return false; 156 157 if( lhsGraph.edgeSet().size() != rhsGraph.edgeSet().size() ) 158 return false; 159 160 for( E e : lhsGraph.edgeSet() ) 161 { 162 V source = lhsGraph.getEdgeSource( e ); 163 V target = lhsGraph.getEdgeTarget( e ); 164 165 if( !rhsGraph.containsEdge( e ) ) 166 return false; 167 168 if( !rhsGraph.getEdgeSource( e ).equals( source ) || !rhsGraph.getEdgeTarget( e ).equals( target ) ) 169 return false; 170 171 if( Math.abs( lhsGraph.getEdgeWeight( e ) - rhsGraph.getEdgeWeight( e ) ) > 10e-7 ) 172 return false; 173 } 174 175 return true; 176 } 177 178 public static boolean equals( ElementGraph lhs, ElementGraph rhs ) 179 { 180 if( !equalsIgnoreAnnotations( lhs, rhs ) ) 181 return false; 182 183 if( !( lhs instanceof AnnotatedGraph ) && !( rhs instanceof AnnotatedGraph ) ) 184 return true; 185 186 if( !( lhs instanceof AnnotatedGraph ) || !( rhs instanceof AnnotatedGraph ) ) 187 return false; 188 189 AnnotatedGraph lhsAnnotated = (AnnotatedGraph) lhs; 190 AnnotatedGraph rhsAnnotated = (AnnotatedGraph) rhs; 191 192 if( lhsAnnotated.hasAnnotations() != rhsAnnotated.hasAnnotations() ) 193 return false; 194 195 if( !lhsAnnotated.hasAnnotations() ) 196 return true; 197 198 return lhsAnnotated.getAnnotations().equals( rhsAnnotated.getAnnotations() ); 199 } 200 201 public static String canonicalHash( ElementGraph graph ) 202 { 203 return canonicalHash( directed( graph ) ); 204 } 205 206 public static String canonicalHash( Graph<FlowElement, Scope> graph ) 207 { 208 int hash = Murmur3.SEED; 209 210 int edges = 0; 211 boolean hasExtents = false; 212 213 for( Scope e : graph.edgeSet() ) 214 { 215 FlowElement edgeSource = graph.getEdgeSource( e ); 216 FlowElement edgeTarget = graph.getEdgeTarget( e ); 217 218 // simpler to ignore extents here 219 if( edgeSource instanceof Extent || edgeTarget instanceof Extent ) 220 { 221 hasExtents = true; 222 continue; 223 } 224 225 int source = hash( edgeSource ); 226 int target = hash( edgeTarget ); 227 int pairing = pair( source, target ); 228 229 hash += pairing; // don't make edge traversal order significant 230 edges++; 231 } 232 233 int vertexes = graph.vertexSet().size() - ( hasExtents ? 2 : 0 ); 234 235 hash = Murmur3.fmix( hash, vertexes * edges ); 236 237 return Util.getHex( Util.intToByteArray( hash ) ); 238 } 239 240 private static int hash( FlowElement flowElement ) 241 { 242 int lhs = flowElement.getClass().getName().hashCode(); 243 int rhs = 0; 244 245 if( flowElement instanceof Operator && ( (Operator) flowElement ).getOperation() != null ) 246 rhs = ( (Operator) flowElement ).getOperation().getClass().getName().hashCode(); 247 else if( flowElement instanceof Tap && ( (Tap) flowElement ).getScheme() != null ) 248 rhs = ( (Tap) flowElement ).getScheme().getClass().getName().hashCode(); 249 else if( flowElement instanceof Splice ) 250 rhs = ( (Splice) flowElement ).getJoiner().getClass().getName().hashCode() + 31 * ( (Splice) flowElement ).getNumSelfJoins(); 251 252 return pair( lhs, rhs ); 253 } 254 255 protected static int pair( int lhs, int rhs ) 256 { 257 if( rhs == 0 ) 258 return lhs; 259 260 // see http://en.wikipedia.org/wiki/Pairing_function 261 return ( ( lhs + rhs ) * ( lhs + rhs + 1 ) / 2 ) + rhs; 262 } 263 264 public static Iterator<FlowElement> getTopologicalIterator( ElementGraph graph ) 265 { 266 return new TopologicalOrderIterator<>( directed( graph ) ); 267 } 268 269 public static Iterator<FlowElement> getReverseTopologicalIterator( ElementGraph graph ) 270 { 271 return new TopologicalOrderIterator<>( new EdgeReversedGraph<>( directed( graph ) ) ); 272 } 273 274 public static List<GraphPath<FlowElement, Scope>> getAllShortestPathsBetween( ElementGraph graph, FlowElement from, FlowElement to ) 275 { 276 return getAllShortestPathsBetween( directed( graph ), from, to ); 277 } 278 279 public static <V, E> List<GraphPath<V, E>> getAllShortestPathsBetween( DirectedGraph<V, E> graph, V from, V to ) 280 { 281 List<GraphPath<V, E>> paths = new KShortestPaths<>( graph, from, Integer.MAX_VALUE ).getPaths( to ); 282 283 if( paths == null ) 284 return new ArrayList<>(); 285 286 return paths; 287 } 288 289 public static ElementSubGraph asSubGraph( ElementGraph elementGraph, ElementGraph contractedGraph, Set<FlowElement> excludes ) 290 { 291 elementGraph = asExtentMaskedSubGraph( elementGraph ); // returns same instance if not bounded 292 293 Pair<Set<FlowElement>, Set<Scope>> pair = findClosureViaFloydWarshall( directed( elementGraph ), directed( contractedGraph ), excludes ); 294 Set<FlowElement> vertices = pair.getLhs(); 295 Set<Scope> excludeEdges = pair.getRhs(); 296 297 Set<Scope> scopes = new HashSet<>( elementGraph.edgeSet() ); 298 scopes.removeAll( excludeEdges ); 299 300 return new ElementSubGraph( elementGraph, vertices, scopes ); 301 } 302 303 /** 304 * Returns a new ElementGraph (a MaskedSubGraph) of the given ElementGraph that will not contain the {@link Extent} 305 * head or tail instances. 306 * <p/> 307 * If the given ElementGraph does not contain head or tail, it will be returned unchanged. 308 * 309 * @param elementGraph 310 * @return 311 */ 312 public static ElementGraph asExtentMaskedSubGraph( ElementGraph elementGraph ) 313 { 314 if( elementGraph.containsVertex( Extent.head ) || elementGraph.containsVertex( Extent.tail ) ) 315 return new ElementMaskSubGraph( elementGraph, Extent.head, Extent.tail ); 316 317 return elementGraph; 318 } 319 320 public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( DirectedGraph<V, E> full, DirectedGraph<V, E> contracted ) 321 { 322 return findClosureViaFloydWarshall( full, contracted, null ); 323 } 324 325 public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( DirectedGraph<V, E> full, DirectedGraph<V, E> contracted, Set<V> excludes ) 326 { 327 Set<V> vertices = new HashSet<>( contracted.vertexSet() ); 328 LinkedList<V> allVertices = new LinkedList<>( full.vertexSet() ); 329 330 allVertices.removeAll( vertices ); 331 332 Set<E> excludeEdges = new HashSet<>(); 333 334 // prevent distinguished elements from being included inside the sub-graph 335 if( excludes != null ) 336 { 337 for( V v : excludes ) 338 { 339 if( !full.containsVertex( v ) ) 340 continue; 341 342 excludeEdges.addAll( full.incomingEdgesOf( v ) ); 343 excludeEdges.addAll( full.outgoingEdgesOf( v ) ); 344 } 345 } 346 347 for( V v : contracted.vertexSet() ) 348 { 349 if( contracted.inDegreeOf( v ) == 0 ) 350 excludeEdges.addAll( full.incomingEdgesOf( v ) ); 351 } 352 353 for( V v : contracted.vertexSet() ) 354 { 355 if( contracted.outDegreeOf( v ) == 0 ) 356 excludeEdges.addAll( full.outgoingEdgesOf( v ) ); 357 } 358 359 DirectedGraph<V, E> disconnected = disconnectExtentsAndExclude( full, excludeEdges ); 360 361 FloydWarshallShortestPaths<V, E> paths = new FloydWarshallShortestPaths<>( disconnected ); 362 363 for( E edge : contracted.edgeSet() ) 364 { 365 V edgeSource = contracted.getEdgeSource( edge ); 366 V edgeTarget = contracted.getEdgeTarget( edge ); 367 368 ListIterator<V> iterator = allVertices.listIterator(); 369 while( iterator.hasNext() ) 370 { 371 V vertex = iterator.next(); 372 373 if( !isBetween( paths, edgeSource, edgeTarget, vertex ) ) 374 continue; 375 376 vertices.add( vertex ); 377 iterator.remove(); 378 } 379 } 380 381 return new Pair<>( vertices, excludeEdges ); 382 } 383 384 private static <V, E> DirectedGraph<V, E> disconnectExtentsAndExclude( DirectedGraph<V, E> full, Set<E> withoutEdges ) 385 { 386 DirectedGraph<V, E> copy = (DirectedGraph<V, E>) new SimpleDirectedGraph<>( Object.class ); 387 388 Graphs.addAllVertices( copy, full.vertexSet() ); 389 390 copy.removeVertex( (V) Extent.head ); 391 copy.removeVertex( (V) Extent.tail ); 392 393 Set<E> edges = full.edgeSet(); 394 395 if( !withoutEdges.isEmpty() ) 396 { 397 edges = new HashSet<>( edges ); 398 edges.removeAll( withoutEdges ); 399 } 400 401 Graphs.addAllEdges( copy, full, edges ); 402 403 return copy; 404 } 405 406 private static <V, E> boolean isBetween( FloydWarshallShortestPaths<V, E> paths, V edgeSource, V edgeTarget, V vertex ) 407 { 408 return paths.shortestDistance( edgeSource, vertex ) != POSITIVE_INFINITY && paths.shortestDistance( vertex, edgeTarget ) != POSITIVE_INFINITY; 409 } 410 411 public static void removeAndContract( ElementGraph elementGraph, FlowElement flowElement ) 412 { 413 LOG.debug( "removing element, contracting edge for: {}", flowElement ); 414 415 Set<Scope> incomingScopes = elementGraph.incomingEdgesOf( flowElement ); 416 417 boolean contractIncoming = true; 418 419 if( !contractIncoming ) 420 { 421 if( incomingScopes.size() != 1 ) 422 throw new IllegalStateException( "flow element:" + flowElement + ", has multiple input paths: " + incomingScopes.size() ); 423 } 424 425 boolean isJoin = flowElement instanceof Splice && ( (Splice) flowElement ).isJoin(); 426 427 for( Scope incoming : incomingScopes ) 428 { 429 Set<Scope> outgoingScopes = elementGraph.outgoingEdgesOf( flowElement ); 430 431 // source -> incoming -> flowElement -> outgoing -> target 432 FlowElement source = elementGraph.getEdgeSource( incoming ); 433 434 for( Scope outgoing : outgoingScopes ) 435 { 436 FlowElement target = elementGraph.getEdgeTarget( outgoing ); 437 438 boolean isNonBlocking = outgoing.isNonBlocking(); 439 440 if( isJoin ) 441 isNonBlocking = isNonBlocking && incoming.isNonBlocking(); 442 443 Scope scope = new Scope( outgoing ); 444 445 // unsure if necessary since we track blocking independently 446 // when removing a pipe, pull ordinal up to tap 447 // when removing a Splice retain ordinal 448 if( flowElement instanceof Splice ) 449 scope.setOrdinal( incoming.getOrdinal() ); 450 else 451 scope.setOrdinal( outgoing.getOrdinal() ); 452 453 scope.setNonBlocking( isNonBlocking ); 454 scope.addPriorNames( incoming, outgoing ); // not copied 455 elementGraph.addEdge( source, target, scope ); 456 } 457 } 458 459 elementGraph.removeVertex( flowElement ); 460 } 461 462 public static boolean printElementGraph( String filename, final ElementGraph graph, final PlatformInfo platformInfo ) 463 { 464 try 465 { 466 File parentFile = new File( filename ).getParentFile(); 467 468 if( parentFile != null && !parentFile.exists() ) 469 parentFile.mkdirs(); 470 471 Writer writer = new FileWriter( filename ); 472 473 Util.writeDOT( writer, ElementGraphs.directed( graph ), 474 new IntegerNameProvider<FlowElement>(), 475 new FlowElementVertexNameProvider( graph, platformInfo ), 476 new ScopeEdgeNameProvider(), 477 new VertexAttributeProvider(), new EdgeAttributeProvider() ); 478 479 writer.close(); 480 return true; 481 } 482 catch( IOException exception ) 483 { 484 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 485 } 486 487 return false; 488 } 489 490 public static boolean printProcessGraph( String filename, final ElementGraph graph, final ProcessGraph<? extends ProcessModel> processGraph ) 491 { 492 try 493 { 494 File parentFile = new File( filename ).getParentFile(); 495 496 if( parentFile != null && !parentFile.exists() ) 497 parentFile.mkdirs(); 498 499 Writer writer = new FileWriter( filename ); 500 501 DOTProcessGraphWriter graphWriter = new DOTProcessGraphWriter( 502 new IntegerNameProvider<Pair<ElementGraph, FlowElement>>(), 503 new FlowElementVertexNameProvider( graph, null ), 504 new ScopeEdgeNameProvider(), 505 new VertexAttributeProvider(), new EdgeAttributeProvider(), 506 new ProcessGraphNameProvider(), new ProcessGraphLabelProvider() 507 ); 508 509 graphWriter.writeGraph( writer, graph, processGraph ); 510 511 writer.close(); 512 return true; 513 } 514 catch( IOException exception ) 515 { 516 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 517 } 518 519 return false; 520 } 521 522 public static void insertFlowElementAfter( ElementGraph elementGraph, FlowElement previousElement, FlowElement flowElement ) 523 { 524 Set<Scope> outgoing = new HashSet<>( elementGraph.outgoingEdgesOf( previousElement ) ); 525 526 elementGraph.addVertex( flowElement ); 527 528 String name = previousElement.toString(); 529 530 if( previousElement instanceof Pipe ) 531 name = ( (Pipe) previousElement ).getName(); 532 533 elementGraph.addEdge( previousElement, flowElement, new Scope( name ) ); 534 535 for( Scope scope : outgoing ) 536 { 537 FlowElement target = elementGraph.getEdgeTarget( scope ); 538 Scope foundScope = elementGraph.removeEdge( previousElement, target ); // remove scope 539 540 if( foundScope != scope ) 541 throw new IllegalStateException( "did not remove proper scope" ); 542 543 elementGraph.addEdge( flowElement, target, scope ); // add scope back 544 } 545 } 546 547 public static void insertFlowElementBefore( ElementGraph graph, FlowElement nextElement, FlowElement flowElement ) 548 { 549 Set<Scope> incoming = new HashSet<>( graph.incomingEdgesOf( nextElement ) ); 550 551 graph.addVertex( flowElement ); 552 553 String name = nextElement.toString(); 554 555 if( nextElement instanceof Pipe ) 556 name = ( (Pipe) nextElement ).getName(); 557 558 graph.addEdge( flowElement, nextElement, new Scope( name ) ); 559 560 for( Scope scope : incoming ) 561 { 562 FlowElement target = graph.getEdgeSource( scope ); 563 Scope foundScope = graph.removeEdge( target, nextElement ); // remove scope 564 565 if( foundScope != scope ) 566 throw new IllegalStateException( "did not remove proper scope" ); 567 568 graph.addEdge( target, flowElement, scope ); // add scope back 569 } 570 } 571 572 public static void addSources( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sources ) 573 { 574 for( Tap tap : sources ) 575 { 576 for( Scope scope : elementGraph.outgoingEdgesOf( tap ) ) 577 flowStep.addSource( scope.getName(), tap ); 578 } 579 } 580 581 public static Set<Tap> findSources( ElementGraph elementGraph ) 582 { 583 return findSources( elementGraph, Tap.class ); 584 } 585 586 public static <F extends FlowElement> Set<F> findSources( ElementGraph elementGraph, Class<F> type ) 587 { 588 if( elementGraph == null ) 589 return Collections.emptySet(); 590 591 if( elementGraph.containsVertex( Extent.head ) ) 592 return narrowSet( type, elementGraph.successorListOf( Extent.head ) ); 593 594 SubGraphIterator iterator = new ExpressionSubGraphIterator( 595 new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Head ) ), 596 elementGraph 597 ); 598 599 return narrowSet( type, getAllVertices( iterator ) ); 600 } 601 602 public static <F extends FlowElement> Set<F> findSinks( ElementGraph elementGraph, Class<F> type ) 603 { 604 if( elementGraph == null ) 605 return Collections.emptySet(); 606 607 if( elementGraph.containsVertex( Extent.tail ) ) 608 return narrowSet( type, elementGraph.predecessorListOf( Extent.tail ) ); 609 610 SubGraphIterator iterator = new ExpressionSubGraphIterator( 611 new ExpressionGraph( SearchOrder.ReverseTopological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Tail ) ), 612 elementGraph 613 ); 614 615 return narrowSet( type, getAllVertices( iterator ) ); 616 } 617 618 public static void addSinks( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sinks ) 619 { 620 for( Tap tap : sinks ) 621 { 622 for( Scope scope : elementGraph.incomingEdgesOf( tap ) ) 623 flowStep.addSink( scope.getName(), tap ); 624 } 625 } 626 627 public static Set<Tap> findSinks( ElementGraph elementGraph ) 628 { 629 return findSinks( elementGraph, Tap.class ); 630 } 631 632 public static Set<Group> findAllGroups( ElementGraph elementGraph ) 633 { 634 SubGraphIterator iterator = new ExpressionSubGraphIterator( 635 new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, Group.class ) ), 636 elementGraph 637 ); 638 639 return narrowSet( Group.class, getAllVertices( iterator ) ); 640 } 641 642 public static Set<HashJoin> findAllHashJoins( ElementGraph elementGraph ) 643 { 644 SubGraphIterator iterator = new ExpressionSubGraphIterator( 645 new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, HashJoin.class ) ), 646 elementGraph 647 ); 648 649 return narrowSet( HashJoin.class, getAllVertices( iterator ) ); 650 } 651 652 private static Set<FlowElement> getAllVertices( SubGraphIterator iterator ) 653 { 654 Set<FlowElement> vertices = createIdentitySet(); 655 656 while( iterator.hasNext() ) 657 vertices.addAll( iterator.next().vertexSet() ); 658 659 return vertices; 660 } 661 662 public static void replaceElementWith( ElementGraph elementGraph, FlowElement replace, FlowElement replaceWith ) 663 { 664 Set<Scope> incoming = new HashSet<Scope>( elementGraph.incomingEdgesOf( replace ) ); 665 Set<Scope> outgoing = new HashSet<Scope>( elementGraph.outgoingEdgesOf( replace ) ); 666 667 if( !elementGraph.containsVertex( replaceWith ) ) 668 elementGraph.addVertex( replaceWith ); 669 670 for( Scope scope : incoming ) 671 { 672 FlowElement source = elementGraph.getEdgeSource( scope ); 673 elementGraph.removeEdge( source, replace ); // remove scope 674 675 // drop edge between, if any 676 if( source != replaceWith ) 677 elementGraph.addEdge( source, replaceWith, scope ); // add scope back 678 } 679 680 for( Scope scope : outgoing ) 681 { 682 FlowElement target = elementGraph.getEdgeTarget( scope ); 683 elementGraph.removeEdge( replace, target ); // remove scope 684 685 // drop edge between, if any 686 if( target != replaceWith ) 687 elementGraph.addEdge( replaceWith, target, scope ); // add scope back 688 } 689 690 elementGraph.removeVertex( replace ); 691 } 692 693 public static Pipe findFirstPipeNamed( ElementGraph elementGraph, String name ) 694 { 695 Iterator<FlowElement> iterator = getTopologicalIterator( elementGraph ); 696 697 return find( name, iterator ); 698 } 699 700 public static Pipe findLastPipeNamed( ElementGraph elementGraph, String name ) 701 { 702 Iterator<FlowElement> iterator = getReverseTopologicalIterator( elementGraph ); 703 704 return find( name, iterator ); 705 } 706 707 private static Pipe find( String name, Iterator<FlowElement> iterator ) 708 { 709 while( iterator.hasNext() ) 710 { 711 FlowElement flowElement = iterator.next(); 712 713 if( flowElement instanceof Pipe && ( (Pipe) flowElement ).getName().equals( name ) ) 714 return (Pipe) flowElement; 715 } 716 717 return null; 718 } 719 720 public static boolean removeBranchContaining( ElementGraph elementGraph, FlowElement flowElement ) 721 { 722 Set<FlowElement> branch = new LinkedHashSet<>(); 723 724 walkUp( branch, elementGraph, flowElement ); 725 726 walkDown( branch, elementGraph, flowElement ); 727 728 if( branch.isEmpty() ) 729 return false; 730 731 for( FlowElement element : branch ) 732 elementGraph.removeVertex( element ); 733 734 return true; 735 } 736 737 public static boolean removeBranchBetween( ElementGraph elementGraph, FlowElement first, FlowElement second, boolean inclusive ) 738 { 739 Set<FlowElement> branch = new LinkedHashSet<>( Arrays.asList( first, second ) ); 740 741 walkDown( branch, elementGraph, first ); 742 743 if( !inclusive ) 744 { 745 branch.remove( first ); 746 branch.remove( second ); 747 } 748 749 if( branch.isEmpty() ) 750 return false; 751 752 for( FlowElement element : branch ) 753 elementGraph.removeVertex( element ); 754 755 return true; 756 } 757 758 private static void walkDown( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement ) 759 { 760 FlowElement current; 761 current = flowElement; 762 763 while( true ) 764 { 765 if( !branch.contains( current ) && ( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 ) ) 766 break; 767 768 branch.add( current ); 769 770 FlowElement element = elementGraph.getEdgeTarget( getFirst( elementGraph.outgoingEdgesOf( current ) ) ); 771 772 if( element instanceof Extent || branch.contains( element ) ) 773 break; 774 775 current = element; 776 } 777 } 778 779 private static void walkUp( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement ) 780 { 781 FlowElement current = flowElement; 782 783 while( true ) 784 { 785 if( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 ) 786 break; 787 788 branch.add( current ); 789 790 FlowElement element = elementGraph.getEdgeSource( getFirst( elementGraph.incomingEdgesOf( current ) ) ); 791 792 if( element instanceof Extent || branch.contains( element ) ) 793 break; 794 795 current = element; 796 } 797 } 798 799 /** 800 * Returns the number of edges found on the shortest distance between the lhs and rhs. 801 */ 802 public static int shortestDistance( ElementGraph graph, FlowElement lhs, FlowElement rhs ) 803 { 804 return DijkstraShortestPath.findPathBetween( directed( graph ), lhs, rhs ).size(); 805 } 806 807 private static class FlowElementVertexNameProvider implements VertexNameProvider<FlowElement> 808 { 809 private final ElementGraph elementGraph; 810 private final PlatformInfo platformInfo; 811 812 public FlowElementVertexNameProvider( ElementGraph elementGraph, PlatformInfo platformInfo ) 813 { 814 this.elementGraph = elementGraph; 815 this.platformInfo = platformInfo; 816 } 817 818 public String getVertexName( FlowElement object ) 819 { 820 if( object instanceof Extent ) // is head/tail 821 { 822 String result = object.toString().replaceAll( "\"", "\'" ); 823 824 if( object == Extent.tail ) 825 return result; 826 827 result = result + "|hash: " + canonicalHash( elementGraph ); 828 829 String versionString = Version.getRelease(); 830 831 if( platformInfo != null ) 832 versionString = ( versionString == null ? "" : versionString + "|" ) + platformInfo; 833 834 return "{" + ( versionString == null ? result : result + "|" + versionString ) + "}"; 835 } 836 837 String label; 838 839 Iterator<Scope> iterator = elementGraph.outgoingEdgesOf( object ).iterator(); 840 841 if( object instanceof Tap || !iterator.hasNext() ) 842 { 843 label = object.toString().replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" ); 844 } 845 else 846 { 847 Scope scope = iterator.next(); 848 849 label = ( (Pipe) object ).print( scope ).replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" ); 850 } 851 852 label = label.replaceFirst( "([^|]+)\\|(.*)", "$1 : " + getID( object ) + "|$2" ); // insert id 853 854 label = "{" + label.replaceAll( "\\{", "\\\\{" ).replaceAll( "\\}", "\\\\}" ).replaceAll( ">", "\\\\>" ) + "}"; 855 856 if( !( elementGraph instanceof AnnotatedGraph ) || !( (AnnotatedGraph) elementGraph ).hasAnnotations() ) 857 return label; 858 859 Set<Enum> annotations = ( (AnnotatedGraph) elementGraph ).getAnnotations().getKeysFor( object ); 860 861 if( !annotations.isEmpty() ) 862 label += "|{" + Util.join( annotations, "|" ) + "}"; 863 864 return label; 865 } 866 867 protected String getID( FlowElement object ) 868 { 869 return FlowElements.id( object ).substring( 0, 5 ); 870 } 871 } 872 873 private static class ScopeEdgeNameProvider implements EdgeNameProvider<Scope> 874 { 875 public String getEdgeName( Scope object ) 876 { 877 return object.toString().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz 878 } 879 } 880 881 private static class VertexAttributeProvider implements ComponentAttributeProvider<FlowElement> 882 { 883 static Map<String, String> defaultNode = new HashMap<String, String>() 884 { 885 {put( "shape", "Mrecord" );} 886 }; 887 888 public VertexAttributeProvider() 889 { 890 } 891 892 @Override 893 public Map<String, String> getComponentAttributes( FlowElement object ) 894 { 895 return defaultNode; 896 } 897 } 898 899 private static class EdgeAttributeProvider implements ComponentAttributeProvider<Scope> 900 { 901 static Map<String, String> attributes = new HashMap<String, String>() 902 { 903 {put( "style", "dotted" );} 904 905 {put( "arrowhead", "dot" );} 906 }; 907 908 @Override 909 public Map<String, String> getComponentAttributes( Scope scope ) 910 { 911 if( scope.isNonBlocking() ) 912 return null; 913 914 return attributes; 915 } 916 } 917 918 private static class ProcessGraphNameProvider implements VertexNameProvider<ProcessModel> 919 { 920 @Override 921 public String getVertexName( ProcessModel processModel ) 922 { 923 return "" + processModel.getOrdinal(); 924 } 925 } 926 927 private static class ProcessGraphLabelProvider implements VertexNameProvider<ProcessModel> 928 { 929 @Override 930 public String getVertexName( ProcessModel processModel ) 931 { 932 return "ordinal: " + processModel.getOrdinal() + 933 "\\nid: " + processModel.getID() + 934 "\\nhash: " + canonicalHash( processModel.getElementGraph() ); 935 } 936 } 937 938 static void injectIdentityMap( AbstractGraph graph ) 939 { 940 // this overcomes jgrapht 0.9.0 using a LinkedHashMap vs an IdentityHashMap 941 // vertex not found errors will be thrown if this fails 942 Object specifics = Util.returnInstanceFieldIfExistsSafe( graph, "specifics" ); 943 944 if( specifics == null ) 945 { 946 LOG.warn( "unable to get jgrapht Specifics for identity map injection, may be using an incompatible jgrapht version" ); 947 return; 948 } 949 950 boolean success = Util.setInstanceFieldIfExistsSafe( specifics, "vertexMapDirected", new IdentityHashMap<>() ); 951 952 if( !success ) 953 LOG.warn( "unable to set IdentityHashMap on jgrapht Specifics, may be using an incompatible jgrapht version" ); 954 } 955 }