001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.util.Collection;
024    import java.util.Collections;
025    import java.util.HashSet;
026    import java.util.Iterator;
027    import java.util.LinkedList;
028    import java.util.List;
029    import java.util.ListIterator;
030    import java.util.Set;
031    
032    import cascading.util.Util;
033    import org.jgrapht.DirectedGraph;
034    import org.jgrapht.GraphPath;
035    import org.jgrapht.Graphs;
036    import org.jgrapht.alg.KShortestPaths;
037    import org.jgrapht.traverse.TopologicalOrderIterator;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     * Class StreamGraph is the operation pipeline used during processing. This an internal use only class.
043     * <p/>
044     * Under some circumstances it may make sense to see the actual graph plan. To do so, enable one or both dot file
045     * properties, {@link #ERROR_DOT_FILE_NAME} and {@link #DOT_FILE_PATH}.
046     */
047    public class StreamGraph
048      {
049      /** Property denoting the path and filename to write the failed stream graph dot file. */
050      public final static String ERROR_DOT_FILE_NAME = "cascading.stream.error.dotfile";
051    
052      /**
053       * Property denoting the path to write all stream graph dot files. The filename will be generated
054       * based on platform properties.
055       */
056      public final static String DOT_FILE_PATH = "cascading.stream.dotfile.path";
057    
058      private static final Logger LOG = LoggerFactory.getLogger( StreamGraph.class );
059    
060      private final Duct HEAD = new Extent( "head" );
061      private final Duct TAIL = new Extent( "tail" );
062    
063      private final DuctGraph graph = new DuctGraph();
064    
065      private class Extent extends Stage
066        {
067        final String name;
068    
069        private Extent( String name )
070          {
071          this.name = name;
072          }
073    
074        @Override
075        public String toString()
076          {
077          return name;
078          }
079        }
080    
081      public StreamGraph()
082        {
083        }
084    
085      protected Object getProperty( String name )
086        {
087        return null;
088        }
089    
090      Duct getHEAD()
091        {
092        return HEAD;
093        }
094    
095      Duct getTAIL()
096        {
097        return TAIL;
098        }
099    
100      public void addHead( Duct head )
101        {
102        addPath( getHEAD(), head );
103        }
104    
105      public void addTail( Duct tail )
106        {
107        addPath( tail, getTAIL() );
108        }
109    
110      public void addPath( Duct lhs, Duct rhs )
111        {
112        addPath( lhs, 0, rhs );
113        }
114    
115      public void addPath( Duct lhs, int ordinal, Duct rhs )
116        {
117        if( lhs == null && rhs == null )
118          throw new IllegalArgumentException( "both lhs and rhs may not be null" );
119    
120        if( lhs == getTAIL() )
121          throw new IllegalStateException( "lhs may not be a TAIL" );
122    
123        if( rhs == getHEAD() )
124          throw new IllegalStateException( "rhs may not be a HEAD" );
125    
126        if( lhs == null )
127          lhs = getHEAD();
128    
129        if( rhs == null )
130          rhs = getTAIL();
131    
132        try
133          {
134          graph.addVertex( lhs );
135          graph.addVertex( rhs );
136          graph.addEdge( lhs, rhs, graph.makeOrdinal( ordinal ) );
137          }
138        catch( RuntimeException exception )
139          {
140          LOG.error( "unable to add path", exception );
141          printGraphError();
142          throw exception;
143          }
144        }
145    
146      public void bind()
147        {
148        Iterator<Duct> iterator = getTopologicalOrderIterator();
149    
150        // build the actual processing graph
151        while( iterator.hasNext() )
152          iterator.next().bind( this );
153    
154        iterator = getReversedTopologicalOrderIterator();
155    
156        // initialize all the ducts
157        while( iterator.hasNext() )
158          iterator.next().initialize();
159        }
160    
161      /** Calls prepare starting at the tail and working backwards */
162      public void prepare()
163        {
164        TopologicalOrderIterator<Duct, Integer> iterator = getReversedTopologicalOrderIterator();
165    
166        while( iterator.hasNext() )
167          iterator.next().prepare();
168        }
169    
170      /** Calls cleanup starting at the head and working forwards */
171      public void cleanup()
172        {
173        TopologicalOrderIterator<Duct, Integer> iterator = getTopologicalOrderIterator();
174    
175        while( iterator.hasNext() )
176          iterator.next().cleanup();
177        }
178    
179      public Collection<Duct> getHeads()
180        {
181        return Graphs.successorListOf( graph, getHEAD() );
182        }
183    
184      public Collection<Duct> getTails()
185        {
186        return Graphs.predecessorListOf( graph, getTAIL() );
187        }
188    
189      public Duct[] findAllNextFor( Duct current )
190        {
191        LinkedList<Duct> successors = new LinkedList<Duct>( Graphs.successorListOf( graph, current ) );
192        ListIterator<Duct> iterator = successors.listIterator();
193    
194        while( iterator.hasNext() )
195          {
196          Duct successor = iterator.next();
197    
198          if( successor == getHEAD() )
199            throw new IllegalStateException( "HEAD may not be next" );
200    
201          if( successor == getTAIL() ) // tail is not included, its just a marker
202            iterator.remove();
203          }
204    
205        return successors.toArray( new Duct[ successors.size() ] );
206        }
207    
208      public Duct[] findAllPreviousFor( Duct current )
209        {
210        LinkedList<Duct> predecessors = new LinkedList<Duct>( Graphs.predecessorListOf( graph, current ) );
211        ListIterator<Duct> iterator = predecessors.listIterator();
212    
213        while( iterator.hasNext() )
214          {
215          Duct successor = iterator.next();
216    
217          if( successor == getTAIL() )
218            throw new IllegalStateException( "TAIL may not be successor" );
219    
220          if( successor == getHEAD() ) // head is not included, its just a marker
221            iterator.remove();
222          }
223    
224        return predecessors.toArray( new Duct[ predecessors.size() ] );
225        }
226    
227      public Duct createNextFor( Duct current )
228        {
229        if( current == getHEAD() || current == getTAIL() )
230          return null;
231    
232        Set<DuctGraph.Ordinal> edges = graph.outgoingEdgesOf( current );
233    
234        if( edges.size() == 0 )
235          throw new IllegalStateException( "ducts must have an outgoing edge, current: " + current );
236    
237        Duct next = graph.getEdgeTarget( edges.iterator().next() );
238    
239        if( current instanceof Gate )
240          {
241          if( next instanceof OpenWindow )
242            return next;
243    
244          if( edges.size() > 1 )
245            return createOpenWindow( createFork( findAllNextFor( current ) ) );
246    
247          if( next instanceof Reducing )
248            return createOpenReducingWindow( next );
249    
250          return createOpenWindow( next );
251          }
252    
253        if( current instanceof Reducing )
254          {
255          if( next instanceof Reducing )
256            return next;
257    
258          if( edges.size() > 1 )
259            return createCloseWindow( createFork( findAllNextFor( current ) ) );
260    
261          return createCloseWindow( next );
262          }
263    
264        if( edges.size() > 1 )
265          return createFork( findAllNextFor( current ) );
266    
267        if( next == getTAIL() ) // tail is not included, its just a marker
268          throw new IllegalStateException( "tail ducts should not bind to next" );
269    
270        return next;
271        }
272    
273      private Duct createCloseWindow( Duct next )
274        {
275        return new CloseReducingDuct( next );
276        }
277    
278      protected Duct createOpenWindow( Duct next )
279        {
280        return new OpenDuct( next );
281        }
282    
283      protected Duct createOpenReducingWindow( Duct next )
284        {
285        return new OpenReducingDuct( next );
286        }
287    
288      protected Duct createFork( Duct[] allNext )
289        {
290        return new Fork( allNext );
291        }
292    
293      /**
294       * Returns all free paths to the current duct, usually a GroupGate.
295       * <p/>
296       * Paths all unique paths are counted, minus any immediate prior GroupGates as they
297       * block incoming paths into a single path
298       *
299       * @param duct of type Duct
300       * @return an int
301       */
302      public int countAllEventingPathsTo( Duct duct )
303        {
304        // find all immediate prior groups/ collapsed
305        LinkedList<List<Duct>> allPaths = asPathList( allPathsBetweenInclusive( getHEAD(), duct ) );
306    
307        Set<Duct> nearestCollapsed = new HashSet<Duct>();
308    
309        for( List<Duct> path : allPaths )
310          {
311          Collections.reverse( path );
312    
313          path.remove( 0 ); // remove the duct param
314    
315          for( Duct element : path )
316            {
317            if( !( element instanceof Collapsing ) )
318              continue;
319    
320            nearestCollapsed.add( element );
321            break;
322            }
323          }
324    
325        // find all paths
326        // remove all paths containing prior groups
327        LinkedList<List<Duct>> collapsedPaths = new LinkedList<List<Duct>>( allPaths );
328        ListIterator<List<Duct>> iterator = collapsedPaths.listIterator();
329        while( iterator.hasNext() )
330          {
331          List<Duct> path = iterator.next();
332    
333          if( Collections.disjoint( path, nearestCollapsed ) )
334            iterator.remove();
335          }
336    
337        int collapsedPathsCount = 0;
338        for( Duct collapsed : nearestCollapsed )
339          {
340          LinkedList<List<Duct>> subPaths = asPathList( allPathsBetweenInclusive( collapsed, duct ) );
341          for( List<Duct> subPath : subPaths )
342            {
343            subPath.remove( 0 ); // remove collapsed duct
344            if( Collections.disjoint( subPath, nearestCollapsed ) )
345              collapsedPathsCount += 1;
346            }
347          }
348    
349        int nonCollapsedPathsCount = allPaths.size() - collapsedPaths.size();
350    
351        // incoming == paths + prior
352        return nonCollapsedPathsCount + collapsedPathsCount;
353        }
354    
355      public int ordinalBetween( Duct lhs, Duct rhs )
356        {
357        return graph.getEdge( lhs, rhs ).ordinal;
358        }
359    
360      private List<GraphPath<Duct, DuctGraph.Ordinal>> allPathsBetweenInclusive( Duct from, Duct to )
361        {
362        return new KShortestPaths<Duct, DuctGraph.Ordinal>( graph, from, Integer.MAX_VALUE ).getPaths( to );
363        }
364    
365      public static LinkedList<List<Duct>> asPathList( List<GraphPath<Duct, DuctGraph.Ordinal>> paths )
366        {
367        LinkedList<List<Duct>> results = new LinkedList<List<Duct>>();
368    
369        if( paths == null )
370          return results;
371    
372        for( GraphPath<Duct, DuctGraph.Ordinal> path : paths )
373          results.add( Graphs.getPathVertexList( path ) );
374    
375        return results;
376        }
377    
378      public TopologicalOrderIterator<Duct, Integer> getTopologicalOrderIterator()
379        {
380        try
381          {
382          return new TopologicalOrderIterator( graph );
383          }
384        catch( RuntimeException exception )
385          {
386          LOG.error( "failed creating topological iterator", exception );
387          printGraphError();
388    
389          throw exception;
390          }
391        }
392    
393      public TopologicalOrderIterator<Duct, Integer> getReversedTopologicalOrderIterator()
394        {
395        try
396          {
397          return new TopologicalOrderIterator( getReversedGraph() );
398          }
399        catch( RuntimeException exception )
400          {
401          LOG.error( "failed creating reversed topological iterator", exception );
402          printGraphError();
403    
404          throw exception;
405          }
406        }
407    
408      public DirectedGraph getReversedGraph()
409        {
410        DuctGraph reversedGraph = new DuctGraph();
411    
412        Graphs.addGraphReversed( reversedGraph, graph );
413    
414        return reversedGraph;
415        }
416    
417      public Collection<Duct> getAllDucts()
418        {
419        return graph.vertexSet();
420        }
421    
422      public void printGraphError()
423        {
424        String filename = (String) getProperty( ERROR_DOT_FILE_NAME );
425    
426        if( filename == null )
427          return;
428    
429        printGraph( filename );
430        }
431    
432      public void printGraph( String id, String classifier, int discriminator )
433        {
434        String path = (String) getProperty( DOT_FILE_PATH );
435    
436        if( path == null )
437          return;
438    
439        path = String.format( "%s/streamgraph-%s-%s-%s.dot", path, id, classifier, discriminator );
440    
441        printGraph( path );
442        }
443    
444      public void printGraph( String filename )
445        {
446        LOG.info( "writing stream graph to {}", filename );
447        Util.printGraph( filename, graph );
448        }
449      }