001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.graph;
022
023import java.io.File;
024import java.io.FileWriter;
025import java.io.IOException;
026import java.io.Writer;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.IdentityHashMap;
033import java.util.Iterator;
034import java.util.LinkedHashSet;
035import java.util.LinkedList;
036import java.util.List;
037import java.util.ListIterator;
038import java.util.Map;
039import java.util.Set;
040
041import cascading.flow.FlowElement;
042import cascading.flow.FlowElements;
043import cascading.flow.planner.BaseFlowStep;
044import cascading.flow.planner.PlatformInfo;
045import cascading.flow.planner.Scope;
046import cascading.flow.planner.iso.expression.ElementCapture;
047import cascading.flow.planner.iso.expression.ExpressionGraph;
048import cascading.flow.planner.iso.expression.FlowElementExpression;
049import cascading.flow.planner.iso.expression.TypeExpression;
050import cascading.flow.planner.iso.finder.SearchOrder;
051import cascading.flow.planner.iso.subgraph.SubGraphIterator;
052import cascading.flow.planner.iso.subgraph.iterator.ExpressionSubGraphIterator;
053import cascading.flow.planner.process.ProcessGraph;
054import cascading.flow.planner.process.ProcessModel;
055import cascading.pipe.Group;
056import cascading.pipe.HashJoin;
057import cascading.pipe.Operator;
058import cascading.pipe.Pipe;
059import cascading.pipe.Splice;
060import cascading.tap.Tap;
061import cascading.util.DOTProcessGraphWriter;
062import cascading.util.Murmur3;
063import cascading.util.Pair;
064import cascading.util.Util;
065import cascading.util.Version;
066import cascading.util.jgrapht.ComponentAttributeProvider;
067import cascading.util.jgrapht.EdgeNameProvider;
068import cascading.util.jgrapht.IntegerNameProvider;
069import cascading.util.jgrapht.VertexNameProvider;
070import org.jgrapht.DirectedGraph;
071import org.jgrapht.Graph;
072import org.jgrapht.GraphPath;
073import org.jgrapht.Graphs;
074import org.jgrapht.alg.DijkstraShortestPath;
075import org.jgrapht.alg.FloydWarshallShortestPaths;
076import org.jgrapht.alg.KShortestPaths;
077import org.jgrapht.graph.AbstractGraph;
078import org.jgrapht.graph.EdgeReversedGraph;
079import org.jgrapht.graph.SimpleDirectedGraph;
080import org.jgrapht.traverse.TopologicalOrderIterator;
081import org.jgrapht.util.TypeUtil;
082import org.slf4j.Logger;
083import org.slf4j.LoggerFactory;
084
085import static cascading.util.Util.*;
086import static java.lang.Double.POSITIVE_INFINITY;
087
088/**
089 * Class ElementGraphs maintains a collection of operations that can be performed on an {@link ElementGraph}.
090 */
091public class ElementGraphs
092  {
093  private static final Logger LOG = LoggerFactory.getLogger( ElementGraphs.class );
094
095  // not for instantiation
096  private ElementGraphs()
097    {
098    }
099
100  public static DirectedGraph<FlowElement, Scope> directed( ElementGraph elementGraph )
101    {
102    if( elementGraph == null )
103      return null;
104
105    if( elementGraph instanceof DecoratedElementGraph )
106      return directed( ( (DecoratedElementGraph) elementGraph ).getDecorated() );
107
108    return ( (BaseElementGraph) elementGraph ).graph;
109    }
110
111  public static int hashCodeIgnoreAnnotations( ElementGraph elementGraph )
112    {
113    return hashCodeIgnoreAnnotations( directed( elementGraph ) );
114    }
115
116  public static <V, E> int hashCodeIgnoreAnnotations( Graph<V, E> graph )
117    {
118    int hash = graph.vertexSet().hashCode();
119
120    for( E e : graph.edgeSet() )
121      {
122      int part = e.hashCode();
123
124      int source = graph.getEdgeSource( e ).hashCode();
125      int target = graph.getEdgeTarget( e ).hashCode();
126
127      int pairing = pair( source, target );
128
129      part = ( 27 * part ) + pairing;
130
131      long weight = (long) graph.getEdgeWeight( e );
132      part = ( 27 * part ) + (int) ( weight ^ ( weight >>> 32 ) );
133
134      hash += part;
135      }
136
137    return hash;
138    }
139
140  public static boolean equalsIgnoreAnnotations( ElementGraph lhs, ElementGraph rhs )
141    {
142    return equalsIgnoreAnnotations( directed( lhs ), directed( rhs ) );
143    }
144
145  public static <V, E> boolean equalsIgnoreAnnotations( Graph<V, E> lhs, Graph<V, E> rhs )
146    {
147    if( lhs == rhs )
148      return true;
149
150    TypeUtil<Graph<V, E>> typeDecl = null;
151    Graph<V, E> lhsGraph = TypeUtil.uncheckedCast( lhs, typeDecl );
152    Graph<V, E> rhsGraph = TypeUtil.uncheckedCast( rhs, typeDecl );
153
154    if( !lhsGraph.vertexSet().equals( rhsGraph.vertexSet() ) )
155      return false;
156
157    if( lhsGraph.edgeSet().size() != rhsGraph.edgeSet().size() )
158      return false;
159
160    for( E e : lhsGraph.edgeSet() )
161      {
162      V source = lhsGraph.getEdgeSource( e );
163      V target = lhsGraph.getEdgeTarget( e );
164
165      if( !rhsGraph.containsEdge( e ) )
166        return false;
167
168      if( !rhsGraph.getEdgeSource( e ).equals( source ) || !rhsGraph.getEdgeTarget( e ).equals( target ) )
169        return false;
170
171      if( Math.abs( lhsGraph.getEdgeWeight( e ) - rhsGraph.getEdgeWeight( e ) ) > 10e-7 )
172        return false;
173      }
174
175    return true;
176    }
177
178  public static boolean equals( ElementGraph lhs, ElementGraph rhs )
179    {
180    if( !equalsIgnoreAnnotations( lhs, rhs ) )
181      return false;
182
183    if( !( lhs instanceof AnnotatedGraph ) && !( rhs instanceof AnnotatedGraph ) )
184      return true;
185
186    if( !( lhs instanceof AnnotatedGraph ) || !( rhs instanceof AnnotatedGraph ) )
187      return false;
188
189    AnnotatedGraph lhsAnnotated = (AnnotatedGraph) lhs;
190    AnnotatedGraph rhsAnnotated = (AnnotatedGraph) rhs;
191
192    if( lhsAnnotated.hasAnnotations() != rhsAnnotated.hasAnnotations() )
193      return false;
194
195    if( !lhsAnnotated.hasAnnotations() )
196      return true;
197
198    return lhsAnnotated.getAnnotations().equals( rhsAnnotated.getAnnotations() );
199    }
200
201  public static String canonicalHash( ElementGraph graph )
202    {
203    return canonicalHash( directed( graph ) );
204    }
205
206  public static String canonicalHash( Graph<FlowElement, Scope> graph )
207    {
208    int hash = Murmur3.SEED;
209
210    int edges = 0;
211    boolean hasExtents = false;
212
213    for( Scope e : graph.edgeSet() )
214      {
215      FlowElement edgeSource = graph.getEdgeSource( e );
216      FlowElement edgeTarget = graph.getEdgeTarget( e );
217
218      // simpler to ignore extents here
219      if( edgeSource instanceof Extent || edgeTarget instanceof Extent )
220        {
221        hasExtents = true;
222        continue;
223        }
224
225      int source = hash( edgeSource );
226      int target = hash( edgeTarget );
227      int pairing = pair( source, target );
228
229      hash += pairing; // don't make edge traversal order significant
230      edges++;
231      }
232
233    int vertexes = graph.vertexSet().size() - ( hasExtents ? 2 : 0 );
234
235    hash = Murmur3.fmix( hash, vertexes * edges );
236
237    return Util.getHex( Util.intToByteArray( hash ) );
238    }
239
240  private static int hash( FlowElement flowElement )
241    {
242    int lhs = flowElement.getClass().getName().hashCode();
243    int rhs = 0;
244
245    if( flowElement instanceof Operator && ( (Operator) flowElement ).getOperation() != null )
246      rhs = ( (Operator) flowElement ).getOperation().getClass().getName().hashCode();
247    else if( flowElement instanceof Tap && ( (Tap) flowElement ).getScheme() != null )
248      rhs = ( (Tap) flowElement ).getScheme().getClass().getName().hashCode();
249    else if( flowElement instanceof Splice )
250      rhs = ( (Splice) flowElement ).getJoiner().getClass().getName().hashCode() + 31 * ( (Splice) flowElement ).getNumSelfJoins();
251
252    return pair( lhs, rhs );
253    }
254
255  protected static int pair( int lhs, int rhs )
256    {
257    if( rhs == 0 )
258      return lhs;
259
260    // see http://en.wikipedia.org/wiki/Pairing_function
261    return ( ( lhs + rhs ) * ( lhs + rhs + 1 ) / 2 ) + rhs;
262    }
263
264  public static Iterator<FlowElement> getTopologicalIterator( ElementGraph graph )
265    {
266    return new TopologicalOrderIterator<>( directed( graph ) );
267    }
268
269  public static Iterator<FlowElement> getReverseTopologicalIterator( ElementGraph graph )
270    {
271    return new TopologicalOrderIterator<>( new EdgeReversedGraph<>( directed( graph ) ) );
272    }
273
274  public static List<GraphPath<FlowElement, Scope>> getAllShortestPathsBetween( ElementGraph graph, FlowElement from, FlowElement to )
275    {
276    return getAllShortestPathsBetween( directed( graph ), from, to );
277    }
278
279  public static <V, E> List<GraphPath<V, E>> getAllShortestPathsBetween( DirectedGraph<V, E> graph, V from, V to )
280    {
281    List<GraphPath<V, E>> paths = new KShortestPaths<>( graph, from, Integer.MAX_VALUE ).getPaths( to );
282
283    if( paths == null )
284      return new ArrayList<>();
285
286    return paths;
287    }
288
289  public static ElementSubGraph asSubGraph( ElementGraph elementGraph, ElementGraph contractedGraph, Set<FlowElement> excludes )
290    {
291    elementGraph = asExtentMaskedSubGraph( elementGraph ); // returns same instance if not bounded
292
293    Pair<Set<FlowElement>, Set<Scope>> pair = findClosureViaFloydWarshall( directed( elementGraph ), directed( contractedGraph ), excludes );
294    Set<FlowElement> vertices = pair.getLhs();
295    Set<Scope> excludeEdges = pair.getRhs();
296
297    Set<Scope> scopes = new HashSet<>( elementGraph.edgeSet() );
298    scopes.removeAll( excludeEdges );
299
300    return new ElementSubGraph( elementGraph, vertices, scopes );
301    }
302
303  /**
304   * Returns a new ElementGraph (a MaskedSubGraph) of the given ElementGraph that will not contain the {@link Extent}
305   * head or tail instances.
306   * <p/>
307   * If the given ElementGraph does not contain head or tail, it will be returned unchanged.
308   *
309   * @param elementGraph
310   * @return
311   */
312  public static ElementGraph asExtentMaskedSubGraph( ElementGraph elementGraph )
313    {
314    if( elementGraph.containsVertex( Extent.head ) || elementGraph.containsVertex( Extent.tail ) )
315      return new ElementMaskSubGraph( elementGraph, Extent.head, Extent.tail );
316
317    return elementGraph;
318    }
319
320  public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( DirectedGraph<V, E> full, DirectedGraph<V, E> contracted )
321    {
322    return findClosureViaFloydWarshall( full, contracted, null );
323    }
324
325  public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( DirectedGraph<V, E> full, DirectedGraph<V, E> contracted, Set<V> excludes )
326    {
327    Set<V> vertices = new HashSet<>( contracted.vertexSet() );
328    LinkedList<V> allVertices = new LinkedList<>( full.vertexSet() );
329
330    allVertices.removeAll( vertices );
331
332    Set<E> excludeEdges = new HashSet<>();
333
334    // prevent distinguished elements from being included inside the sub-graph
335    if( excludes != null )
336      {
337      for( V v : excludes )
338        {
339        if( !full.containsVertex( v ) )
340          continue;
341
342        excludeEdges.addAll( full.incomingEdgesOf( v ) );
343        excludeEdges.addAll( full.outgoingEdgesOf( v ) );
344        }
345      }
346
347    for( V v : contracted.vertexSet() )
348      {
349      if( contracted.inDegreeOf( v ) == 0 )
350        excludeEdges.addAll( full.incomingEdgesOf( v ) );
351      }
352
353    for( V v : contracted.vertexSet() )
354      {
355      if( contracted.outDegreeOf( v ) == 0 )
356        excludeEdges.addAll( full.outgoingEdgesOf( v ) );
357      }
358
359    DirectedGraph<V, E> disconnected = disconnectExtentsAndExclude( full, excludeEdges );
360
361    FloydWarshallShortestPaths<V, E> paths = new FloydWarshallShortestPaths<>( disconnected );
362
363    for( E edge : contracted.edgeSet() )
364      {
365      V edgeSource = contracted.getEdgeSource( edge );
366      V edgeTarget = contracted.getEdgeTarget( edge );
367
368      ListIterator<V> iterator = allVertices.listIterator();
369      while( iterator.hasNext() )
370        {
371        V vertex = iterator.next();
372
373        if( !isBetween( paths, edgeSource, edgeTarget, vertex ) )
374          continue;
375
376        vertices.add( vertex );
377        iterator.remove();
378        }
379      }
380
381    return new Pair<>( vertices, excludeEdges );
382    }
383
384  private static <V, E> DirectedGraph<V, E> disconnectExtentsAndExclude( DirectedGraph<V, E> full, Set<E> withoutEdges )
385    {
386    DirectedGraph<V, E> copy = (DirectedGraph<V, E>) new SimpleDirectedGraph<>( Object.class );
387
388    Graphs.addAllVertices( copy, full.vertexSet() );
389
390    copy.removeVertex( (V) Extent.head );
391    copy.removeVertex( (V) Extent.tail );
392
393    Set<E> edges = full.edgeSet();
394
395    if( !withoutEdges.isEmpty() )
396      {
397      edges = new HashSet<>( edges );
398      edges.removeAll( withoutEdges );
399      }
400
401    Graphs.addAllEdges( copy, full, edges );
402
403    return copy;
404    }
405
406  private static <V, E> boolean isBetween( FloydWarshallShortestPaths<V, E> paths, V edgeSource, V edgeTarget, V vertex )
407    {
408    return paths.shortestDistance( edgeSource, vertex ) != POSITIVE_INFINITY && paths.shortestDistance( vertex, edgeTarget ) != POSITIVE_INFINITY;
409    }
410
411  public static void removeAndContract( ElementGraph elementGraph, FlowElement flowElement )
412    {
413    LOG.debug( "removing element, contracting edge for: {}", flowElement );
414
415    Set<Scope> incomingScopes = elementGraph.incomingEdgesOf( flowElement );
416
417    boolean contractIncoming = true;
418
419    if( !contractIncoming )
420      {
421      if( incomingScopes.size() != 1 )
422        throw new IllegalStateException( "flow element:" + flowElement + ", has multiple input paths: " + incomingScopes.size() );
423      }
424
425    boolean isJoin = flowElement instanceof Splice && ( (Splice) flowElement ).isJoin();
426
427    for( Scope incoming : incomingScopes )
428      {
429      Set<Scope> outgoingScopes = elementGraph.outgoingEdgesOf( flowElement );
430
431      // source -> incoming -> flowElement -> outgoing -> target
432      FlowElement source = elementGraph.getEdgeSource( incoming );
433
434      for( Scope outgoing : outgoingScopes )
435        {
436        FlowElement target = elementGraph.getEdgeTarget( outgoing );
437
438        boolean isNonBlocking = outgoing.isNonBlocking();
439
440        if( isJoin )
441          isNonBlocking = isNonBlocking && incoming.isNonBlocking();
442
443        Scope scope = new Scope( outgoing );
444
445        // unsure if necessary since we track blocking independently
446        // when removing a pipe, pull ordinal up to tap
447        // when removing a Splice retain ordinal
448        if( flowElement instanceof Splice )
449          scope.setOrdinal( incoming.getOrdinal() );
450        else
451          scope.setOrdinal( outgoing.getOrdinal() );
452
453        scope.setNonBlocking( isNonBlocking );
454        scope.addPriorNames( incoming, outgoing ); // not copied
455        elementGraph.addEdge( source, target, scope );
456        }
457      }
458
459    elementGraph.removeVertex( flowElement );
460    }
461
462  public static boolean printElementGraph( String filename, final ElementGraph graph, final PlatformInfo platformInfo )
463    {
464    try
465      {
466      File parentFile = new File( filename ).getParentFile();
467
468      if( parentFile != null && !parentFile.exists() )
469        parentFile.mkdirs();
470
471      Writer writer = new FileWriter( filename );
472
473      Util.writeDOT( writer, ElementGraphs.directed( graph ),
474        new IntegerNameProvider<FlowElement>(),
475        new FlowElementVertexNameProvider( graph, platformInfo ),
476        new ScopeEdgeNameProvider(),
477        new VertexAttributeProvider(), new EdgeAttributeProvider() );
478
479      writer.close();
480      return true;
481      }
482    catch( IOException exception )
483      {
484      LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
485      }
486
487    return false;
488    }
489
490  public static boolean printProcessGraph( String filename, final ElementGraph graph, final ProcessGraph<? extends ProcessModel> processGraph )
491    {
492    try
493      {
494      File parentFile = new File( filename ).getParentFile();
495
496      if( parentFile != null && !parentFile.exists() )
497        parentFile.mkdirs();
498
499      Writer writer = new FileWriter( filename );
500
501      DOTProcessGraphWriter graphWriter = new DOTProcessGraphWriter(
502        new IntegerNameProvider<Pair<ElementGraph, FlowElement>>(),
503        new FlowElementVertexNameProvider( graph, null ),
504        new ScopeEdgeNameProvider(),
505        new VertexAttributeProvider(), new EdgeAttributeProvider(),
506        new ProcessGraphNameProvider(), new ProcessGraphLabelProvider()
507      );
508
509      graphWriter.writeGraph( writer, graph, processGraph );
510
511      writer.close();
512      return true;
513      }
514    catch( IOException exception )
515      {
516      LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
517      }
518
519    return false;
520    }
521
522  public static void insertFlowElementAfter( ElementGraph elementGraph, FlowElement previousElement, FlowElement flowElement )
523    {
524    Set<Scope> outgoing = new HashSet<>( elementGraph.outgoingEdgesOf( previousElement ) );
525
526    elementGraph.addVertex( flowElement );
527
528    String name = previousElement.toString();
529
530    if( previousElement instanceof Pipe )
531      name = ( (Pipe) previousElement ).getName();
532
533    elementGraph.addEdge( previousElement, flowElement, new Scope( name ) );
534
535    for( Scope scope : outgoing )
536      {
537      FlowElement target = elementGraph.getEdgeTarget( scope );
538      Scope foundScope = elementGraph.removeEdge( previousElement, target ); // remove scope
539
540      if( foundScope != scope )
541        throw new IllegalStateException( "did not remove proper scope" );
542
543      elementGraph.addEdge( flowElement, target, scope ); // add scope back
544      }
545    }
546
547  public static void insertFlowElementBefore( ElementGraph graph, FlowElement nextElement, FlowElement flowElement )
548    {
549    Set<Scope> incoming = new HashSet<>( graph.incomingEdgesOf( nextElement ) );
550
551    graph.addVertex( flowElement );
552
553    String name = nextElement.toString();
554
555    if( nextElement instanceof Pipe )
556      name = ( (Pipe) nextElement ).getName();
557
558    graph.addEdge( flowElement, nextElement, new Scope( name ) );
559
560    for( Scope scope : incoming )
561      {
562      FlowElement target = graph.getEdgeSource( scope );
563      Scope foundScope = graph.removeEdge( target, nextElement ); // remove scope
564
565      if( foundScope != scope )
566        throw new IllegalStateException( "did not remove proper scope" );
567
568      graph.addEdge( target, flowElement, scope ); // add scope back
569      }
570    }
571
572  public static void addSources( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sources )
573    {
574    for( Tap tap : sources )
575      {
576      for( Scope scope : elementGraph.outgoingEdgesOf( tap ) )
577        flowStep.addSource( scope.getName(), tap );
578      }
579    }
580
581  public static Set<Tap> findSources( ElementGraph elementGraph )
582    {
583    return findSources( elementGraph, Tap.class );
584    }
585
586  public static <F extends FlowElement> Set<F> findSources( ElementGraph elementGraph, Class<F> type )
587    {
588    if( elementGraph == null )
589      return Collections.emptySet();
590
591    if( elementGraph.containsVertex( Extent.head ) )
592      return narrowSet( type, elementGraph.successorListOf( Extent.head ) );
593
594    SubGraphIterator iterator = new ExpressionSubGraphIterator(
595      new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Head ) ),
596      elementGraph
597    );
598
599    return narrowSet( type, getAllVertices( iterator ) );
600    }
601
602  public static <F extends FlowElement> Set<F> findSinks( ElementGraph elementGraph, Class<F> type )
603    {
604    if( elementGraph == null )
605      return Collections.emptySet();
606
607    if( elementGraph.containsVertex( Extent.tail ) )
608      return narrowSet( type, elementGraph.predecessorListOf( Extent.tail ) );
609
610    SubGraphIterator iterator = new ExpressionSubGraphIterator(
611      new ExpressionGraph( SearchOrder.ReverseTopological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Tail ) ),
612      elementGraph
613    );
614
615    return narrowSet( type, getAllVertices( iterator ) );
616    }
617
618  public static void addSinks( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sinks )
619    {
620    for( Tap tap : sinks )
621      {
622      for( Scope scope : elementGraph.incomingEdgesOf( tap ) )
623        flowStep.addSink( scope.getName(), tap );
624      }
625    }
626
627  public static Set<Tap> findSinks( ElementGraph elementGraph )
628    {
629    return findSinks( elementGraph, Tap.class );
630    }
631
632  public static Set<Group> findAllGroups( ElementGraph elementGraph )
633    {
634    SubGraphIterator iterator = new ExpressionSubGraphIterator(
635      new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, Group.class ) ),
636      elementGraph
637    );
638
639    return narrowSet( Group.class, getAllVertices( iterator ) );
640    }
641
642  public static Set<HashJoin> findAllHashJoins( ElementGraph elementGraph )
643    {
644    SubGraphIterator iterator = new ExpressionSubGraphIterator(
645      new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, HashJoin.class ) ),
646      elementGraph
647    );
648
649    return narrowSet( HashJoin.class, getAllVertices( iterator ) );
650    }
651
652  private static Set<FlowElement> getAllVertices( SubGraphIterator iterator )
653    {
654    Set<FlowElement> vertices = createIdentitySet();
655
656    while( iterator.hasNext() )
657      vertices.addAll( iterator.next().vertexSet() );
658
659    return vertices;
660    }
661
662  public static void replaceElementWith( ElementGraph elementGraph, FlowElement replace, FlowElement replaceWith )
663    {
664    Set<Scope> incoming = new HashSet<Scope>( elementGraph.incomingEdgesOf( replace ) );
665    Set<Scope> outgoing = new HashSet<Scope>( elementGraph.outgoingEdgesOf( replace ) );
666
667    if( !elementGraph.containsVertex( replaceWith ) )
668      elementGraph.addVertex( replaceWith );
669
670    for( Scope scope : incoming )
671      {
672      FlowElement source = elementGraph.getEdgeSource( scope );
673      elementGraph.removeEdge( source, replace ); // remove scope
674
675      // drop edge between, if any
676      if( source != replaceWith )
677        elementGraph.addEdge( source, replaceWith, scope ); // add scope back
678      }
679
680    for( Scope scope : outgoing )
681      {
682      FlowElement target = elementGraph.getEdgeTarget( scope );
683      elementGraph.removeEdge( replace, target ); // remove scope
684
685      // drop edge between, if any
686      if( target != replaceWith )
687        elementGraph.addEdge( replaceWith, target, scope ); // add scope back
688      }
689
690    elementGraph.removeVertex( replace );
691    }
692
693  public static Pipe findFirstPipeNamed( ElementGraph elementGraph, String name )
694    {
695    Iterator<FlowElement> iterator = getTopologicalIterator( elementGraph );
696
697    return find( name, iterator );
698    }
699
700  public static Pipe findLastPipeNamed( ElementGraph elementGraph, String name )
701    {
702    Iterator<FlowElement> iterator = getReverseTopologicalIterator( elementGraph );
703
704    return find( name, iterator );
705    }
706
707  private static Pipe find( String name, Iterator<FlowElement> iterator )
708    {
709    while( iterator.hasNext() )
710      {
711      FlowElement flowElement = iterator.next();
712
713      if( flowElement instanceof Pipe && ( (Pipe) flowElement ).getName().equals( name ) )
714        return (Pipe) flowElement;
715      }
716
717    return null;
718    }
719
720  public static boolean removeBranchContaining( ElementGraph elementGraph, FlowElement flowElement )
721    {
722    Set<FlowElement> branch = new LinkedHashSet<>();
723
724    walkUp( branch, elementGraph, flowElement );
725
726    walkDown( branch, elementGraph, flowElement );
727
728    if( branch.isEmpty() )
729      return false;
730
731    for( FlowElement element : branch )
732      elementGraph.removeVertex( element );
733
734    return true;
735    }
736
737  public static boolean removeBranchBetween( ElementGraph elementGraph, FlowElement first, FlowElement second, boolean inclusive )
738    {
739    Set<FlowElement> branch = new LinkedHashSet<>( Arrays.asList( first, second ) );
740
741    walkDown( branch, elementGraph, first );
742
743    if( !inclusive )
744      {
745      branch.remove( first );
746      branch.remove( second );
747      }
748
749    if( branch.isEmpty() )
750      return false;
751
752    for( FlowElement element : branch )
753      elementGraph.removeVertex( element );
754
755    return true;
756    }
757
758  private static void walkDown( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement )
759    {
760    FlowElement current;
761    current = flowElement;
762
763    while( true )
764      {
765      if( !branch.contains( current ) && ( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 ) )
766        break;
767
768      branch.add( current );
769
770      FlowElement element = elementGraph.getEdgeTarget( getFirst( elementGraph.outgoingEdgesOf( current ) ) );
771
772      if( element instanceof Extent || branch.contains( element ) )
773        break;
774
775      current = element;
776      }
777    }
778
779  private static void walkUp( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement )
780    {
781    FlowElement current = flowElement;
782
783    while( true )
784      {
785      if( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 )
786        break;
787
788      branch.add( current );
789
790      FlowElement element = elementGraph.getEdgeSource( getFirst( elementGraph.incomingEdgesOf( current ) ) );
791
792      if( element instanceof Extent || branch.contains( element ) )
793        break;
794
795      current = element;
796      }
797    }
798
799  /**
800   * Returns the number of edges found on the shortest distance between the lhs and rhs.
801   */
802  public static int shortestDistance( ElementGraph graph, FlowElement lhs, FlowElement rhs )
803    {
804    return DijkstraShortestPath.findPathBetween( directed( graph ), lhs, rhs ).size();
805    }
806
807  private static class FlowElementVertexNameProvider implements VertexNameProvider<FlowElement>
808    {
809    private final ElementGraph elementGraph;
810    private final PlatformInfo platformInfo;
811
812    public FlowElementVertexNameProvider( ElementGraph elementGraph, PlatformInfo platformInfo )
813      {
814      this.elementGraph = elementGraph;
815      this.platformInfo = platformInfo;
816      }
817
818    public String getVertexName( FlowElement object )
819      {
820      if( object instanceof Extent ) // is head/tail
821        {
822        String result = object.toString().replaceAll( "\"", "\'" );
823
824        if( object == Extent.tail )
825          return result;
826
827        result = result + "|hash: " + canonicalHash( elementGraph );
828
829        String versionString = Version.getRelease();
830
831        if( platformInfo != null )
832          versionString = ( versionString == null ? "" : versionString + "|" ) + platformInfo;
833
834        return "{" + ( versionString == null ? result : result + "|" + versionString ) + "}";
835        }
836
837      String label;
838
839      Iterator<Scope> iterator = elementGraph.outgoingEdgesOf( object ).iterator();
840
841      if( object instanceof Tap || !iterator.hasNext() )
842        {
843        label = object.toString().replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" );
844        }
845      else
846        {
847        Scope scope = iterator.next();
848
849        label = ( (Pipe) object ).print( scope ).replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" );
850        }
851
852      label = label.replaceFirst( "([^|]+)\\|(.*)", "$1 : " + getID( object ) + "|$2" ); // insert id
853
854      label = "{" + label.replaceAll( "\\{", "\\\\{" ).replaceAll( "\\}", "\\\\}" ).replaceAll( ">", "\\\\>" ) + "}";
855
856      if( !( elementGraph instanceof AnnotatedGraph ) || !( (AnnotatedGraph) elementGraph ).hasAnnotations() )
857        return label;
858
859      Set<Enum> annotations = ( (AnnotatedGraph) elementGraph ).getAnnotations().getKeysFor( object );
860
861      if( !annotations.isEmpty() )
862        label += "|{" + Util.join( annotations, "|" ) + "}";
863
864      return label;
865      }
866
867    protected String getID( FlowElement object )
868      {
869      return FlowElements.id( object ).substring( 0, 5 );
870      }
871    }
872
873  private static class ScopeEdgeNameProvider implements EdgeNameProvider<Scope>
874    {
875    public String getEdgeName( Scope object )
876      {
877      return object.toString().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz
878      }
879    }
880
881  private static class VertexAttributeProvider implements ComponentAttributeProvider<FlowElement>
882    {
883    static Map<String, String> defaultNode = new HashMap<String, String>()
884    {
885    {put( "shape", "Mrecord" );}
886    };
887
888    public VertexAttributeProvider()
889      {
890      }
891
892    @Override
893    public Map<String, String> getComponentAttributes( FlowElement object )
894      {
895      return defaultNode;
896      }
897    }
898
899  private static class EdgeAttributeProvider implements ComponentAttributeProvider<Scope>
900    {
901    static Map<String, String> attributes = new HashMap<String, String>()
902    {
903    {put( "style", "dotted" );}
904
905    {put( "arrowhead", "dot" );}
906    };
907
908    @Override
909    public Map<String, String> getComponentAttributes( Scope scope )
910      {
911      if( scope.isNonBlocking() )
912        return null;
913
914      return attributes;
915      }
916    }
917
918  private static class ProcessGraphNameProvider implements VertexNameProvider<ProcessModel>
919    {
920    @Override
921    public String getVertexName( ProcessModel processModel )
922      {
923      return "" + processModel.getOrdinal();
924      }
925    }
926
927  private static class ProcessGraphLabelProvider implements VertexNameProvider<ProcessModel>
928    {
929    @Override
930    public String getVertexName( ProcessModel processModel )
931      {
932      return "ordinal: " + processModel.getOrdinal() +
933        "\\nid: " + processModel.getID() +
934        "\\nhash: " + canonicalHash( processModel.getElementGraph() );
935      }
936    }
937
938  static void injectIdentityMap( AbstractGraph graph )
939    {
940    // this overcomes jgrapht 0.9.0 using a LinkedHashMap vs an IdentityHashMap
941    // vertex not found errors will be thrown if this fails
942    Object specifics = Util.returnInstanceFieldIfExistsSafe( graph, "specifics" );
943
944    if( specifics == null )
945      {
946      LOG.warn( "unable to get jgrapht Specifics for identity map injection, may be using an incompatible jgrapht version" );
947      return;
948      }
949
950    boolean success = Util.setInstanceFieldIfExistsSafe( specifics, "vertexMapDirected", new IdentityHashMap<>() );
951
952    if( !success )
953      LOG.warn( "unable to set IdentityHashMap on jgrapht Specifics, may be using an incompatible jgrapht version" );
954    }
955  }