001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.graph;
022
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030
031import cascading.flow.FlowElement;
032import cascading.flow.FlowElements;
033import cascading.flow.planner.ElementGraphException;
034import cascading.flow.planner.PlatformInfo;
035import cascading.flow.planner.Scope;
036import cascading.pipe.Checkpoint;
037import cascading.pipe.Pipe;
038import cascading.pipe.Splice;
039import cascading.pipe.SubAssembly;
040import cascading.tap.Tap;
041import cascading.util.EnumMultiMap;
042import cascading.util.Util;
043import org.jgrapht.Graphs;
044import org.jgrapht.traverse.DepthFirstIterator;
045import org.jgrapht.traverse.TopologicalOrderIterator;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/** Class ElementGraph represents the executable FlowElement graph. */
050public class FlowElementGraph extends ElementDirectedGraph implements AnnotatedGraph
051  {
052  /** Field LOG */
053  private static final Logger LOG = LoggerFactory.getLogger( FlowElementGraph.class );
054
055  /** Field resolved */
056  private boolean resolved;
057  /** Field platformInfo */
058  private PlatformInfo platformInfo;
059  /** Field sources */
060  private Map<String, Tap> sources;
061  /** Field sinks */
062  private Map<String, Tap> sinks;
063  /** Field traps */
064  private Map<String, Tap> traps;
065  /** Field checkpoints */
066  private Map<String, Tap> checkpoints;
067  /** Field requireUniqueCheckpoints */
068  private boolean requireUniqueCheckpoints;
069
070  // used for creating isolated test graphs
071  protected FlowElementGraph()
072    {
073    }
074
075  public FlowElementGraph( FlowElementGraph flowElementGraph )
076    {
077    this();
078    this.platformInfo = flowElementGraph.platformInfo;
079    this.sources = flowElementGraph.sources;
080    this.sinks = flowElementGraph.sinks;
081    this.traps = flowElementGraph.traps;
082    this.checkpoints = flowElementGraph.checkpoints;
083    this.requireUniqueCheckpoints = flowElementGraph.requireUniqueCheckpoints;
084
085    if( flowElementGraph.annotations != null )
086      this.annotations = new EnumMultiMap( flowElementGraph.annotations );
087
088    Graphs.addAllVertices( this.graph, flowElementGraph.vertexSet() );
089    Graphs.addAllEdges( this.graph, flowElementGraph.graph, flowElementGraph.edgeSet() );
090    }
091
092  public FlowElementGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks )
093    {
094    this( null, pipes, sources, sinks, Collections.<String, Tap>emptyMap(), Collections.<String, Tap>emptyMap(), false );
095    }
096
097  /**
098   * Constructor ElementGraph creates a new ElementGraph instance.
099   *
100   * @param pipes   of type Pipe[]
101   * @param sources of type Map<String, Tap>
102   * @param sinks   of type Map<String, Tap>
103   */
104  public FlowElementGraph( PlatformInfo platformInfo, Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Map<String, Tap> checkpoints, boolean requireUniqueCheckpoints )
105    {
106    this();
107    this.platformInfo = platformInfo;
108    this.sources = sources;
109    this.sinks = sinks;
110    this.traps = traps;
111    this.checkpoints = checkpoints;
112    this.requireUniqueCheckpoints = requireUniqueCheckpoints;
113
114    assembleGraph( pipes, sources, sinks );
115
116    verifyGraph();
117    }
118
119  public Map<String, Tap> getSourceMap()
120    {
121    return sources;
122    }
123
124  public Map<String, Tap> getSinkMap()
125    {
126    return sinks;
127    }
128
129  public Map<String, Tap> getTrapMap()
130    {
131    return traps;
132    }
133
134  public Map<String, Tap> getCheckpointsMap()
135    {
136    return checkpoints;
137    }
138
139  public Collection<Tap> getSources()
140    {
141    return sources.values();
142    }
143
144  public Collection<Tap> getSinks()
145    {
146    return sinks.values();
147    }
148
149  public Collection<Tap> getTraps()
150    {
151    return traps.values();
152    }
153
154  protected void initialize( Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails )
155    {
156    this.sources = sources;
157    this.sinks = sinks;
158    this.traps = Util.createHashMap();
159
160    assembleGraph( tails, sources, sinks );
161
162    verifyGraph();
163    }
164
165  private void assembleGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks )
166    {
167    HashMap<String, Tap> sourcesCopy = new HashMap<String, Tap>( sources );
168    HashMap<String, Tap> sinksCopy = new HashMap<String, Tap>( sinks );
169
170    for( Pipe pipe : pipes )
171      makeGraph( pipe, sourcesCopy, sinksCopy );
172
173    addExtents( sources, sinks );
174    }
175
176  private void verifyGraph()
177    {
178    if( vertexSet().isEmpty() )
179      return;
180
181    Set<String> checkpointNames = new HashSet<String>();
182
183    // need to verify that only Extent instances are origins in this graph. Otherwise a Tap was not properly connected
184    TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator();
185
186    FlowElement flowElement = null;
187
188    while( iterator.hasNext() )
189      {
190      try
191        {
192        flowElement = iterator.next();
193        }
194      catch( IllegalArgumentException exception )
195        {
196        if( flowElement == null )
197          throw new ElementGraphException( "unable to traverse to the first element" );
198
199        throw new ElementGraphException( flowElement, "unable to traverse to the next element after " + flowElement );
200        }
201
202      if( requireUniqueCheckpoints && flowElement instanceof Checkpoint )
203        {
204        String name = ( (Checkpoint) flowElement ).getName();
205
206        if( checkpointNames.contains( name ) )
207          throw new ElementGraphException( (Pipe) flowElement, "may not have duplicate checkpoint names in assembly, found: " + name );
208
209        checkpointNames.add( name );
210        }
211
212      if( incomingEdgesOf( flowElement ).size() != 0 && outgoingEdgesOf( flowElement ).size() != 0 )
213        continue;
214
215      if( flowElement instanceof Extent )
216        continue;
217
218      if( flowElement instanceof Pipe )
219        {
220        if( incomingEdgesOf( flowElement ).size() == 0 )
221          throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to head Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" );
222        else
223          throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to tail Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" );
224        }
225
226      if( flowElement instanceof Tap )
227        throw new ElementGraphException( (Tap) flowElement, "no Pipe connected to Tap: " + flowElement );
228      else
229        throw new ElementGraphException( flowElement, "unknown element type: " + flowElement );
230      }
231    }
232
233  protected FlowElementGraph shallowCopyElementGraph()
234    {
235    FlowElementGraph copy = new FlowElementGraph();
236    Graphs.addGraph( copy.graph, this.graph );
237
238    copy.traps = new HashMap<String, Tap>( this.traps );
239
240    return copy;
241    }
242
243  /**
244   * created to support the ability to generate all paths between the head and tail of the process.
245   *
246   * @param sources
247   * @param sinks
248   */
249  private void addExtents( Map<String, Tap> sources, Map<String, Tap> sinks )
250    {
251    addVertex( Extent.head );
252
253    for( String source : sources.keySet() )
254      {
255      Scope scope = addEdge( Extent.head, sources.get( source ) );
256
257      // edge may already exist, if so, above returns null
258      if( scope != null )
259        scope.setName( source );
260      }
261
262    addVertex( Extent.tail );
263
264    for( String sink : sinks.keySet() )
265      {
266      Scope scope;
267
268      try
269        {
270        scope = addEdge( sinks.get( sink ), Extent.tail );
271        }
272      catch( IllegalArgumentException exception )
273        {
274        throw new ElementGraphException( "missing pipe for sink tap: [" + sink + "]" );
275        }
276
277      if( scope == null )
278        throw new ElementGraphException( "cannot sink to the same path from multiple branches: [" + Util.join( sinks.values() ) + "]" );
279
280      scope.setName( sink );
281      }
282    }
283
284  /**
285   * Performs one rule check, verifies group does not join duplicate tap resources.
286   * <p/>
287   * Scopes are always named after the source side of the source -> target relationship
288   */
289  private void makeGraph( Pipe current, Map<String, Tap> sources, Map<String, Tap> sinks )
290    {
291    LOG.debug( "adding pipe: {}", current );
292
293    if( current instanceof SubAssembly )
294      {
295      for( Pipe pipe : SubAssembly.unwind( current.getPrevious() ) )
296        makeGraph( pipe, sources, sinks );
297
298      return;
299      }
300
301    if( containsVertex( current ) )
302      return;
303
304    addVertex( current );
305
306    Tap sink = sinks.remove( current.getName() );
307
308    if( sink != null )
309      {
310      LOG.debug( "adding sink: {}", sink );
311
312      addVertex( sink );
313
314      LOG.debug( "adding edge: {} -> {}", current, sink );
315
316      addEdge( current, sink ).setName( current.getName() );
317      }
318
319    // PipeAssemblies should always have a previous
320    if( SubAssembly.unwind( current.getPrevious() ).length == 0 )
321      {
322      Tap source = sources.remove( current.getName() );
323
324      if( source != null )
325        {
326        LOG.debug( "adding source: {}", source );
327
328        addVertex( source );
329
330        LOG.debug( "adding edge: {} -> {}", source, current );
331
332        Scope scope = addEdge( source, current );
333
334        scope.setName( current.getName() );
335
336        setOrdinal( source, current, scope );
337        }
338      }
339
340    for( Pipe previous : SubAssembly.unwind( current.getPrevious() ) )
341      {
342      makeGraph( previous, sources, sinks );
343
344      LOG.debug( "adding edge: {} -> ", previous, current );
345
346      if( getEdge( previous, current ) != null )
347        throw new ElementGraphException( previous, "cannot distinguish pipe branches, give pipe unique name: " + previous );
348
349      Scope scope = addEdge( previous, current );
350
351      scope.setName( previous.getName() ); // name scope after previous pipe
352
353      setOrdinal( previous, current, scope );
354      }
355    }
356
357  private void setOrdinal( FlowElement previous, Pipe current, Scope scope )
358    {
359    if( current instanceof Splice )
360      {
361      Splice splice = (Splice) current;
362
363      Integer ordinal;
364
365      if( previous instanceof Tap ) // revert to pipe name
366        ordinal = splice.getPipePos().get( scope.getName() );
367      else // GroupBy allows for duplicate pipe names, this guarantees correct ordinality
368        ordinal = FlowElements.findOrdinal( splice, (Pipe) previous );
369
370      scope.setOrdinal( ordinal );
371
372      Set<Scope> scopes = new HashSet<>( incomingEdgesOf( current ) );
373
374      scopes.remove( scope );
375
376      for( Scope other : scopes )
377        {
378        if( other.getOrdinal() == scope.getOrdinal() )
379          throw new IllegalStateException( "duplicate ordinals" );
380        }
381
382      if( splice.isJoin() && ordinal != 0 )
383        scope.setNonBlocking( false );
384      }
385    }
386
387  /**
388   * Method getTopologicalIterator returns the topologicalIterator of this ElementGraph object.
389   *
390   * @return the topologicalIterator (type TopologicalOrderIterator<FlowElement, Scope>) of this ElementGraph object.
391   */
392  public TopologicalOrderIterator<FlowElement, Scope> getTopologicalIterator()
393    {
394    return new TopologicalOrderIterator<>( this.graph );
395    }
396
397  /**
398   * Method getDepthFirstIterator returns the depthFirstIterator of this ElementGraph object.
399   *
400   * @return the depthFirstIterator (type DepthFirstIterator<FlowElement, Scope>) of this ElementGraph object.
401   */
402  public DepthFirstIterator<FlowElement, Scope> getDepthFirstIterator()
403    {
404    return new DepthFirstIterator<>( this.graph, Extent.head );
405    }
406
407  private BaseElementGraph copyWithTraps()
408    {
409    FlowElementGraph copy = shallowCopyElementGraph();
410
411    copy.addTrapsToGraph();
412
413    return copy;
414    }
415
416  private void addTrapsToGraph()
417    {
418    DepthFirstIterator<FlowElement, Scope> iterator = getDepthFirstIterator();
419
420    while( iterator.hasNext() )
421      {
422      FlowElement element = iterator.next();
423
424      if( !( element instanceof Pipe ) )
425        continue;
426
427      Pipe pipe = (Pipe) element;
428      Tap trap = traps.get( pipe.getName() );
429
430      if( trap == null )
431        continue;
432
433      addVertex( trap );
434
435      if( LOG.isDebugEnabled() )
436        LOG.debug( "adding trap edge: " + pipe + " -> " + trap );
437
438      if( getEdge( pipe, trap ) != null )
439        continue;
440
441      addEdge( pipe, trap ).setName( pipe.getName() ); // name scope after previous pipe
442      }
443    }
444
445  /**
446   * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging.
447   *
448   * @param filename of type String
449   */
450  @Override
451  public void writeDOT( String filename )
452    {
453    boolean success = ElementGraphs.printElementGraph( filename, this.copyWithTraps(), platformInfo );
454
455    if( success )
456      Util.writePDF( filename );
457    }
458
459  /** Method resolveFields performs a breadth first traversal and resolves the tuple fields between each Pipe instance. */
460  public void resolveFields()
461    {
462    if( resolved )
463      throw new IllegalStateException( "element graph already resolved" );
464
465    TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator();
466
467    while( iterator.hasNext() )
468      resolveFields( iterator.next() );
469
470    resolved = true;
471    }
472
473  private void resolveFields( FlowElement source )
474    {
475    if( source instanceof Extent )
476      return;
477
478    Set<Scope> incomingScopes = incomingEdgesOf( source );
479    Set<Scope> outgoingScopes = outgoingEdgesOf( source );
480
481    List<FlowElement> flowElements = successorListOf( source );
482
483    if( flowElements.size() == 0 )
484      throw new IllegalStateException( "unable to find next elements in pipeline from: " + source.toString() );
485
486    Scope outgoingScope = source.outgoingScopeFor( incomingScopes );
487
488    if( LOG.isDebugEnabled() && outgoingScope != null )
489      {
490      LOG.debug( "for modifier: " + source );
491      if( outgoingScope.getArgumentsSelector() != null )
492        LOG.debug( "setting outgoing arguments: " + outgoingScope.getArgumentsSelector() );
493      if( outgoingScope.getOperationDeclaredFields() != null )
494        LOG.debug( "setting outgoing declared: " + outgoingScope.getOperationDeclaredFields() );
495      if( outgoingScope.getKeySelectors() != null )
496        LOG.debug( "setting outgoing group: " + outgoingScope.getKeySelectors() );
497      if( outgoingScope.getOutValuesSelector() != null )
498        LOG.debug( "setting outgoing values: " + outgoingScope.getOutValuesSelector() );
499      }
500
501    for( Scope scope : outgoingScopes )
502      scope.copyFields( outgoingScope );
503    }
504
505  @Override
506  public ElementGraph copyElementGraph()
507    {
508    return new FlowElementGraph( this );
509    }
510  }