001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.graph;
022
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.IdentityHashMap;
027import java.util.Iterator;
028import java.util.LinkedList;
029import java.util.List;
030import java.util.ListIterator;
031import java.util.Map;
032import java.util.Set;
033
034import cascading.flow.stream.duct.CloseReducingDuct;
035import cascading.flow.stream.duct.Collapsing;
036import cascading.flow.stream.duct.Duct;
037import cascading.flow.stream.duct.DuctGraph;
038import cascading.flow.stream.duct.Fork;
039import cascading.flow.stream.duct.Gate;
040import cascading.flow.stream.duct.OpenDuct;
041import cascading.flow.stream.duct.OpenReducingDuct;
042import cascading.flow.stream.duct.OpenWindow;
043import cascading.flow.stream.duct.Reducing;
044import cascading.flow.stream.duct.Stage;
045import cascading.flow.stream.duct.Window;
046import cascading.util.Util;
047import org.jgrapht.DirectedGraph;
048import org.jgrapht.GraphPath;
049import org.jgrapht.Graphs;
050import org.jgrapht.alg.KShortestPaths;
051import org.jgrapht.traverse.TopologicalOrderIterator;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Class StreamGraph is the operation pipeline used during processing. This an internal use only class.
057 * <p/>
058 * Under some circumstances it may make sense to see the actual graph plan. To do so, enable one or both dot file
059 * properties, {@link #ERROR_DOT_FILE_NAME} and {@link #DOT_FILE_PATH}.
060 */
061public class StreamGraph
062  {
063  /** Property denoting the path and filename to write the failed stream graph dot file. */
064  public final static String ERROR_DOT_FILE_NAME = "cascading.stream.error.dotfile";
065
066  /**
067   * Property denoting the path to write all stream graph dot files. The filename will be generated
068   * based on platform properties.
069   */
070  public final static String DOT_FILE_PATH = "cascading.stream.dotfile.path";
071
072  private static final Logger LOG = LoggerFactory.getLogger( StreamGraph.class );
073
074  private final Duct HEAD = new Extent( "head" );
075  private final Duct TAIL = new Extent( "tail" );
076
077  private final DuctGraph ductGraph = new DuctGraph();
078
079  private class Extent extends Stage
080    {
081    final String name;
082
083    private Extent( String name )
084      {
085      this.name = name;
086      }
087
088    @Override
089    public String toString()
090      {
091      return name;
092      }
093    }
094
095  public StreamGraph()
096    {
097    }
098
099  protected Object getProperty( String name )
100    {
101    return null;
102    }
103
104  Duct getHEAD()
105    {
106    return HEAD;
107    }
108
109  Duct getTAIL()
110    {
111    return TAIL;
112    }
113
114  public void addHead( Duct head )
115    {
116    addPath( getHEAD(), head );
117    }
118
119  public void addTail( Duct tail )
120    {
121    addPath( tail, getTAIL() );
122    }
123
124  public void addPath( Duct lhs, Duct rhs )
125    {
126    addPath( lhs, 0, rhs );
127    }
128
129  public void addPath( Duct lhs, int ordinal, Duct rhs )
130    {
131    if( lhs == null && rhs == null )
132      throw new IllegalArgumentException( "both lhs and rhs may not be null" );
133
134    if( lhs == getTAIL() )
135      throw new IllegalStateException( "lhs may not be a TAIL" );
136
137    if( rhs == getHEAD() )
138      throw new IllegalStateException( "rhs may not be a HEAD" );
139
140    if( lhs == null )
141      lhs = getHEAD();
142
143    if( rhs == null )
144      rhs = getTAIL();
145
146    try
147      {
148      ductGraph.addVertex( lhs );
149      ductGraph.addVertex( rhs );
150      ductGraph.addEdge( lhs, rhs, ductGraph.makeOrdinal( ordinal ) );
151      }
152    catch( RuntimeException exception )
153      {
154      LOG.error( "unable to add path", exception );
155      printGraphError();
156      throw exception;
157      }
158    }
159
160  public void bind()
161    {
162    Iterator<Duct> iterator = getTopologicalOrderIterator();
163
164    // build the actual processing graph
165    while( iterator.hasNext() )
166      iterator.next().bind( this );
167
168    iterator = getReversedTopologicalOrderIterator();
169
170    // initialize all the ducts
171    while( iterator.hasNext() )
172      iterator.next().initialize();
173    }
174
175  /** Calls prepare starting at the tail and working backwards */
176  public void prepare()
177    {
178    TopologicalOrderIterator<Duct, Integer> iterator = getReversedTopologicalOrderIterator();
179
180    while( iterator.hasNext() )
181      iterator.next().prepare();
182    }
183
184  /** Calls cleanup starting at the head and working forwards */
185  public void cleanup()
186    {
187    TopologicalOrderIterator<Duct, Integer> iterator = getTopologicalOrderIterator();
188
189    while( iterator.hasNext() )
190      iterator.next().cleanup();
191    }
192
193  public Collection<Duct> getHeads()
194    {
195    return Graphs.successorListOf( ductGraph, getHEAD() );
196    }
197
198  public Collection<Duct> getTails()
199    {
200    return Graphs.predecessorListOf( ductGraph, getTAIL() );
201    }
202
203  public Duct[] findAllNextFor( Duct current )
204    {
205    LinkedList<Duct> successors = new LinkedList<Duct>( Graphs.successorListOf( ductGraph, current ) );
206    ListIterator<Duct> iterator = successors.listIterator();
207
208    while( iterator.hasNext() )
209      {
210      Duct successor = iterator.next();
211
212      if( successor == getHEAD() )
213        throw new IllegalStateException( "HEAD may not be next" );
214
215      if( successor == getTAIL() ) // tail is not included, its just a marker
216        iterator.remove();
217      }
218
219    return successors.toArray( new Duct[ successors.size() ] );
220    }
221
222  public Duct[] findAllPreviousFor( Duct current )
223    {
224    LinkedList<Duct> predecessors = new LinkedList<Duct>( Graphs.predecessorListOf( ductGraph, current ) );
225    ListIterator<Duct> iterator = predecessors.listIterator();
226
227    while( iterator.hasNext() )
228      {
229      Duct successor = iterator.next();
230
231      if( successor == getTAIL() )
232        throw new IllegalStateException( "TAIL may not be successor" );
233
234      if( successor == getHEAD() ) // head is not included, its just a marker
235        iterator.remove();
236      }
237
238    return predecessors.toArray( new Duct[ predecessors.size() ] );
239    }
240
241  public Map<Duct, Integer> getOrdinalMap( Duct current )
242    {
243    Duct[] allPrevious = findAllPreviousFor( current );
244
245    Map<Duct, Integer> results = new IdentityHashMap<>();
246
247    for( Duct previous : allPrevious )
248      results.put( previous, ductGraph.getEdge( previous, current ).getOrdinal() );
249
250    return results;
251    }
252
253  public Duct createNextFor( Duct current )
254    {
255    if( current == getHEAD() || current == getTAIL() )
256      return null;
257
258    Set<DuctGraph.Ordinal> edges = ductGraph.outgoingEdgesOf( current );
259
260    if( edges.size() == 0 )
261      throw new IllegalStateException( "ducts must have an outgoing edge, current: " + current );
262
263    Duct next = ductGraph.getEdgeTarget( edges.iterator().next() );
264
265    if( current instanceof Gate )
266      {
267      if( !( current instanceof Window ) ) // just collapse - Merge
268        {
269        if( edges.size() > 1 )
270          return createFork( findAllNextFor( current ) );
271
272        return next;
273        }
274
275      if( next instanceof OpenWindow )
276        return next;
277
278      if( edges.size() > 1 )
279        return createOpenWindow( createFork( findAllNextFor( current ) ) );
280
281      if( next instanceof Reducing )
282        return createOpenReducingWindow( next );
283
284      return createOpenWindow( next );
285      }
286
287    if( current instanceof Reducing )
288      {
289      if( next instanceof Reducing )
290        return next;
291
292      if( edges.size() > 1 )
293        return createCloseWindow( createFork( findAllNextFor( current ) ) );
294
295      return createCloseWindow( next );
296      }
297
298    if( edges.size() > 1 )
299      return createFork( findAllNextFor( current ) );
300
301    if( next == getTAIL() ) // tail is not included, its just a marker
302      throw new IllegalStateException( "tail ducts should not bind to next" );
303
304    return next;
305    }
306
307  private Duct createCloseWindow( Duct next )
308    {
309    return new CloseReducingDuct( next );
310    }
311
312  protected Duct createOpenWindow( Duct next )
313    {
314    return new OpenDuct( next );
315    }
316
317  protected Duct createOpenReducingWindow( Duct next )
318    {
319    return new OpenReducingDuct( next );
320    }
321
322  protected Duct createFork( Duct[] allNext )
323    {
324    return new Fork( allNext );
325    }
326
327  /**
328   * Returns all free paths to the current duct, usually a GroupGate.
329   * <p/>
330   * Paths all unique paths are counted, minus any immediate prior GroupGates as they
331   * block incoming paths into a single path
332   *
333   * @param duct of type Duct
334   * @return an int
335   */
336  public int countAllEventingPathsTo( Duct duct )
337    {
338    // find all immediate prior groups/ collapsed
339    LinkedList<List<Duct>> allPaths = asPathList( allPathsBetweenInclusive( getHEAD(), duct ) );
340
341    Set<Duct> nearestCollapsed = new HashSet<Duct>();
342
343    for( List<Duct> path : allPaths )
344      {
345      Collections.reverse( path );
346
347      path.remove( 0 ); // remove the duct param
348
349      for( Duct element : path )
350        {
351        if( !( element instanceof Collapsing ) )
352          continue;
353
354        nearestCollapsed.add( element );
355        break;
356        }
357      }
358
359    // find all paths
360    // remove all paths containing prior groups
361    LinkedList<List<Duct>> collapsedPaths = new LinkedList<List<Duct>>( allPaths );
362    ListIterator<List<Duct>> iterator = collapsedPaths.listIterator();
363    while( iterator.hasNext() )
364      {
365      List<Duct> path = iterator.next();
366
367      if( Collections.disjoint( path, nearestCollapsed ) )
368        iterator.remove();
369      }
370
371    int collapsedPathsCount = 0;
372    for( Duct collapsed : nearestCollapsed )
373      {
374      LinkedList<List<Duct>> subPaths = asPathList( allPathsBetweenInclusive( collapsed, duct ) );
375      for( List<Duct> subPath : subPaths )
376        {
377        subPath.remove( 0 ); // remove collapsed duct
378        if( Collections.disjoint( subPath, nearestCollapsed ) )
379          collapsedPathsCount += 1;
380        }
381      }
382
383    int nonCollapsedPathsCount = allPaths.size() - collapsedPaths.size();
384
385    // incoming == paths + prior
386    return nonCollapsedPathsCount + collapsedPathsCount;
387    }
388
389  public int ordinalBetween( Duct lhs, Duct rhs )
390    {
391    return ductGraph.getEdge( lhs, rhs ).getOrdinal();
392    }
393
394  private List<GraphPath<Duct, DuctGraph.Ordinal>> allPathsBetweenInclusive( Duct from, Duct to )
395    {
396    return new KShortestPaths<>( ductGraph, from, Integer.MAX_VALUE ).getPaths( to );
397    }
398
399  public static LinkedList<List<Duct>> asPathList( List<GraphPath<Duct, DuctGraph.Ordinal>> paths )
400    {
401    LinkedList<List<Duct>> results = new LinkedList<List<Duct>>();
402
403    if( paths == null )
404      return results;
405
406    for( GraphPath<Duct, DuctGraph.Ordinal> path : paths )
407      results.add( Graphs.getPathVertexList( path ) );
408
409    return results;
410    }
411
412  public TopologicalOrderIterator<Duct, Integer> getTopologicalOrderIterator()
413    {
414    try
415      {
416      return new TopologicalOrderIterator( ductGraph );
417      }
418    catch( RuntimeException exception )
419      {
420      LOG.error( "failed creating topological iterator", exception );
421      printGraphError();
422
423      throw exception;
424      }
425    }
426
427  public TopologicalOrderIterator<Duct, Integer> getReversedTopologicalOrderIterator()
428    {
429    try
430      {
431      return new TopologicalOrderIterator( getReversedGraph() );
432      }
433    catch( RuntimeException exception )
434      {
435      LOG.error( "failed creating reversed topological iterator", exception );
436      printGraphError();
437
438      throw exception;
439      }
440    }
441
442  public DirectedGraph getReversedGraph()
443    {
444    DuctGraph reversedGraph = new DuctGraph();
445
446    Graphs.addGraphReversed( reversedGraph, ductGraph );
447
448    return reversedGraph;
449    }
450
451  public Collection<Duct> getAllDucts()
452    {
453    return ductGraph.vertexSet();
454    }
455
456  public void printGraphError()
457    {
458    String filename = (String) getProperty( ERROR_DOT_FILE_NAME );
459
460    if( filename == null )
461      return;
462
463    printGraph( filename );
464    }
465
466  public void printGraph( String id, String classifier, int discriminator )
467    {
468    String path = (String) getProperty( DOT_FILE_PATH );
469
470    if( path == null )
471      return;
472
473    classifier = Util.cleansePathName( classifier );
474
475    path = String.format( "%s/streamgraph-%s-%s-%s.dot", path, id, classifier, discriminator );
476
477    printGraph( path );
478    }
479
480  public void printGraph( String filename )
481    {
482    LOG.info( "writing stream graph to {}", filename );
483    Util.printGraph( filename, ductGraph );
484    }
485  }