001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.graph;
022
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Map;
028import java.util.Set;
029
030import cascading.flow.FlowElement;
031import cascading.flow.FlowElements;
032import cascading.flow.planner.ElementGraphException;
033import cascading.flow.planner.PlatformInfo;
034import cascading.flow.planner.Scope;
035import cascading.pipe.Checkpoint;
036import cascading.pipe.Pipe;
037import cascading.pipe.Splice;
038import cascading.pipe.SubAssembly;
039import cascading.tap.Tap;
040import cascading.util.EnumMultiMap;
041import cascading.util.Util;
042import org.jgrapht.Graphs;
043import org.jgrapht.traverse.DepthFirstIterator;
044import org.jgrapht.traverse.TopologicalOrderIterator;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/** Class ElementGraph represents the executable FlowElement graph. */
049public class FlowElementGraph extends ElementDirectedGraph implements AnnotatedGraph
050  {
051  /** Field LOG */
052  private static final Logger LOG = LoggerFactory.getLogger( FlowElementGraph.class );
053
054  /** Field resolved */
055  private boolean resolved;
056  /** Field platformInfo */
057  protected PlatformInfo platformInfo;
058  /** Field sources */
059  protected Map<String, Tap> sources;
060  /** Field sinks */
061  protected Map<String, Tap> sinks;
062  /** Field traps */
063  protected Map<String, Tap> traps;
064  /** Field checkpoints */
065  protected Map<String, Tap> checkpoints;
066  /** Field requireUniqueCheckpoints */
067  private boolean requireUniqueCheckpoints;
068
069  // used for creating isolated test graphs
070  protected FlowElementGraph()
071    {
072    }
073
074  public FlowElementGraph( FlowElementGraph flowElementGraph )
075    {
076    this();
077    this.platformInfo = flowElementGraph.platformInfo;
078    this.sources = flowElementGraph.sources;
079    this.sinks = flowElementGraph.sinks;
080    this.traps = flowElementGraph.traps;
081    this.checkpoints = flowElementGraph.checkpoints;
082    this.requireUniqueCheckpoints = flowElementGraph.requireUniqueCheckpoints;
083
084    if( flowElementGraph.annotations != null )
085      this.annotations = new EnumMultiMap<>( flowElementGraph.annotations );
086
087    copyFrom( flowElementGraph );
088    }
089
090  public FlowElementGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks )
091    {
092    this( null, pipes, sources, sinks, Collections.<String, Tap>emptyMap(), Collections.<String, Tap>emptyMap(), false );
093    }
094
095  /**
096   * Constructor ElementGraph creates a new ElementGraph instance.
097   *
098   * @param pipes   of type Pipe[]
099   * @param sources of type Map<String, Tap>
100   * @param sinks   of type Map<String, Tap>
101   */
102  public FlowElementGraph( PlatformInfo platformInfo, Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Map<String, Tap> checkpoints, boolean requireUniqueCheckpoints )
103    {
104    this();
105    this.platformInfo = platformInfo;
106    this.sources = sources;
107    this.sinks = sinks;
108    this.traps = traps;
109    this.checkpoints = checkpoints;
110    this.requireUniqueCheckpoints = requireUniqueCheckpoints;
111
112    assembleGraph( pipes, sources, sinks );
113
114    verifyGraph();
115    }
116
117  public Map<String, Tap> getSourceMap()
118    {
119    return sources;
120    }
121
122  public Map<String, Tap> getSinkMap()
123    {
124    return sinks;
125    }
126
127  public Map<String, Tap> getTrapMap()
128    {
129    return traps;
130    }
131
132  public Map<String, Tap> getCheckpointsMap()
133    {
134    return checkpoints;
135    }
136
137  public Collection<Tap> getSources()
138    {
139    return sources.values();
140    }
141
142  public Collection<Tap> getSinks()
143    {
144    return sinks.values();
145    }
146
147  public Collection<Tap> getTraps()
148    {
149    return traps.values();
150    }
151
152  protected void initialize( Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails )
153    {
154    this.sources = sources;
155    this.sinks = sinks;
156    this.traps = Util.createHashMap();
157
158    assembleGraph( tails, sources, sinks );
159
160    verifyGraph();
161    }
162
163  private void assembleGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks )
164    {
165    HashMap<String, Tap> sourcesCopy = new HashMap<String, Tap>( sources );
166    HashMap<String, Tap> sinksCopy = new HashMap<String, Tap>( sinks );
167
168    for( Pipe pipe : pipes )
169      makeGraph( pipe, sourcesCopy, sinksCopy );
170
171    addExtents( sources, sinks );
172    }
173
174  private void verifyGraph()
175    {
176    if( vertexSet().isEmpty() )
177      return;
178
179    Set<String> checkpointNames = new HashSet<String>();
180
181    // need to verify that only Extent instances are origins in this graph. Otherwise a Tap was not properly connected
182    TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator();
183
184    FlowElement flowElement = null;
185
186    while( iterator.hasNext() )
187      {
188      try
189        {
190        flowElement = iterator.next();
191        }
192      catch( IllegalArgumentException exception )
193        {
194        if( flowElement == null )
195          throw new ElementGraphException( "unable to traverse to the first element" );
196
197        throw new ElementGraphException( flowElement, "unable to traverse to the next element after " + flowElement );
198        }
199
200      if( requireUniqueCheckpoints && flowElement instanceof Checkpoint )
201        {
202        String name = ( (Checkpoint) flowElement ).getName();
203
204        if( checkpointNames.contains( name ) )
205          throw new ElementGraphException( (Pipe) flowElement, "may not have duplicate checkpoint names in assembly, found: " + name );
206
207        checkpointNames.add( name );
208        }
209
210      if( incomingEdgesOf( flowElement ).size() != 0 && outgoingEdgesOf( flowElement ).size() != 0 )
211        continue;
212
213      if( flowElement instanceof Extent )
214        continue;
215
216      if( flowElement instanceof Pipe )
217        {
218        if( incomingEdgesOf( flowElement ).size() == 0 )
219          throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to head Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" );
220        else
221          throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to tail Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" );
222        }
223
224      if( flowElement instanceof Tap )
225        throw new ElementGraphException( (Tap) flowElement, "no Pipe connected to Tap: " + flowElement );
226      else
227        throw new ElementGraphException( flowElement, "unknown element type: " + flowElement );
228      }
229    }
230
231  protected FlowElementGraph shallowCopyElementGraph()
232    {
233    FlowElementGraph copy = new FlowElementGraph();
234    Graphs.addGraph( copy.graph, this.graph );
235
236    copy.traps = new HashMap<String, Tap>( this.traps );
237
238    return copy;
239    }
240
241  public boolean isResolved()
242    {
243    return resolved;
244    }
245
246  public void setResolved( boolean resolved )
247    {
248    this.resolved = resolved;
249    }
250
251  /**
252   * created to support the ability to generate all paths between the head and tail of the process.
253   *
254   * @param sources
255   * @param sinks
256   */
257  private void addExtents( Map<String, Tap> sources, Map<String, Tap> sinks )
258    {
259    addVertex( Extent.head );
260
261    for( String source : sources.keySet() )
262      {
263      Scope scope = addEdge( Extent.head, sources.get( source ) );
264
265      // edge may already exist, if so, above returns null
266      if( scope != null )
267        scope.setName( source );
268      }
269
270    addVertex( Extent.tail );
271
272    for( String sink : sinks.keySet() )
273      {
274      Scope scope;
275
276      try
277        {
278        scope = addEdge( sinks.get( sink ), Extent.tail );
279        }
280      catch( IllegalArgumentException exception )
281        {
282        throw new ElementGraphException( "missing pipe for sink tap: [" + sink + "]" );
283        }
284
285      if( scope == null )
286        throw new ElementGraphException( "cannot sink to the same path from multiple branches: [" + Util.join( sinks.values() ) + "]" );
287
288      scope.setName( sink );
289      }
290    }
291
292  /**
293   * Performs one rule check, verifies group does not join duplicate tap resources.
294   * <p/>
295   * Scopes are always named after the source side of the source -> target relationship
296   */
297  private void makeGraph( Pipe current, Map<String, Tap> sources, Map<String, Tap> sinks )
298    {
299    LOG.debug( "adding pipe: {}", current );
300
301    if( current instanceof SubAssembly )
302      {
303      for( Pipe pipe : SubAssembly.unwind( current.getPrevious() ) )
304        makeGraph( pipe, sources, sinks );
305
306      return;
307      }
308
309    if( containsVertex( current ) )
310      return;
311
312    addVertex( current );
313
314    Tap sink = sinks.remove( current.getName() );
315
316    if( sink != null )
317      {
318      LOG.debug( "adding sink: {}", sink );
319
320      addVertex( sink );
321
322      LOG.debug( "adding edge: {} -> {}", current, sink );
323
324      addEdge( current, sink ).setName( current.getName() );
325      }
326
327    // PipeAssemblies should always have a previous
328    if( SubAssembly.unwind( current.getPrevious() ).length == 0 )
329      {
330      Tap source = sources.remove( current.getName() );
331
332      if( source != null )
333        {
334        LOG.debug( "adding source: {}", source );
335
336        addVertex( source );
337
338        LOG.debug( "adding edge: {} -> {}", source, current );
339
340        Scope scope = addEdge( source, current );
341
342        scope.setName( current.getName() );
343
344        setOrdinal( source, current, scope );
345        }
346      }
347
348    for( Pipe previous : SubAssembly.unwind( current.getPrevious() ) )
349      {
350      makeGraph( previous, sources, sinks );
351
352      LOG.debug( "adding edge: {} -> ", previous, current );
353
354      if( getEdge( previous, current ) != null )
355        throw new ElementGraphException( previous, "cannot distinguish pipe branches, give pipe unique name: " + previous );
356
357      Scope scope = addEdge( previous, current );
358
359      scope.setName( previous.getName() ); // name scope after previous pipe
360
361      setOrdinal( previous, current, scope );
362      }
363    }
364
365  private void setOrdinal( FlowElement previous, Pipe current, Scope scope )
366    {
367    if( current instanceof Splice )
368      {
369      Splice splice = (Splice) current;
370
371      Integer ordinal;
372
373      if( previous instanceof Tap ) // revert to pipe name
374        ordinal = splice.getPipePos().get( scope.getName() );
375      else // GroupBy allows for duplicate pipe names, this guarantees correct ordinality
376        ordinal = FlowElements.findOrdinal( splice, (Pipe) previous );
377
378      scope.setOrdinal( ordinal );
379
380      Set<Scope> scopes = new HashSet<>( incomingEdgesOf( current ) );
381
382      scopes.remove( scope );
383
384      for( Scope other : scopes )
385        {
386        if( other.getOrdinal() == scope.getOrdinal() )
387          throw new IllegalStateException( "duplicate ordinals" );
388        }
389
390      if( splice.isJoin() && ordinal != 0 )
391        scope.setNonBlocking( false );
392      }
393    }
394
395  /**
396   * Method getTopologicalIterator returns the topologicalIterator of this ElementGraph object.
397   *
398   * @return the topologicalIterator (type TopologicalOrderIterator<FlowElement, Scope>) of this ElementGraph object.
399   */
400  public TopologicalOrderIterator<FlowElement, Scope> getTopologicalIterator()
401    {
402    return new TopologicalOrderIterator<>( this.graph );
403    }
404
405  /**
406   * Method getDepthFirstIterator returns the depthFirstIterator of this ElementGraph object.
407   *
408   * @return the depthFirstIterator (type DepthFirstIterator<FlowElement, Scope>) of this ElementGraph object.
409   */
410  public DepthFirstIterator<FlowElement, Scope> getDepthFirstIterator()
411    {
412    return new DepthFirstIterator<>( this.graph, Extent.head );
413    }
414
415  private BaseElementGraph copyWithTraps()
416    {
417    FlowElementGraph copy = shallowCopyElementGraph();
418
419    copy.addTrapsToGraph();
420
421    return copy;
422    }
423
424  private void addTrapsToGraph()
425    {
426    DepthFirstIterator<FlowElement, Scope> iterator = getDepthFirstIterator();
427
428    while( iterator.hasNext() )
429      {
430      FlowElement element = iterator.next();
431
432      if( !( element instanceof Pipe ) )
433        continue;
434
435      Pipe pipe = (Pipe) element;
436      Tap trap = traps.get( pipe.getName() );
437
438      if( trap == null )
439        continue;
440
441      addVertex( trap );
442
443      if( LOG.isDebugEnabled() )
444        LOG.debug( "adding trap edge: " + pipe + " -> " + trap );
445
446      if( getEdge( pipe, trap ) != null )
447        continue;
448
449      addEdge( pipe, trap ).setName( pipe.getName() ); // name scope after previous pipe
450      }
451    }
452
453  /**
454   * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging.
455   *
456   * @param filename of type String
457   */
458  @Override
459  public void writeDOT( String filename )
460    {
461    boolean success = ElementGraphs.printElementGraph( filename, this.copyWithTraps(), platformInfo );
462
463    if( success )
464      Util.writePDF( filename );
465    }
466
467  @Override
468  public ElementGraph copyElementGraph()
469    {
470    return new FlowElementGraph( this );
471    }
472  }