001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner;
023
024import java.io.IOException;
025import java.io.Serializable;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.Date;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.Iterator;
033import java.util.LinkedList;
034import java.util.List;
035import java.util.Map;
036import java.util.Set;
037
038import cascading.flow.Flow;
039import cascading.flow.FlowElement;
040import cascading.flow.FlowException;
041import cascading.flow.FlowNode;
042import cascading.flow.FlowProcess;
043import cascading.flow.FlowStep;
044import cascading.flow.FlowStepListener;
045import cascading.flow.planner.graph.AnnotatedGraph;
046import cascading.flow.planner.graph.ElementGraph;
047import cascading.flow.planner.graph.ElementGraphs;
048import cascading.flow.planner.process.FlowNodeGraph;
049import cascading.flow.stream.annotations.StreamMode;
050import cascading.management.CascadingServices;
051import cascading.management.state.ClientState;
052import cascading.operation.Operation;
053import cascading.pipe.Group;
054import cascading.pipe.Operator;
055import cascading.pipe.Pipe;
056import cascading.pipe.SubAssembly;
057import cascading.property.ConfigDef;
058import cascading.stats.FlowStepStats;
059import cascading.tap.Tap;
060import cascading.util.EnumMultiMap;
061import cascading.util.ProcessLogger;
062import cascading.util.Util;
063
064import static cascading.flow.planner.graph.ElementGraphs.findAllGroups;
065
066/**
067 * Class FlowStep is an internal representation of a given Job to be executed on a remote cluster. During
068 * planning, pipe assemblies are broken down into "steps" and encapsulated in this class.
069 * <p/>
070 * FlowSteps are submitted in order of dependency. If two or more steps do not share the same dependencies and all
071 * can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines the order in which
072 * all steps will be submitted for execution. The default submit priority is 5.
073 * <p/>
074 * This class is for internal use, there are no stable public methods.
075 */
076public abstract class BaseFlowStep<Config> implements FlowStep<Config>, ProcessLogger, Serializable
077  {
078  /** Field flow */
079  private transient Flow<Config> flow;
080  /** Field flowName */
081  private String flowName;
082  /** Field flowID */
083  private String flowID;
084
085  private transient Config flowStepConf;
086
087  /** Field submitPriority */
088  private int submitPriority = 5;
089
090  /** Field name */
091  String name;
092  private String id;
093  private int ordinal;
094  private Map<String, String> processAnnotations;
095
096  /** Field step listeners */
097  private List<SafeFlowStepListener> listeners;
098
099  /** Field elementGraph */
100  protected ElementGraph elementGraph;
101  /** Field flowNodeGraph */
102  protected FlowNodeGraph flowNodeGraph;
103
104  /** Field sources */
105  protected final Map<Tap, Set<String>> sources = new HashMap<>(); // all sources
106  /** Field sink */
107  protected final Map<Tap, Set<String>> sinks = new HashMap<>(); // all sinks
108  /** Field traps */
109  private final Map<String, Tap> traps = new HashMap<>();
110
111  /** Field tempSink */
112  protected Tap tempSink; // used if we need to bypass the filesystem
113
114  /** Field groups */
115  private final List<Group> groups = new ArrayList<Group>();
116
117  protected transient FlowStepStats flowStepStats;
118
119  private transient FlowStepJob<Config> flowStepJob;
120
121  /** optional metadata about the FlowStep */
122  private Map<String, String> flowStepDescriptor = Collections.emptyMap();
123
124  protected BaseFlowStep( String name, int ordinal )
125    {
126    this( name, ordinal, null );
127    }
128
129  protected BaseFlowStep( String name, int ordinal, Map<String, String> flowStepDescriptor )
130    {
131    this( name, ordinal, null, flowStepDescriptor );
132    }
133
134  protected BaseFlowStep( String name, int ordinal, FlowNodeGraph flowNodeGraph, Map<String, String> flowStepDescriptor )
135    {
136    this();
137    setName( name );
138    this.ordinal = ordinal;
139
140    this.elementGraph = null;
141    this.flowNodeGraph = flowNodeGraph;
142
143    setFlowStepDescriptor( flowStepDescriptor );
144    }
145
146  protected BaseFlowStep( ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph )
147    {
148    this( elementStepGraph, flowNodeGraph, null );
149    }
150
151  protected BaseFlowStep( ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph, Map<String, String> flowStepDescriptor )
152    {
153    this();
154    this.elementGraph = elementStepGraph;
155    this.flowNodeGraph = flowNodeGraph; // TODO: verify no missing elements in the union of the node graphs
156
157    setFlowStepDescriptor( flowStepDescriptor );
158
159    configure();
160    }
161
162  protected BaseFlowStep()
163    {
164    this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number
165    }
166
167  protected void configure()
168    {
169    addSources( this, getElementGraph(), getFlowNodeGraph().getSourceTaps() );
170    addSinks( this, getElementGraph(), getFlowNodeGraph().getSinkTaps() );
171
172    addAllGroups();
173
174    traps.putAll( getFlowNodeGraph().getTrapsMap() );
175    }
176
177  protected void addAllGroups()
178    {
179    addGroups( findAllGroups( getElementGraph() ) );
180    }
181
182  @Override
183  public String getID()
184    {
185    return id;
186    }
187
188  public void setOrdinal( int ordinal )
189    {
190    this.ordinal = ordinal;
191    }
192
193  @Override
194  public int getOrdinal()
195    {
196    return ordinal;
197    }
198
199  @Override
200  public String getName()
201    {
202    return name;
203    }
204
205  public void setName( String name )
206    {
207    if( name == null || name.isEmpty() )
208      throw new IllegalArgumentException( "step name may not be null or empty" );
209
210    this.name = name;
211    }
212
213  @Override
214  public Map<String, String> getFlowStepDescriptor()
215    {
216    return Collections.unmodifiableMap( flowStepDescriptor );
217    }
218
219  protected void setFlowStepDescriptor( Map<String, String> flowStepDescriptor )
220    {
221    if( flowStepDescriptor != null )
222      this.flowStepDescriptor = flowStepDescriptor;
223    }
224
225  @Override
226  public Map<String, String> getProcessAnnotations()
227    {
228    if( processAnnotations == null )
229      return Collections.emptyMap();
230
231    return Collections.unmodifiableMap( processAnnotations );
232    }
233
234  @Override
235  public void addProcessAnnotation( Enum annotation )
236    {
237    if( annotation == null )
238      return;
239
240    addProcessAnnotation( annotation.getDeclaringClass().getName(), annotation.name() );
241    }
242
243  @Override
244  public void addProcessAnnotation( String key, String value )
245    {
246    if( processAnnotations == null )
247      processAnnotations = new HashMap<>();
248
249    processAnnotations.put( key, value );
250    }
251
252  public void setFlow( Flow<Config> flow )
253    {
254    this.flow = flow;
255    this.flowID = flow.getID();
256    this.flowName = flow.getName();
257    }
258
259  @Override
260  public Flow<Config> getFlow()
261    {
262    return flow;
263    }
264
265  @Override
266  public String getFlowID()
267    {
268    return flowID;
269    }
270
271  @Override
272  public String getFlowName()
273    {
274    return flowName;
275    }
276
277  protected void setFlowName( String flowName )
278    {
279    this.flowName = flowName;
280    }
281
282  @Override
283  public Config getConfig()
284    {
285    return flowStepConf;
286    }
287
288  @Override
289  public Map<Object, Object> getConfigAsProperties()
290    {
291    return Collections.emptyMap();
292    }
293
294  /**
295   * Set the initialized flowStepConf Config instance
296   *
297   * @param flowStepConf of type Config
298   */
299  protected void setConfig( Config flowStepConf )
300    {
301    this.flowStepConf = flowStepConf;
302    }
303
304  @Override
305  public String getStepDisplayName()
306    {
307    return getStepDisplayName( Util.ID_LENGTH );
308    }
309
310  protected String getStepDisplayName( int idLength )
311    {
312    if( idLength < 0 || idLength > Util.ID_LENGTH )
313      idLength = Util.ID_LENGTH;
314
315    if( idLength == 0 )
316      return String.format( "%s/%s", getFlowName(), getName() );
317
318    String flowID = getFlowID().substring( 0, idLength );
319    String stepID = getID().substring( 0, idLength );
320
321    return String.format( "[%s/%s] %s/%s", flowID, stepID, getFlowName(), getName() );
322    }
323
324  protected String getNodeDisplayName( FlowNode flowNode, int idLength )
325    {
326    if( idLength > Util.ID_LENGTH )
327      idLength = Util.ID_LENGTH;
328
329    String flowID = getFlowID().substring( 0, idLength );
330    String stepID = getID().substring( 0, idLength );
331    String nodeID = flowNode.getID().substring( 0, idLength );
332
333    return String.format( "[%s/%s/%s] %s/%s", flowID, stepID, nodeID, getFlowName(), getName() );
334    }
335
336  @Override
337  public int getSubmitPriority()
338    {
339    return submitPriority;
340    }
341
342  @Override
343  public void setSubmitPriority( int submitPriority )
344    {
345    if( submitPriority < 1 || submitPriority > 10 )
346      throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority );
347
348    this.submitPriority = submitPriority;
349    }
350
351  @Override
352  public void setFlowStepStats( FlowStepStats flowStepStats )
353    {
354    this.flowStepStats = flowStepStats;
355    }
356
357  @Override
358  public FlowStepStats getFlowStepStats()
359    {
360    return flowStepStats;
361    }
362
363  @Override
364  public ElementGraph getElementGraph()
365    {
366    return elementGraph;
367    }
368
369  protected EnumMultiMap getAnnotations()
370    {
371    return ( (AnnotatedGraph) elementGraph ).getAnnotations();
372    }
373
374  @Override
375  public FlowNodeGraph getFlowNodeGraph()
376    {
377    return flowNodeGraph;
378    }
379
380  @Override
381  public int getNumFlowNodes()
382    {
383    return flowNodeGraph.vertexSet().size();
384    }
385
386  public Set<FlowElement> getSourceElements()
387    {
388    return ElementGraphs.findSources( getElementGraph(), FlowElement.class );
389    }
390
391  public Set<FlowElement> getSinkElements()
392    {
393    return ElementGraphs.findSinks( getElementGraph(), FlowElement.class );
394    }
395
396  @Override
397  public Group getGroup()
398    {
399    if( groups.isEmpty() )
400      return null;
401
402    if( groups.size() > 1 )
403      throw new IllegalStateException( "more than one group" );
404
405    return groups.get( 0 );
406    }
407
408  @Override
409  public Collection<Group> getGroups()
410    {
411    return groups;
412    }
413
414  public void addGroups( Collection<Group> groups )
415    {
416    for( Group group : groups )
417      addGroup( group );
418    }
419
420  public void addGroup( Group group )
421    {
422    if( !groups.contains( group ) )
423      groups.add( group );
424    }
425
426  /**
427   * Returns all source Tap instances annotated by the planner as being {@link StreamMode#Accumulated}.
428   *
429   * @return Set of accumulated Tap instances
430   */
431  public Set<Tap> getAllAccumulatedSources()
432    {
433    return Util.narrowIdentitySet( Tap.class, getFlowNodeGraph().getFlowElementsFor( StreamMode.Accumulated ) );
434    }
435
436  /**
437   * Returns all source Tap instances annotated by the planner as being {@link StreamMode#Streamed}.
438   *
439   * @return Set of streamed Tap instances
440   */
441  public Set<Tap> getAllStreamedSources()
442    {
443    return Util.narrowIdentitySet( Tap.class, getFlowNodeGraph().getFlowElementsFor( StreamMode.Streamed ) );
444    }
445
446  public void addSource( String name, Tap source )
447    {
448    if( !sources.containsKey( source ) )
449      sources.put( source, new HashSet<String>() );
450
451    sources.get( source ).add( name );
452    }
453
454  public void addSink( String name, Tap sink )
455    {
456    if( !sinks.containsKey( sink ) )
457      sinks.put( sink, new HashSet<String>() );
458
459    sinks.get( sink ).add( name );
460    }
461
462  @Override
463  public Set<Tap> getSourceTaps()
464    {
465    return Collections.unmodifiableSet( new HashSet<Tap>( sources.keySet() ) );
466    }
467
468  @Override
469  public Set<Tap> getSinkTaps()
470    {
471    return Collections.unmodifiableSet( new HashSet<Tap>( sinks.keySet() ) );
472    }
473
474  @Override
475  public Tap getSink()
476    {
477    if( sinks.size() == 0 )
478      return null;
479
480    if( sinks.size() > 1 )
481      throw new IllegalStateException( "more than one sink" );
482
483    return sinks.keySet().iterator().next();
484    }
485
486  @Override
487  public Set<String> getSourceName( Tap source )
488    {
489    return Collections.unmodifiableSet( sources.get( source ) );
490    }
491
492  @Override
493  public Set<String> getSinkName( Tap sink )
494    {
495    return Collections.unmodifiableSet( sinks.get( sink ) );
496    }
497
498  @Override
499  public Tap getSourceWith( String identifier )
500    {
501    if( Util.isEmpty( identifier ) )
502      return null;
503
504    for( Tap tap : sources.keySet() )
505      {
506      if( identifier.equalsIgnoreCase( tap.getIdentifier() ) )
507        return tap;
508      }
509
510    return null;
511    }
512
513  @Override
514  public Tap getSinkWith( String identifier )
515    {
516    if( Util.isEmpty( identifier ) )
517      return null;
518
519    for( Tap tap : sinks.keySet() )
520      {
521      if( identifier.equalsIgnoreCase( tap.getIdentifier() ) )
522        return tap;
523      }
524
525    return null;
526    }
527
528  @Override
529  public Map<String, Tap> getTrapMap()
530    {
531    return traps;
532    }
533
534  @Override
535  public Set<Tap> getTraps()
536    {
537    return Collections.unmodifiableSet( new HashSet<Tap>( traps.values() ) );
538    }
539
540  public Tap getTrap( String name )
541    {
542    return getTrapMap().get( name );
543    }
544
545  boolean allSourcesExist() throws IOException
546    {
547    for( Tap tap : sources.keySet() )
548      {
549      if( !tap.resourceExists( getConfig() ) )
550        return false;
551      }
552
553    return true;
554    }
555
556  boolean areSourcesNewer( long sinkModified ) throws IOException
557    {
558    Config config = getConfig();
559    Iterator<Tap> values = sources.keySet().iterator();
560
561    long sourceModified = 0;
562
563    try
564      {
565      sourceModified = Util.getSourceModified( config, values, sinkModified );
566
567      if( sinkModified < sourceModified )
568        return true;
569
570      return false;
571      }
572    finally
573      {
574      if( isInfoEnabled() )
575        logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all
576      }
577    }
578
579  long getSinkModified() throws IOException
580    {
581    long sinkModified = Util.getSinkModified( getConfig(), sinks.keySet() );
582
583    if( isInfoEnabled() )
584      {
585      if( sinkModified == -1L )
586        logInfo( "at least one sink is marked for delete" );
587      if( sinkModified == 0L )
588        logInfo( "at least one sink does not exist" );
589      else
590        logInfo( "sink oldest modified date: " + new Date( sinkModified ) );
591      }
592
593    return sinkModified;
594    }
595
596  protected Throwable prepareResources()
597    {
598    Throwable throwable = prepareResources( getSourceTaps(), false );
599
600    if( throwable == null )
601      throwable = prepareResources( getSinkTaps(), true );
602
603    if( throwable == null )
604      throwable = prepareResources( getTraps(), true );
605
606    return throwable;
607    }
608
609  private Throwable prepareResources( Collection<Tap> taps, boolean forWrite )
610    {
611    Throwable throwable = null;
612
613    for( Tap tap : taps )
614      {
615      throwable = prepareResource( tap, forWrite );
616
617      if( throwable != null )
618        break;
619      }
620
621    return throwable;
622    }
623
624  private Throwable prepareResource( Tap tap, boolean forWrite )
625    {
626    Throwable throwable = null;
627
628    try
629      {
630      boolean result;
631
632      if( forWrite )
633        result = tap.prepareResourceForWrite( getConfig() );
634      else
635        result = tap.prepareResourceForRead( getConfig() );
636
637      if( !result )
638        {
639        String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) );
640
641        logError( message );
642
643        throwable = new FlowException( message );
644        }
645      }
646    catch( Throwable exception )
647      {
648      String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) );
649
650      logError( message, exception );
651
652      throwable = new FlowException( message, exception );
653      }
654
655    return throwable;
656    }
657
658  protected Throwable commitSinks()
659    {
660    Throwable throwable = null;
661
662    for( Tap tap : sinks.keySet() )
663      {
664      if( throwable != null )
665        rollbackResource( tap );
666      else
667        throwable = commitResource( tap );
668      }
669
670    return throwable;
671    }
672
673  private Throwable commitResource( Tap tap )
674    {
675    Throwable throwable = null;
676
677    try
678      {
679      if( !tap.commitResource( getConfig() ) )
680        {
681        String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
682
683        logError( message );
684
685        throwable = new FlowException( message );
686        }
687      }
688    catch( Throwable exception )
689      {
690      String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
691
692      logError( message, exception );
693
694      throwable = new FlowException( message, exception );
695      }
696
697    return throwable;
698    }
699
700  private Throwable rollbackResource( Tap tap )
701    {
702    Throwable throwable = null;
703
704    try
705      {
706      if( !tap.rollbackResource( getConfig() ) )
707        {
708        String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() );
709
710        logError( message );
711
712        throwable = new FlowException( message );
713        }
714      }
715    catch( Throwable exception )
716      {
717      String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() );
718
719      logError( message, exception );
720
721      throwable = new FlowException( message, exception );
722      }
723
724    return throwable;
725    }
726
727  protected Throwable rollbackSinks()
728    {
729    Throwable throwable = null;
730
731    for( Tap tap : sinks.keySet() )
732      {
733      if( throwable != null )
734        rollbackResource( tap );
735      else
736        throwable = rollbackResource( tap );
737      }
738
739    return throwable;
740    }
741
742  /**
743   * Public for testing.
744   *
745   * @param flowProcess
746   * @param parentConfig
747   * @return
748   */
749  public abstract Config createInitializedConfig( FlowProcess<Config> flowProcess, Config parentConfig );
750
751  /**
752   * Method getPreviousScopes returns the previous Scope instances. If the flowElement is a Group (specifically a CoGroup),
753   * there will be more than one instance.
754   *
755   * @param flowElement of type FlowElement
756   * @return Set<Scope>
757   */
758  public Set<Scope> getPreviousScopes( FlowElement flowElement )
759    {
760    return getElementGraph().incomingEdgesOf( flowElement );
761    }
762
763  /**
764   * Method getNextScope returns the next Scope instance in the graph. There will always only be one next.
765   *
766   * @param flowElement of type FlowElement
767   * @return Scope
768   */
769  public Scope getNextScope( FlowElement flowElement )
770    {
771    Set<Scope> set = getElementGraph().outgoingEdgesOf( flowElement );
772
773    if( set.size() != 1 )
774      throw new IllegalStateException( "should only be one scope after current flow element: " + flowElement + " found: " + set.size() );
775
776    return set.iterator().next();
777    }
778
779  public FlowElement getNextFlowElement( Scope scope )
780    {
781    return getElementGraph().getEdgeTarget( scope );
782    }
783
784  public Collection<Operation> getAllOperations()
785    {
786    Set<FlowElement> vertices = getElementGraph().vertexSet();
787    List<Operation> operations = new ArrayList<Operation>(); // operations impl equals, so two instance may be the same
788
789    for( FlowElement vertex : vertices )
790      {
791      if( vertex instanceof Operator )
792        operations.add( ( (Operator) vertex ).getOperation() );
793      }
794
795    return operations;
796    }
797
798  @Override
799  public boolean containsPipeNamed( String pipeName )
800    {
801    Set<FlowElement> vertices = getElementGraph().vertexSet();
802
803    for( FlowElement vertex : vertices )
804      {
805      if( vertex instanceof Pipe && ( (Pipe) vertex ).getName().equals( pipeName ) )
806        return true;
807      }
808
809    return false;
810    }
811
812  public void clean()
813    {
814    // use step config by default
815    clean( getConfig() );
816    }
817
818  public abstract void clean( Config config );
819
820  List<SafeFlowStepListener> getListeners()
821    {
822    if( listeners == null )
823      listeners = new LinkedList<SafeFlowStepListener>();
824
825    return listeners;
826    }
827
828  @Override
829  public boolean hasListeners()
830    {
831    return listeners != null && !listeners.isEmpty();
832    }
833
834  @Override
835  public void addListener( FlowStepListener flowStepListener )
836    {
837    getListeners().add( new SafeFlowStepListener( flowStepListener ) );
838    }
839
840  @Override
841  public boolean removeListener( FlowStepListener flowStepListener )
842    {
843    return getListeners().remove( new SafeFlowStepListener( flowStepListener ) );
844    }
845
846  protected void fireOnCompleted()
847    {
848    if( hasListeners() )
849      {
850      if( isDebugEnabled() )
851        logDebug( "firing onCompleted event: " + getListeners().size() );
852
853      for( Object flowStepListener : getListeners() )
854        ( (FlowStepListener) flowStepListener ).onStepCompleted( this );
855      }
856    }
857
858  protected void fireOnThrowable( Throwable throwable )
859    {
860    if( hasListeners() )
861      {
862      if( isDebugEnabled() )
863        logDebug( "firing onThrowable event: " + getListeners().size() );
864
865      for( Object flowStepListener : getListeners() )
866        ( (FlowStepListener) flowStepListener ).onStepThrowable( this, throwable );
867      }
868    }
869
870  protected void fireOnStopping()
871    {
872    if( hasListeners() )
873      {
874      if( isDebugEnabled() )
875        logDebug( "firing onStopping event: " + getListeners() );
876
877      for( Object flowStepListener : getListeners() )
878        ( (FlowStepListener) flowStepListener ).onStepStopping( this );
879      }
880    }
881
882  protected void fireOnStarting()
883    {
884    if( hasListeners() )
885      {
886      if( isDebugEnabled() )
887        logDebug( "firing onStarting event: " + getListeners().size() );
888
889      for( Object flowStepListener : getListeners() )
890        ( (FlowStepListener) flowStepListener ).onStepStarting( this );
891      }
892    }
893
894  protected void fireOnRunning()
895    {
896    if( hasListeners() )
897      {
898      if( isDebugEnabled() )
899        logDebug( "firing onRunning event: " + getListeners().size() );
900
901      for( Object flowStepListener : getListeners() )
902        ( (FlowStepListener) flowStepListener ).onStepRunning( this );
903      }
904    }
905
906  protected ClientState createClientState( FlowProcess flowProcess )
907    {
908    CascadingServices services = flowProcess.getCurrentSession().getCascadingServices();
909
910    if( services == null )
911      return ClientState.NULL;
912
913    return services.createClientState( getID() );
914    }
915
916  public FlowStepJob<Config> getFlowStepJob()
917    {
918    return flowStepJob;
919    }
920
921  public FlowStepJob<Config> getCreateFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig )
922    {
923    if( flowStepJob != null )
924      return flowStepJob;
925
926    if( flowProcess == null )
927      return null;
928
929    Config initializedConfig = createInitializedConfig( flowProcess, parentConfig );
930
931    setConfig( initializedConfig );
932
933    ClientState clientState = createClientState( flowProcess );
934
935    flowStepJob = createFlowStepJob( clientState, flowProcess, initializedConfig );
936
937    return flowStepJob;
938    }
939
940  protected abstract FlowStepJob createFlowStepJob( ClientState clientState, FlowProcess<Config> flowProcess, Config initializedStepConfig );
941
942  protected void initConfFromNodeConfigDef( ElementGraph nodeElementGraph, ConfigDef.Setter setter )
943    {
944    nodeElementGraph = ElementGraphs.asExtentMaskedSubGraph( nodeElementGraph );
945
946    ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() );
947
948    // applies each mode in order, topologically
949    for( ConfigDef.Mode mode : ConfigDef.Mode.values() )
950      {
951      Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( nodeElementGraph );
952
953      while( iterator.hasNext() )
954        {
955        FlowElement element = iterator.next();
956
957        while( element != null )
958          {
959          // intentionally skip any element that spans downstream nodes, like a GroupBy
960          // this way GroupBy is applied on the inbound side (where partitioning happens)
961          // not the outbound side.
962          // parent sub-assemblies (like Unique) will be applied if they have leading Pipes to the current spanning Pipe
963          if( elementSpansDownStream( stepElementGraph, nodeElementGraph, element ) )
964            {
965            element = null;
966            continue;
967            }
968
969          if( element instanceof ScopedElement && ( (ScopedElement) element ).hasNodeConfigDef() )
970            ( (ScopedElement) element ).getNodeConfigDef().apply( mode, setter );
971
972          // walk up the sub-assembly parent hierarchy
973          if( element instanceof Pipe )
974            element = ( (Pipe) element ).getParent();
975          else
976            element = null;
977          }
978        }
979      }
980    }
981
982  private boolean elementSpansDownStream( ElementGraph stepElementGraph, ElementGraph nodeElementGraph, FlowElement element )
983    {
984    boolean spansNodes = !( element instanceof SubAssembly );
985
986    if( spansNodes )
987      spansNodes = nodeElementGraph.outDegreeOf( element ) == 0 && stepElementGraph.outDegreeOf( element ) > 0;
988
989    return spansNodes;
990    }
991
992  protected void initConfFromStepConfigDef( ConfigDef.Setter setter )
993    {
994    ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() );
995
996    // applies each mode in order, topologically
997    for( ConfigDef.Mode mode : ConfigDef.Mode.values() )
998      {
999      Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( stepElementGraph );
1000
1001      while( iterator.hasNext() )
1002        {
1003        FlowElement element = iterator.next();
1004
1005        while( element != null )
1006          {
1007          if( element instanceof ScopedElement && ( (ScopedElement) element ).hasStepConfigDef() )
1008            ( (ScopedElement) element ).getStepConfigDef().apply( mode, setter );
1009
1010          // walk up the sub-assembly parent hierarchy
1011          if( element instanceof Pipe )
1012            element = ( (Pipe) element ).getParent();
1013          else
1014            element = null;
1015          }
1016        }
1017      }
1018    }
1019
1020  protected static void addSources( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sources )
1021    {
1022    for( Tap tap : sources )
1023      {
1024      for( Scope scope : elementGraph.outgoingEdgesOf( tap ) )
1025        flowStep.addSource( scope.getName(), tap );
1026      }
1027    }
1028
1029  protected static void addSinks( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sinks )
1030    {
1031    for( Tap tap : sinks )
1032      {
1033      for( Scope scope : elementGraph.incomingEdgesOf( tap ) )
1034        flowStep.addSink( scope.getName(), tap );
1035      }
1036    }
1037
1038  @Override
1039  public boolean equals( Object object )
1040    {
1041    if( this == object )
1042      return true;
1043    if( object == null || getClass() != object.getClass() )
1044      return false;
1045
1046    BaseFlowStep flowStep = (BaseFlowStep) object;
1047
1048    if( id != null ? !id.equals( flowStep.id ) : flowStep.id != null )
1049      return false;
1050
1051    return true;
1052    }
1053
1054  @Override
1055  public int hashCode()
1056    {
1057    return id != null ? id.hashCode() : 0;
1058    }
1059
1060  @Override
1061  public String toString()
1062    {
1063    StringBuffer buffer = new StringBuffer();
1064
1065    buffer.append( getClass().getSimpleName() );
1066    buffer.append( "[name: " ).append( getName() ).append( "]" );
1067
1068    return buffer.toString();
1069    }
1070
1071  @Override
1072  public final boolean isInfoEnabled()
1073    {
1074    return getLogger().isInfoEnabled();
1075    }
1076
1077  private ProcessLogger getLogger()
1078    {
1079    if( flow != null && flow instanceof ProcessLogger )
1080      return (ProcessLogger) flow;
1081
1082    return ProcessLogger.NULL;
1083    }
1084
1085  @Override
1086  public final boolean isDebugEnabled()
1087    {
1088    return ( getLogger() ).isDebugEnabled();
1089    }
1090
1091  @Override
1092  public void logDebug( String message, Object... arguments )
1093    {
1094    getLogger().logDebug( message, arguments );
1095    }
1096
1097  @Override
1098  public void logInfo( String message, Object... arguments )
1099    {
1100    getLogger().logInfo( message, arguments );
1101    }
1102
1103  @Override
1104  public void logWarn( String message )
1105    {
1106    getLogger().logWarn( message );
1107    }
1108
1109  @Override
1110  public void logWarn( String message, Throwable throwable )
1111    {
1112    getLogger().logWarn( message, throwable );
1113    }
1114
1115  @Override
1116  public void logWarn( String message, Object... arguments )
1117    {
1118    getLogger().logWarn( message, arguments );
1119    }
1120
1121  @Override
1122  public void logError( String message, Object... arguments )
1123    {
1124    getLogger().logError( message, arguments );
1125    }
1126
1127  @Override
1128  public void logError( String message, Throwable throwable )
1129    {
1130    getLogger().logError( message, throwable );
1131    }
1132
1133  /**
1134   * Class SafeFlowStepListener safely calls a wrapped FlowStepListener.
1135   * <p/>
1136   * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
1137   * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method
1138   * which in turn is run in a new Thread.
1139   */
1140  private class SafeFlowStepListener implements FlowStepListener
1141    {
1142    /** Field flowListener */
1143    final FlowStepListener flowStepListener;
1144    /** Field throwable */
1145    Throwable throwable;
1146
1147    private SafeFlowStepListener( FlowStepListener flowStepListener )
1148      {
1149      this.flowStepListener = flowStepListener;
1150      }
1151
1152    public void onStepStarting( FlowStep flowStep )
1153      {
1154      try
1155        {
1156        flowStepListener.onStepStarting( flowStep );
1157        }
1158      catch( Throwable throwable )
1159        {
1160        handleThrowable( throwable );
1161        }
1162      }
1163
1164    public void onStepStopping( FlowStep flowStep )
1165      {
1166      try
1167        {
1168        flowStepListener.onStepStopping( flowStep );
1169        }
1170      catch( Throwable throwable )
1171        {
1172        handleThrowable( throwable );
1173        }
1174      }
1175
1176    public void onStepCompleted( FlowStep flowStep )
1177      {
1178      try
1179        {
1180        flowStepListener.onStepCompleted( flowStep );
1181        }
1182      catch( Throwable throwable )
1183        {
1184        handleThrowable( throwable );
1185        }
1186      }
1187
1188    public void onStepRunning( FlowStep flowStep )
1189      {
1190      try
1191        {
1192        flowStepListener.onStepRunning( flowStep );
1193        }
1194      catch( Throwable throwable )
1195        {
1196        handleThrowable( throwable );
1197        }
1198      }
1199
1200    public boolean onStepThrowable( FlowStep flowStep, Throwable flowStepThrowable )
1201      {
1202      try
1203        {
1204        return flowStepListener.onStepThrowable( flowStep, flowStepThrowable );
1205        }
1206      catch( Throwable throwable )
1207        {
1208        handleThrowable( throwable );
1209        }
1210
1211      return false;
1212      }
1213
1214    private void handleThrowable( Throwable throwable )
1215      {
1216      this.throwable = throwable;
1217
1218      logWarn( String.format( "flow step listener %s threw throwable", flowStepListener ), throwable );
1219      }
1220
1221    public boolean equals( Object object )
1222      {
1223      if( object instanceof BaseFlowStep.SafeFlowStepListener )
1224        return flowStepListener.equals( ( (BaseFlowStep.SafeFlowStepListener) object ).flowStepListener );
1225
1226      return flowStepListener.equals( object );
1227      }
1228
1229    public int hashCode()
1230      {
1231      return flowStepListener.hashCode();
1232      }
1233    }
1234  }