001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.Date;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.Iterator;
031import java.util.LinkedHashMap;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Map;
035import java.util.Properties;
036import java.util.Set;
037import java.util.concurrent.Callable;
038import java.util.concurrent.Future;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.locks.ReentrantLock;
041
042import cascading.CascadingException;
043import cascading.cascade.Cascade;
044import cascading.flow.planner.BaseFlowNode;
045import cascading.flow.planner.BaseFlowStep;
046import cascading.flow.planner.FlowStepJob;
047import cascading.flow.planner.PlannerInfo;
048import cascading.flow.planner.PlatformInfo;
049import cascading.flow.planner.graph.FlowElementGraph;
050import cascading.flow.planner.process.FlowStepGraph;
051import cascading.flow.planner.process.ProcessGraphs;
052import cascading.management.CascadingServices;
053import cascading.management.UnitOfWorkExecutorStrategy;
054import cascading.management.UnitOfWorkSpawnStrategy;
055import cascading.management.state.ClientState;
056import cascading.property.AppProps;
057import cascading.property.PropertyUtil;
058import cascading.stats.FlowStats;
059import cascading.tap.Tap;
060import cascading.tuple.Fields;
061import cascading.tuple.TupleEntryCollector;
062import cascading.tuple.TupleEntryIterator;
063import cascading.util.ProcessLogger;
064import cascading.util.ShutdownUtil;
065import cascading.util.Update;
066import cascading.util.Util;
067import cascading.util.Version;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070import riffle.process.DependencyIncoming;
071import riffle.process.DependencyOutgoing;
072import riffle.process.ProcessCleanup;
073import riffle.process.ProcessComplete;
074import riffle.process.ProcessPrepare;
075import riffle.process.ProcessStart;
076import riffle.process.ProcessStop;
077
078import static cascading.util.Util.formatDurationFromMillis;
079
080@riffle.process.Process
081public abstract class BaseFlow<Config> implements Flow<Config>, ProcessLogger
082  {
083  /** Field LOG */
084  private static final Logger LOG = LoggerFactory.getLogger( Flow.class ); // wrapped by ProcessLogger interface methods
085  private static final int LOG_FLOW_NAME_MAX = 25;
086
087  private PlannerInfo plannerInfo;
088  private PlatformInfo platformInfo;
089
090  /** Field id */
091  private String id;
092  /** Field name */
093  private String name;
094  /** Fields runID */
095  private String runID;
096  /** Fields classpath */
097  private List<String> classPath; // may remain null
098  /** Field tags */
099  private String tags;
100  /** Field listeners */
101  private List<SafeFlowListener> listeners;
102  /** Field skipStrategy */
103  private FlowSkipStrategy flowSkipStrategy = new FlowSkipIfSinkNotStale();
104  /** Field flowStats */
105  protected FlowStats flowStats; // don't use a listener to set values
106  /** Field sources */
107  protected Map<String, Tap> sources = Collections.emptyMap();
108  /** Field sinks */
109  protected Map<String, Tap> sinks = Collections.emptyMap();
110  /** Field traps */
111  private Map<String, Tap> traps = Collections.emptyMap();
112  /** Field checkpoints */
113  private Map<String, Tap> checkpoints = Collections.emptyMap();
114  /** Field stopJobsOnExit */
115  protected boolean stopJobsOnExit = true;
116  /** Field submitPriority */
117  private int submitPriority = 5;
118
119  /** Field stepGraph */
120  private FlowStepGraph flowStepGraph;
121  /** Field thread */
122  protected transient Thread thread;
123  /** Field throwable */
124  private Throwable throwable;
125  /** Field stop */
126  protected boolean stop;
127
128  /** Field pipeGraph */
129  private FlowElementGraph flowElementGraph; // only used for documentation purposes
130
131  private transient CascadingServices cascadingServices;
132
133  private FlowStepStrategy<Config> flowStepStrategy = null;
134  /** Field steps */
135  private transient List<FlowStep<Config>> steps;
136  /** Field jobsMap */
137  private transient Map<String, FlowStepJob<Config>> jobsMap;
138  private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy();
139
140  private transient ReentrantLock stopLock = new ReentrantLock( true );
141  protected ShutdownUtil.Hook shutdownHook;
142
143  private HashMap<String, String> flowDescriptor;
144
145  /**
146   * Returns property stopJobsOnExit.
147   *
148   * @param properties of type Map
149   * @return a boolean
150   */
151  static boolean getStopJobsOnExit( Map<Object, Object> properties )
152    {
153    return Boolean.parseBoolean( PropertyUtil.getProperty( properties, FlowProps.STOP_JOBS_ON_EXIT, "true" ) );
154    }
155
156  /** Used for testing. */
157  protected BaseFlow()
158    {
159    this.name = "NA";
160    this.flowStats = createPrepareFlowStats();
161    }
162
163  protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name )
164    {
165    this( platformInfo, properties, defaultConfig, name, new LinkedHashMap<String, String>() );
166    }
167
168  protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name, Map<String, String> flowDescriptor )
169    {
170    this.platformInfo = platformInfo;
171    this.name = name;
172
173    if( flowDescriptor != null )
174      this.flowDescriptor = new LinkedHashMap<>( flowDescriptor );
175
176    addSessionProperties( properties );
177    initConfig( properties, defaultConfig );
178
179    this.flowStats = createPrepareFlowStats(); // must be last
180    }
181
182  protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, FlowDef flowDef )
183    {
184    properties = PropertyUtil.asFlatMap( properties );
185
186    this.platformInfo = platformInfo;
187    this.name = flowDef.getName();
188    this.tags = flowDef.getTags();
189    this.runID = flowDef.getRunID();
190    this.classPath = flowDef.getClassPath();
191
192    if( !flowDef.getFlowDescriptor().isEmpty() )
193      this.flowDescriptor = new LinkedHashMap<String, String>( flowDef.getFlowDescriptor() );
194
195    addSessionProperties( properties );
196    initConfig( properties, defaultConfig );
197    setSources( flowDef.getSourcesCopy() );
198    setSinks( flowDef.getSinksCopy() );
199    setTraps( flowDef.getTrapsCopy() );
200    setCheckpoints( flowDef.getCheckpointsCopy() );
201    initFromTaps();
202
203    retrieveSourceFields();
204    retrieveSinkFields();
205    }
206
207  public void setPlannerInfo( PlannerInfo plannerInfo )
208    {
209    this.plannerInfo = plannerInfo;
210    }
211
212  @Override
213  public PlannerInfo getPlannerInfo()
214    {
215    return plannerInfo;
216    }
217
218  @Override
219  public PlatformInfo getPlatformInfo()
220    {
221    return platformInfo;
222    }
223
224  public void initialize( FlowElementGraph flowElementGraph, FlowStepGraph flowStepGraph )
225    {
226    addPlannerProperties();
227    this.flowElementGraph = flowElementGraph;
228    this.flowStepGraph = flowStepGraph;
229
230    initSteps();
231
232    this.flowStats = createPrepareFlowStats(); // must be last
233
234    initializeNewJobsMap();
235    }
236
237  public FlowElementGraph updateSchemes( FlowElementGraph pipeGraph )
238    {
239    presentSourceFields( pipeGraph );
240
241    presentSinkFields( pipeGraph );
242
243    return new FlowElementGraph( pipeGraph );
244    }
245
246  /** Force a Scheme to fetch any fields from a meta-data store */
247  protected void retrieveSourceFields()
248    {
249    for( Tap tap : sources.values() )
250      tap.retrieveSourceFields( getFlowProcess() );
251    }
252
253  /**
254   * Present the current resolved fields for the Tap
255   *
256   * @param pipeGraph
257   */
258  protected void presentSourceFields( FlowElementGraph pipeGraph )
259    {
260    for( Tap tap : sources.values() )
261      {
262      if( pipeGraph.containsVertex( tap ) )
263        tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
264      }
265
266    for( Tap tap : checkpoints.values() )
267      {
268      if( pipeGraph.containsVertex( tap ) )
269        tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
270      }
271    }
272
273  /** Force a Scheme to fetch any fields from a meta-data store */
274  protected void retrieveSinkFields()
275    {
276    for( Tap tap : sinks.values() )
277      tap.retrieveSinkFields( getFlowProcess() );
278    }
279
280  /**
281   * Present the current resolved fields for the Tap
282   *
283   * @param pipeGraph
284   */
285  protected void presentSinkFields( FlowElementGraph pipeGraph )
286    {
287    for( Tap tap : sinks.values() )
288      {
289      if( pipeGraph.containsVertex( tap ) )
290        tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
291      }
292
293    for( Tap tap : checkpoints.values() )
294      {
295      if( pipeGraph.containsVertex( tap ) )
296        tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
297      }
298    }
299
300  protected Fields getFieldsFor( FlowElementGraph pipeGraph, Tap tap )
301    {
302    return pipeGraph.outgoingEdgesOf( tap ).iterator().next().getOutValuesFields();
303    }
304
305  private void addSessionProperties( Map<Object, Object> properties )
306    {
307    if( properties == null )
308      return;
309
310    PropertyUtil.setProperty( properties, CASCADING_FLOW_ID, getID() );
311    PropertyUtil.setProperty( properties, "cascading.flow.tags", getTags() );
312    AppProps.setApplicationID( properties );
313    PropertyUtil.setProperty( properties, "cascading.app.name", makeAppName( properties ) );
314    PropertyUtil.setProperty( properties, "cascading.app.version", makeAppVersion( properties ) );
315    }
316
317  protected void addPlannerProperties()
318    {
319    setConfigProperty( getConfig(), "cascading.flow.planner", getPlannerInfo().name );
320    setConfigProperty( getConfig(), "cascading.flow.platform", getPlannerInfo().platform );
321    setConfigProperty( getConfig(), "cascading.flow.registry", getPlannerInfo().registry );
322    }
323
324  private String makeAppName( Map<Object, Object> properties )
325    {
326    if( properties == null )
327      return null;
328
329    String name = AppProps.getApplicationName( properties );
330
331    if( name != null )
332      return name;
333
334    return Util.findName( AppProps.getApplicationJarPath( properties ) );
335    }
336
337  private String makeAppVersion( Map<Object, Object> properties )
338    {
339    if( properties == null )
340      return null;
341
342    String name = AppProps.getApplicationVersion( properties );
343
344    if( name != null )
345      return name;
346
347    return Util.findVersion( AppProps.getApplicationJarPath( properties ) );
348    }
349
350  private FlowStats createPrepareFlowStats()
351    {
352    FlowStats flowStats = new FlowStats( this, getClientState() );
353
354    flowStats.prepare();
355    flowStats.markPending();
356
357    return flowStats;
358    }
359
360  public CascadingServices getCascadingServices()
361    {
362    if( cascadingServices == null )
363      cascadingServices = new CascadingServices( getConfigAsProperties() );
364
365    return cascadingServices;
366    }
367
368  private ClientState getClientState()
369    {
370    return getFlowSession().getCascadingServices().createClientState( getID() );
371    }
372
373  protected void initSteps()
374    {
375    if( flowStepGraph == null )
376      return;
377
378    Set<FlowStep> flowSteps = flowStepGraph.vertexSet();
379
380    for( FlowStep flowStep : flowSteps )
381      {
382      ( (BaseFlowStep) flowStep ).setFlow( this );
383
384      Set<FlowNode> flowNodes = flowStep.getFlowNodeGraph().vertexSet();
385
386      for( FlowNode flowNode : flowNodes )
387        ( (BaseFlowNode) flowNode ).setFlowStep( flowStep );
388      }
389    }
390
391  private void initFromTaps()
392    {
393    initFromTaps( sources );
394    initFromTaps( sinks );
395    initFromTaps( traps );
396    }
397
398  private void initFromTaps( Map<String, Tap> taps )
399    {
400    for( Tap tap : taps.values() )
401      tap.flowConfInit( this );
402    }
403
404  @Override
405  public String getName()
406    {
407    return name;
408    }
409
410  protected void setName( String name )
411    {
412    this.name = name;
413    }
414
415  @Override
416  public String getID()
417    {
418    if( id == null )
419      id = Util.createUniqueID();
420
421    return id;
422    }
423
424  @Override
425  public String getTags()
426    {
427    return tags;
428    }
429
430  @Override
431  public int getSubmitPriority()
432    {
433    return submitPriority;
434    }
435
436  @Override
437  public void setSubmitPriority( int submitPriority )
438    {
439    if( submitPriority < 1 || submitPriority > 10 )
440      throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority );
441
442    this.submitPriority = submitPriority;
443    }
444
445  FlowElementGraph getFlowElementGraph()
446    {
447    return flowElementGraph;
448    }
449
450  FlowStepGraph getFlowStepGraph()
451    {
452    return flowStepGraph;
453    }
454
455  protected void setSources( Map<String, Tap> sources )
456    {
457    addListeners( sources.values() );
458    this.sources = sources;
459    }
460
461  protected void setSinks( Map<String, Tap> sinks )
462    {
463    addListeners( sinks.values() );
464    this.sinks = sinks;
465    }
466
467  protected void setTraps( Map<String, Tap> traps )
468    {
469    addListeners( traps.values() );
470    this.traps = traps;
471    }
472
473  protected void setCheckpoints( Map<String, Tap> checkpoints )
474    {
475    addListeners( checkpoints.values() );
476    this.checkpoints = checkpoints;
477    }
478
479  protected void setFlowStepGraph( FlowStepGraph flowStepGraph )
480    {
481    this.flowStepGraph = flowStepGraph;
482    }
483
484  /**
485   * This method creates a new internal Config with the parentConfig as defaults using the properties to override
486   * the defaults.
487   *
488   * @param properties   of type Map
489   * @param parentConfig of type Config
490   */
491  protected abstract void initConfig( Map<Object, Object> properties, Config parentConfig );
492
493  public Config createConfig( Map<Object, Object> properties, Config defaultConfig )
494    {
495    Config config = newConfig( defaultConfig );
496
497    if( properties == null )
498      return config;
499
500    Set<Object> keys = new HashSet<>( properties.keySet() );
501
502    // keys will only be grabbed if both key/value are String, so keep orig keys
503    if( properties instanceof Properties )
504      keys.addAll( ( (Properties) properties ).stringPropertyNames() );
505
506    for( Object key : keys )
507      {
508      Object value = properties.get( key );
509
510      if( value == null && properties instanceof Properties && key instanceof String )
511        value = ( (Properties) properties ).getProperty( (String) key );
512
513      if( value == null ) // don't stuff null values
514        continue;
515
516      setConfigProperty( config, key, value );
517      }
518
519    return config;
520    }
521
522  protected abstract void setConfigProperty( Config config, Object key, Object value );
523
524  protected abstract Config newConfig( Config defaultConfig );
525
526  protected void initFromProperties( Map<Object, Object> properties )
527    {
528    stopJobsOnExit = getStopJobsOnExit( properties );
529    }
530
531  public FlowSession getFlowSession()
532    {
533    return new FlowSession( getCascadingServices() );
534    }
535
536  @Override
537  public FlowStats getFlowStats()
538    {
539    return flowStats;
540    }
541
542  @Override
543  public Map<String, String> getFlowDescriptor()
544    {
545    if( flowDescriptor == null )
546      return Collections.emptyMap();
547
548    return Collections.unmodifiableMap( flowDescriptor );
549    }
550
551  @Override
552  public FlowStats getStats()
553    {
554    return getFlowStats();
555    }
556
557  void addListeners( Collection listeners )
558    {
559    for( Object listener : listeners )
560      {
561      if( listener instanceof FlowListener )
562        addListener( (FlowListener) listener );
563      }
564    }
565
566  List<SafeFlowListener> getListeners()
567    {
568    if( listeners == null )
569      listeners = new LinkedList<SafeFlowListener>();
570
571    return listeners;
572    }
573
574  @Override
575  public boolean hasListeners()
576    {
577    return listeners != null && !listeners.isEmpty();
578    }
579
580  @Override
581  public void addListener( FlowListener flowListener )
582    {
583    getListeners().add( new SafeFlowListener( flowListener ) );
584    }
585
586  @Override
587  public boolean removeListener( FlowListener flowListener )
588    {
589    return getListeners().remove( new SafeFlowListener( flowListener ) );
590    }
591
592  @Override
593  public boolean hasStepListeners()
594    {
595    boolean hasStepListeners = false;
596
597    for( FlowStep step : getFlowSteps() )
598      hasStepListeners |= step.hasListeners();
599
600    return hasStepListeners;
601    }
602
603  @Override
604  public void addStepListener( FlowStepListener flowStepListener )
605    {
606    for( FlowStep step : getFlowSteps() )
607      step.addListener( flowStepListener );
608    }
609
610  @Override
611  public boolean removeStepListener( FlowStepListener flowStepListener )
612    {
613    boolean listenerRemoved = true;
614
615    for( FlowStep step : getFlowSteps() )
616      listenerRemoved &= step.removeListener( flowStepListener );
617
618    return listenerRemoved;
619    }
620
621  @Override
622  public Map<String, Tap> getSources()
623    {
624    return Collections.unmodifiableMap( sources );
625    }
626
627  @Override
628  public List<String> getSourceNames()
629    {
630    return new ArrayList<String>( sources.keySet() );
631    }
632
633  @Override
634  public Tap getSource( String name )
635    {
636    return sources.get( name );
637    }
638
639  @Override
640  @DependencyIncoming
641  public Collection<Tap> getSourcesCollection()
642    {
643    return getSources().values();
644    }
645
646  @Override
647  public Map<String, Tap> getSinks()
648    {
649    return Collections.unmodifiableMap( sinks );
650    }
651
652  @Override
653  public List<String> getSinkNames()
654    {
655    return new ArrayList<String>( sinks.keySet() );
656    }
657
658  @Override
659  public Tap getSink( String name )
660    {
661    return sinks.get( name );
662    }
663
664  @Override
665  @DependencyOutgoing
666  public Collection<Tap> getSinksCollection()
667    {
668    return getSinks().values();
669    }
670
671  @Override
672  public Tap getSink()
673    {
674    return sinks.values().iterator().next();
675    }
676
677  @Override
678  public Map<String, Tap> getTraps()
679    {
680    return Collections.unmodifiableMap( traps );
681    }
682
683  @Override
684  public List<String> getTrapNames()
685    {
686    return new ArrayList<String>( traps.keySet() );
687    }
688
689  @Override
690  public Collection<Tap> getTrapsCollection()
691    {
692    return getTraps().values();
693    }
694
695  @Override
696  public Map<String, Tap> getCheckpoints()
697    {
698    return Collections.unmodifiableMap( checkpoints );
699    }
700
701  @Override
702  public List<String> getCheckpointNames()
703    {
704    return new ArrayList<String>( checkpoints.keySet() );
705    }
706
707  @Override
708  public Collection<Tap> getCheckpointsCollection()
709    {
710    return getCheckpoints().values();
711    }
712
713  @Override
714  public boolean isStopJobsOnExit()
715    {
716    return stopJobsOnExit;
717    }
718
719  @Override
720  public FlowSkipStrategy getFlowSkipStrategy()
721    {
722    return flowSkipStrategy;
723    }
724
725  @Override
726  public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy )
727    {
728    if( flowSkipStrategy == null )
729      throw new IllegalArgumentException( "flowSkipStrategy may not be null" );
730
731    try
732      {
733      return this.flowSkipStrategy;
734      }
735    finally
736      {
737      this.flowSkipStrategy = flowSkipStrategy;
738      }
739    }
740
741  @Override
742  public boolean isSkipFlow() throws IOException
743    {
744    return flowSkipStrategy.skipFlow( this );
745    }
746
747  @Override
748  public boolean areSinksStale() throws IOException
749    {
750    return areSourcesNewer( getSinkModified() );
751    }
752
753  @Override
754  public boolean areSourcesNewer( long sinkModified ) throws IOException
755    {
756    Config config = getConfig();
757    Iterator<Tap> values = sources.values().iterator();
758
759    long sourceModified = 0;
760
761    try
762      {
763      sourceModified = Util.getSourceModified( config, values, sinkModified );
764
765      if( sinkModified < sourceModified )
766        return true;
767
768      return false;
769      }
770    finally
771      {
772      if( LOG.isInfoEnabled() )
773        logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all
774      }
775    }
776
777  @Override
778  public long getSinkModified() throws IOException
779    {
780    long sinkModified = Util.getSinkModified( getConfig(), sinks.values() );
781
782    if( LOG.isInfoEnabled() )
783      {
784      if( sinkModified == -1L )
785        logInfo( "at least one sink is marked for delete" );
786      if( sinkModified == 0L )
787        logInfo( "at least one sink does not exist" );
788      else
789        logInfo( "sink oldest modified date: " + new Date( sinkModified ) );
790      }
791
792    return sinkModified;
793    }
794
795  @Override
796  public FlowStepStrategy getFlowStepStrategy()
797    {
798    return flowStepStrategy;
799    }
800
801  @Override
802  public void setFlowStepStrategy( FlowStepStrategy flowStepStrategy )
803    {
804    this.flowStepStrategy = flowStepStrategy;
805    }
806
807  @Override
808  public List<FlowStep<Config>> getFlowSteps()
809    {
810    if( steps != null )
811      return steps;
812
813    if( flowStepGraph == null )
814      return Collections.emptyList();
815
816    Iterator<FlowStep> topoIterator = flowStepGraph.getTopologicalIterator();
817
818    steps = new ArrayList<>();
819
820    while( topoIterator.hasNext() )
821      steps.add( topoIterator.next() );
822
823    return steps;
824    }
825
826  @Override
827  @ProcessPrepare
828  public void prepare()
829    {
830    try
831      {
832      deleteSinksIfNotUpdate();
833      deleteTrapsIfNotUpdate();
834      deleteCheckpointsIfNotUpdate();
835      }
836    catch( IOException exception )
837      {
838      throw new FlowException( "unable to prepare flow", exception );
839      }
840    }
841
842  @Override
843  @ProcessStart
844  public synchronized void start()
845    {
846    if( thread != null )
847      return;
848
849    if( stop )
850      return;
851
852    registerShutdownHook();
853
854    internalStart();
855
856    String threadName = ( "flow " + Util.toNull( getName() ) ).trim();
857
858    thread = createFlowThread( threadName );
859
860    thread.start();
861    }
862
863  protected Thread createFlowThread( String threadName )
864    {
865    return new Thread( new Runnable()
866    {
867    @Override
868    public void run()
869      {
870      BaseFlow.this.run();
871      }
872    }, threadName );
873    }
874
875  protected abstract void internalStart();
876
877  @Override
878  @ProcessStop
879  public synchronized void stop()
880    {
881    stopLock.lock();
882
883    try
884      {
885      if( stop )
886        return;
887
888      stop = true;
889
890      fireOnStopping();
891
892      if( !flowStats.isFinished() )
893        flowStats.markStopped();
894
895      internalStopAllJobs();
896
897      handleExecutorShutdown();
898
899      internalClean( true );
900      }
901    finally
902      {
903      flowStats.cleanup();
904      stopLock.unlock();
905      }
906    }
907
908  protected abstract void internalClean( boolean stop );
909
910  @Override
911  @ProcessComplete
912  public void complete()
913    {
914    start();
915
916    try
917      {
918      try
919        {
920        synchronized( this ) // prevent NPE on quick stop() & complete() after start()
921          {
922          while( thread == null && !stop )
923            Util.safeSleep( 10 );
924          }
925
926        if( thread != null )
927          thread.join();
928        }
929      catch( InterruptedException exception )
930        {
931        throw new FlowException( getName(), "thread interrupted", exception );
932        }
933
934      // if in #stop and stopping, lets wait till its done in this thread
935      try
936        {
937        stopLock.lock();
938        }
939      finally
940        {
941        stopLock.unlock();
942        }
943
944      if( throwable instanceof FlowException )
945        ( (FlowException) throwable ).setFlowName( getName() );
946
947      if( throwable instanceof CascadingException )
948        throw (CascadingException) throwable;
949
950      if( throwable instanceof OutOfMemoryError )
951        throw (OutOfMemoryError) throwable;
952
953      if( throwable != null )
954        throw new FlowException( getName(), "unhandled exception", throwable );
955
956      if( hasListeners() )
957        {
958        for( SafeFlowListener safeFlowListener : getListeners() )
959          {
960          if( safeFlowListener.throwable != null )
961            throw new FlowException( getName(), "unhandled listener exception", throwable );
962          }
963        }
964      }
965    finally
966      {
967      thread = null;
968      throwable = null;
969      }
970    }
971
972  private void commitTraps()
973    {
974    // commit all the traps, don't fail on an error
975    for( Tap tap : traps.values() )
976      {
977      try
978        {
979        if( !tap.commitResource( getConfig() ) )
980          logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ) );
981        }
982      catch( IOException exception )
983        {
984        logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), exception );
985        }
986      }
987    }
988
989  @Override
990  @ProcessCleanup
991  public void cleanup()
992    {
993    // do nothing
994    }
995
996  @Override
997  public TupleEntryIterator openSource() throws IOException
998    {
999    return sources.values().iterator().next().openForRead( getFlowProcess() );
1000    }
1001
1002  @Override
1003  public TupleEntryIterator openSource( String name ) throws IOException
1004    {
1005    if( !sources.containsKey( name ) )
1006      throw new IllegalArgumentException( "source does not exist: " + name );
1007
1008    return sources.get( name ).openForRead( getFlowProcess() );
1009    }
1010
1011  @Override
1012  public TupleEntryIterator openSink() throws IOException
1013    {
1014    return sinks.values().iterator().next().openForRead( getFlowProcess() );
1015    }
1016
1017  @Override
1018  public TupleEntryIterator openSink( String name ) throws IOException
1019    {
1020    if( !sinks.containsKey( name ) )
1021      throw new IllegalArgumentException( "sink does not exist: " + name );
1022
1023    return sinks.get( name ).openForRead( getFlowProcess() );
1024    }
1025
1026  @Override
1027  public TupleEntryIterator openTrap() throws IOException
1028    {
1029    return traps.values().iterator().next().openForRead( getFlowProcess() );
1030    }
1031
1032  @Override
1033  public TupleEntryIterator openTrap( String name ) throws IOException
1034    {
1035    if( !traps.containsKey( name ) )
1036      throw new IllegalArgumentException( "trap does not exist: " + name );
1037
1038    return traps.get( name ).openForRead( getFlowProcess() );
1039    }
1040
1041  /**
1042   * Method deleteSinks deletes all sinks, whether or not they are configured for {@link cascading.tap.SinkMode#UPDATE}.
1043   * <p/>
1044   * Use with caution.
1045   *
1046   * @throws IOException when
1047   * @see BaseFlow#deleteSinksIfNotUpdate()
1048   */
1049  public void deleteSinks() throws IOException
1050    {
1051    for( Tap tap : sinks.values() )
1052      deleteOrFail( tap );
1053    }
1054
1055  private void deleteOrFail( Tap tap ) throws IOException
1056    {
1057    if( !tap.resourceExists( getConfig() ) )
1058      return;
1059
1060    if( !tap.deleteResource( getConfig() ) )
1061      throw new FlowException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) );
1062    }
1063
1064  /**
1065   * Method deleteSinksIfNotUpdate deletes all sinks if they are not configured with the {@link cascading.tap.SinkMode#UPDATE} flag.
1066   * <p/>
1067   * Typically used by a {@link Cascade} before executing the flow if the sinks are stale.
1068   * <p/>
1069   * Use with caution.
1070   *
1071   * @throws IOException when
1072   */
1073  public void deleteSinksIfNotUpdate() throws IOException
1074    {
1075    for( Tap tap : sinks.values() )
1076      {
1077      if( !tap.isUpdate() )
1078        deleteOrFail( tap );
1079      }
1080    }
1081
1082  public void deleteSinksIfReplace() throws IOException
1083    {
1084    for( Tap tap : sinks.values() )
1085      {
1086      if( tap.isReplace() )
1087        deleteOrFail( tap );
1088      }
1089    }
1090
1091  public void deleteTrapsIfNotUpdate() throws IOException
1092    {
1093    for( Tap tap : traps.values() )
1094      {
1095      if( !tap.isUpdate() )
1096        deleteOrFail( tap );
1097      }
1098    }
1099
1100  public void deleteCheckpointsIfNotUpdate() throws IOException
1101    {
1102    for( Tap tap : checkpoints.values() )
1103      {
1104      if( !tap.isUpdate() )
1105        deleteOrFail( tap );
1106      }
1107    }
1108
1109  public void deleteTrapsIfReplace() throws IOException
1110    {
1111    for( Tap tap : traps.values() )
1112      {
1113      if( tap.isReplace() )
1114        deleteOrFail( tap );
1115      }
1116    }
1117
1118  public void deleteCheckpointsIfReplace() throws IOException
1119    {
1120    for( Tap tap : checkpoints.values() )
1121      {
1122      if( tap.isReplace() )
1123        deleteOrFail( tap );
1124      }
1125    }
1126
1127  @Override
1128  public boolean resourceExists( Tap tap ) throws IOException
1129    {
1130    return tap.resourceExists( getConfig() );
1131    }
1132
1133  @Override
1134  public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
1135    {
1136    return tap.openForRead( getFlowProcess() );
1137    }
1138
1139  @Override
1140  public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
1141    {
1142    return tap.openForWrite( getFlowProcess() );
1143    }
1144
1145  /** Method run implements the Runnable run method and should not be called by users. */
1146  private void run()
1147    {
1148    if( thread == null )
1149      throw new IllegalStateException( "to start a Flow call start() or complete(), not Runnable#run()" );
1150
1151    Version.printBanner();
1152    Update.checkForUpdate( getPlatformInfo() );
1153
1154    try
1155      {
1156      if( stop )
1157        return;
1158
1159      flowStats.markStarted();
1160
1161      fireOnStarting();
1162
1163      if( LOG.isInfoEnabled() )
1164        {
1165        logInfo( "starting" );
1166
1167        for( Tap source : getSourcesCollection() )
1168          logInfo( " source: " + source );
1169        for( Tap sink : getSinksCollection() )
1170          logInfo( " sink: " + sink );
1171        }
1172
1173      // if jobs are run local, then only use one thread to force execution serially
1174      //int numThreads = jobsAreLocal() ? 1 : getMaxConcurrentSteps( getJobConf() );
1175      int numThreads = getMaxNumParallelSteps();
1176
1177      if( numThreads == 0 )
1178        numThreads = jobsMap.size();
1179
1180      if( numThreads == 0 )
1181        throw new IllegalStateException( "no jobs rendered for flow: " + getName() );
1182
1183      if( LOG.isInfoEnabled() )
1184        {
1185        logInfo( " parallel execution of steps is enabled: " + ( getMaxNumParallelSteps() != 1 ) );
1186        logInfo( " executing total steps: " + jobsMap.size() );
1187        logInfo( " allocating management threads: " + numThreads );
1188        }
1189
1190      List<Future<Throwable>> futures = spawnJobs( numThreads );
1191
1192      for( Future<Throwable> future : futures )
1193        {
1194        throwable = future.get();
1195
1196        if( throwable != null )
1197          {
1198          if( !stop )
1199            internalStopAllJobs();
1200
1201          handleExecutorShutdown();
1202          break;
1203          }
1204        }
1205      }
1206    catch( Throwable throwable )
1207      {
1208      this.throwable = throwable;
1209      }
1210    finally
1211      {
1212      handleThrowableAndMarkFailed();
1213
1214      if( !stop && !flowStats.isFinished() )
1215        flowStats.markSuccessful();
1216
1217      internalClean( stop ); // cleaning temp taps may be determined by success/failure
1218
1219      commitTraps();
1220
1221      try
1222        {
1223        fireOnCompleted();
1224        }
1225      finally
1226        {
1227        if( LOG.isInfoEnabled() )
1228          {
1229          long totalSliceCPUSeconds = getTotalSliceCPUMilliSeconds();
1230
1231          if( totalSliceCPUSeconds == -1 )
1232            logInfo( " completed in: " + formatDurationFromMillis( flowStats.getDuration() ) );
1233          else
1234            logInfo( " completed in: " + formatDurationFromMillis( flowStats.getDuration() ) + ", using cpu time: " + formatDurationFromMillis( totalSliceCPUSeconds ) );
1235          }
1236
1237        flowStats.cleanup();
1238        internalShutdown();
1239        deregisterShutdownHook();
1240        }
1241      }
1242    }
1243
1244  protected long getTotalSliceCPUMilliSeconds()
1245    {
1246    return -1;
1247    }
1248
1249  protected abstract int getMaxNumParallelSteps();
1250
1251  protected abstract void internalShutdown();
1252
1253  private List<Future<Throwable>> spawnJobs( int numThreads ) throws InterruptedException
1254    {
1255    if( stop )
1256      return new ArrayList<Future<Throwable>>();
1257
1258    List<Callable<Throwable>> list = new ArrayList<Callable<Throwable>>();
1259
1260    for( FlowStepJob<Config> job : jobsMap.values() )
1261      list.add( job );
1262
1263    return spawnStrategy.start( this, numThreads, list );
1264    }
1265
1266  private void handleThrowableAndMarkFailed()
1267    {
1268    if( throwable != null && !stop )
1269      {
1270      flowStats.markFailed( throwable );
1271
1272      fireOnThrowable();
1273      }
1274    }
1275
1276  Map<String, FlowStepJob<Config>> getJobsMap()
1277    {
1278    return jobsMap;
1279    }
1280
1281  protected void initializeNewJobsMap()
1282    {
1283    jobsMap = new LinkedHashMap<>(); // keep topo order
1284    Iterator<FlowStep> topoIterator = flowStepGraph.getTopologicalIterator();
1285
1286    while( topoIterator.hasNext() )
1287      {
1288      BaseFlowStep<Config> step = (BaseFlowStep) topoIterator.next();
1289      FlowStepJob<Config> flowStepJob = step.getCreateFlowStepJob( getFlowProcess(), getConfig() );
1290
1291      jobsMap.put( step.getName(), flowStepJob );
1292
1293      List<FlowStepJob<Config>> predecessors = new ArrayList<FlowStepJob<Config>>();
1294
1295      for( Object flowStep : ProcessGraphs.predecessorListOf( flowStepGraph, step ) )
1296        predecessors.add( jobsMap.get( ( (FlowStep) flowStep ).getName() ) );
1297
1298      flowStepJob.setPredecessors( predecessors );
1299
1300      flowStats.addStepStats( flowStepJob.getStepStats() );
1301      }
1302    }
1303
1304  protected void internalStopAllJobs()
1305    {
1306    logInfo( "stopping all jobs" );
1307
1308    try
1309      {
1310      if( jobsMap == null )
1311        return;
1312
1313      List<FlowStepJob<Config>> jobs = new ArrayList<FlowStepJob<Config>>( jobsMap.values() );
1314
1315      Collections.reverse( jobs );
1316
1317      for( FlowStepJob<Config> job : jobs )
1318        job.stop();
1319      }
1320    finally
1321      {
1322      logInfo( "stopped all jobs" );
1323      }
1324    }
1325
1326  protected void handleExecutorShutdown()
1327    {
1328    if( spawnStrategy.isCompleted( this ) )
1329      return;
1330
1331    logInfo( "shutting down job executor" );
1332
1333    try
1334      {
1335      spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS );
1336      }
1337    catch( InterruptedException exception )
1338      {
1339      // ignore
1340      }
1341
1342    logInfo( "shutdown complete" );
1343    }
1344
1345  protected void fireOnCompleted()
1346    {
1347    if( hasListeners() )
1348      {
1349      if( LOG.isDebugEnabled() )
1350        logDebug( "firing onCompleted event: " + getListeners().size() );
1351
1352      for( FlowListener flowListener : getListeners() )
1353        flowListener.onCompleted( this );
1354      }
1355    }
1356
1357  protected void fireOnThrowable( Throwable throwable )
1358    {
1359    this.throwable = throwable;
1360    fireOnThrowable();
1361    }
1362
1363  protected void fireOnThrowable()
1364    {
1365    if( hasListeners() )
1366      {
1367      if( LOG.isDebugEnabled() )
1368        logDebug( "firing onThrowable event: " + getListeners().size() );
1369
1370      boolean isHandled = false;
1371
1372      for( FlowListener flowListener : getListeners() )
1373        isHandled = flowListener.onThrowable( this, throwable ) || isHandled;
1374
1375      if( isHandled )
1376        throwable = null;
1377      }
1378    }
1379
1380  protected void fireOnStopping()
1381    {
1382    if( hasListeners() )
1383      {
1384      if( LOG.isDebugEnabled() )
1385        logDebug( "firing onStopping event: " + getListeners().size() );
1386
1387      for( FlowListener flowListener : getListeners() )
1388        flowListener.onStopping( this );
1389      }
1390    }
1391
1392  protected void fireOnStarting()
1393    {
1394    if( hasListeners() )
1395      {
1396      if( LOG.isDebugEnabled() )
1397        logDebug( "firing onStarting event: " + getListeners().size() );
1398
1399      for( FlowListener flowListener : getListeners() )
1400        flowListener.onStarting( this );
1401      }
1402    }
1403
1404  @Override
1405  public String toString()
1406    {
1407    StringBuffer buffer = new StringBuffer();
1408
1409    if( getName() != null )
1410      buffer.append( getName() ).append( ": " );
1411
1412    for( FlowStep step : getFlowSteps() )
1413      buffer.append( step );
1414
1415    return buffer.toString();
1416    }
1417
1418  @Override
1419  public final boolean isInfoEnabled()
1420    {
1421    return LOG.isInfoEnabled();
1422    }
1423
1424  @Override
1425  public final boolean isDebugEnabled()
1426    {
1427    return LOG.isDebugEnabled();
1428    }
1429
1430  @Override
1431  public void logInfo( String message, Object... arguments )
1432    {
1433    LOG.info( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments );
1434    }
1435
1436  @Override
1437  public void logDebug( String message, Object... arguments )
1438    {
1439    LOG.debug( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments );
1440    }
1441
1442  @Override
1443  public void logWarn( String message )
1444    {
1445    LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message );
1446    }
1447
1448  @Override
1449  public void logWarn( String message, Throwable throwable )
1450    {
1451    LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, throwable );
1452    }
1453
1454  @Override
1455  public void logWarn( String message, Object... arguments )
1456    {
1457    LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments );
1458    }
1459
1460  @Override
1461  public void logError( String message, Object... arguments )
1462    {
1463    LOG.error( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments );
1464    }
1465
1466  @Override
1467  public void logError( String message, Throwable throwable )
1468    {
1469    LOG.error( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, throwable );
1470    }
1471
1472  @Override
1473  public void writeDOT( String filename )
1474    {
1475    if( flowElementGraph == null )
1476      throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" );
1477
1478    flowElementGraph.writeDOT( filename );
1479    }
1480
1481  @Override
1482  public void writeStepsDOT( String filename )
1483    {
1484    if( flowStepGraph == null )
1485      throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" );
1486
1487    flowStepGraph.writeDOT( filename );
1488    }
1489
1490  /**
1491   * Used to return a simple wrapper for use as an edge in a graph where there can only be
1492   * one instance of every edge.
1493   *
1494   * @return FlowHolder
1495   */
1496  public FlowHolder getHolder()
1497    {
1498    return new FlowHolder( this );
1499    }
1500
1501  public void setCascade( Cascade cascade )
1502    {
1503    setConfigProperty( getConfig(), "cascading.cascade.id", cascade.getID() );
1504    flowStats.recordInfo();
1505    }
1506
1507  @Override
1508  public String getCascadeID()
1509    {
1510    return getProperty( "cascading.cascade.id" );
1511    }
1512
1513  @Override
1514  public String getRunID()
1515    {
1516    return runID;
1517    }
1518
1519  public List<String> getClassPath()
1520    {
1521    return classPath;
1522    }
1523
1524  @Override
1525  public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy )
1526    {
1527    this.spawnStrategy = spawnStrategy;
1528    }
1529
1530  @Override
1531  public UnitOfWorkSpawnStrategy getSpawnStrategy()
1532    {
1533    return spawnStrategy;
1534    }
1535
1536  protected void registerShutdownHook()
1537    {
1538    if( !isStopJobsOnExit() )
1539      return;
1540
1541    shutdownHook = new ShutdownUtil.Hook()
1542    {
1543    @Override
1544    public Priority priority()
1545      {
1546      return Priority.WORK_CHILD;
1547      }
1548
1549    @Override
1550    public void execute()
1551      {
1552      logInfo( "shutdown hook calling stop on flow" );
1553
1554      BaseFlow.this.stop();
1555      }
1556    };
1557
1558    ShutdownUtil.addHook( shutdownHook );
1559    }
1560
1561  private void deregisterShutdownHook()
1562    {
1563    if( !isStopJobsOnExit() || stop )
1564      return;
1565
1566    ShutdownUtil.removeHook( shutdownHook );
1567    }
1568
1569  /** Class FlowHolder is a helper class for wrapping Flow instances. */
1570  public static class FlowHolder
1571    {
1572    /** Field flow */
1573    public Flow flow;
1574
1575    public FlowHolder()
1576      {
1577      }
1578
1579    public FlowHolder( Flow flow )
1580      {
1581      this.flow = flow;
1582      }
1583    }
1584
1585  /**
1586   * Class SafeFlowListener safely calls a wrapped FlowListener.
1587   * <p/>
1588   * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
1589   * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method
1590   * which in turn is run in a new Thread.
1591   */
1592  private class SafeFlowListener implements FlowListener
1593    {
1594    /** Field flowListener */
1595    final FlowListener flowListener;
1596    /** Field throwable */
1597    Throwable throwable;
1598
1599    private SafeFlowListener( FlowListener flowListener )
1600      {
1601      this.flowListener = flowListener;
1602      }
1603
1604    public void onStarting( Flow flow )
1605      {
1606      try
1607        {
1608        flowListener.onStarting( flow );
1609        }
1610      catch( Throwable throwable )
1611        {
1612        handleThrowable( throwable );
1613        }
1614      }
1615
1616    public void onStopping( Flow flow )
1617      {
1618      try
1619        {
1620        flowListener.onStopping( flow );
1621        }
1622      catch( Throwable throwable )
1623        {
1624        handleThrowable( throwable );
1625        }
1626      }
1627
1628    public void onCompleted( Flow flow )
1629      {
1630      try
1631        {
1632        flowListener.onCompleted( flow );
1633        }
1634      catch( Throwable throwable )
1635        {
1636        handleThrowable( throwable );
1637        }
1638      }
1639
1640    public boolean onThrowable( Flow flow, Throwable flowThrowable )
1641      {
1642      try
1643        {
1644        return flowListener.onThrowable( flow, flowThrowable );
1645        }
1646      catch( Throwable throwable )
1647        {
1648        handleThrowable( throwable );
1649        }
1650
1651      return false;
1652      }
1653
1654    private void handleThrowable( Throwable throwable )
1655      {
1656      this.throwable = throwable;
1657
1658      logWarn( String.format( "flow listener %s threw throwable", flowListener ), throwable );
1659
1660      // stop this flow
1661      stop();
1662      }
1663
1664    public boolean equals( Object object )
1665      {
1666      if( object instanceof BaseFlow.SafeFlowListener )
1667        return flowListener.equals( ( (BaseFlow.SafeFlowListener) object ).flowListener );
1668
1669      return flowListener.equals( object );
1670      }
1671
1672    public int hashCode()
1673      {
1674      return flowListener.hashCode();
1675      }
1676    }
1677  }