001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner;
022
023import java.io.IOException;
024import java.io.Serializable;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.Date;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036
037import cascading.flow.Flow;
038import cascading.flow.FlowElement;
039import cascading.flow.FlowException;
040import cascading.flow.FlowNode;
041import cascading.flow.FlowProcess;
042import cascading.flow.FlowStep;
043import cascading.flow.FlowStepListener;
044import cascading.flow.planner.graph.AnnotatedGraph;
045import cascading.flow.planner.graph.ElementGraph;
046import cascading.flow.planner.graph.ElementGraphs;
047import cascading.flow.planner.process.FlowNodeGraph;
048import cascading.flow.stream.annotations.StreamMode;
049import cascading.management.CascadingServices;
050import cascading.management.state.ClientState;
051import cascading.operation.Operation;
052import cascading.pipe.Group;
053import cascading.pipe.Operator;
054import cascading.pipe.Pipe;
055import cascading.pipe.SubAssembly;
056import cascading.property.ConfigDef;
057import cascading.stats.FlowStepStats;
058import cascading.tap.Tap;
059import cascading.util.EnumMultiMap;
060import cascading.util.ProcessLogger;
061import cascading.util.Util;
062
063import static cascading.flow.planner.graph.ElementGraphs.findAllGroups;
064
065/**
066 * Class FlowStep is an internal representation of a given Job to be executed on a remote cluster. During
067 * planning, pipe assemblies are broken down into "steps" and encapsulated in this class.
068 * <p/>
069 * FlowSteps are submitted in order of dependency. If two or more steps do not share the same dependencies and all
070 * can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines the order in which
071 * all steps will be submitted for execution. The default submit priority is 5.
072 * <p/>
073 * This class is for internal use, there are no stable public methods.
074 */
075public abstract class BaseFlowStep<Config> implements FlowStep<Config>, ProcessLogger, Serializable
076  {
077  /** Field flow */
078  private transient Flow<Config> flow;
079  /** Field flowName */
080  private String flowName;
081  /** Field flowID */
082  private String flowID;
083
084  private transient Config flowStepConf;
085
086  /** Field submitPriority */
087  private int submitPriority = 5;
088
089  /** Field name */
090  String name;
091  private String id;
092  private int ordinal;
093  private Map<String, String> processAnnotations;
094
095  /** Field step listeners */
096  private List<SafeFlowStepListener> listeners;
097
098  /** Field graph */
099  private final ElementGraph elementGraph;
100
101  private FlowNodeGraph flowNodeGraph;
102
103  /** Field sources */
104  protected final Map<Tap, Set<String>> sources = new HashMap<>(); // all sources
105  /** Field sink */
106  protected final Map<Tap, Set<String>> sinks = new HashMap<>(); // all sinks
107
108  /** Field mapperTraps */
109  private final Map<String, Tap> traps = new HashMap<>();
110
111  /** Field tempSink */
112  protected Tap tempSink; // used if we need to bypass the filesystem
113
114  /** Field groups */
115  private final List<Group> groups = new ArrayList<Group>();
116
117  protected transient FlowStepStats flowStepStats;
118
119  private transient FlowStepJob<Config> flowStepJob;
120
121  // for testing
122  protected BaseFlowStep( String name, int ordinal )
123    {
124    this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number
125    setName( name );
126    this.ordinal = ordinal;
127
128    this.elementGraph = null;
129    this.flowNodeGraph = null;
130    }
131
132  protected BaseFlowStep( ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph )
133    {
134    this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number
135    this.elementGraph = elementStepGraph;
136    this.flowNodeGraph = flowNodeGraph; // TODO: verify no missing elements in the union of the node graphs
137
138    configure();
139    }
140
141  protected void configure()
142    {
143    // todo: remove once FlowMapper/FlowReducer aren't reliant
144    ElementGraphs.addSources( this, elementGraph, flowNodeGraph.getSourceTaps() );
145    ElementGraphs.addSinks( this, elementGraph, flowNodeGraph.getSinkTaps() );
146
147    addGroups( findAllGroups( elementGraph ) );
148
149    traps.putAll( flowNodeGraph.getTrapsMap() );
150    }
151
152  @Override
153  public String getID()
154    {
155    return id;
156    }
157
158  public void setOrdinal( int ordinal )
159    {
160    this.ordinal = ordinal;
161    }
162
163  @Override
164  public int getOrdinal()
165    {
166    return ordinal;
167    }
168
169  @Override
170  public String getName()
171    {
172    return name;
173    }
174
175  public void setName( String name )
176    {
177    if( name == null || name.isEmpty() )
178      throw new IllegalArgumentException( "step name may not be null or empty" );
179
180    this.name = name;
181    }
182
183  @Override
184  public Map<String, String> getProcessAnnotations()
185    {
186    if( processAnnotations == null )
187      return Collections.emptyMap();
188
189    return Collections.unmodifiableMap( processAnnotations );
190    }
191
192  @Override
193  public void addProcessAnnotation( Enum annotation )
194    {
195    if( annotation == null )
196      return;
197
198    addProcessAnnotation( annotation.getDeclaringClass().getName(), annotation.name() );
199    }
200
201  @Override
202  public void addProcessAnnotation( String key, String value )
203    {
204    if( processAnnotations == null )
205      processAnnotations = new HashMap<>();
206
207    processAnnotations.put( key, value );
208    }
209
210  public void setFlow( Flow<Config> flow )
211    {
212    this.flow = flow;
213    this.flowID = flow.getID();
214    this.flowName = flow.getName();
215    }
216
217  @Override
218  public Flow<Config> getFlow()
219    {
220    return flow;
221    }
222
223  @Override
224  public String getFlowID()
225    {
226    return flowID;
227    }
228
229  @Override
230  public String getFlowName()
231    {
232    return flowName;
233    }
234
235  protected void setFlowName( String flowName )
236    {
237    this.flowName = flowName;
238    }
239
240  @Override
241  public Config getConfig()
242    {
243    return flowStepConf;
244    }
245
246  @Override
247  public Map<Object, Object> getConfigAsProperties()
248    {
249    return Collections.emptyMap();
250    }
251
252  /**
253   * Set the initialized flowStepConf Config instance
254   *
255   * @param flowStepConf of type Config
256   */
257  protected void setConfig( Config flowStepConf )
258    {
259    this.flowStepConf = flowStepConf;
260    }
261
262  @Override
263  public String getStepDisplayName()
264    {
265    return getStepDisplayName( Util.ID_LENGTH );
266    }
267
268  protected String getStepDisplayName( int idLength )
269    {
270    if( idLength < 0 || idLength > Util.ID_LENGTH )
271      idLength = Util.ID_LENGTH;
272
273    if( idLength == 0 )
274      return String.format( "%s/%s", getFlowName(), getName() );
275
276    String flowID = getFlowID().substring( 0, idLength );
277    String stepID = getID().substring( 0, idLength );
278
279    return String.format( "[%s/%s] %s/%s", flowID, stepID, getFlowName(), getName() );
280    }
281
282  protected String getNodeDisplayName( FlowNode flowNode, int idLength )
283    {
284    if( idLength > Util.ID_LENGTH )
285      idLength = Util.ID_LENGTH;
286
287    String flowID = getFlowID().substring( 0, idLength );
288    String stepID = getID().substring( 0, idLength );
289    String nodeID = flowNode.getID().substring( 0, idLength );
290
291    return String.format( "[%s/%s/%s] %s/%s", flowID, stepID, nodeID, getFlowName(), getName() );
292    }
293
294  @Override
295  public int getSubmitPriority()
296    {
297    return submitPriority;
298    }
299
300  @Override
301  public void setSubmitPriority( int submitPriority )
302    {
303    if( submitPriority < 1 || submitPriority > 10 )
304      throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority );
305
306    this.submitPriority = submitPriority;
307    }
308
309  @Override
310  public void setFlowStepStats( FlowStepStats flowStepStats )
311    {
312    this.flowStepStats = flowStepStats;
313    }
314
315  @Override
316  public FlowStepStats getFlowStepStats()
317    {
318    return flowStepStats;
319    }
320
321  @Override
322  public ElementGraph getElementGraph()
323    {
324    return elementGraph;
325    }
326
327  protected EnumMultiMap getAnnotations()
328    {
329    return ( (AnnotatedGraph) elementGraph ).getAnnotations();
330    }
331
332  @Override
333  public FlowNodeGraph getFlowNodeGraph()
334    {
335    return flowNodeGraph;
336    }
337
338  @Override
339  public int getNumFlowNodes()
340    {
341    return flowNodeGraph.vertexSet().size();
342    }
343
344  public Set<FlowElement> getSourceElements()
345    {
346    return ElementGraphs.findSources( getElementGraph(), FlowElement.class );
347    }
348
349  public Set<FlowElement> getSinkElements()
350    {
351    return ElementGraphs.findSinks( getElementGraph(), FlowElement.class );
352    }
353
354  @Override
355  public Group getGroup()
356    {
357    if( groups.isEmpty() )
358      return null;
359
360    if( groups.size() > 1 )
361      throw new IllegalStateException( "more than one group" );
362
363    return groups.get( 0 );
364    }
365
366  @Override
367  public Collection<Group> getGroups()
368    {
369    return groups;
370    }
371
372  public void addGroups( Collection<Group> groups )
373    {
374    for( Group group : groups )
375      addGroup( group );
376    }
377
378  public void addGroup( Group group )
379    {
380    if( !groups.contains( group ) )
381      groups.add( group );
382    }
383
384  public Set<Tap> getAllAccumulatedSources()
385    {
386    return Util.narrowSet( Tap.class, getFlowNodeGraph().getFlowElementsFor( StreamMode.Accumulated ) );
387    }
388
389  public void addSource( String name, Tap source )
390    {
391    if( !sources.containsKey( source ) )
392      sources.put( source, new HashSet<String>() );
393
394    sources.get( source ).add( name );
395    }
396
397  public void addSink( String name, Tap sink )
398    {
399    if( !sinks.containsKey( sink ) )
400      sinks.put( sink, new HashSet<String>() );
401
402    sinks.get( sink ).add( name );
403    }
404
405  @Override
406  public Set<Tap> getSourceTaps()
407    {
408    return Collections.unmodifiableSet( new HashSet<Tap>( sources.keySet() ) );
409    }
410
411  @Override
412  public Set<Tap> getSinkTaps()
413    {
414    return Collections.unmodifiableSet( new HashSet<Tap>( sinks.keySet() ) );
415    }
416
417  @Override
418  public Tap getSink()
419    {
420    if( sinks.size() != 1 )
421      throw new IllegalStateException( "more than one sink" );
422
423    return sinks.keySet().iterator().next();
424    }
425
426  @Override
427  public Set<String> getSourceName( Tap source )
428    {
429    return Collections.unmodifiableSet( sources.get( source ) );
430    }
431
432  @Override
433  public Set<String> getSinkName( Tap sink )
434    {
435    return Collections.unmodifiableSet( sinks.get( sink ) );
436    }
437
438  @Override
439  public Tap getSourceWith( String identifier )
440    {
441    for( Tap tap : sources.keySet() )
442      {
443      if( tap.getIdentifier().equalsIgnoreCase( identifier ) )
444        return tap;
445      }
446
447    return null;
448    }
449
450  @Override
451  public Tap getSinkWith( String identifier )
452    {
453    for( Tap tap : sinks.keySet() )
454      {
455      if( tap.getIdentifier().equalsIgnoreCase( identifier ) )
456        return tap;
457      }
458
459    return null;
460    }
461
462  @Override
463  public Map<String, Tap> getTrapMap()
464    {
465    return traps;
466    }
467
468  @Override
469  public Set<Tap> getTraps()
470    {
471    return Collections.unmodifiableSet( new HashSet<Tap>( traps.values() ) );
472    }
473
474  public Tap getTrap( String name )
475    {
476    return getTrapMap().get( name );
477    }
478
479  boolean allSourcesExist() throws IOException
480    {
481    for( Tap tap : sources.keySet() )
482      {
483      if( !tap.resourceExists( getConfig() ) )
484        return false;
485      }
486
487    return true;
488    }
489
490  boolean areSourcesNewer( long sinkModified ) throws IOException
491    {
492    Config config = getConfig();
493    Iterator<Tap> values = sources.keySet().iterator();
494
495    long sourceModified = 0;
496
497    try
498      {
499      sourceModified = Util.getSourceModified( config, values, sinkModified );
500
501      if( sinkModified < sourceModified )
502        return true;
503
504      return false;
505      }
506    finally
507      {
508      if( isInfoEnabled() )
509        logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all
510      }
511    }
512
513  long getSinkModified() throws IOException
514    {
515    long sinkModified = Util.getSinkModified( getConfig(), sinks.keySet() );
516
517    if( isInfoEnabled() )
518      {
519      if( sinkModified == -1L )
520        logInfo( "at least one sink is marked for delete" );
521      if( sinkModified == 0L )
522        logInfo( "at least one sink does not exist" );
523      else
524        logInfo( "sink oldest modified date: " + new Date( sinkModified ) );
525      }
526
527    return sinkModified;
528    }
529
530  protected Throwable prepareResources()
531    {
532    Throwable throwable = prepareResources( getSourceTaps(), false );
533
534    if( throwable == null )
535      throwable = prepareResources( getSinkTaps(), true );
536
537    if( throwable == null )
538      throwable = prepareResources( getTraps(), true );
539
540    return throwable;
541    }
542
543  private Throwable prepareResources( Collection<Tap> taps, boolean forWrite )
544    {
545    Throwable throwable = null;
546
547    for( Tap tap : taps )
548      {
549      throwable = prepareResource( tap, forWrite );
550
551      if( throwable != null )
552        break;
553      }
554
555    return throwable;
556    }
557
558  private Throwable prepareResource( Tap tap, boolean forWrite )
559    {
560    Throwable throwable = null;
561
562    try
563      {
564      boolean result;
565
566      if( forWrite )
567        result = tap.prepareResourceForWrite( getConfig() );
568      else
569        result = tap.prepareResourceForRead( getConfig() );
570
571      if( !result )
572        {
573        String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) );
574
575        logError( message );
576
577        throwable = new FlowException( message );
578        }
579      }
580    catch( Throwable exception )
581      {
582      String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) );
583
584      logError( message, exception );
585
586      throwable = new FlowException( message, exception );
587      }
588
589    return throwable;
590    }
591
592  protected Throwable commitSinks()
593    {
594    Throwable throwable = null;
595
596    for( Tap tap : sinks.keySet() )
597      {
598      if( throwable != null )
599        rollbackResource( tap );
600      else
601        throwable = commitResource( tap );
602      }
603
604    return throwable;
605    }
606
607  private Throwable commitResource( Tap tap )
608    {
609    Throwable throwable = null;
610
611    try
612      {
613      if( !tap.commitResource( getConfig() ) )
614        {
615        String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
616
617        logError( message );
618
619        throwable = new FlowException( message );
620        }
621      }
622    catch( Throwable exception )
623      {
624      String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
625
626      logError( message, exception );
627
628      throwable = new FlowException( message, exception );
629      }
630
631    return throwable;
632    }
633
634  private Throwable rollbackResource( Tap tap )
635    {
636    Throwable throwable = null;
637
638    try
639      {
640      if( !tap.rollbackResource( getConfig() ) )
641        {
642        String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() );
643
644        logError( message );
645
646        throwable = new FlowException( message );
647        }
648      }
649    catch( Throwable exception )
650      {
651      String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() );
652
653      logError( message, exception );
654
655      throwable = new FlowException( message, exception );
656      }
657
658    return throwable;
659    }
660
661  protected Throwable rollbackSinks()
662    {
663    Throwable throwable = null;
664
665    for( Tap tap : sinks.keySet() )
666      {
667      if( throwable != null )
668        rollbackResource( tap );
669      else
670        throwable = rollbackResource( tap );
671      }
672
673    return throwable;
674    }
675
676  /**
677   * Public for testing.
678   *
679   * @param flowProcess
680   * @param parentConfig
681   * @return
682   */
683  public abstract Config createInitializedConfig( FlowProcess<Config> flowProcess, Config parentConfig );
684
685  /**
686   * Method getPreviousScopes returns the previous Scope instances. If the flowElement is a Group (specifically a CoGroup),
687   * there will be more than one instance.
688   *
689   * @param flowElement of type FlowElement
690   * @return Set<Scope>
691   */
692  public Set<Scope> getPreviousScopes( FlowElement flowElement )
693    {
694    return getElementGraph().incomingEdgesOf( flowElement );
695    }
696
697  /**
698   * Method getNextScope returns the next Scope instance in the graph. There will always only be one next.
699   *
700   * @param flowElement of type FlowElement
701   * @return Scope
702   */
703  public Scope getNextScope( FlowElement flowElement )
704    {
705    Set<Scope> set = getElementGraph().outgoingEdgesOf( flowElement );
706
707    if( set.size() != 1 )
708      throw new IllegalStateException( "should only be one scope after current flow element: " + flowElement + " found: " + set.size() );
709
710    return set.iterator().next();
711    }
712
713  public FlowElement getNextFlowElement( Scope scope )
714    {
715    return getElementGraph().getEdgeTarget( scope );
716    }
717
718  public Collection<Operation> getAllOperations()
719    {
720    Set<FlowElement> vertices = getElementGraph().vertexSet();
721    List<Operation> operations = new ArrayList<Operation>(); // operations impl equals, so two instance may be the same
722
723    for( FlowElement vertex : vertices )
724      {
725      if( vertex instanceof Operator )
726        operations.add( ( (Operator) vertex ).getOperation() );
727      }
728
729    return operations;
730    }
731
732  @Override
733  public boolean containsPipeNamed( String pipeName )
734    {
735    Set<FlowElement> vertices = getElementGraph().vertexSet();
736
737    for( FlowElement vertex : vertices )
738      {
739      if( vertex instanceof Pipe && ( (Pipe) vertex ).getName().equals( pipeName ) )
740        return true;
741      }
742
743    return false;
744    }
745
746  public void clean()
747    {
748    // use step config by default
749    clean( getConfig() );
750    }
751
752  public abstract void clean( Config config );
753
754  List<SafeFlowStepListener> getListeners()
755    {
756    if( listeners == null )
757      listeners = new LinkedList<SafeFlowStepListener>();
758
759    return listeners;
760    }
761
762  @Override
763  public boolean hasListeners()
764    {
765    return listeners != null && !listeners.isEmpty();
766    }
767
768  @Override
769  public void addListener( FlowStepListener flowStepListener )
770    {
771    getListeners().add( new SafeFlowStepListener( flowStepListener ) );
772    }
773
774  @Override
775  public boolean removeListener( FlowStepListener flowStepListener )
776    {
777    return getListeners().remove( new SafeFlowStepListener( flowStepListener ) );
778    }
779
780  protected void fireOnCompleted()
781    {
782    if( hasListeners() )
783      {
784      if( isDebugEnabled() )
785        logDebug( "firing onCompleted event: " + getListeners().size() );
786
787      for( Object flowStepListener : getListeners() )
788        ( (FlowStepListener) flowStepListener ).onStepCompleted( this );
789      }
790    }
791
792  protected void fireOnThrowable( Throwable throwable )
793    {
794    if( hasListeners() )
795      {
796      if( isDebugEnabled() )
797        logDebug( "firing onThrowable event: " + getListeners().size() );
798
799      for( Object flowStepListener : getListeners() )
800        ( (FlowStepListener) flowStepListener ).onStepThrowable( this, throwable );
801      }
802    }
803
804  protected void fireOnStopping()
805    {
806    if( hasListeners() )
807      {
808      if( isDebugEnabled() )
809        logDebug( "firing onStopping event: " + getListeners() );
810
811      for( Object flowStepListener : getListeners() )
812        ( (FlowStepListener) flowStepListener ).onStepStopping( this );
813      }
814    }
815
816  protected void fireOnStarting()
817    {
818    if( hasListeners() )
819      {
820      if( isDebugEnabled() )
821        logDebug( "firing onStarting event: " + getListeners().size() );
822
823      for( Object flowStepListener : getListeners() )
824        ( (FlowStepListener) flowStepListener ).onStepStarting( this );
825      }
826    }
827
828  protected void fireOnRunning()
829    {
830    if( hasListeners() )
831      {
832      if( isDebugEnabled() )
833        logDebug( "firing onRunning event: " + getListeners().size() );
834
835      for( Object flowStepListener : getListeners() )
836        ( (FlowStepListener) flowStepListener ).onStepRunning( this );
837      }
838    }
839
840  protected ClientState createClientState( FlowProcess flowProcess )
841    {
842    CascadingServices services = flowProcess.getCurrentSession().getCascadingServices();
843
844    return services.createClientState( getID() );
845    }
846
847  public FlowStepJob<Config> getFlowStepJob()
848    {
849    return flowStepJob;
850    }
851
852  public FlowStepJob<Config> getCreateFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig )
853    {
854    if( flowStepJob != null )
855      return flowStepJob;
856
857    if( flowProcess == null )
858      return null;
859
860    Config initializedConfig = createInitializedConfig( flowProcess, parentConfig );
861
862    setConfig( initializedConfig );
863
864    ClientState clientState = createClientState( flowProcess );
865
866    flowStepJob = createFlowStepJob( clientState, flowProcess, initializedConfig );
867
868    return flowStepJob;
869    }
870
871  protected abstract FlowStepJob createFlowStepJob( ClientState clientState, FlowProcess<Config> flowProcess, Config initializedStepConfig );
872
873  protected void initConfFromNodeConfigDef( ElementGraph nodeElementGraph, ConfigDef.Setter setter )
874    {
875    nodeElementGraph = ElementGraphs.asExtentMaskedSubGraph( nodeElementGraph );
876
877    ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() );
878
879    // applies each mode in order, topologically
880    for( ConfigDef.Mode mode : ConfigDef.Mode.values() )
881      {
882      Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( nodeElementGraph );
883
884      while( iterator.hasNext() )
885        {
886        FlowElement element = iterator.next();
887
888        while( element != null )
889          {
890          // intentionally skip any element that spans downstream nodes, like a GroupBy
891          // this way GroupBy is applied on the inbound side (where partitioning happens)
892          // not the outbound side.
893          // parent sub-assemblies (like Unique) will be applied if they have leading Pipes to the current spanning Pipe
894          if( elementSpansDownStream( stepElementGraph, nodeElementGraph, element ) )
895            {
896            element = null;
897            continue;
898            }
899
900          if( element.hasNodeConfigDef() )
901            element.getNodeConfigDef().apply( mode, setter );
902
903          // walk up the sub-assembly parent hierarchy
904          if( element instanceof Pipe )
905            element = ( (Pipe) element ).getParent();
906          else
907            element = null;
908          }
909        }
910      }
911    }
912
913  private boolean elementSpansDownStream( ElementGraph stepElementGraph, ElementGraph nodeElementGraph, FlowElement element )
914    {
915    boolean spansNodes = !( element instanceof SubAssembly );
916
917    if( spansNodes )
918      spansNodes = nodeElementGraph.outDegreeOf( element ) == 0 && stepElementGraph.outDegreeOf( element ) > 0;
919
920    return spansNodes;
921    }
922
923  protected void initConfFromStepConfigDef( ConfigDef.Setter setter )
924    {
925    ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() );
926
927    // applies each mode in order, topologically
928    for( ConfigDef.Mode mode : ConfigDef.Mode.values() )
929      {
930      Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( stepElementGraph );
931
932      while( iterator.hasNext() )
933        {
934        FlowElement element = iterator.next();
935
936        while( element != null )
937          {
938          if( element.hasStepConfigDef() )
939            element.getStepConfigDef().apply( mode, setter );
940
941          // walk up the sub-assembly parent hierarchy
942          if( element instanceof Pipe )
943            element = ( (Pipe) element ).getParent();
944          else
945            element = null;
946          }
947        }
948      }
949    }
950
951  @Override
952  public boolean equals( Object object )
953    {
954    if( this == object )
955      return true;
956    if( object == null || getClass() != object.getClass() )
957      return false;
958
959    BaseFlowStep flowStep = (BaseFlowStep) object;
960
961    if( id != null ? !id.equals( flowStep.id ) : flowStep.id != null )
962      return false;
963
964    return true;
965    }
966
967  @Override
968  public int hashCode()
969    {
970    return id != null ? id.hashCode() : 0;
971    }
972
973  @Override
974  public String toString()
975    {
976    StringBuffer buffer = new StringBuffer();
977
978    buffer.append( getClass().getSimpleName() );
979    buffer.append( "[name: " ).append( getName() ).append( "]" );
980
981    return buffer.toString();
982    }
983
984  @Override
985  public final boolean isInfoEnabled()
986    {
987    return getLogger().isInfoEnabled();
988    }
989
990  private ProcessLogger getLogger()
991    {
992    if( flow != null && flow instanceof ProcessLogger )
993      return (ProcessLogger) flow;
994
995    return ProcessLogger.NULL;
996    }
997
998  @Override
999  public final boolean isDebugEnabled()
1000    {
1001    return ( getLogger() ).isDebugEnabled();
1002    }
1003
1004  @Override
1005  public void logDebug( String message, Object... arguments )
1006    {
1007    getLogger().logDebug( message, arguments );
1008    }
1009
1010  @Override
1011  public void logInfo( String message, Object... arguments )
1012    {
1013    getLogger().logInfo( message, arguments );
1014    }
1015
1016  @Override
1017  public void logWarn( String message )
1018    {
1019    getLogger().logWarn( message );
1020    }
1021
1022  @Override
1023  public void logWarn( String message, Throwable throwable )
1024    {
1025    getLogger().logWarn( message, throwable );
1026    }
1027
1028  @Override
1029  public void logWarn( String message, Object... arguments )
1030    {
1031    getLogger().logWarn( message, arguments );
1032    }
1033
1034  @Override
1035  public void logError( String message, Object... arguments )
1036    {
1037    getLogger().logError( message, arguments );
1038    }
1039
1040  @Override
1041  public void logError( String message, Throwable throwable )
1042    {
1043    getLogger().logError( message, throwable );
1044    }
1045
1046  /**
1047   * Class SafeFlowStepListener safely calls a wrapped FlowStepListener.
1048   * <p/>
1049   * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
1050   * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method
1051   * which in turn is run in a new Thread.
1052   */
1053  private class SafeFlowStepListener implements FlowStepListener
1054    {
1055    /** Field flowListener */
1056    final FlowStepListener flowStepListener;
1057    /** Field throwable */
1058    Throwable throwable;
1059
1060    private SafeFlowStepListener( FlowStepListener flowStepListener )
1061      {
1062      this.flowStepListener = flowStepListener;
1063      }
1064
1065    public void onStepStarting( FlowStep flowStep )
1066      {
1067      try
1068        {
1069        flowStepListener.onStepStarting( flowStep );
1070        }
1071      catch( Throwable throwable )
1072        {
1073        handleThrowable( throwable );
1074        }
1075      }
1076
1077    public void onStepStopping( FlowStep flowStep )
1078      {
1079      try
1080        {
1081        flowStepListener.onStepStopping( flowStep );
1082        }
1083      catch( Throwable throwable )
1084        {
1085        handleThrowable( throwable );
1086        }
1087      }
1088
1089    public void onStepCompleted( FlowStep flowStep )
1090      {
1091      try
1092        {
1093        flowStepListener.onStepCompleted( flowStep );
1094        }
1095      catch( Throwable throwable )
1096        {
1097        handleThrowable( throwable );
1098        }
1099      }
1100
1101    public void onStepRunning( FlowStep flowStep )
1102      {
1103      try
1104        {
1105        flowStepListener.onStepRunning( flowStep );
1106        }
1107      catch( Throwable throwable )
1108        {
1109        handleThrowable( throwable );
1110        }
1111      }
1112
1113    public boolean onStepThrowable( FlowStep flowStep, Throwable flowStepThrowable )
1114      {
1115      try
1116        {
1117        return flowStepListener.onStepThrowable( flowStep, flowStepThrowable );
1118        }
1119      catch( Throwable throwable )
1120        {
1121        handleThrowable( throwable );
1122        }
1123
1124      return false;
1125      }
1126
1127    private void handleThrowable( Throwable throwable )
1128      {
1129      this.throwable = throwable;
1130
1131      logWarn( String.format( "flow step listener %s threw throwable", flowStepListener ), throwable );
1132      }
1133
1134    public boolean equals( Object object )
1135      {
1136      if( object instanceof BaseFlowStep.SafeFlowStepListener )
1137        return flowStepListener.equals( ( (BaseFlowStep.SafeFlowStepListener) object ).flowStepListener );
1138
1139      return flowStepListener.equals( object );
1140      }
1141
1142    public int hashCode()
1143      {
1144      return flowStepListener.hashCode();
1145      }
1146    }
1147  }