001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.planner;
022    
023    import java.io.File;
024    import java.io.FileWriter;
025    import java.io.IOException;
026    import java.io.Writer;
027    import java.util.ArrayList;
028    import java.util.Collection;
029    import java.util.HashMap;
030    import java.util.HashSet;
031    import java.util.LinkedList;
032    import java.util.List;
033    import java.util.Map;
034    import java.util.Set;
035    
036    import cascading.flow.FlowElement;
037    import cascading.operation.PlannedOperation;
038    import cascading.operation.PlannerLevel;
039    import cascading.pipe.Checkpoint;
040    import cascading.pipe.CoGroup;
041    import cascading.pipe.Each;
042    import cascading.pipe.Every;
043    import cascading.pipe.Group;
044    import cascading.pipe.Operator;
045    import cascading.pipe.Pipe;
046    import cascading.pipe.Splice;
047    import cascading.pipe.SubAssembly;
048    import cascading.tap.Tap;
049    import cascading.util.Util;
050    import cascading.util.Version;
051    import org.jgrapht.GraphPath;
052    import org.jgrapht.Graphs;
053    import org.jgrapht.alg.KShortestPaths;
054    import org.jgrapht.ext.EdgeNameProvider;
055    import org.jgrapht.ext.IntegerNameProvider;
056    import org.jgrapht.ext.VertexNameProvider;
057    import org.jgrapht.graph.SimpleDirectedGraph;
058    import org.jgrapht.traverse.DepthFirstIterator;
059    import org.jgrapht.traverse.TopologicalOrderIterator;
060    import org.slf4j.Logger;
061    import org.slf4j.LoggerFactory;
062    
063    /** Class ElementGraph represents the executable FlowElement graph. */
064    public class ElementGraph extends SimpleDirectedGraph<FlowElement, Scope>
065      {
066      /** Field LOG */
067      private static final Logger LOG = LoggerFactory.getLogger( ElementGraph.class );
068    
069      /** Field head */
070      public static final Extent head = new Extent( "head" );
071      /** Field tail */
072      public static final Extent tail = new Extent( "tail" );
073      /** Field resolved */
074      private boolean resolved;
075    
076      private PlatformInfo platformInfo;
077      /** Field sources */
078      private Map<String, Tap> sources;
079      /** Field sinks */
080      private Map<String, Tap> sinks;
081      /** Field traps */
082      private Map<String, Tap> traps;
083      /** Field checkpoints */
084      private Map<String, Tap> checkpoints;
085      /** Field requireUniqueCheckpoints */
086      private boolean requireUniqueCheckpoints;
087      /** Field assertionLevel */
088      private PlannerLevel[] plannerLevels;
089    
090      ElementGraph()
091        {
092        super( Scope.class );
093        }
094    
095      public ElementGraph( ElementGraph elementGraph )
096        {
097        this();
098        this.platformInfo = elementGraph.platformInfo;
099        this.sources = elementGraph.sources;
100        this.sinks = elementGraph.sinks;
101        this.traps = elementGraph.traps;
102        this.checkpoints = elementGraph.checkpoints;
103        this.plannerLevels = elementGraph.plannerLevels;
104        this.requireUniqueCheckpoints = elementGraph.requireUniqueCheckpoints;
105    
106        Graphs.addAllVertices( this, elementGraph.vertexSet() );
107        Graphs.addAllEdges( this, elementGraph, elementGraph.edgeSet() );
108        }
109    
110      /**
111       * Constructor ElementGraph creates a new ElementGraph instance.
112       *
113       * @param pipes   of type Pipe[]
114       * @param sources of type Map<String, Tap>
115       * @param sinks   of type Map<String, Tap>
116       */
117      public ElementGraph( PlatformInfo platformInfo, Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Map<String, Tap> checkpoints, boolean requireUniqueCheckpoints, PlannerLevel... plannerLevels )
118        {
119        super( Scope.class );
120        this.platformInfo = platformInfo;
121        this.sources = sources;
122        this.sinks = sinks;
123        this.traps = traps;
124        this.checkpoints = checkpoints;
125        this.requireUniqueCheckpoints = requireUniqueCheckpoints;
126        this.plannerLevels = plannerLevels;
127    
128        assembleGraph( pipes, sources, sinks );
129    
130        verifyGraph();
131        }
132    
133      public Map<String, Tap> getSourceMap()
134        {
135        return sources;
136        }
137    
138      public Map<String, Tap> getSinkMap()
139        {
140        return sinks;
141        }
142    
143      public Map<String, Tap> getTrapMap()
144        {
145        return traps;
146        }
147    
148      public Map<String, Tap> getCheckpointsMap()
149        {
150        return checkpoints;
151        }
152    
153      public Collection<Tap> getSources()
154        {
155        return sources.values();
156        }
157    
158      public Collection<Tap> getSinks()
159        {
160        return sinks.values();
161        }
162    
163      public Collection<Tap> getTraps()
164        {
165        return traps.values();
166        }
167    
168      private void assembleGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks )
169        {
170        HashMap<String, Tap> sourcesCopy = new HashMap<String, Tap>( sources );
171        HashMap<String, Tap> sinksCopy = new HashMap<String, Tap>( sinks );
172    
173        for( Pipe pipe : pipes )
174          makeGraph( pipe, sourcesCopy, sinksCopy );
175    
176        addExtents( sources, sinks );
177        }
178    
179      /** Method verifyGraphConnections ... */
180      private void verifyGraph()
181        {
182        if( vertexSet().isEmpty() )
183          return;
184    
185        Set<String> checkpointNames = new HashSet<String>();
186    
187        // need to verify that only Extent instances are origins in this graph. Otherwise a Tap was not properly connected
188        TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator();
189    
190        FlowElement flowElement = null;
191    
192        while( iterator.hasNext() )
193          {
194          try
195            {
196            flowElement = iterator.next();
197            }
198          catch( IllegalArgumentException exception )
199            {
200            if( flowElement == null )
201              throw new ElementGraphException( "unable to traverse to the first element" );
202    
203            throw new ElementGraphException( flowElement, "unable to traverse to the next element after " + flowElement );
204            }
205    
206          if( requireUniqueCheckpoints && flowElement instanceof Checkpoint )
207            {
208            String name = ( (Checkpoint) flowElement ).getName();
209    
210            if( checkpointNames.contains( name ) )
211              throw new ElementGraphException( (Pipe) flowElement, "may not have duplicate checkpoint names in assembly, found: " + name );
212    
213            checkpointNames.add( name );
214            }
215    
216          if( incomingEdgesOf( flowElement ).size() != 0 && outgoingEdgesOf( flowElement ).size() != 0 )
217            continue;
218    
219          if( flowElement instanceof Extent )
220            continue;
221    
222          if( flowElement instanceof Pipe )
223            {
224            if( incomingEdgesOf( flowElement ).size() == 0 )
225              throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to head Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming heads" );
226            else
227              throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to tail Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" );
228            }
229    
230          if( flowElement instanceof Tap )
231            throw new ElementGraphException( (Tap) flowElement, "no Pipe connected to Tap: " + flowElement );
232          else
233            throw new ElementGraphException( flowElement, "unknown element type: " + flowElement );
234          }
235        }
236    
237      /**
238       * Method copyGraph returns a partial copy of the current ElementGraph. Only Vertices and Edges are copied.
239       *
240       * @return ElementGraph
241       */
242      public ElementGraph copyElementGraph()
243        {
244        ElementGraph copy = new ElementGraph();
245        Graphs.addGraph( copy, this );
246    
247        copy.traps = new HashMap<String, Tap>( this.traps );
248    
249        return copy;
250        }
251    
252      /**
253       * created to support the ability to generate all paths between the head and tail of the process.
254       *
255       * @param sources
256       * @param sinks
257       */
258      private void addExtents( Map<String, Tap> sources, Map<String, Tap> sinks )
259        {
260        addVertex( head );
261    
262        for( String source : sources.keySet() )
263          {
264          Scope scope = addEdge( head, sources.get( source ) );
265    
266          // edge may already exist, if so, above returns null
267          if( scope != null )
268            scope.setName( source );
269          }
270    
271        addVertex( tail );
272    
273        for( String sink : sinks.keySet() )
274          {
275          Scope scope;
276    
277          try
278            {
279            scope = addEdge( sinks.get( sink ), tail );
280            }
281          catch( IllegalArgumentException exception )
282            {
283            throw new ElementGraphException( "missing pipe for sink tap: [" + sink + "]" );
284            }
285    
286          if( scope == null )
287            throw new ElementGraphException( "cannot sink to the same path from multiple branches: [" + Util.join( sinks.values() ) + "]" );
288    
289          scope.setName( sink );
290          }
291        }
292    
293      /**
294       * Performs one rule check, verifies group does not join duplicate tap resources.
295       * <p/>
296       * Scopes are always named after the source side of the source -> target relationship
297       */
298      private void makeGraph( Pipe current, Map<String, Tap> sources, Map<String, Tap> sinks )
299        {
300        if( LOG.isDebugEnabled() )
301          LOG.debug( "adding pipe: " + current );
302    
303        if( current instanceof SubAssembly )
304          {
305          for( Pipe pipe : SubAssembly.unwind( current.getPrevious() ) )
306            makeGraph( pipe, sources, sinks );
307    
308          return;
309          }
310    
311        if( containsVertex( current ) )
312          return;
313    
314        addVertex( current );
315    
316        Tap sink = sinks.remove( current.getName() );
317    
318        if( sink != null )
319          {
320          if( LOG.isDebugEnabled() )
321            LOG.debug( "adding sink: " + sink );
322    
323          addVertex( sink );
324    
325          if( LOG.isDebugEnabled() )
326            LOG.debug( "adding edge: " + current + " -> " + sink );
327    
328          addEdge( current, sink ).setName( current.getName() ); // name scope after sink
329          }
330    
331        // PipeAssemblies should always have a previous
332        if( SubAssembly.unwind( current.getPrevious() ).length == 0 )
333          {
334          Tap source = sources.remove( current.getName() );
335    
336          if( source != null )
337            {
338            if( LOG.isDebugEnabled() )
339              LOG.debug( "adding source: " + source );
340    
341            addVertex( source );
342    
343            if( LOG.isDebugEnabled() )
344              LOG.debug( "adding edge: " + source + " -> " + current );
345    
346            addEdge( source, current ).setName( current.getName() ); // name scope after source
347            }
348          }
349    
350        for( Pipe previous : SubAssembly.unwind( current.getPrevious() ) )
351          {
352          makeGraph( previous, sources, sinks );
353    
354          if( LOG.isDebugEnabled() )
355            LOG.debug( "adding edge: " + previous + " -> " + current );
356    
357          if( getEdge( previous, current ) != null )
358            throw new ElementGraphException( previous, "cannot distinguish pipe branches, give pipe unique name: " + previous );
359    
360          addEdge( previous, current ).setName( previous.getName() ); // name scope after previous pipe
361          }
362        }
363    
364      /**
365       * Method getTopologicalIterator returns the topologicalIterator of this ElementGraph object.
366       *
367       * @return the topologicalIterator (type TopologicalOrderIterator<FlowElement, Scope>) of this ElementGraph object.
368       */
369      public TopologicalOrderIterator<FlowElement, Scope> getTopologicalIterator()
370        {
371        return new TopologicalOrderIterator<FlowElement, Scope>( this );
372        }
373    
374      /**
375       * Method getAllShortestPathsFrom ...
376       *
377       * @param flowElement of type FlowElement
378       * @return List<GraphPath<FlowElement, Scope>>
379       */
380      public List<GraphPath<FlowElement, Scope>> getAllShortestPathsFrom( FlowElement flowElement )
381        {
382        return ElementGraphs.getAllShortestPathsBetween( this, flowElement, tail );
383        }
384    
385      /**
386       * Method getAllShortestPathsTo ...
387       *
388       * @param flowElement of type FlowElement
389       * @return List<GraphPath<FlowElement, Scope>>
390       */
391      public List<GraphPath<FlowElement, Scope>> getAllShortestPathsTo( FlowElement flowElement )
392        {
393        return ElementGraphs.getAllShortestPathsBetween( this, head, flowElement );
394        }
395    
396      /**
397       * Method getAllShortestPathsBetweenExtents returns the allShortestPathsBetweenExtents of this ElementGraph object.
398       *
399       * @return the allShortestPathsBetweenExtents (type List<GraphPath<FlowElement, Scope>>) of this ElementGraph object.
400       */
401      public List<GraphPath<FlowElement, Scope>> getAllShortestPathsBetweenExtents()
402        {
403        List<GraphPath<FlowElement, Scope>> paths = new KShortestPaths<FlowElement, Scope>( this, head, Integer.MAX_VALUE ).getPaths( tail );
404    
405        if( paths == null )
406          return new ArrayList<GraphPath<FlowElement, Scope>>();
407    
408        return paths;
409        }
410    
411      /**
412       * Method getDepthFirstIterator returns the depthFirstIterator of this ElementGraph object.
413       *
414       * @return the depthFirstIterator (type DepthFirstIterator<FlowElement, Scope>) of this ElementGraph object.
415       */
416      public DepthFirstIterator<FlowElement, Scope> getDepthFirstIterator()
417        {
418        return new DepthFirstIterator<FlowElement, Scope>( this, head );
419        }
420    
421      private SimpleDirectedGraph<FlowElement, Scope> copyWithTraps()
422        {
423        ElementGraph copy = this.copyElementGraph();
424    
425        copy.addTraps();
426    
427        return copy;
428        }
429    
430      private void addTraps()
431        {
432        DepthFirstIterator<FlowElement, Scope> iterator = getDepthFirstIterator();
433    
434        while( iterator.hasNext() )
435          {
436          FlowElement element = iterator.next();
437    
438          if( !( element instanceof Pipe ) )
439            continue;
440    
441          Pipe pipe = (Pipe) element;
442          Tap trap = traps.get( pipe.getName() );
443    
444          if( trap == null )
445            continue;
446    
447          addVertex( trap );
448    
449          if( LOG.isDebugEnabled() )
450            LOG.debug( "adding trap edge: " + pipe + " -> " + trap );
451    
452          if( getEdge( pipe, trap ) != null )
453            continue;
454    
455          addEdge( pipe, trap ).setName( pipe.getName() ); // name scope after previous pipe
456          }
457        }
458    
459      /**
460       * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging.
461       *
462       * @param filename of type String
463       */
464      public void writeDOT( String filename )
465        {
466        printElementGraph( filename, this.copyWithTraps() );
467        }
468    
469      protected void printElementGraph( String filename, final SimpleDirectedGraph<FlowElement, Scope> graph )
470        {
471        try
472          {
473          File parentFile = new File( filename ).getParentFile();
474    
475          if( parentFile != null && !parentFile.exists() )
476            parentFile.mkdirs();
477    
478          Writer writer = new FileWriter( filename );
479    
480          Util.writeDOT( writer, graph, new IntegerNameProvider<FlowElement>(), new VertexNameProvider<FlowElement>()
481            {
482            public String getVertexName( FlowElement object )
483              {
484              if( graph.incomingEdgesOf( object ).isEmpty() )
485                {
486                String result = object.toString().replaceAll( "\"", "\'" );
487                String versionString = Version.getRelease();
488    
489                if( platformInfo != null )
490                  versionString = ( versionString == null ? "" : versionString + "\\n" ) + platformInfo;
491    
492                return versionString == null ? result : result + "\\n" + versionString;
493                }
494    
495              if( object instanceof Tap || object instanceof Extent )
496                return object.toString().replaceAll( "\"", "\'" );
497    
498              Scope scope = graph.outgoingEdgesOf( object ).iterator().next();
499    
500              return ( (Pipe) object ).print( scope ).replaceAll( "\"", "\'" );
501              }
502            }, new EdgeNameProvider<Scope>()
503            {
504            public String getEdgeName( Scope object )
505              {
506              return object.toString().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz
507              }
508            }
509          );
510    
511          writer.close();
512          }
513        catch( IOException exception )
514          {
515          LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
516          }
517        }
518    
519      /** Method removeEmptyPipes performs a depth first traversal and removes instance of {@link cascading.pipe.Pipe} or {@link cascading.pipe.SubAssembly}. */
520      public void removeUnnecessaryPipes()
521        {
522        while( !internalRemoveUnnecessaryPipes() )
523          ;
524    
525        int numPipes = 0;
526        for( FlowElement flowElement : vertexSet() )
527          {
528          if( flowElement instanceof Pipe )
529            numPipes++;
530          }
531    
532        if( numPipes == 0 )
533          throw new ElementGraphException( "resulting graph has no pipe elements after removing empty Pipe, assertions, and SubAssembly containers" );
534        }
535    
536      private boolean internalRemoveUnnecessaryPipes()
537        {
538        DepthFirstIterator<FlowElement, Scope> iterator = getDepthFirstIterator();
539    
540        while( iterator.hasNext() )
541          {
542          FlowElement flowElement = iterator.next();
543    
544          if( flowElement.getClass() == Pipe.class || flowElement.getClass() == Checkpoint.class ||
545            flowElement instanceof SubAssembly || testPlannerLevel( flowElement ) )
546            {
547            // Pipe class is guaranteed to have one input
548            removeElement( flowElement );
549    
550            return false;
551            }
552          }
553    
554        return true;
555        }
556    
557      private void removeElement( FlowElement flowElement )
558        {
559        LOG.debug( "removing: " + flowElement );
560    
561        Set<Scope> incomingScopes = incomingEdgesOf( flowElement );
562    
563        if( incomingScopes.size() != 1 )
564          throw new IllegalStateException( "flow element:" + flowElement + ", has multiple input paths: " + incomingScopes.size() );
565    
566        Scope incoming = incomingScopes.iterator().next();
567        Set<Scope> outgoingScopes = outgoingEdgesOf( flowElement );
568    
569        // source -> incoming -> flowElement -> outgoing -> target
570        FlowElement source = getEdgeSource( incoming );
571    
572        for( Scope outgoing : outgoingScopes )
573          {
574          FlowElement target = getEdgeTarget( outgoing );
575    
576          addEdge( source, target, new Scope( outgoing ) );
577          }
578    
579        removeVertex( flowElement );
580        }
581    
582      private boolean testPlannerLevel( FlowElement flowElement )
583        {
584        if( !( flowElement instanceof Operator ) )
585          return false;
586    
587        Operator operator = (Operator) flowElement;
588    
589        if( !operator.hasPlannerLevel() )
590          return false;
591    
592        for( PlannerLevel plannerLevel : plannerLevels )
593          {
594          if( ( (PlannedOperation) operator.getOperation() ).supportsPlannerLevel( plannerLevel ) )
595            return operator.getPlannerLevel().isStricterThan( plannerLevel );
596          }
597    
598        throw new IllegalStateException( "encountered unsupported planner level: " + operator.getPlannerLevel().getClass().getName() );
599        }
600    
601      /** Method resolveFields performs a breadth first traversal and resolves the tuple fields between each Pipe instance. */
602      public void resolveFields()
603        {
604        if( resolved )
605          throw new IllegalStateException( "element graph already resolved" );
606    
607        TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator();
608    
609        while( iterator.hasNext() )
610          resolveFields( iterator.next() );
611    
612        resolved = true;
613        }
614    
615      private void resolveFields( FlowElement source )
616        {
617        if( source instanceof Extent )
618          return;
619    
620        Set<Scope> incomingScopes = incomingEdgesOf( source );
621        Set<Scope> outgoingScopes = outgoingEdgesOf( source );
622    
623        List<FlowElement> flowElements = Graphs.successorListOf( this, source );
624    
625        if( flowElements.size() == 0 )
626          throw new IllegalStateException( "unable to find next elements in pipeline from: " + source.toString() );
627    
628        Scope outgoingScope = source.outgoingScopeFor( incomingScopes );
629    
630        if( LOG.isDebugEnabled() && outgoingScope != null )
631          {
632          LOG.debug( "for modifier: " + source );
633          if( outgoingScope.getArgumentsSelector() != null )
634            LOG.debug( "setting outgoing arguments: " + outgoingScope.getArgumentsSelector() );
635          if( outgoingScope.getOperationDeclaredFields() != null )
636            LOG.debug( "setting outgoing declared: " + outgoingScope.getOperationDeclaredFields() );
637          if( outgoingScope.getKeySelectors() != null )
638            LOG.debug( "setting outgoing group: " + outgoingScope.getKeySelectors() );
639          if( outgoingScope.getOutValuesSelector() != null )
640            LOG.debug( "setting outgoing values: " + outgoingScope.getOutValuesSelector() );
641          }
642    
643        for( Scope scope : outgoingScopes )
644          scope.copyFields( outgoingScope );
645        }
646    
647      /**
648       * Finds all groups that merge/join streams. returned in topological order.
649       *
650       * @return a List fo Group instances
651       */
652      public List<Group> findAllMergeJoinGroups()
653        {
654        return findAllOfType( 2, 1, Group.class, new LinkedList<Group>() );
655        }
656    
657      /**
658       * Finds all splices that merge/join streams. returned in topological order.
659       *
660       * @return a List fo Group instances
661       */
662      public List<Splice> findAllMergeJoinSplices()
663        {
664        return findAllOfType( 2, 1, Splice.class, new LinkedList<Splice>() );
665        }
666    
667      public List<CoGroup> findAllCoGroups()
668        {
669        return findAllOfType( 2, 1, CoGroup.class, new LinkedList<CoGroup>() );
670        }
671    
672      /**
673       * Method findAllGroups ...
674       *
675       * @return List<Group>
676       */
677      public List<Group> findAllGroups()
678        {
679        return findAllOfType( 1, 1, Group.class, new LinkedList<Group>() );
680        }
681    
682      /**
683       * Method findAllEveries ...
684       *
685       * @return List<Every>
686       */
687      public List<Every> findAllEveries()
688        {
689        return findAllOfType( 1, 1, Every.class, new LinkedList<Every>() );
690        }
691    
692      /**
693       * Method findAllTaps ...
694       *
695       * @return List<Tap>
696       */
697      public List<Tap> findAllTaps()
698        {
699        return findAllOfType( 1, 1, Tap.class, new LinkedList<Tap>() );
700        }
701    
702      /**
703       * Method findAllSplits ...
704       *
705       * @return List<FlowElement>
706       */
707      public List<Each> findAllEachSplits()
708        {
709        return findAllOfType( 1, 2, Each.class, new LinkedList<Each>() );
710        }
711    
712      public List<Pipe> findAllPipeSplits()
713        {
714        return findAllOfType( 1, 2, Pipe.class, new LinkedList<Pipe>() );
715        }
716    
717      /**
718       * Method findAllOfType ...
719       *
720       * @param minInDegree  of type int
721       * @param minOutDegree
722       * @param type         of type Class<P>
723       * @param results      of type List<P>   @return List<P>
724       */
725      public <P> List<P> findAllOfType( int minInDegree, int minOutDegree, Class<P> type, List<P> results )
726        {
727        TopologicalOrderIterator<FlowElement, Scope> topoIterator = getTopologicalIterator();
728    
729        while( topoIterator.hasNext() )
730          {
731          FlowElement flowElement = topoIterator.next();
732    
733          if( type.isInstance( flowElement ) && inDegreeOf( flowElement ) >= minInDegree && outDegreeOf( flowElement ) >= minOutDegree )
734            results.add( (P) flowElement );
735          }
736    
737        return results;
738        }
739    
740      public void insertFlowElementAfter( FlowElement previousElement, FlowElement flowElement )
741        {
742        Set<Scope> outgoing = new HashSet<Scope>( outgoingEdgesOf( previousElement ) );
743    
744        addVertex( flowElement );
745    
746        String name = previousElement.toString();
747    
748        if( previousElement instanceof Pipe )
749          name = ( (Pipe) previousElement ).getName();
750    
751        addEdge( previousElement, flowElement, new Scope( name ) );
752    
753        for( Scope scope : outgoing )
754          {
755          FlowElement target = getEdgeTarget( scope );
756          removeEdge( previousElement, target ); // remove scope
757          addEdge( flowElement, target, scope ); // add scope back
758          }
759        }
760    
761      /** Simple class that acts in as the root of the graph */
762      /**
763       * Method makeTapGraph returns a directed graph of all taps in the current element graph.
764       *
765       * @return SimpleDirectedGraph<Tap, Integer>
766       */
767      public SimpleDirectedGraph<Tap, Integer> makeTapGraph()
768        {
769        SimpleDirectedGraph<Tap, Integer> tapGraph = new SimpleDirectedGraph<Tap, Integer>( Integer.class );
770        List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetweenExtents();
771        int count = 0;
772    
773        if( LOG.isDebugEnabled() )
774          LOG.debug( "found num paths: " + paths.size() );
775    
776        for( GraphPath<FlowElement, Scope> element : paths )
777          {
778          List<Scope> path = element.getEdgeList();
779          Tap lastTap = null;
780    
781          for( Scope scope : path )
782            {
783            FlowElement target = getEdgeTarget( scope );
784    
785            if( target instanceof Extent )
786              continue;
787    
788            if( !( target instanceof Tap ) )
789              continue;
790    
791            tapGraph.addVertex( (Tap) target );
792    
793            if( lastTap != null )
794              {
795              if( LOG.isDebugEnabled() )
796                LOG.debug( "adding tap edge: " + lastTap + " -> " + target );
797    
798              if( tapGraph.getEdge( lastTap, (Tap) target ) == null && !tapGraph.addEdge( lastTap, (Tap) target, count++ ) )
799                throw new ElementGraphException( "could not add graph edge: " + lastTap + " -> " + target );
800              }
801    
802            lastTap = (Tap) target;
803            }
804          }
805    
806        return tapGraph;
807        }
808    
809      public int getMaxNumPathsBetweenElementAndGroupingMergeJoin( FlowElement flowElement )
810        {
811        List<Group> groups = findAllMergeJoinGroups();
812    
813        int maxPaths = 0;
814    
815        if( groups == null )
816          return 0;
817    
818        for( Group group : groups )
819          {
820          if( flowElement != group )
821            {
822            List<GraphPath<FlowElement, Scope>> paths = ElementGraphs.getAllShortestPathsBetween( this, flowElement, group );
823    
824            if( paths != null )
825              maxPaths = Math.max( maxPaths, paths.size() );
826            }
827          }
828    
829        return maxPaths;
830        }
831    
832      public List<FlowElement> getAllSuccessors( FlowElement element )
833        {
834        return Graphs.successorListOf( this, element );
835        }
836    
837      public void replaceElementWith( FlowElement element, FlowElement replacement )
838        {
839        Set<Scope> incoming = new HashSet<Scope>( incomingEdgesOf( element ) );
840        Set<Scope> outgoing = new HashSet<Scope>( outgoingEdgesOf( element ) );
841    
842        if( !containsVertex( replacement ) )
843          addVertex( replacement );
844    
845        for( Scope scope : incoming )
846          {
847          FlowElement source = getEdgeSource( scope );
848          removeEdge( source, element ); // remove scope
849    
850          // drop edge between, if any
851          if( source != replacement )
852            addEdge( source, replacement, scope ); // add scope back
853          }
854    
855        for( Scope scope : outgoing )
856          {
857          FlowElement target = getEdgeTarget( scope );
858          removeEdge( element, target ); // remove scope
859    
860          // drop edge between, if any
861          if( target != replacement )
862            addEdge( replacement, target, scope ); // add scope back
863          }
864    
865        removeVertex( element );
866        }
867    
868      public <A extends FlowElement> Set<A> getAllChildrenOfType( FlowElement flowElement, Class<A> type )
869        {
870        Set<A> allChildren = new HashSet<A>();
871    
872        getAllChildrenOfType( allChildren, flowElement, type );
873    
874        return allChildren;
875        }
876    
877      private <A extends FlowElement> void getAllChildrenOfType( Set<A> allSuccessors, FlowElement flowElement, Class<A> type )
878        {
879        List<FlowElement> successors = getAllSuccessors( flowElement );
880    
881        for( FlowElement successor : successors )
882          {
883          if( type.isInstance( successor ) )
884            allSuccessors.add( (A) successor );
885          else
886            getAllChildrenOfType( allSuccessors, successor, type );
887          }
888        }
889    
890      public Set<FlowElement> getAllChildrenNotExactlyType( FlowElement flowElement, Class<? extends FlowElement> type )
891        {
892        Set<FlowElement> allChildren = new HashSet<FlowElement>();
893    
894        getAllChildrenNotExactlyType( allChildren, flowElement, type );
895    
896        return allChildren;
897        }
898    
899      private void getAllChildrenNotExactlyType( Set<FlowElement> allSuccessors, FlowElement flowElement, Class<? extends FlowElement> type )
900        {
901        List<FlowElement> successors = getAllSuccessors( flowElement );
902    
903        for( FlowElement successor : successors )
904          {
905          if( type != successor.getClass() )
906            allSuccessors.add( successor );
907          else
908            getAllChildrenNotExactlyType( allSuccessors, successor, type );
909          }
910        }
911    
912      public static class Extent extends Pipe
913        {
914    
915        /** @see cascading.pipe.Pipe#Pipe(String) */
916        public Extent( String name )
917          {
918          super( name );
919          }
920    
921        @Override
922        public Scope outgoingScopeFor( Set<Scope> scopes )
923          {
924          return new Scope();
925          }
926    
927        @Override
928        public String toString()
929          {
930          return "[" + getName() + "]";
931          }
932    
933        public boolean equals( Object object )
934          {
935          if( object == null )
936            return false;
937    
938          if( this == object )
939            return true;
940    
941          if( object.getClass() != this.getClass() )
942            return false;
943    
944          return this.getName().equals( ( (Pipe) object ).getName() );
945          }
946        }
947      }