001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.Date;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.Iterator;
031import java.util.LinkedHashMap;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Map;
035import java.util.Properties;
036import java.util.Set;
037import java.util.concurrent.Callable;
038import java.util.concurrent.Future;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.locks.ReentrantLock;
041
042import cascading.CascadingException;
043import cascading.cascade.Cascade;
044import cascading.flow.planner.BaseFlowNode;
045import cascading.flow.planner.BaseFlowStep;
046import cascading.flow.planner.FlowStepJob;
047import cascading.flow.planner.PlannerInfo;
048import cascading.flow.planner.PlatformInfo;
049import cascading.flow.planner.graph.ElementGraphs;
050import cascading.flow.planner.graph.FlowElementGraph;
051import cascading.flow.planner.process.FlowStepGraph;
052import cascading.flow.planner.process.ProcessGraphs;
053import cascading.management.CascadingServices;
054import cascading.management.UnitOfWorkExecutorStrategy;
055import cascading.management.UnitOfWorkSpawnStrategy;
056import cascading.management.state.ClientState;
057import cascading.property.AppProps;
058import cascading.property.PropertyUtil;
059import cascading.stats.FlowStats;
060import cascading.tap.Tap;
061import cascading.tuple.Fields;
062import cascading.tuple.TupleEntryCollector;
063import cascading.tuple.TupleEntryIterator;
064import cascading.util.ProcessLogger;
065import cascading.util.ShutdownUtil;
066import cascading.util.Update;
067import cascading.util.Util;
068import cascading.util.Version;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071import riffle.process.DependencyIncoming;
072import riffle.process.DependencyOutgoing;
073import riffle.process.ProcessCleanup;
074import riffle.process.ProcessComplete;
075import riffle.process.ProcessPrepare;
076import riffle.process.ProcessStart;
077import riffle.process.ProcessStop;
078
079import static cascading.util.Util.formatDurationFromMillis;
080
081@riffle.process.Process
082public abstract class BaseFlow<Config> implements Flow<Config>, ProcessLogger
083  {
084  /** Field LOG */
085  private static final Logger LOG = LoggerFactory.getLogger( Flow.class ); // wrapped by ProcessLogger interface methods
086  private static final int LOG_FLOW_NAME_MAX = 25;
087
088  private PlannerInfo plannerInfo = PlannerInfo.NULL;
089  protected PlatformInfo platformInfo = PlatformInfo.NULL;
090
091  /** Field id */
092  private String id;
093  /** Field name */
094  private String name;
095  /** Fields runID */
096  private String runID;
097  /** Fields classpath */
098  private List<String> classPath; // may remain null
099  /** Field tags */
100  private String tags;
101  /** Field listeners */
102  private List<SafeFlowListener> listeners;
103  /** Field skipStrategy */
104  private FlowSkipStrategy flowSkipStrategy = new FlowSkipIfSinkNotStale();
105  /** Field flowStats */
106  protected FlowStats flowStats; // don't use a listener to set values
107  /** Field sources */
108  protected Map<String, Tap> sources = Collections.emptyMap();
109  /** Field sinks */
110  protected Map<String, Tap> sinks = Collections.emptyMap();
111  /** Field traps */
112  private Map<String, Tap> traps = Collections.emptyMap();
113  /** Field checkpoints */
114  private Map<String, Tap> checkpoints = Collections.emptyMap();
115  /** Field stopJobsOnExit */
116  protected boolean stopJobsOnExit = true;
117  /** Field submitPriority */
118  private int submitPriority = 5;
119
120  /** Field stepGraph */
121  protected FlowStepGraph flowStepGraph;
122  /** Field thread */
123  protected transient Thread thread;
124  /** Field throwable */
125  private Throwable throwable;
126  /** Field stop */
127  protected boolean stop;
128
129  /** Field flowCanonicalHash */
130  protected String flowCanonicalHash;
131  /** Field flowElementGraph */
132  protected FlowElementGraph flowElementGraph; // only used for documentation purposes
133
134  private transient CascadingServices cascadingServices;
135
136  private FlowStepStrategy<Config> flowStepStrategy = null;
137  /** Field steps */
138  protected transient List<FlowStep<Config>> steps;
139  /** Field jobsMap */
140  private transient Map<String, FlowStepJob<Config>> jobsMap;
141  private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy();
142
143  private transient ReentrantLock stopLock = new ReentrantLock( true );
144  protected ShutdownUtil.Hook shutdownHook;
145
146  protected HashMap<String, String> flowDescriptor;
147
148  /**
149   * Returns property stopJobsOnExit.
150   *
151   * @param properties of type Map
152   * @return a boolean
153   */
154  static boolean getStopJobsOnExit( Map<Object, Object> properties )
155    {
156    return Boolean.parseBoolean( PropertyUtil.getProperty( properties, FlowProps.STOP_JOBS_ON_EXIT, "true" ) );
157    }
158
159  /** Used for testing. */
160  protected BaseFlow()
161    {
162    this.name = "NA";
163    this.flowStats = createPrepareFlowStats();
164    }
165
166  /**
167   * Does not initialize stats
168   *
169   * @param name
170   */
171  protected BaseFlow( PlatformInfo platformInfo, String name )
172    {
173    if( platformInfo != null )
174      this.platformInfo = platformInfo;
175
176    this.name = name;
177    }
178
179  protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name, Map<String, String> flowDescriptor )
180    {
181    if( platformInfo != null )
182      this.platformInfo = platformInfo;
183
184    this.name = name;
185
186    if( flowDescriptor != null )
187      this.flowDescriptor = new LinkedHashMap<>( flowDescriptor );
188
189    addSessionProperties( properties );
190    initConfig( properties, defaultConfig );
191    }
192
193  protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, FlowDef flowDef )
194    {
195    properties = PropertyUtil.asFlatMap( properties );
196
197    if( platformInfo != null )
198      this.platformInfo = platformInfo;
199
200    this.name = flowDef.getName();
201    this.tags = flowDef.getTags();
202    this.runID = flowDef.getRunID();
203    this.classPath = flowDef.getClassPath();
204
205    if( !flowDef.getFlowDescriptor().isEmpty() )
206      this.flowDescriptor = new LinkedHashMap<>( flowDef.getFlowDescriptor() );
207
208    addSessionProperties( properties );
209    initConfig( properties, defaultConfig );
210    setSources( flowDef.getSourcesCopy() );
211    setSinks( flowDef.getSinksCopy() );
212    setTraps( flowDef.getTrapsCopy() );
213    setCheckpoints( flowDef.getCheckpointsCopy() );
214    initFromTaps();
215
216    retrieveSourceFields();
217    retrieveSinkFields();
218    }
219
220  public void setPlannerInfo( PlannerInfo plannerInfo )
221    {
222    this.plannerInfo = plannerInfo;
223    }
224
225  @Override
226  public PlannerInfo getPlannerInfo()
227    {
228    return plannerInfo;
229    }
230
231  @Override
232  public PlatformInfo getPlatformInfo()
233    {
234    return platformInfo;
235    }
236
237  public void initialize( FlowElementGraph flowElementGraph, FlowStepGraph flowStepGraph )
238    {
239    addPlannerProperties();
240    this.flowElementGraph = flowElementGraph;
241    this.flowStepGraph = flowStepGraph;
242
243    initSteps();
244
245    this.flowStats = createPrepareFlowStats();
246
247    initializeNewJobsMap();
248
249    initializeChildStats();
250    }
251
252  public FlowElementGraph updateSchemes( FlowElementGraph pipeGraph )
253    {
254    presentSourceFields( pipeGraph );
255
256    presentSinkFields( pipeGraph );
257
258    return new FlowElementGraph( pipeGraph );
259    }
260
261  /** Force a Scheme to fetch any fields from a meta-data store */
262  protected void retrieveSourceFields()
263    {
264    for( Tap tap : sources.values() )
265      tap.retrieveSourceFields( getFlowProcess() );
266    }
267
268  /**
269   * Present the current resolved fields for the Tap
270   *
271   * @param pipeGraph
272   */
273  protected void presentSourceFields( FlowElementGraph pipeGraph )
274    {
275    for( Tap tap : sources.values() )
276      {
277      if( pipeGraph.containsVertex( tap ) )
278        tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
279      }
280
281    for( Tap tap : checkpoints.values() )
282      {
283      if( pipeGraph.containsVertex( tap ) )
284        tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
285      }
286    }
287
288  /** Force a Scheme to fetch any fields from a meta-data store */
289  protected void retrieveSinkFields()
290    {
291    for( Tap tap : sinks.values() )
292      tap.retrieveSinkFields( getFlowProcess() );
293    }
294
295  /**
296   * Present the current resolved fields for the Tap
297   *
298   * @param pipeGraph
299   */
300  protected void presentSinkFields( FlowElementGraph pipeGraph )
301    {
302    for( Tap tap : sinks.values() )
303      {
304      if( pipeGraph.containsVertex( tap ) )
305        tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
306      }
307
308    for( Tap tap : checkpoints.values() )
309      {
310      if( pipeGraph.containsVertex( tap ) )
311        tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
312      }
313    }
314
315  protected Fields getFieldsFor( FlowElementGraph pipeGraph, Tap tap )
316    {
317    return pipeGraph.outgoingEdgesOf( tap ).iterator().next().getOutValuesFields();
318    }
319
320  protected void addSessionProperties( Map<Object, Object> properties )
321    {
322    if( properties == null )
323      return;
324
325    PropertyUtil.setProperty( properties, CASCADING_FLOW_ID, getID() );
326    PropertyUtil.setProperty( properties, "cascading.flow.tags", getTags() );
327    AppProps.setApplicationID( properties );
328    PropertyUtil.setProperty( properties, "cascading.app.name", makeAppName( properties ) );
329    PropertyUtil.setProperty( properties, "cascading.app.version", makeAppVersion( properties ) );
330    }
331
332  protected void addPlannerProperties()
333    {
334    setConfigProperty( getConfig(), "cascading.flow.planner", getPlannerInfo().name );
335    setConfigProperty( getConfig(), "cascading.flow.platform", getPlannerInfo().platform );
336    setConfigProperty( getConfig(), "cascading.flow.registry", getPlannerInfo().registry );
337    }
338
339  private String makeAppName( Map<Object, Object> properties )
340    {
341    if( properties == null )
342      return null;
343
344    String name = AppProps.getApplicationName( properties );
345
346    if( name != null )
347      return name;
348
349    return Util.findName( AppProps.getApplicationJarPath( properties ) );
350    }
351
352  private String makeAppVersion( Map<Object, Object> properties )
353    {
354    if( properties == null )
355      return null;
356
357    String name = AppProps.getApplicationVersion( properties );
358
359    if( name != null )
360      return name;
361
362    return Util.findVersion( AppProps.getApplicationJarPath( properties ) );
363    }
364
365  protected FlowStats createPrepareFlowStats()
366    {
367    FlowStats flowStats = createFlowStats();
368
369    flowStats.prepare();
370    flowStats.markPending();
371
372    return flowStats;
373    }
374
375  protected FlowStats createFlowStats()
376    {
377    return new FlowStats( this, getClientState() );
378    }
379
380  public CascadingServices getCascadingServices()
381    {
382    if( cascadingServices == null )
383      cascadingServices = new CascadingServices( getConfigAsProperties() );
384
385    return cascadingServices;
386    }
387
388  protected ClientState getClientState()
389    {
390    return getFlowSession().getCascadingServices().createClientState( getID() );
391    }
392
393  protected void initSteps()
394    {
395    if( flowStepGraph == null )
396      return;
397
398    Set<FlowStep> flowSteps = flowStepGraph.vertexSet();
399
400    for( FlowStep flowStep : flowSteps )
401      {
402      ( (BaseFlowStep) flowStep ).setFlow( this );
403
404      Set<FlowNode> flowNodes = flowStep.getFlowNodeGraph().vertexSet();
405
406      for( FlowNode flowNode : flowNodes )
407        ( (BaseFlowNode) flowNode ).setFlowStep( flowStep );
408      }
409    }
410
411  private void initFromTaps()
412    {
413    initFromTaps( sources );
414    initFromTaps( sinks );
415    initFromTaps( traps );
416    }
417
418  private void initFromTaps( Map<String, Tap> taps )
419    {
420    for( Tap tap : taps.values() )
421      tap.flowConfInit( this );
422    }
423
424  @Override
425  public String getName()
426    {
427    return name;
428    }
429
430  protected void setName( String name )
431    {
432    this.name = name;
433    }
434
435  @Override
436  public String getID()
437    {
438    if( id == null )
439      id = Util.createUniqueID();
440
441    return id;
442    }
443
444  @Override
445  public String getTags()
446    {
447    return tags;
448    }
449
450  @Override
451  public int getSubmitPriority()
452    {
453    return submitPriority;
454    }
455
456  @Override
457  public void setSubmitPriority( int submitPriority )
458    {
459    if( submitPriority < 1 || submitPriority > 10 )
460      throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority );
461
462    this.submitPriority = submitPriority;
463    }
464
465  /**
466   * The hash value can be used to determine if two unique Flow instances performed the same work.
467   * <p/>
468   * The source and sink taps are not relevant to the hash.
469   *
470   * @return a String value
471   */
472  public String getFlowCanonicalHash()
473    {
474    if( flowCanonicalHash != null || flowElementGraph == null )
475      return flowCanonicalHash;
476
477    // synchronize on flowElementGraph to prevent duplicate hash creation - can be high overhead
478    // and to prevent deadlocks if the DocumentService calls back into the flow when transitioning
479    // into a running state
480    synchronized( flowElementGraph )
481      {
482      flowCanonicalHash = createFlowCanonicalHash( flowElementGraph );
483      }
484
485    return flowCanonicalHash;
486    }
487
488  // allow for sub-class overrides
489  protected String createFlowCanonicalHash( FlowElementGraph flowElementGraph )
490    {
491    if( flowElementGraph == null )
492      return null;
493
494    return ElementGraphs.canonicalHash( flowElementGraph );
495    }
496
497  FlowElementGraph getFlowElementGraph()
498    {
499    return flowElementGraph;
500    }
501
502  FlowStepGraph getFlowStepGraph()
503    {
504    return flowStepGraph;
505    }
506
507  protected void setSources( Map<String, Tap> sources )
508    {
509    if( sources == null )
510      return;
511
512    addListeners( sources.values() );
513    this.sources = sources;
514    }
515
516  protected void setSinks( Map<String, Tap> sinks )
517    {
518    if( sinks == null )
519      return;
520
521    addListeners( sinks.values() );
522    this.sinks = sinks;
523    }
524
525  protected void setTraps( Map<String, Tap> traps )
526    {
527    addListeners( traps.values() );
528    this.traps = traps;
529    }
530
531  protected void setCheckpoints( Map<String, Tap> checkpoints )
532    {
533    addListeners( checkpoints.values() );
534    this.checkpoints = checkpoints;
535    }
536
537  protected void setFlowStepGraph( FlowStepGraph flowStepGraph )
538    {
539    this.flowStepGraph = flowStepGraph;
540    }
541
542  /**
543   * This method creates a new internal Config with the parentConfig as defaults using the properties to override
544   * the defaults.
545   *
546   * @param properties   of type Map
547   * @param parentConfig of type Config
548   */
549  protected abstract void initConfig( Map<Object, Object> properties, Config parentConfig );
550
551  public Config createConfig( Map<Object, Object> properties, Config defaultConfig )
552    {
553    Config config = newConfig( defaultConfig );
554
555    if( properties == null )
556      return config;
557
558    Set<Object> keys = new HashSet<>( properties.keySet() );
559
560    // keys will only be grabbed if both key/value are String, so keep orig keys
561    if( properties instanceof Properties )
562      keys.addAll( ( (Properties) properties ).stringPropertyNames() );
563
564    for( Object key : keys )
565      {
566      Object value = properties.get( key );
567
568      if( value == null && properties instanceof Properties && key instanceof String )
569        value = ( (Properties) properties ).getProperty( (String) key );
570
571      if( value == null ) // don't stuff null values
572        continue;
573
574      setConfigProperty( config, key, value );
575      }
576
577    return config;
578    }
579
580  protected abstract void setConfigProperty( Config config, Object key, Object value );
581
582  protected abstract Config newConfig( Config defaultConfig );
583
584  protected void initFromProperties( Map<Object, Object> properties )
585    {
586    stopJobsOnExit = getStopJobsOnExit( properties );
587    }
588
589  public FlowSession getFlowSession()
590    {
591    return new FlowSession( getCascadingServices() );
592    }
593
594  @Override
595  public FlowStats getFlowStats()
596    {
597    return flowStats;
598    }
599
600  @Override
601  public Map<String, String> getFlowDescriptor()
602    {
603    if( flowDescriptor == null )
604      return Collections.emptyMap();
605
606    return Collections.unmodifiableMap( flowDescriptor );
607    }
608
609  @Override
610  public FlowStats getStats()
611    {
612    return getFlowStats();
613    }
614
615  void addListeners( Collection listeners )
616    {
617    for( Object listener : listeners )
618      {
619      if( listener instanceof FlowListener )
620        addListener( (FlowListener) listener );
621      }
622    }
623
624  List<SafeFlowListener> getListeners()
625    {
626    if( listeners == null )
627      listeners = new LinkedList<SafeFlowListener>();
628
629    return listeners;
630    }
631
632  @Override
633  public boolean hasListeners()
634    {
635    return listeners != null && !listeners.isEmpty();
636    }
637
638  @Override
639  public void addListener( FlowListener flowListener )
640    {
641    getListeners().add( new SafeFlowListener( flowListener ) );
642    }
643
644  @Override
645  public boolean removeListener( FlowListener flowListener )
646    {
647    return getListeners().remove( new SafeFlowListener( flowListener ) );
648    }
649
650  @Override
651  public boolean hasStepListeners()
652    {
653    boolean hasStepListeners = false;
654
655    for( FlowStep step : getFlowSteps() )
656      hasStepListeners |= step.hasListeners();
657
658    return hasStepListeners;
659    }
660
661  @Override
662  public void addStepListener( FlowStepListener flowStepListener )
663    {
664    for( FlowStep step : getFlowSteps() )
665      step.addListener( flowStepListener );
666    }
667
668  @Override
669  public boolean removeStepListener( FlowStepListener flowStepListener )
670    {
671    boolean listenerRemoved = true;
672
673    for( FlowStep step : getFlowSteps() )
674      listenerRemoved &= step.removeListener( flowStepListener );
675
676    return listenerRemoved;
677    }
678
679  @Override
680  public Map<String, Tap> getSources()
681    {
682    return Collections.unmodifiableMap( sources );
683    }
684
685  @Override
686  public List<String> getSourceNames()
687    {
688    return new ArrayList<String>( sources.keySet() );
689    }
690
691  @Override
692  public Tap getSource( String name )
693    {
694    return sources.get( name );
695    }
696
697  @Override
698  @DependencyIncoming
699  public Collection<Tap> getSourcesCollection()
700    {
701    return getSources().values();
702    }
703
704  @Override
705  public Map<String, Tap> getSinks()
706    {
707    return Collections.unmodifiableMap( sinks );
708    }
709
710  @Override
711  public List<String> getSinkNames()
712    {
713    return new ArrayList<String>( sinks.keySet() );
714    }
715
716  @Override
717  public Tap getSink( String name )
718    {
719    return sinks.get( name );
720    }
721
722  @Override
723  @DependencyOutgoing
724  public Collection<Tap> getSinksCollection()
725    {
726    return getSinks().values();
727    }
728
729  @Override
730  public Tap getSink()
731    {
732    return sinks.values().iterator().next();
733    }
734
735  @Override
736  public Map<String, Tap> getTraps()
737    {
738    return Collections.unmodifiableMap( traps );
739    }
740
741  @Override
742  public List<String> getTrapNames()
743    {
744    return new ArrayList<String>( traps.keySet() );
745    }
746
747  @Override
748  public Collection<Tap> getTrapsCollection()
749    {
750    return getTraps().values();
751    }
752
753  @Override
754  public Map<String, Tap> getCheckpoints()
755    {
756    return Collections.unmodifiableMap( checkpoints );
757    }
758
759  @Override
760  public List<String> getCheckpointNames()
761    {
762    return new ArrayList<String>( checkpoints.keySet() );
763    }
764
765  @Override
766  public Collection<Tap> getCheckpointsCollection()
767    {
768    return getCheckpoints().values();
769    }
770
771  @Override
772  public boolean isStopJobsOnExit()
773    {
774    return stopJobsOnExit;
775    }
776
777  @Override
778  public FlowSkipStrategy getFlowSkipStrategy()
779    {
780    return flowSkipStrategy;
781    }
782
783  @Override
784  public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy )
785    {
786    if( flowSkipStrategy == null )
787      throw new IllegalArgumentException( "flowSkipStrategy may not be null" );
788
789    try
790      {
791      return this.flowSkipStrategy;
792      }
793    finally
794      {
795      this.flowSkipStrategy = flowSkipStrategy;
796      }
797    }
798
799  @Override
800  public boolean isSkipFlow() throws IOException
801    {
802    return flowSkipStrategy.skipFlow( this );
803    }
804
805  @Override
806  public boolean areSinksStale() throws IOException
807    {
808    return areSourcesNewer( getSinkModified() );
809    }
810
811  @Override
812  public boolean areSourcesNewer( long sinkModified ) throws IOException
813    {
814    Config config = getConfig();
815    Iterator<Tap> values = sources.values().iterator();
816
817    long sourceModified = 0;
818
819    try
820      {
821      sourceModified = Util.getSourceModified( config, values, sinkModified );
822
823      if( sinkModified < sourceModified )
824        return true;
825
826      return false;
827      }
828    finally
829      {
830      if( LOG.isInfoEnabled() )
831        logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all
832      }
833    }
834
835  @Override
836  public long getSinkModified() throws IOException
837    {
838    long sinkModified = Util.getSinkModified( getConfig(), sinks.values() );
839
840    if( LOG.isInfoEnabled() )
841      {
842      if( sinkModified == -1L )
843        logInfo( "at least one sink is marked for delete" );
844      if( sinkModified == 0L )
845        logInfo( "at least one sink does not exist" );
846      else
847        logInfo( "sink oldest modified date: " + new Date( sinkModified ) );
848      }
849
850    return sinkModified;
851    }
852
853  @Override
854  public FlowStepStrategy getFlowStepStrategy()
855    {
856    return flowStepStrategy;
857    }
858
859  @Override
860  public void setFlowStepStrategy( FlowStepStrategy flowStepStrategy )
861    {
862    this.flowStepStrategy = flowStepStrategy;
863    }
864
865  @Override
866  public List<FlowStep<Config>> getFlowSteps()
867    {
868    if( steps != null )
869      return steps;
870
871    if( flowStepGraph == null )
872      return Collections.emptyList();
873
874    Iterator<FlowStep> topoIterator = flowStepGraph.getTopologicalIterator();
875
876    steps = new ArrayList<>();
877
878    while( topoIterator.hasNext() )
879      steps.add( topoIterator.next() );
880
881    return steps;
882    }
883
884  @Override
885  @ProcessPrepare
886  public void prepare()
887    {
888    try
889      {
890      deleteSinksIfNotUpdate();
891      deleteTrapsIfNotUpdate();
892      deleteCheckpointsIfNotUpdate();
893      }
894    catch( IOException exception )
895      {
896      throw new FlowException( "unable to prepare flow", exception );
897      }
898    }
899
900  @Override
901  @ProcessStart
902  public synchronized void start()
903    {
904    if( thread != null )
905      return;
906
907    if( stop )
908      return;
909
910    registerShutdownHook();
911
912    internalStart();
913
914    String threadName = ( "flow " + Util.toNull( getName() ) ).trim();
915
916    thread = createFlowThread( threadName );
917
918    thread.start();
919    }
920
921  protected Thread createFlowThread( String threadName )
922    {
923    return new Thread( new Runnable()
924      {
925      @Override
926      public void run()
927        {
928        BaseFlow.this.run();
929        }
930      }, threadName );
931    }
932
933  protected abstract void internalStart();
934
935  @Override
936  @ProcessStop
937  public synchronized void stop()
938    {
939    stopLock.lock();
940
941    try
942      {
943      if( stop )
944        return;
945
946      stop = true;
947
948      fireOnStopping();
949
950      if( !flowStats.isFinished() )
951        flowStats.markStopped();
952
953      internalStopAllJobs();
954
955      handleExecutorShutdown();
956
957      internalClean( true );
958      }
959    finally
960      {
961      flowStats.cleanup();
962      stopLock.unlock();
963      }
964    }
965
966  protected abstract void internalClean( boolean stop );
967
968  @Override
969  @ProcessComplete
970  public void complete()
971    {
972    start();
973
974    try
975      {
976      try
977        {
978        synchronized( this ) // prevent NPE on quick stop() & complete() after start()
979          {
980          while( thread == null && !stop )
981            Util.safeSleep( 10 );
982          }
983
984        if( thread != null )
985          thread.join();
986        }
987      catch( InterruptedException exception )
988        {
989        throw new FlowException( getName(), "thread interrupted", exception );
990        }
991
992      // if in #stop and stopping, lets wait till its done in this thread
993      try
994        {
995        stopLock.lock();
996        }
997      finally
998        {
999        stopLock.unlock();
1000        }
1001
1002      if( throwable instanceof FlowException )
1003        ( (FlowException) throwable ).setFlowName( getName() );
1004
1005      if( throwable instanceof CascadingException )
1006        throw (CascadingException) throwable;
1007
1008      if( throwable instanceof OutOfMemoryError )
1009        throw (OutOfMemoryError) throwable;
1010
1011      if( throwable != null )
1012        throw new FlowException( getName(), "unhandled exception", throwable );
1013
1014      if( hasListeners() )
1015        {
1016        for( SafeFlowListener safeFlowListener : getListeners() )
1017          {
1018          if( safeFlowListener.throwable != null )
1019            throw new FlowException( getName(), "unhandled listener exception", throwable );
1020          }
1021        }
1022      }
1023    finally
1024      {
1025      thread = null;
1026      throwable = null;
1027      }
1028    }
1029
1030  private void commitTraps()
1031    {
1032    // commit all the traps, don't fail on an error
1033    for( Tap tap : traps.values() )
1034      {
1035      try
1036        {
1037        if( !tap.commitResource( getConfig() ) )
1038          logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ) );
1039        }
1040      catch( IOException exception )
1041        {
1042        logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), exception );
1043        }
1044      }
1045    }
1046
1047  @Override
1048  @ProcessCleanup
1049  public void cleanup()
1050    {
1051    // do nothing
1052    }
1053
1054  @Override
1055  public TupleEntryIterator openSource() throws IOException
1056    {
1057    return sources.values().iterator().next().openForRead( getFlowProcess() );
1058    }
1059
1060  @Override
1061  public TupleEntryIterator openSource( String name ) throws IOException
1062    {
1063    if( !sources.containsKey( name ) )
1064      throw new IllegalArgumentException( "source does not exist: " + name );
1065
1066    return sources.get( name ).openForRead( getFlowProcess() );
1067    }
1068
1069  @Override
1070  public TupleEntryIterator openSink() throws IOException
1071    {
1072    return sinks.values().iterator().next().openForRead( getFlowProcess() );
1073    }
1074
1075  @Override
1076  public TupleEntryIterator openSink( String name ) throws IOException
1077    {
1078    if( !sinks.containsKey( name ) )
1079      throw new IllegalArgumentException( "sink does not exist: " + name );
1080
1081    return sinks.get( name ).openForRead( getFlowProcess() );
1082    }
1083
1084  @Override
1085  public TupleEntryIterator openTrap() throws IOException
1086    {
1087    return traps.values().iterator().next().openForRead( getFlowProcess() );
1088    }
1089
1090  @Override
1091  public TupleEntryIterator openTrap( String name ) throws IOException
1092    {
1093    if( !traps.containsKey( name ) )
1094      throw new IllegalArgumentException( "trap does not exist: " + name );
1095
1096    return traps.get( name ).openForRead( getFlowProcess() );
1097    }
1098
1099  /**
1100   * Method deleteSinks deletes all sinks, whether or not they are configured for {@link cascading.tap.SinkMode#UPDATE}.
1101   * <p/>
1102   * Use with caution.
1103   *
1104   * @throws IOException when
1105   * @see BaseFlow#deleteSinksIfNotUpdate()
1106   */
1107  public void deleteSinks() throws IOException
1108    {
1109    for( Tap tap : sinks.values() )
1110      deleteOrFail( tap );
1111    }
1112
1113  private void deleteOrFail( Tap tap ) throws IOException
1114    {
1115    if( !tap.resourceExists( getConfig() ) )
1116      return;
1117
1118    if( !tap.deleteResource( getConfig() ) )
1119      throw new FlowException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) );
1120    }
1121
1122  /**
1123   * Method deleteSinksIfNotUpdate deletes all sinks if they are not configured with the {@link cascading.tap.SinkMode#UPDATE} flag.
1124   * <p/>
1125   * Typically used by a {@link Cascade} before executing the flow if the sinks are stale.
1126   * <p/>
1127   * Use with caution.
1128   *
1129   * @throws IOException when
1130   */
1131  public void deleteSinksIfNotUpdate() throws IOException
1132    {
1133    for( Tap tap : sinks.values() )
1134      {
1135      if( !tap.isUpdate() )
1136        deleteOrFail( tap );
1137      }
1138    }
1139
1140  public void deleteSinksIfReplace() throws IOException
1141    {
1142    for( Tap tap : sinks.values() )
1143      {
1144      if( tap.isReplace() )
1145        deleteOrFail( tap );
1146      }
1147    }
1148
1149  public void deleteTrapsIfNotUpdate() throws IOException
1150    {
1151    for( Tap tap : traps.values() )
1152      {
1153      if( !tap.isUpdate() )
1154        deleteOrFail( tap );
1155      }
1156    }
1157
1158  public void deleteCheckpointsIfNotUpdate() throws IOException
1159    {
1160    for( Tap tap : checkpoints.values() )
1161      {
1162      if( !tap.isUpdate() )
1163        deleteOrFail( tap );
1164      }
1165    }
1166
1167  public void deleteTrapsIfReplace() throws IOException
1168    {
1169    for( Tap tap : traps.values() )
1170      {
1171      if( tap.isReplace() )
1172        deleteOrFail( tap );
1173      }
1174    }
1175
1176  public void deleteCheckpointsIfReplace() throws IOException
1177    {
1178    for( Tap tap : checkpoints.values() )
1179      {
1180      if( tap.isReplace() )
1181        deleteOrFail( tap );
1182      }
1183    }
1184
1185  @Override
1186  public boolean resourceExists( Tap tap ) throws IOException
1187    {
1188    return tap.resourceExists( getConfig() );
1189    }
1190
1191  @Override
1192  public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
1193    {
1194    return tap.openForRead( getFlowProcess() );
1195    }
1196
1197  @Override
1198  public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
1199    {
1200    return tap.openForWrite( getFlowProcess() );
1201    }
1202
1203  /** Method run implements the Runnable run method and should not be called by users. */
1204  private void run()
1205    {
1206    if( thread == null )
1207      throw new IllegalStateException( "to start a Flow call start() or complete(), not Runnable#run()" );
1208
1209    Version.printBanner();
1210    Update.checkForUpdate( getPlatformInfo() );
1211
1212    try
1213      {
1214      if( stop )
1215        return;
1216
1217      flowStats.markStarted();
1218
1219      fireOnStarting();
1220
1221      if( LOG.isInfoEnabled() )
1222        {
1223        logInfo( "starting" );
1224
1225        for( Tap source : getSourcesCollection() )
1226          logInfo( " source: " + source );
1227        for( Tap sink : getSinksCollection() )
1228          logInfo( " sink: " + sink );
1229        }
1230
1231      // if jobs are run local, then only use one thread to force execution serially
1232      //int numThreads = jobsAreLocal() ? 1 : getMaxConcurrentSteps( getJobConf() );
1233      int numThreads = getMaxNumParallelSteps();
1234
1235      if( numThreads == 0 )
1236        numThreads = jobsMap.size();
1237
1238      if( numThreads == 0 )
1239        throw new IllegalStateException( "no jobs rendered for flow: " + getName() );
1240
1241      if( LOG.isInfoEnabled() )
1242        {
1243        logInfo( " parallel execution of steps is enabled: " + ( getMaxNumParallelSteps() != 1 ) );
1244        logInfo( " executing total steps: " + jobsMap.size() );
1245        logInfo( " allocating management threads: " + numThreads );
1246        }
1247
1248      List<Future<Throwable>> futures = spawnJobs( numThreads );
1249
1250      for( Future<Throwable> future : futures )
1251        {
1252        throwable = future.get();
1253
1254        if( throwable != null )
1255          {
1256          if( !stop )
1257            internalStopAllJobs();
1258
1259          handleExecutorShutdown();
1260          break;
1261          }
1262        }
1263      }
1264    catch( Throwable throwable )
1265      {
1266      this.throwable = throwable;
1267      }
1268    finally
1269      {
1270      handleThrowableAndMarkFailed();
1271
1272      if( !stop && !flowStats.isFinished() )
1273        flowStats.markSuccessful();
1274
1275      internalClean( stop ); // cleaning temp taps may be determined by success/failure
1276
1277      commitTraps();
1278
1279      try
1280        {
1281        fireOnCompleted();
1282        }
1283      finally
1284        {
1285        if( LOG.isInfoEnabled() )
1286          {
1287          long totalSliceCPUSeconds = getTotalSliceCPUMilliSeconds();
1288
1289          if( totalSliceCPUSeconds == -1 )
1290            logInfo( " completed in: " + formatDurationFromMillis( flowStats.getDuration() ) );
1291          else
1292            logInfo( " completed in: " + formatDurationFromMillis( flowStats.getDuration() ) + ", using cpu time: " + formatDurationFromMillis( totalSliceCPUSeconds ) );
1293          }
1294
1295        flowStats.cleanup();
1296        internalShutdown();
1297        deregisterShutdownHook();
1298        }
1299      }
1300    }
1301
1302  protected long getTotalSliceCPUMilliSeconds()
1303    {
1304    return -1;
1305    }
1306
1307  protected abstract int getMaxNumParallelSteps();
1308
1309  protected abstract void internalShutdown();
1310
1311  private List<Future<Throwable>> spawnJobs( int numThreads ) throws InterruptedException
1312    {
1313    if( spawnStrategy == null )
1314      {
1315      logError( "no spawnStrategy set" );
1316      return new ArrayList<>();
1317      }
1318
1319    if( stop )
1320      return new ArrayList<>();
1321
1322    List<Callable<Throwable>> list = new ArrayList<>();
1323
1324    for( FlowStepJob<Config> job : jobsMap.values() )
1325      list.add( job );
1326
1327    return spawnStrategy.start( this, numThreads, list );
1328    }
1329
1330  private void handleThrowableAndMarkFailed()
1331    {
1332    if( throwable != null && !stop )
1333      {
1334      flowStats.markFailed( throwable );
1335
1336      fireOnThrowable();
1337      }
1338    }
1339
1340  Map<String, FlowStepJob<Config>> getJobsMap()
1341    {
1342    return jobsMap;
1343    }
1344
1345  protected void initializeNewJobsMap()
1346    {
1347    jobsMap = new LinkedHashMap<>(); // keep topo order
1348    Iterator<FlowStep> topoIterator = flowStepGraph.getTopologicalIterator();
1349
1350    while( topoIterator.hasNext() )
1351      {
1352      BaseFlowStep<Config> step = (BaseFlowStep) topoIterator.next();
1353      FlowStepJob<Config> flowStepJob = step.getCreateFlowStepJob( getFlowProcess(), getConfig() );
1354
1355      jobsMap.put( step.getName(), flowStepJob );
1356
1357      List<FlowStepJob<Config>> predecessors = new ArrayList<FlowStepJob<Config>>();
1358
1359      for( Object flowStep : ProcessGraphs.predecessorListOf( flowStepGraph, step ) )
1360        predecessors.add( jobsMap.get( ( (FlowStep) flowStep ).getName() ) );
1361
1362      flowStepJob.setPredecessors( predecessors );
1363      }
1364    }
1365
1366  protected void initializeChildStats()
1367    {
1368    for( FlowStepJob<Config> flowStepJob : jobsMap.values() )
1369      flowStats.addStepStats( flowStepJob.getStepStats() );
1370    }
1371
1372  protected void internalStopAllJobs()
1373    {
1374    logInfo( "stopping all jobs" );
1375
1376    try
1377      {
1378      if( jobsMap == null )
1379        return;
1380
1381      List<FlowStepJob<Config>> jobs = new ArrayList<FlowStepJob<Config>>( jobsMap.values() );
1382
1383      Collections.reverse( jobs );
1384
1385      for( FlowStepJob<Config> job : jobs )
1386        job.stop();
1387      }
1388    finally
1389      {
1390      logInfo( "stopped all jobs" );
1391      }
1392    }
1393
1394  protected void handleExecutorShutdown()
1395    {
1396    if( spawnStrategy == null )
1397      return;
1398
1399    if( spawnStrategy.isCompleted( this ) )
1400      return;
1401
1402    logDebug( "shutting down job executor" );
1403
1404    try
1405      {
1406      spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS );
1407      }
1408    catch( InterruptedException exception )
1409      {
1410      // ignore
1411      }
1412
1413    logDebug( "shutdown of job executor complete" );
1414    }
1415
1416  protected void fireOnCompleted()
1417    {
1418    if( hasListeners() )
1419      {
1420      if( isDebugEnabled() )
1421        logDebug( "firing onCompleted event: " + getListeners().size() );
1422
1423      for( FlowListener flowListener : getListeners() )
1424        flowListener.onCompleted( this );
1425      }
1426    }
1427
1428  protected void fireOnThrowable( Throwable throwable )
1429    {
1430    this.throwable = throwable;
1431    fireOnThrowable();
1432    }
1433
1434  protected void fireOnThrowable()
1435    {
1436    if( hasListeners() )
1437      {
1438      if( isDebugEnabled() )
1439        logDebug( "firing onThrowable event: " + getListeners().size() );
1440
1441      boolean isHandled = false;
1442
1443      for( FlowListener flowListener : getListeners() )
1444        isHandled = flowListener.onThrowable( this, throwable ) || isHandled;
1445
1446      if( isHandled )
1447        throwable = null;
1448      }
1449    }
1450
1451  protected void fireOnStopping()
1452    {
1453    if( hasListeners() )
1454      {
1455      if( isDebugEnabled() )
1456        logDebug( "firing onStopping event: " + getListeners().size() );
1457
1458      for( FlowListener flowListener : getListeners() )
1459        flowListener.onStopping( this );
1460      }
1461    }
1462
1463  protected void fireOnStarting()
1464    {
1465    if( hasListeners() )
1466      {
1467      if( isDebugEnabled() )
1468        logDebug( "firing onStarting event: " + getListeners().size() );
1469
1470      for( FlowListener flowListener : getListeners() )
1471        flowListener.onStarting( this );
1472      }
1473    }
1474
1475  @Override
1476  public String toString()
1477    {
1478    StringBuffer buffer = new StringBuffer();
1479
1480    if( getName() != null )
1481      buffer.append( getName() ).append( ": " );
1482
1483    for( FlowStep step : getFlowSteps() )
1484      buffer.append( step );
1485
1486    return buffer.toString();
1487    }
1488
1489  @Override
1490  public final boolean isInfoEnabled()
1491    {
1492    return LOG.isInfoEnabled();
1493    }
1494
1495  @Override
1496  public final boolean isDebugEnabled()
1497    {
1498    return LOG.isDebugEnabled();
1499    }
1500
1501  @Override
1502  public void logInfo( String message, Object... arguments )
1503    {
1504    LOG.info( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments );
1505    }
1506
1507  @Override
1508  public void logDebug( String message, Object... arguments )
1509    {
1510    LOG.debug( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments );
1511    }
1512
1513  @Override
1514  public void logWarn( String message )
1515    {
1516    LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message );
1517    }
1518
1519  @Override
1520  public void logWarn( String message, Throwable throwable )
1521    {
1522    LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, throwable );
1523    }
1524
1525  @Override
1526  public void logWarn( String message, Object... arguments )
1527    {
1528    LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments );
1529    }
1530
1531  @Override
1532  public void logError( String message, Object... arguments )
1533    {
1534    LOG.error( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments );
1535    }
1536
1537  @Override
1538  public void logError( String message, Throwable throwable )
1539    {
1540    LOG.error( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, throwable );
1541    }
1542
1543  @Override
1544  public void writeDOT( String filename )
1545    {
1546    if( flowElementGraph == null )
1547      throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" );
1548
1549    flowElementGraph.writeDOT( filename );
1550    }
1551
1552  @Override
1553  public void writeStepsDOT( String filename )
1554    {
1555    if( flowStepGraph == null )
1556      throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" );
1557
1558    flowStepGraph.writeDOT( filename );
1559    }
1560
1561  /**
1562   * Used to return a simple wrapper for use as an edge in a graph where there can only be
1563   * one instance of every edge.
1564   *
1565   * @return FlowHolder
1566   */
1567  public FlowHolder getHolder()
1568    {
1569    return new FlowHolder( this );
1570    }
1571
1572  public void setCascade( Cascade cascade )
1573    {
1574    setConfigProperty( getConfig(), "cascading.cascade.id", cascade.getID() );
1575    flowStats.recordInfo();
1576    }
1577
1578  @Override
1579  public String getCascadeID()
1580    {
1581    return getProperty( "cascading.cascade.id" );
1582    }
1583
1584  @Override
1585  public String getRunID()
1586    {
1587    return runID;
1588    }
1589
1590  public List<String> getClassPath()
1591    {
1592    return classPath;
1593    }
1594
1595  @Override
1596  public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy )
1597    {
1598    this.spawnStrategy = spawnStrategy;
1599    }
1600
1601  @Override
1602  public UnitOfWorkSpawnStrategy getSpawnStrategy()
1603    {
1604    return spawnStrategy;
1605    }
1606
1607  protected void registerShutdownHook()
1608    {
1609    if( !isStopJobsOnExit() )
1610      return;
1611
1612    shutdownHook = new ShutdownUtil.Hook()
1613      {
1614      @Override
1615      public Priority priority()
1616        {
1617        return Priority.WORK_CHILD;
1618        }
1619
1620      @Override
1621      public void execute()
1622        {
1623        logInfo( "shutdown hook calling stop on flow" );
1624
1625        BaseFlow.this.stop();
1626        }
1627      };
1628
1629    ShutdownUtil.addHook( shutdownHook );
1630    }
1631
1632  private void deregisterShutdownHook()
1633    {
1634    if( !isStopJobsOnExit() || stop )
1635      return;
1636
1637    ShutdownUtil.removeHook( shutdownHook );
1638    }
1639
1640  /** Class FlowHolder is a helper class for wrapping Flow instances. */
1641  public static class FlowHolder
1642    {
1643    /** Field flow */
1644    public Flow flow;
1645
1646    public FlowHolder()
1647      {
1648      }
1649
1650    public FlowHolder( Flow flow )
1651      {
1652      this.flow = flow;
1653      }
1654    }
1655
1656  /**
1657   * Class SafeFlowListener safely calls a wrapped FlowListener.
1658   * <p/>
1659   * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
1660   * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method
1661   * which in turn is run in a new Thread.
1662   */
1663  private class SafeFlowListener implements FlowListener
1664    {
1665    /** Field flowListener */
1666    final FlowListener flowListener;
1667    /** Field throwable */
1668    Throwable throwable;
1669
1670    private SafeFlowListener( FlowListener flowListener )
1671      {
1672      this.flowListener = flowListener;
1673      }
1674
1675    public void onStarting( Flow flow )
1676      {
1677      try
1678        {
1679        flowListener.onStarting( flow );
1680        }
1681      catch( Throwable throwable )
1682        {
1683        handleThrowable( throwable );
1684        }
1685      }
1686
1687    public void onStopping( Flow flow )
1688      {
1689      try
1690        {
1691        flowListener.onStopping( flow );
1692        }
1693      catch( Throwable throwable )
1694        {
1695        handleThrowable( throwable );
1696        }
1697      }
1698
1699    public void onCompleted( Flow flow )
1700      {
1701      try
1702        {
1703        flowListener.onCompleted( flow );
1704        }
1705      catch( Throwable throwable )
1706        {
1707        handleThrowable( throwable );
1708        }
1709      }
1710
1711    public boolean onThrowable( Flow flow, Throwable flowThrowable )
1712      {
1713      try
1714        {
1715        return flowListener.onThrowable( flow, flowThrowable );
1716        }
1717      catch( Throwable throwable )
1718        {
1719        handleThrowable( throwable );
1720        }
1721
1722      return false;
1723      }
1724
1725    private void handleThrowable( Throwable throwable )
1726      {
1727      this.throwable = throwable;
1728
1729      logWarn( String.format( "flow listener %s threw throwable", flowListener ), throwable );
1730
1731      // stop this flow
1732      stop();
1733      }
1734
1735    public boolean equals( Object object )
1736      {
1737      if( object instanceof BaseFlow.SafeFlowListener )
1738        return flowListener.equals( ( (BaseFlow.SafeFlowListener) object ).flowListener );
1739
1740      return flowListener.equals( object );
1741      }
1742
1743    public int hashCode()
1744      {
1745      return flowListener.hashCode();
1746      }
1747    }
1748  }