001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner.graph;
023
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Map;
029import java.util.Set;
030
031import cascading.flow.FlowElement;
032import cascading.flow.FlowElements;
033import cascading.flow.planner.ElementGraphException;
034import cascading.flow.planner.PlatformInfo;
035import cascading.flow.planner.Scope;
036import cascading.pipe.Checkpoint;
037import cascading.pipe.Pipe;
038import cascading.pipe.Splice;
039import cascading.pipe.SubAssembly;
040import cascading.tap.Tap;
041import cascading.util.EnumMultiMap;
042import cascading.util.Util;
043import org.jgrapht.Graphs;
044import org.jgrapht.traverse.DepthFirstIterator;
045import org.jgrapht.traverse.TopologicalOrderIterator;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/** Class ElementGraph represents the executable FlowElement graph. */
050public class FlowElementGraph extends ElementMultiGraph implements AnnotatedGraph
051  {
052  /** Field LOG */
053  private static final Logger LOG = LoggerFactory.getLogger( FlowElementGraph.class );
054
055  /** Field resolved */
056  private boolean resolved;
057  /** Field platformInfo */
058  protected PlatformInfo platformInfo;
059  /** Field sources */
060  protected Map<String, Tap> sources;
061  /** Field sinks */
062  protected Map<String, Tap> sinks;
063  /** Field traps */
064  protected Map<String, Tap> traps;
065  /** Field checkpoints */
066  protected Map<String, Tap> checkpoints;
067  /** Field requireUniqueCheckpoints */
068  private boolean requireUniqueCheckpoints;
069
070  // used for creating isolated test graphs
071  protected FlowElementGraph()
072    {
073    }
074
075  public FlowElementGraph( FlowElementGraph flowElementGraph )
076    {
077    this();
078    this.platformInfo = flowElementGraph.platformInfo;
079    this.sources = flowElementGraph.sources;
080    this.sinks = flowElementGraph.sinks;
081    this.traps = flowElementGraph.traps;
082    this.checkpoints = flowElementGraph.checkpoints;
083    this.requireUniqueCheckpoints = flowElementGraph.requireUniqueCheckpoints;
084
085    if( flowElementGraph.annotations != null )
086      this.annotations = new EnumMultiMap<>( flowElementGraph.annotations );
087
088    copyFrom( flowElementGraph );
089    }
090
091  public FlowElementGraph( PlatformInfo platformInfo, ElementGraph elementGraph, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Map<String, Tap> checkpoints )
092    {
093    this();
094    this.platformInfo = platformInfo;
095
096    if( elementGraph == null )
097      elementGraph = BaseElementGraph.NULL;
098
099    if( sources == null || sources.isEmpty() )
100      throw new IllegalArgumentException( "sources may not be null or empty" );
101
102    if( sinks == null || sinks.isEmpty() )
103      throw new IllegalArgumentException( "sinks may not be null or empty" );
104
105    this.sources = new HashMap<>( sources );
106    this.sinks = new HashMap<>( sinks );
107    this.traps = new HashMap<>( traps == null ? Collections.<String, Tap>emptyMap() : traps );
108    this.checkpoints = new HashMap<>( checkpoints == null ? Collections.<String, Tap>emptyMap() : checkpoints );
109
110    EnumMultiMap<FlowElement> annotations = ElementGraphs.annotations( elementGraph );
111
112    if( annotations != null )
113      this.annotations = new EnumMultiMap<>( annotations );
114
115    // prevents multiple edge from head and to tail extents
116    copyFrom( ElementGraphs.asExtentMaskedSubGraph( elementGraph ) );
117
118    bindExtents();
119    }
120
121  public FlowElementGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks )
122    {
123    this( null, pipes, sources, sinks, Collections.<String, Tap>emptyMap(), Collections.<String, Tap>emptyMap(), false );
124    }
125
126  /**
127   * Constructor ElementGraph creates a new ElementGraph instance.
128   *
129   * @param pipes   of type Pipe[]
130   * @param sources of type Map<String, Tap>
131   * @param sinks   of type Map<String, Tap>
132   */
133  public FlowElementGraph( PlatformInfo platformInfo, Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Map<String, Tap> checkpoints, boolean requireUniqueCheckpoints )
134    {
135    this();
136    this.platformInfo = platformInfo;
137    this.sources = sources;
138    this.sinks = sinks;
139    this.traps = traps;
140    this.checkpoints = checkpoints;
141    this.requireUniqueCheckpoints = requireUniqueCheckpoints;
142
143    assembleGraph( pipes, sources, sinks );
144
145    verifyGraph();
146    }
147
148  public Map<String, Tap> getSourceMap()
149    {
150    return sources;
151    }
152
153  public Map<String, Tap> getSinkMap()
154    {
155    return sinks;
156    }
157
158  public Map<String, Tap> getTrapMap()
159    {
160    return traps;
161    }
162
163  public Map<String, Tap> getCheckpointsMap()
164    {
165    return checkpoints;
166    }
167
168  public Collection<Tap> getSources()
169    {
170    return sources.values();
171    }
172
173  public Collection<Tap> getSinks()
174    {
175    return sinks.values();
176    }
177
178  public Collection<Tap> getTraps()
179    {
180    return traps.values();
181    }
182
183  protected void initialize( Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails )
184    {
185    this.sources = sources;
186    this.sinks = sinks;
187    this.traps = Util.createHashMap();
188
189    assembleGraph( tails, sources, sinks );
190
191    verifyGraph();
192    }
193
194  private void assembleGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks )
195    {
196    HashMap<String, Tap> sourcesCopy = new HashMap<String, Tap>( sources );
197    HashMap<String, Tap> sinksCopy = new HashMap<String, Tap>( sinks );
198
199    for( Pipe pipe : pipes )
200      makeGraph( pipe, sourcesCopy, sinksCopy );
201
202    addExtents( sources, sinks );
203    }
204
205  private void verifyGraph()
206    {
207    if( vertexSet().isEmpty() )
208      return;
209
210    Set<String> checkpointNames = new HashSet<String>();
211
212    // need to verify that only Extent instances are origins in this graph. Otherwise a Tap was not properly connected
213    TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator();
214
215    FlowElement flowElement = null;
216
217    while( iterator.hasNext() )
218      {
219      try
220        {
221        flowElement = iterator.next();
222        }
223      catch( IllegalArgumentException exception )
224        {
225        if( flowElement == null )
226          throw new ElementGraphException( "unable to traverse to the first element" );
227
228        throw new ElementGraphException( flowElement, "unable to traverse to the next element after " + flowElement );
229        }
230
231      if( requireUniqueCheckpoints && flowElement instanceof Checkpoint )
232        {
233        String name = ( (Checkpoint) flowElement ).getName();
234
235        if( checkpointNames.contains( name ) )
236          throw new ElementGraphException( (Pipe) flowElement, "may not have duplicate checkpoint names in assembly, found: " + name );
237
238        checkpointNames.add( name );
239        }
240
241      if( incomingEdgesOf( flowElement ).size() != 0 && outgoingEdgesOf( flowElement ).size() != 0 )
242        continue;
243
244      if( flowElement instanceof Extent )
245        continue;
246
247      if( flowElement instanceof Pipe )
248        {
249        if( incomingEdgesOf( flowElement ).size() == 0 )
250          throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to head Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" );
251        else
252          throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to tail Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" );
253        }
254
255      if( flowElement instanceof Tap )
256        throw new ElementGraphException( (Tap) flowElement, "no Pipe connected to Tap: " + flowElement );
257      else
258        throw new ElementGraphException( flowElement, "unknown element type: " + flowElement );
259      }
260    }
261
262  protected FlowElementGraph shallowCopyElementGraph()
263    {
264    FlowElementGraph copy = new FlowElementGraph();
265    Graphs.addGraph( copy.graph, this.graph );
266
267    copy.traps = new HashMap<String, Tap>( this.traps );
268
269    return copy;
270    }
271
272  public boolean isResolved()
273    {
274    return resolved;
275    }
276
277  public void setResolved( boolean resolved )
278    {
279    this.resolved = resolved;
280    }
281
282  @Override
283  protected boolean allowMultipleExtentEdges()
284    {
285    return false;
286    }
287
288  /**
289   * created to support the ability to generate all paths between the head and tail of the process.
290   *
291   * @param sources
292   * @param sinks
293   */
294  private void addExtents( Map<String, Tap> sources, Map<String, Tap> sinks )
295    {
296    addVertex( Extent.head );
297
298    for( String source : sources.keySet() )
299      {
300      Scope scope = addEdge( Extent.head, sources.get( source ) );
301
302      // edge may already exist, if so, above returns null
303      if( scope != null )
304        scope.setName( source );
305      }
306
307    addVertex( Extent.tail );
308
309    for( String sink : sinks.keySet() )
310      {
311      Scope scope;
312
313      try
314        {
315        scope = addEdge( sinks.get( sink ), Extent.tail );
316        }
317      catch( IllegalArgumentException exception )
318        {
319        throw new ElementGraphException( "missing pipe for sink tap: [" + sink + "]" );
320        }
321
322      if( scope == null )
323        throw new ElementGraphException( "cannot sink to the same path from multiple branches: [" + Util.join( sinks.values() ) + "]" );
324
325      scope.setName( sink );
326      }
327    }
328
329  /**
330   * Performs one rule check, verifies group does not join duplicate tap resources.
331   * <p/>
332   * Scopes are always named after the source side of the source -> target relationship
333   */
334  private void makeGraph( Pipe current, Map<String, Tap> sources, Map<String, Tap> sinks )
335    {
336    LOG.debug( "adding pipe: {}", current );
337
338    if( current instanceof SubAssembly )
339      {
340      for( Pipe pipe : SubAssembly.unwind( current.getPrevious() ) )
341        makeGraph( pipe, sources, sinks );
342
343      return;
344      }
345
346    if( containsVertex( current ) )
347      return;
348
349    addVertex( current );
350
351    Tap sink = sinks.remove( current.getName() );
352
353    if( sink != null )
354      {
355      LOG.debug( "adding sink: {}", sink );
356
357      addVertex( sink );
358
359      LOG.debug( "adding edge: {} -> {}", current, sink );
360
361      addEdge( current, sink ).setName( current.getName() );
362      }
363
364    // PipeAssemblies should always have a previous
365    if( SubAssembly.unwind( current.getPrevious() ).length == 0 )
366      {
367      Tap source = sources.remove( current.getName() );
368
369      if( source != null )
370        {
371        LOG.debug( "adding source: {}", source );
372
373        addVertex( source );
374
375        LOG.debug( "adding edge: {} -> {}", source, current );
376
377        Scope scope = addEdge( source, current );
378
379        scope.setName( current.getName() );
380
381        setOrdinal( source, current, scope );
382        }
383      }
384
385    for( Pipe previous : SubAssembly.unwind( current.getPrevious() ) )
386      {
387      makeGraph( previous, sources, sinks );
388
389      LOG.debug( "adding edge: {} -> ", previous, current );
390
391      if( getEdge( previous, current ) != null )
392        throw new ElementGraphException( previous, "cannot distinguish pipe branches, give pipe unique name: " + previous );
393
394      Scope scope = addEdge( previous, current );
395
396      scope.setName( previous.getName() ); // name scope after previous pipe
397
398      setOrdinal( previous, current, scope );
399      }
400    }
401
402  private void setOrdinal( FlowElement previous, Pipe current, Scope scope )
403    {
404    if( current instanceof Splice )
405      {
406      Splice splice = (Splice) current;
407
408      Integer ordinal;
409
410      if( previous instanceof Tap ) // revert to pipe name
411        ordinal = splice.getPipePos().get( scope.getName() );
412      else // GroupBy allows for duplicate pipe names, this guarantees correct ordinality
413        ordinal = FlowElements.findOrdinal( splice, (Pipe) previous );
414
415      scope.setOrdinal( ordinal );
416
417      Set<Scope> scopes = new HashSet<>( incomingEdgesOf( current ) );
418
419      scopes.remove( scope );
420
421      for( Scope other : scopes )
422        {
423        if( other.getOrdinal() == scope.getOrdinal() )
424          throw new IllegalStateException( "duplicate ordinals" );
425        }
426
427      if( splice.isJoin() && ordinal != 0 )
428        scope.setNonBlocking( false );
429      }
430    }
431
432  /**
433   * Method getTopologicalIterator returns the topologicalIterator of this ElementGraph object.
434   *
435   * @return the topologicalIterator (type TopologicalOrderIterator<FlowElement, Scope>) of this ElementGraph object.
436   */
437  public TopologicalOrderIterator<FlowElement, Scope> getTopologicalIterator()
438    {
439    return new TopologicalOrderIterator<>( this.graph );
440    }
441
442  /**
443   * Method getDepthFirstIterator returns the depthFirstIterator of this ElementGraph object.
444   *
445   * @return the depthFirstIterator (type DepthFirstIterator<FlowElement, Scope>) of this ElementGraph object.
446   */
447  public DepthFirstIterator<FlowElement, Scope> getDepthFirstIterator()
448    {
449    return new DepthFirstIterator<>( this.graph, Extent.head );
450    }
451
452  private BaseElementGraph copyWithTraps()
453    {
454    FlowElementGraph copy = shallowCopyElementGraph();
455
456    copy.addTrapsToGraph();
457
458    return copy;
459    }
460
461  private void addTrapsToGraph()
462    {
463    DepthFirstIterator<FlowElement, Scope> iterator = getDepthFirstIterator();
464
465    while( iterator.hasNext() )
466      {
467      FlowElement element = iterator.next();
468
469      if( !( element instanceof Pipe ) )
470        continue;
471
472      Pipe pipe = (Pipe) element;
473      Tap trap = traps.get( pipe.getName() );
474
475      if( trap == null )
476        continue;
477
478      addVertex( trap );
479
480      if( LOG.isDebugEnabled() )
481        LOG.debug( "adding trap edge: " + pipe + " -> " + trap );
482
483      if( getEdge( pipe, trap ) != null )
484        continue;
485
486      addEdge( pipe, trap ).setName( pipe.getName() ); // name scope after previous pipe
487      }
488    }
489
490  /**
491   * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging.
492   *
493   * @param filename of type String
494   */
495  @Override
496  public void writeDOT( String filename )
497    {
498    boolean success = ElementGraphs.printElementGraph( filename, this.copyWithTraps(), platformInfo );
499
500    if( success )
501      Util.writePDF( filename );
502    }
503
504  @Override
505  public ElementGraph copyElementGraph()
506    {
507    return new FlowElementGraph( this );
508    }
509  }