001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner;
022
023import java.io.IOException;
024import java.io.Serializable;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.Date;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036
037import cascading.flow.Flow;
038import cascading.flow.FlowElement;
039import cascading.flow.FlowException;
040import cascading.flow.FlowNode;
041import cascading.flow.FlowProcess;
042import cascading.flow.FlowStep;
043import cascading.flow.FlowStepListener;
044import cascading.flow.planner.graph.AnnotatedGraph;
045import cascading.flow.planner.graph.ElementGraph;
046import cascading.flow.planner.graph.ElementGraphs;
047import cascading.flow.planner.process.FlowNodeGraph;
048import cascading.flow.stream.annotations.StreamMode;
049import cascading.management.CascadingServices;
050import cascading.management.state.ClientState;
051import cascading.operation.Operation;
052import cascading.pipe.Group;
053import cascading.pipe.Operator;
054import cascading.pipe.Pipe;
055import cascading.pipe.SubAssembly;
056import cascading.property.ConfigDef;
057import cascading.stats.FlowStepStats;
058import cascading.tap.Tap;
059import cascading.util.EnumMultiMap;
060import cascading.util.ProcessLogger;
061import cascading.util.Util;
062
063import static cascading.flow.planner.graph.ElementGraphs.findAllGroups;
064
065/**
066 * Class FlowStep is an internal representation of a given Job to be executed on a remote cluster. During
067 * planning, pipe assemblies are broken down into "steps" and encapsulated in this class.
068 * <p/>
069 * FlowSteps are submitted in order of dependency. If two or more steps do not share the same dependencies and all
070 * can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines the order in which
071 * all steps will be submitted for execution. The default submit priority is 5.
072 * <p/>
073 * This class is for internal use, there are no stable public methods.
074 */
075public abstract class BaseFlowStep<Config> implements FlowStep<Config>, ProcessLogger, Serializable
076  {
077  /** Field flow */
078  private transient Flow<Config> flow;
079  /** Field flowName */
080  private String flowName;
081  /** Field flowID */
082  private String flowID;
083
084  private transient Config flowStepConf;
085
086  /** Field submitPriority */
087  private int submitPriority = 5;
088
089  /** Field name */
090  String name;
091  private String id;
092  private int ordinal;
093  private Map<String, String> processAnnotations;
094
095  /** Field step listeners */
096  private List<SafeFlowStepListener> listeners;
097
098  /** Field graph */
099  private final ElementGraph elementGraph;
100
101  private FlowNodeGraph flowNodeGraph;
102
103  /** Field sources */
104  protected final Map<Tap, Set<String>> sources = new HashMap<>(); // all sources
105  /** Field sink */
106  protected final Map<Tap, Set<String>> sinks = new HashMap<>(); // all sinks
107
108  /** Field mapperTraps */
109  private final Map<String, Tap> traps = new HashMap<>();
110
111  /** Field tempSink */
112  protected Tap tempSink; // used if we need to bypass the filesystem
113
114  /** Field groups */
115  private final List<Group> groups = new ArrayList<Group>();
116
117  protected transient FlowStepStats flowStepStats;
118
119  private transient FlowStepJob<Config> flowStepJob;
120
121  /** optional metadata about the FlowStep */
122  private Map<String, String> flowStepDescriptor = Collections.emptyMap();
123
124  protected BaseFlowStep( String name, int ordinal )
125    {
126    this( name, ordinal, null );
127    }
128
129  protected BaseFlowStep( String name, int ordinal, Map<String, String> flowStepDescriptor )
130    {
131    this( name, ordinal, null, flowStepDescriptor );
132    }
133
134  protected BaseFlowStep( String name, int ordinal, FlowNodeGraph flowNodeGraph, Map<String, String> flowStepDescriptor )
135    {
136    this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number
137    setName( name );
138    this.ordinal = ordinal;
139
140    this.elementGraph = null;
141    this.flowNodeGraph = flowNodeGraph;
142
143    setFlowStepDescriptor( flowStepDescriptor );
144    }
145
146  protected BaseFlowStep( ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph )
147    {
148    this( elementStepGraph, flowNodeGraph, null );
149    }
150
151  protected BaseFlowStep( ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph, Map<String, String> flowStepDescriptor )
152    {
153    this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number
154    this.elementGraph = elementStepGraph;
155    this.flowNodeGraph = flowNodeGraph; // TODO: verify no missing elements in the union of the node graphs
156
157    setFlowStepDescriptor( flowStepDescriptor );
158
159    configure();
160    }
161
162  protected void configure()
163    {
164    // todo: remove once FlowMapper/FlowReducer aren't reliant
165    addSources( this, elementGraph, flowNodeGraph.getSourceTaps() );
166    addSinks( this, elementGraph, flowNodeGraph.getSinkTaps() );
167
168    addAllGroups();
169
170    traps.putAll( flowNodeGraph.getTrapsMap() );
171    }
172
173  protected void addAllGroups()
174    {
175    addGroups( findAllGroups( elementGraph ) );
176    }
177
178  @Override
179  public String getID()
180    {
181    return id;
182    }
183
184  public void setOrdinal( int ordinal )
185    {
186    this.ordinal = ordinal;
187    }
188
189  @Override
190  public int getOrdinal()
191    {
192    return ordinal;
193    }
194
195  @Override
196  public String getName()
197    {
198    return name;
199    }
200
201  public void setName( String name )
202    {
203    if( name == null || name.isEmpty() )
204      throw new IllegalArgumentException( "step name may not be null or empty" );
205
206    this.name = name;
207    }
208
209  @Override
210  public Map<String, String> getFlowStepDescriptor()
211    {
212    return Collections.unmodifiableMap( flowStepDescriptor );
213    }
214
215  protected void setFlowStepDescriptor( Map<String, String> flowStepDescriptor )
216    {
217    if( flowStepDescriptor != null )
218      this.flowStepDescriptor = flowStepDescriptor;
219    }
220
221  @Override
222  public Map<String, String> getProcessAnnotations()
223    {
224    if( processAnnotations == null )
225      return Collections.emptyMap();
226
227    return Collections.unmodifiableMap( processAnnotations );
228    }
229
230  @Override
231  public void addProcessAnnotation( Enum annotation )
232    {
233    if( annotation == null )
234      return;
235
236    addProcessAnnotation( annotation.getDeclaringClass().getName(), annotation.name() );
237    }
238
239  @Override
240  public void addProcessAnnotation( String key, String value )
241    {
242    if( processAnnotations == null )
243      processAnnotations = new HashMap<>();
244
245    processAnnotations.put( key, value );
246    }
247
248  public void setFlow( Flow<Config> flow )
249    {
250    this.flow = flow;
251    this.flowID = flow.getID();
252    this.flowName = flow.getName();
253    }
254
255  @Override
256  public Flow<Config> getFlow()
257    {
258    return flow;
259    }
260
261  @Override
262  public String getFlowID()
263    {
264    return flowID;
265    }
266
267  @Override
268  public String getFlowName()
269    {
270    return flowName;
271    }
272
273  protected void setFlowName( String flowName )
274    {
275    this.flowName = flowName;
276    }
277
278  @Override
279  public Config getConfig()
280    {
281    return flowStepConf;
282    }
283
284  @Override
285  public Map<Object, Object> getConfigAsProperties()
286    {
287    return Collections.emptyMap();
288    }
289
290  /**
291   * Set the initialized flowStepConf Config instance
292   *
293   * @param flowStepConf of type Config
294   */
295  protected void setConfig( Config flowStepConf )
296    {
297    this.flowStepConf = flowStepConf;
298    }
299
300  @Override
301  public String getStepDisplayName()
302    {
303    return getStepDisplayName( Util.ID_LENGTH );
304    }
305
306  protected String getStepDisplayName( int idLength )
307    {
308    if( idLength < 0 || idLength > Util.ID_LENGTH )
309      idLength = Util.ID_LENGTH;
310
311    if( idLength == 0 )
312      return String.format( "%s/%s", getFlowName(), getName() );
313
314    String flowID = getFlowID().substring( 0, idLength );
315    String stepID = getID().substring( 0, idLength );
316
317    return String.format( "[%s/%s] %s/%s", flowID, stepID, getFlowName(), getName() );
318    }
319
320  protected String getNodeDisplayName( FlowNode flowNode, int idLength )
321    {
322    if( idLength > Util.ID_LENGTH )
323      idLength = Util.ID_LENGTH;
324
325    String flowID = getFlowID().substring( 0, idLength );
326    String stepID = getID().substring( 0, idLength );
327    String nodeID = flowNode.getID().substring( 0, idLength );
328
329    return String.format( "[%s/%s/%s] %s/%s", flowID, stepID, nodeID, getFlowName(), getName() );
330    }
331
332  @Override
333  public int getSubmitPriority()
334    {
335    return submitPriority;
336    }
337
338  @Override
339  public void setSubmitPriority( int submitPriority )
340    {
341    if( submitPriority < 1 || submitPriority > 10 )
342      throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority );
343
344    this.submitPriority = submitPriority;
345    }
346
347  @Override
348  public void setFlowStepStats( FlowStepStats flowStepStats )
349    {
350    this.flowStepStats = flowStepStats;
351    }
352
353  @Override
354  public FlowStepStats getFlowStepStats()
355    {
356    return flowStepStats;
357    }
358
359  @Override
360  public ElementGraph getElementGraph()
361    {
362    return elementGraph;
363    }
364
365  protected EnumMultiMap getAnnotations()
366    {
367    return ( (AnnotatedGraph) elementGraph ).getAnnotations();
368    }
369
370  @Override
371  public FlowNodeGraph getFlowNodeGraph()
372    {
373    return flowNodeGraph;
374    }
375
376  @Override
377  public int getNumFlowNodes()
378    {
379    return flowNodeGraph.vertexSet().size();
380    }
381
382  public Set<FlowElement> getSourceElements()
383    {
384    return ElementGraphs.findSources( getElementGraph(), FlowElement.class );
385    }
386
387  public Set<FlowElement> getSinkElements()
388    {
389    return ElementGraphs.findSinks( getElementGraph(), FlowElement.class );
390    }
391
392  @Override
393  public Group getGroup()
394    {
395    if( groups.isEmpty() )
396      return null;
397
398    if( groups.size() > 1 )
399      throw new IllegalStateException( "more than one group" );
400
401    return groups.get( 0 );
402    }
403
404  @Override
405  public Collection<Group> getGroups()
406    {
407    return groups;
408    }
409
410  public void addGroups( Collection<Group> groups )
411    {
412    for( Group group : groups )
413      addGroup( group );
414    }
415
416  public void addGroup( Group group )
417    {
418    if( !groups.contains( group ) )
419      groups.add( group );
420    }
421
422  public Set<Tap> getAllAccumulatedSources()
423    {
424    return Util.narrowIdentitySet( Tap.class, getFlowNodeGraph().getFlowElementsFor( StreamMode.Accumulated ) );
425    }
426
427  public void addSource( String name, Tap source )
428    {
429    if( !sources.containsKey( source ) )
430      sources.put( source, new HashSet<String>() );
431
432    sources.get( source ).add( name );
433    }
434
435  public void addSink( String name, Tap sink )
436    {
437    if( !sinks.containsKey( sink ) )
438      sinks.put( sink, new HashSet<String>() );
439
440    sinks.get( sink ).add( name );
441    }
442
443  @Override
444  public Set<Tap> getSourceTaps()
445    {
446    return Collections.unmodifiableSet( new HashSet<Tap>( sources.keySet() ) );
447    }
448
449  @Override
450  public Set<Tap> getSinkTaps()
451    {
452    return Collections.unmodifiableSet( new HashSet<Tap>( sinks.keySet() ) );
453    }
454
455  @Override
456  public Tap getSink()
457    {
458    if( sinks.size() == 0 )
459      return null;
460
461    if( sinks.size() > 1 )
462      throw new IllegalStateException( "more than one sink" );
463
464    return sinks.keySet().iterator().next();
465    }
466
467  @Override
468  public Set<String> getSourceName( Tap source )
469    {
470    return Collections.unmodifiableSet( sources.get( source ) );
471    }
472
473  @Override
474  public Set<String> getSinkName( Tap sink )
475    {
476    return Collections.unmodifiableSet( sinks.get( sink ) );
477    }
478
479  @Override
480  public Tap getSourceWith( String identifier )
481    {
482    if( Util.isEmpty( identifier ) )
483      return null;
484
485    for( Tap tap : sources.keySet() )
486      {
487      if( identifier.equalsIgnoreCase( tap.getIdentifier() ) )
488        return tap;
489      }
490
491    return null;
492    }
493
494  @Override
495  public Tap getSinkWith( String identifier )
496    {
497    if( Util.isEmpty( identifier ) )
498      return null;
499
500    for( Tap tap : sinks.keySet() )
501      {
502      if( identifier.equalsIgnoreCase( tap.getIdentifier() ) )
503        return tap;
504      }
505
506    return null;
507    }
508
509  @Override
510  public Map<String, Tap> getTrapMap()
511    {
512    return traps;
513    }
514
515  @Override
516  public Set<Tap> getTraps()
517    {
518    return Collections.unmodifiableSet( new HashSet<Tap>( traps.values() ) );
519    }
520
521  public Tap getTrap( String name )
522    {
523    return getTrapMap().get( name );
524    }
525
526  boolean allSourcesExist() throws IOException
527    {
528    for( Tap tap : sources.keySet() )
529      {
530      if( !tap.resourceExists( getConfig() ) )
531        return false;
532      }
533
534    return true;
535    }
536
537  boolean areSourcesNewer( long sinkModified ) throws IOException
538    {
539    Config config = getConfig();
540    Iterator<Tap> values = sources.keySet().iterator();
541
542    long sourceModified = 0;
543
544    try
545      {
546      sourceModified = Util.getSourceModified( config, values, sinkModified );
547
548      if( sinkModified < sourceModified )
549        return true;
550
551      return false;
552      }
553    finally
554      {
555      if( isInfoEnabled() )
556        logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all
557      }
558    }
559
560  long getSinkModified() throws IOException
561    {
562    long sinkModified = Util.getSinkModified( getConfig(), sinks.keySet() );
563
564    if( isInfoEnabled() )
565      {
566      if( sinkModified == -1L )
567        logInfo( "at least one sink is marked for delete" );
568      if( sinkModified == 0L )
569        logInfo( "at least one sink does not exist" );
570      else
571        logInfo( "sink oldest modified date: " + new Date( sinkModified ) );
572      }
573
574    return sinkModified;
575    }
576
577  protected Throwable prepareResources()
578    {
579    Throwable throwable = prepareResources( getSourceTaps(), false );
580
581    if( throwable == null )
582      throwable = prepareResources( getSinkTaps(), true );
583
584    if( throwable == null )
585      throwable = prepareResources( getTraps(), true );
586
587    return throwable;
588    }
589
590  private Throwable prepareResources( Collection<Tap> taps, boolean forWrite )
591    {
592    Throwable throwable = null;
593
594    for( Tap tap : taps )
595      {
596      throwable = prepareResource( tap, forWrite );
597
598      if( throwable != null )
599        break;
600      }
601
602    return throwable;
603    }
604
605  private Throwable prepareResource( Tap tap, boolean forWrite )
606    {
607    Throwable throwable = null;
608
609    try
610      {
611      boolean result;
612
613      if( forWrite )
614        result = tap.prepareResourceForWrite( getConfig() );
615      else
616        result = tap.prepareResourceForRead( getConfig() );
617
618      if( !result )
619        {
620        String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) );
621
622        logError( message );
623
624        throwable = new FlowException( message );
625        }
626      }
627    catch( Throwable exception )
628      {
629      String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) );
630
631      logError( message, exception );
632
633      throwable = new FlowException( message, exception );
634      }
635
636    return throwable;
637    }
638
639  protected Throwable commitSinks()
640    {
641    Throwable throwable = null;
642
643    for( Tap tap : sinks.keySet() )
644      {
645      if( throwable != null )
646        rollbackResource( tap );
647      else
648        throwable = commitResource( tap );
649      }
650
651    return throwable;
652    }
653
654  private Throwable commitResource( Tap tap )
655    {
656    Throwable throwable = null;
657
658    try
659      {
660      if( !tap.commitResource( getConfig() ) )
661        {
662        String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
663
664        logError( message );
665
666        throwable = new FlowException( message );
667        }
668      }
669    catch( Throwable exception )
670      {
671      String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
672
673      logError( message, exception );
674
675      throwable = new FlowException( message, exception );
676      }
677
678    return throwable;
679    }
680
681  private Throwable rollbackResource( Tap tap )
682    {
683    Throwable throwable = null;
684
685    try
686      {
687      if( !tap.rollbackResource( getConfig() ) )
688        {
689        String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() );
690
691        logError( message );
692
693        throwable = new FlowException( message );
694        }
695      }
696    catch( Throwable exception )
697      {
698      String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() );
699
700      logError( message, exception );
701
702      throwable = new FlowException( message, exception );
703      }
704
705    return throwable;
706    }
707
708  protected Throwable rollbackSinks()
709    {
710    Throwable throwable = null;
711
712    for( Tap tap : sinks.keySet() )
713      {
714      if( throwable != null )
715        rollbackResource( tap );
716      else
717        throwable = rollbackResource( tap );
718      }
719
720    return throwable;
721    }
722
723  /**
724   * Public for testing.
725   *
726   * @param flowProcess
727   * @param parentConfig
728   * @return
729   */
730  public abstract Config createInitializedConfig( FlowProcess<Config> flowProcess, Config parentConfig );
731
732  /**
733   * Method getPreviousScopes returns the previous Scope instances. If the flowElement is a Group (specifically a CoGroup),
734   * there will be more than one instance.
735   *
736   * @param flowElement of type FlowElement
737   * @return Set<Scope>
738   */
739  public Set<Scope> getPreviousScopes( FlowElement flowElement )
740    {
741    return getElementGraph().incomingEdgesOf( flowElement );
742    }
743
744  /**
745   * Method getNextScope returns the next Scope instance in the graph. There will always only be one next.
746   *
747   * @param flowElement of type FlowElement
748   * @return Scope
749   */
750  public Scope getNextScope( FlowElement flowElement )
751    {
752    Set<Scope> set = getElementGraph().outgoingEdgesOf( flowElement );
753
754    if( set.size() != 1 )
755      throw new IllegalStateException( "should only be one scope after current flow element: " + flowElement + " found: " + set.size() );
756
757    return set.iterator().next();
758    }
759
760  public FlowElement getNextFlowElement( Scope scope )
761    {
762    return getElementGraph().getEdgeTarget( scope );
763    }
764
765  public Collection<Operation> getAllOperations()
766    {
767    Set<FlowElement> vertices = getElementGraph().vertexSet();
768    List<Operation> operations = new ArrayList<Operation>(); // operations impl equals, so two instance may be the same
769
770    for( FlowElement vertex : vertices )
771      {
772      if( vertex instanceof Operator )
773        operations.add( ( (Operator) vertex ).getOperation() );
774      }
775
776    return operations;
777    }
778
779  @Override
780  public boolean containsPipeNamed( String pipeName )
781    {
782    Set<FlowElement> vertices = getElementGraph().vertexSet();
783
784    for( FlowElement vertex : vertices )
785      {
786      if( vertex instanceof Pipe && ( (Pipe) vertex ).getName().equals( pipeName ) )
787        return true;
788      }
789
790    return false;
791    }
792
793  public void clean()
794    {
795    // use step config by default
796    clean( getConfig() );
797    }
798
799  public abstract void clean( Config config );
800
801  List<SafeFlowStepListener> getListeners()
802    {
803    if( listeners == null )
804      listeners = new LinkedList<SafeFlowStepListener>();
805
806    return listeners;
807    }
808
809  @Override
810  public boolean hasListeners()
811    {
812    return listeners != null && !listeners.isEmpty();
813    }
814
815  @Override
816  public void addListener( FlowStepListener flowStepListener )
817    {
818    getListeners().add( new SafeFlowStepListener( flowStepListener ) );
819    }
820
821  @Override
822  public boolean removeListener( FlowStepListener flowStepListener )
823    {
824    return getListeners().remove( new SafeFlowStepListener( flowStepListener ) );
825    }
826
827  protected void fireOnCompleted()
828    {
829    if( hasListeners() )
830      {
831      if( isDebugEnabled() )
832        logDebug( "firing onCompleted event: " + getListeners().size() );
833
834      for( Object flowStepListener : getListeners() )
835        ( (FlowStepListener) flowStepListener ).onStepCompleted( this );
836      }
837    }
838
839  protected void fireOnThrowable( Throwable throwable )
840    {
841    if( hasListeners() )
842      {
843      if( isDebugEnabled() )
844        logDebug( "firing onThrowable event: " + getListeners().size() );
845
846      for( Object flowStepListener : getListeners() )
847        ( (FlowStepListener) flowStepListener ).onStepThrowable( this, throwable );
848      }
849    }
850
851  protected void fireOnStopping()
852    {
853    if( hasListeners() )
854      {
855      if( isDebugEnabled() )
856        logDebug( "firing onStopping event: " + getListeners() );
857
858      for( Object flowStepListener : getListeners() )
859        ( (FlowStepListener) flowStepListener ).onStepStopping( this );
860      }
861    }
862
863  protected void fireOnStarting()
864    {
865    if( hasListeners() )
866      {
867      if( isDebugEnabled() )
868        logDebug( "firing onStarting event: " + getListeners().size() );
869
870      for( Object flowStepListener : getListeners() )
871        ( (FlowStepListener) flowStepListener ).onStepStarting( this );
872      }
873    }
874
875  protected void fireOnRunning()
876    {
877    if( hasListeners() )
878      {
879      if( isDebugEnabled() )
880        logDebug( "firing onRunning event: " + getListeners().size() );
881
882      for( Object flowStepListener : getListeners() )
883        ( (FlowStepListener) flowStepListener ).onStepRunning( this );
884      }
885    }
886
887  protected ClientState createClientState( FlowProcess flowProcess )
888    {
889    CascadingServices services = flowProcess.getCurrentSession().getCascadingServices();
890
891    if( services == null )
892      return ClientState.NULL;
893
894    return services.createClientState( getID() );
895    }
896
897  public FlowStepJob<Config> getFlowStepJob()
898    {
899    return flowStepJob;
900    }
901
902  public FlowStepJob<Config> getCreateFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig )
903    {
904    if( flowStepJob != null )
905      return flowStepJob;
906
907    if( flowProcess == null )
908      return null;
909
910    Config initializedConfig = createInitializedConfig( flowProcess, parentConfig );
911
912    setConfig( initializedConfig );
913
914    ClientState clientState = createClientState( flowProcess );
915
916    flowStepJob = createFlowStepJob( clientState, flowProcess, initializedConfig );
917
918    return flowStepJob;
919    }
920
921  protected abstract FlowStepJob createFlowStepJob( ClientState clientState, FlowProcess<Config> flowProcess, Config initializedStepConfig );
922
923  protected void initConfFromNodeConfigDef( ElementGraph nodeElementGraph, ConfigDef.Setter setter )
924    {
925    nodeElementGraph = ElementGraphs.asExtentMaskedSubGraph( nodeElementGraph );
926
927    ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() );
928
929    // applies each mode in order, topologically
930    for( ConfigDef.Mode mode : ConfigDef.Mode.values() )
931      {
932      Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( nodeElementGraph );
933
934      while( iterator.hasNext() )
935        {
936        FlowElement element = iterator.next();
937
938        while( element != null )
939          {
940          // intentionally skip any element that spans downstream nodes, like a GroupBy
941          // this way GroupBy is applied on the inbound side (where partitioning happens)
942          // not the outbound side.
943          // parent sub-assemblies (like Unique) will be applied if they have leading Pipes to the current spanning Pipe
944          if( elementSpansDownStream( stepElementGraph, nodeElementGraph, element ) )
945            {
946            element = null;
947            continue;
948            }
949
950          if( element instanceof ScopedElement && ( (ScopedElement) element ).hasNodeConfigDef() )
951            ( (ScopedElement) element ).getNodeConfigDef().apply( mode, setter );
952
953          // walk up the sub-assembly parent hierarchy
954          if( element instanceof Pipe )
955            element = ( (Pipe) element ).getParent();
956          else
957            element = null;
958          }
959        }
960      }
961    }
962
963  private boolean elementSpansDownStream( ElementGraph stepElementGraph, ElementGraph nodeElementGraph, FlowElement element )
964    {
965    boolean spansNodes = !( element instanceof SubAssembly );
966
967    if( spansNodes )
968      spansNodes = nodeElementGraph.outDegreeOf( element ) == 0 && stepElementGraph.outDegreeOf( element ) > 0;
969
970    return spansNodes;
971    }
972
973  protected void initConfFromStepConfigDef( ConfigDef.Setter setter )
974    {
975    ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() );
976
977    // applies each mode in order, topologically
978    for( ConfigDef.Mode mode : ConfigDef.Mode.values() )
979      {
980      Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( stepElementGraph );
981
982      while( iterator.hasNext() )
983        {
984        FlowElement element = iterator.next();
985
986        while( element != null )
987          {
988          if( element instanceof ScopedElement && ( (ScopedElement) element ).hasStepConfigDef() )
989            ( (ScopedElement) element ).getStepConfigDef().apply( mode, setter );
990
991          // walk up the sub-assembly parent hierarchy
992          if( element instanceof Pipe )
993            element = ( (Pipe) element ).getParent();
994          else
995            element = null;
996          }
997        }
998      }
999    }
1000
1001  protected static void addSources( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sources )
1002    {
1003    for( Tap tap : sources )
1004      {
1005      for( Scope scope : elementGraph.outgoingEdgesOf( tap ) )
1006        flowStep.addSource( scope.getName(), tap );
1007      }
1008    }
1009
1010  protected static void addSinks( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sinks )
1011    {
1012    for( Tap tap : sinks )
1013      {
1014      for( Scope scope : elementGraph.incomingEdgesOf( tap ) )
1015        flowStep.addSink( scope.getName(), tap );
1016      }
1017    }
1018
1019  @Override
1020  public boolean equals( Object object )
1021    {
1022    if( this == object )
1023      return true;
1024    if( object == null || getClass() != object.getClass() )
1025      return false;
1026
1027    BaseFlowStep flowStep = (BaseFlowStep) object;
1028
1029    if( id != null ? !id.equals( flowStep.id ) : flowStep.id != null )
1030      return false;
1031
1032    return true;
1033    }
1034
1035  @Override
1036  public int hashCode()
1037    {
1038    return id != null ? id.hashCode() : 0;
1039    }
1040
1041  @Override
1042  public String toString()
1043    {
1044    StringBuffer buffer = new StringBuffer();
1045
1046    buffer.append( getClass().getSimpleName() );
1047    buffer.append( "[name: " ).append( getName() ).append( "]" );
1048
1049    return buffer.toString();
1050    }
1051
1052  @Override
1053  public final boolean isInfoEnabled()
1054    {
1055    return getLogger().isInfoEnabled();
1056    }
1057
1058  private ProcessLogger getLogger()
1059    {
1060    if( flow != null && flow instanceof ProcessLogger )
1061      return (ProcessLogger) flow;
1062
1063    return ProcessLogger.NULL;
1064    }
1065
1066  @Override
1067  public final boolean isDebugEnabled()
1068    {
1069    return ( getLogger() ).isDebugEnabled();
1070    }
1071
1072  @Override
1073  public void logDebug( String message, Object... arguments )
1074    {
1075    getLogger().logDebug( message, arguments );
1076    }
1077
1078  @Override
1079  public void logInfo( String message, Object... arguments )
1080    {
1081    getLogger().logInfo( message, arguments );
1082    }
1083
1084  @Override
1085  public void logWarn( String message )
1086    {
1087    getLogger().logWarn( message );
1088    }
1089
1090  @Override
1091  public void logWarn( String message, Throwable throwable )
1092    {
1093    getLogger().logWarn( message, throwable );
1094    }
1095
1096  @Override
1097  public void logWarn( String message, Object... arguments )
1098    {
1099    getLogger().logWarn( message, arguments );
1100    }
1101
1102  @Override
1103  public void logError( String message, Object... arguments )
1104    {
1105    getLogger().logError( message, arguments );
1106    }
1107
1108  @Override
1109  public void logError( String message, Throwable throwable )
1110    {
1111    getLogger().logError( message, throwable );
1112    }
1113
1114  /**
1115   * Class SafeFlowStepListener safely calls a wrapped FlowStepListener.
1116   * <p/>
1117   * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
1118   * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method
1119   * which in turn is run in a new Thread.
1120   */
1121  private class SafeFlowStepListener implements FlowStepListener
1122    {
1123    /** Field flowListener */
1124    final FlowStepListener flowStepListener;
1125    /** Field throwable */
1126    Throwable throwable;
1127
1128    private SafeFlowStepListener( FlowStepListener flowStepListener )
1129      {
1130      this.flowStepListener = flowStepListener;
1131      }
1132
1133    public void onStepStarting( FlowStep flowStep )
1134      {
1135      try
1136        {
1137        flowStepListener.onStepStarting( flowStep );
1138        }
1139      catch( Throwable throwable )
1140        {
1141        handleThrowable( throwable );
1142        }
1143      }
1144
1145    public void onStepStopping( FlowStep flowStep )
1146      {
1147      try
1148        {
1149        flowStepListener.onStepStopping( flowStep );
1150        }
1151      catch( Throwable throwable )
1152        {
1153        handleThrowable( throwable );
1154        }
1155      }
1156
1157    public void onStepCompleted( FlowStep flowStep )
1158      {
1159      try
1160        {
1161        flowStepListener.onStepCompleted( flowStep );
1162        }
1163      catch( Throwable throwable )
1164        {
1165        handleThrowable( throwable );
1166        }
1167      }
1168
1169    public void onStepRunning( FlowStep flowStep )
1170      {
1171      try
1172        {
1173        flowStepListener.onStepRunning( flowStep );
1174        }
1175      catch( Throwable throwable )
1176        {
1177        handleThrowable( throwable );
1178        }
1179      }
1180
1181    public boolean onStepThrowable( FlowStep flowStep, Throwable flowStepThrowable )
1182      {
1183      try
1184        {
1185        return flowStepListener.onStepThrowable( flowStep, flowStepThrowable );
1186        }
1187      catch( Throwable throwable )
1188        {
1189        handleThrowable( throwable );
1190        }
1191
1192      return false;
1193      }
1194
1195    private void handleThrowable( Throwable throwable )
1196      {
1197      this.throwable = throwable;
1198
1199      logWarn( String.format( "flow step listener %s threw throwable", flowStepListener ), throwable );
1200      }
1201
1202    public boolean equals( Object object )
1203      {
1204      if( object instanceof BaseFlowStep.SafeFlowStepListener )
1205        return flowStepListener.equals( ( (BaseFlowStep.SafeFlowStepListener) object ).flowStepListener );
1206
1207      return flowStepListener.equals( object );
1208      }
1209
1210    public int hashCode()
1211      {
1212      return flowStepListener.hashCode();
1213      }
1214    }
1215  }