001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow;
022    
023    import java.io.IOException;
024    import java.util.ArrayList;
025    import java.util.Collection;
026    import java.util.Collections;
027    import java.util.Date;
028    import java.util.HashSet;
029    import java.util.Iterator;
030    import java.util.LinkedHashMap;
031    import java.util.LinkedList;
032    import java.util.List;
033    import java.util.Map;
034    import java.util.Properties;
035    import java.util.Set;
036    import java.util.concurrent.Callable;
037    import java.util.concurrent.Future;
038    import java.util.concurrent.TimeUnit;
039    import java.util.concurrent.locks.ReentrantLock;
040    
041    import cascading.CascadingException;
042    import cascading.cascade.Cascade;
043    import cascading.flow.planner.BaseFlowStep;
044    import cascading.flow.planner.ElementGraph;
045    import cascading.flow.planner.FlowStepGraph;
046    import cascading.flow.planner.FlowStepJob;
047    import cascading.flow.planner.PlatformInfo;
048    import cascading.management.CascadingServices;
049    import cascading.management.UnitOfWorkExecutorStrategy;
050    import cascading.management.UnitOfWorkSpawnStrategy;
051    import cascading.management.state.ClientState;
052    import cascading.property.AppProps;
053    import cascading.property.PropertyUtil;
054    import cascading.stats.FlowStats;
055    import cascading.tap.Tap;
056    import cascading.tuple.Fields;
057    import cascading.tuple.TupleEntryCollector;
058    import cascading.tuple.TupleEntryIterator;
059    import cascading.util.ShutdownUtil;
060    import cascading.util.Update;
061    import cascading.util.Util;
062    import cascading.util.Version;
063    import org.jgrapht.traverse.TopologicalOrderIterator;
064    import org.slf4j.Logger;
065    import org.slf4j.LoggerFactory;
066    import riffle.process.DependencyIncoming;
067    import riffle.process.DependencyOutgoing;
068    import riffle.process.ProcessCleanup;
069    import riffle.process.ProcessComplete;
070    import riffle.process.ProcessPrepare;
071    import riffle.process.ProcessStart;
072    import riffle.process.ProcessStop;
073    
074    import static org.jgrapht.Graphs.predecessorListOf;
075    
076    @riffle.process.Process
077    public abstract class BaseFlow<Config> implements Flow<Config>
078      {
079      /** Field LOG */
080      private static final Logger LOG = LoggerFactory.getLogger( Flow.class );
081    
082      private PlatformInfo platformInfo;
083    
084      /** Field id */
085      private String id;
086      /** Field name */
087      private String name;
088      /** Fields runID */
089      private String runID;
090      /** Fields classpath */
091      private List<String> classPath; // may remain null
092      /** Field tags */
093      private String tags;
094      /** Field listeners */
095      private List<SafeFlowListener> listeners;
096      /** Field skipStrategy */
097      private FlowSkipStrategy flowSkipStrategy = new FlowSkipIfSinkNotStale();
098      /** Field flowStats */
099      protected FlowStats flowStats; // don't use a listener to set values
100      /** Field sources */
101      protected Map<String, Tap> sources = Collections.EMPTY_MAP;
102      /** Field sinks */
103      protected Map<String, Tap> sinks = Collections.EMPTY_MAP;
104      /** Field traps */
105      private Map<String, Tap> traps = Collections.EMPTY_MAP;
106      /** Field checkpoints */
107      private Map<String, Tap> checkpoints = Collections.EMPTY_MAP;
108      /** Field stopJobsOnExit */
109      protected boolean stopJobsOnExit = true;
110      /** Field submitPriority */
111      private int submitPriority = 5;
112    
113      /** Field stepGraph */
114      private FlowStepGraph<Config> flowStepGraph;
115      /** Field thread */
116      protected transient Thread thread;
117      /** Field throwable */
118      private Throwable throwable;
119      /** Field stop */
120      protected boolean stop;
121    
122      /** Field pipeGraph */
123      private ElementGraph pipeGraph; // only used for documentation purposes
124    
125      private transient CascadingServices cascadingServices;
126    
127      private FlowStepStrategy<Config> flowStepStrategy = null;
128      /** Field steps */
129      private transient List<FlowStep<Config>> steps;
130      /** Field jobsMap */
131      private transient Map<String, FlowStepJob<Config>> jobsMap;
132      private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy();
133    
134      private transient ReentrantLock stopLock = new ReentrantLock( true );
135      protected ShutdownUtil.Hook shutdownHook;
136    
137      /**
138       * Returns property stopJobsOnExit.
139       *
140       * @param properties of type Map
141       * @return a boolean
142       */
143      static boolean getStopJobsOnExit( Map<Object, Object> properties )
144        {
145        return Boolean.parseBoolean( PropertyUtil.getProperty( properties, FlowProps.STOP_JOBS_ON_EXIT, "true" ) );
146        }
147    
148      /** Used for testing. */
149      protected BaseFlow()
150        {
151        this.name = "NA";
152        this.flowStats = createPrepareFlowStats();
153        }
154    
155      protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name )
156        {
157        this.platformInfo = platformInfo;
158        this.name = name;
159        addSessionProperties( properties );
160        initConfig( properties, defaultConfig );
161    
162        this.flowStats = createPrepareFlowStats(); // must be last
163        }
164    
165      protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, FlowDef flowDef )
166        {
167        this.platformInfo = platformInfo;
168        this.name = flowDef.getName();
169        this.tags = flowDef.getTags();
170        this.runID = flowDef.getRunID();
171        this.classPath = flowDef.getClassPath();
172    
173        addSessionProperties( properties );
174        initConfig( properties, defaultConfig );
175        setSources( flowDef.getSourcesCopy() );
176        setSinks( flowDef.getSinksCopy() );
177        setTraps( flowDef.getTrapsCopy() );
178        setCheckpoints( flowDef.getCheckpointsCopy() );
179        initFromTaps();
180    
181        retrieveSourceFields();
182        retrieveSinkFields();
183        }
184    
185      public PlatformInfo getPlatformInfo()
186        {
187        return platformInfo;
188        }
189    
190      public void initialize( ElementGraph pipeGraph, FlowStepGraph<Config> flowStepGraph )
191        {
192        this.pipeGraph = pipeGraph;
193        this.flowStepGraph = flowStepGraph;
194    
195        initSteps();
196    
197        this.flowStats = createPrepareFlowStats(); // must be last
198    
199        initializeNewJobsMap();
200        }
201    
202      public ElementGraph updateSchemes( ElementGraph pipeGraph )
203        {
204        presentSourceFields( pipeGraph );
205    
206        presentSinkFields( pipeGraph );
207    
208        return new ElementGraph( pipeGraph );
209        }
210    
211      /** Force a Scheme to fetch any fields from a meta-data store */
212      protected void retrieveSourceFields()
213        {
214        for( Tap tap : sources.values() )
215          tap.retrieveSourceFields( getFlowProcess() );
216        }
217    
218      /**
219       * Present the current resolved fields for the Tap
220       *
221       * @param pipeGraph
222       */
223      protected void presentSourceFields( ElementGraph pipeGraph )
224        {
225        for( Tap tap : sources.values() )
226          {
227          if( pipeGraph.containsVertex( tap ) )
228            tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
229          }
230    
231        for( Tap tap : checkpoints.values() )
232          {
233          if( pipeGraph.containsVertex( tap ) )
234            tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
235          }
236        }
237    
238      /** Force a Scheme to fetch any fields from a meta-data store */
239      protected void retrieveSinkFields()
240        {
241        for( Tap tap : sinks.values() )
242          tap.retrieveSinkFields( getFlowProcess() );
243        }
244    
245      /**
246       * Present the current resolved fields for the Tap
247       *
248       * @param pipeGraph
249       */
250      protected void presentSinkFields( ElementGraph pipeGraph )
251        {
252        for( Tap tap : sinks.values() )
253          {
254          if( pipeGraph.containsVertex( tap ) )
255            tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
256          }
257    
258        for( Tap tap : checkpoints.values() )
259          {
260          if( pipeGraph.containsVertex( tap ) )
261            tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
262          }
263        }
264    
265      protected Fields getFieldsFor( ElementGraph pipeGraph, Tap tap )
266        {
267        return pipeGraph.outgoingEdgesOf( tap ).iterator().next().getOutValuesFields();
268        }
269    
270      private void addSessionProperties( Map<Object, Object> properties )
271        {
272        if( properties == null )
273          return;
274    
275        PropertyUtil.setProperty( properties, CASCADING_FLOW_ID, getID() );
276        PropertyUtil.setProperty( properties, "cascading.flow.tags", getTags() );
277        AppProps.setApplicationID( properties );
278        PropertyUtil.setProperty( properties, "cascading.app.name", makeAppName( properties ) );
279        PropertyUtil.setProperty( properties, "cascading.app.version", makeAppVersion( properties ) );
280        }
281    
282      private String makeAppName( Map<Object, Object> properties )
283        {
284        if( properties == null )
285          return null;
286    
287        String name = AppProps.getApplicationName( properties );
288    
289        if( name != null )
290          return name;
291    
292        return Util.findName( AppProps.getApplicationJarPath( properties ) );
293        }
294    
295      private String makeAppVersion( Map<Object, Object> properties )
296        {
297        if( properties == null )
298          return null;
299    
300        String name = AppProps.getApplicationVersion( properties );
301    
302        if( name != null )
303          return name;
304    
305        return Util.findVersion( AppProps.getApplicationJarPath( properties ) );
306        }
307    
308      private FlowStats createPrepareFlowStats()
309        {
310        FlowStats flowStats = new FlowStats( this, getClientState() );
311    
312        flowStats.prepare();
313        flowStats.markPending();
314    
315        return flowStats;
316        }
317    
318      public CascadingServices getCascadingServices()
319        {
320        if( cascadingServices == null )
321          cascadingServices = new CascadingServices( getConfigAsProperties() );
322    
323        return cascadingServices;
324        }
325    
326      private ClientState getClientState()
327        {
328        return getFlowSession().getCascadingServices().createClientState( getID() );
329        }
330    
331      protected void initSteps()
332        {
333        if( flowStepGraph == null )
334          return;
335    
336        for( Object flowStep : flowStepGraph.vertexSet() )
337          ( (BaseFlowStep<Config>) flowStep ).setFlow( this );
338        }
339    
340      private void initFromTaps()
341        {
342        initFromTaps( sources );
343        initFromTaps( sinks );
344        initFromTaps( traps );
345        }
346    
347      private void initFromTaps( Map<String, Tap> taps )
348        {
349        for( Tap tap : taps.values() )
350          tap.flowConfInit( this );
351        }
352    
353      @Override
354      public String getName()
355        {
356        return name;
357        }
358    
359      protected void setName( String name )
360        {
361        this.name = name;
362        }
363    
364      @Override
365      public String getID()
366        {
367        if( id == null )
368          id = Util.createUniqueID();
369    
370        return id;
371        }
372    
373      @Override
374      public String getTags()
375        {
376        return tags;
377        }
378    
379      @Override
380      public int getSubmitPriority()
381        {
382        return submitPriority;
383        }
384    
385      @Override
386      public void setSubmitPriority( int submitPriority )
387        {
388        if( submitPriority < 1 || submitPriority > 10 )
389          throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority );
390    
391        this.submitPriority = submitPriority;
392        }
393    
394      ElementGraph getPipeGraph()
395        {
396        return pipeGraph;
397        }
398    
399      FlowStepGraph getFlowStepGraph()
400        {
401        return flowStepGraph;
402        }
403    
404      protected void setSources( Map<String, Tap> sources )
405        {
406        addListeners( sources.values() );
407        this.sources = sources;
408        }
409    
410      protected void setSinks( Map<String, Tap> sinks )
411        {
412        addListeners( sinks.values() );
413        this.sinks = sinks;
414        }
415    
416      protected void setTraps( Map<String, Tap> traps )
417        {
418        addListeners( traps.values() );
419        this.traps = traps;
420        }
421    
422      protected void setCheckpoints( Map<String, Tap> checkpoints )
423        {
424        addListeners( checkpoints.values() );
425        this.checkpoints = checkpoints;
426        }
427    
428      protected void setFlowStepGraph( FlowStepGraph flowStepGraph )
429        {
430        this.flowStepGraph = flowStepGraph;
431        }
432    
433      /**
434       * This method creates a new internal Config with the parentConfig as defaults using the properties to override
435       * the defaults.
436       *
437       * @param properties   of type Map
438       * @param parentConfig of type Config
439       */
440      protected abstract void initConfig( Map<Object, Object> properties, Config parentConfig );
441    
442      public Config createConfig( Map<Object, Object> properties, Config defaultConfig )
443        {
444        Config config = newConfig( defaultConfig );
445    
446        if( properties == null )
447          return config;
448    
449        Set<Object> keys = new HashSet<Object>( properties.keySet() );
450    
451        // keys will only be grabbed if both key/value are String, so keep orig keys
452        if( properties instanceof Properties )
453          keys.addAll( ( (Properties) properties ).stringPropertyNames() );
454    
455        for( Object key : keys )
456          {
457          Object value = properties.get( key );
458    
459          if( value == null && properties instanceof Properties && key instanceof String )
460            value = ( (Properties) properties ).getProperty( (String) key );
461    
462          if( value == null ) // don't stuff null values
463            continue;
464    
465          setConfigProperty( config, key, value );
466          }
467    
468        return config;
469        }
470    
471      protected abstract void setConfigProperty( Config config, Object key, Object value );
472    
473      protected abstract Config newConfig( Config defaultConfig );
474    
475      protected void initFromProperties( Map<Object, Object> properties )
476        {
477        stopJobsOnExit = getStopJobsOnExit( properties );
478        }
479    
480      public FlowSession getFlowSession()
481        {
482        return new FlowSession( getCascadingServices() );
483        }
484    
485      @Override
486      public FlowStats getFlowStats()
487        {
488        return flowStats;
489        }
490    
491      @Override
492      public FlowStats getStats()
493        {
494        return getFlowStats();
495        }
496    
497      void addListeners( Collection listeners )
498        {
499        for( Object listener : listeners )
500          {
501          if( listener instanceof FlowListener )
502            addListener( (FlowListener) listener );
503          }
504        }
505    
506      List<SafeFlowListener> getListeners()
507        {
508        if( listeners == null )
509          listeners = new LinkedList<SafeFlowListener>();
510    
511        return listeners;
512        }
513    
514      @Override
515      public boolean hasListeners()
516        {
517        return listeners != null && !listeners.isEmpty();
518        }
519    
520      @Override
521      public void addListener( FlowListener flowListener )
522        {
523        getListeners().add( new SafeFlowListener( flowListener ) );
524        }
525    
526      @Override
527      public boolean removeListener( FlowListener flowListener )
528        {
529        return getListeners().remove( new SafeFlowListener( flowListener ) );
530        }
531    
532      @Override
533      public boolean hasStepListeners()
534        {
535        boolean hasStepListeners = false;
536    
537        for( FlowStep step : getFlowSteps() )
538          hasStepListeners |= step.hasListeners();
539    
540        return hasStepListeners;
541        }
542    
543      @Override
544      public void addStepListener( FlowStepListener flowStepListener )
545        {
546        for( FlowStep step : getFlowSteps() )
547          step.addListener( flowStepListener );
548        }
549    
550      @Override
551      public boolean removeStepListener( FlowStepListener flowStepListener )
552        {
553        boolean listenerRemoved = true;
554    
555        for( FlowStep step : getFlowSteps() )
556          listenerRemoved &= step.removeListener( flowStepListener );
557    
558        return listenerRemoved;
559        }
560    
561      @Override
562      public Map<String, Tap> getSources()
563        {
564        return Collections.unmodifiableMap( sources );
565        }
566    
567      @Override
568      public List<String> getSourceNames()
569        {
570        return new ArrayList<String>( sources.keySet() );
571        }
572    
573      @Override
574      public Tap getSource( String name )
575        {
576        return sources.get( name );
577        }
578    
579      @Override
580      @DependencyIncoming
581      public Collection<Tap> getSourcesCollection()
582        {
583        return getSources().values();
584        }
585    
586      @Override
587      public Map<String, Tap> getSinks()
588        {
589        return Collections.unmodifiableMap( sinks );
590        }
591    
592      @Override
593      public List<String> getSinkNames()
594        {
595        return new ArrayList<String>( sinks.keySet() );
596        }
597    
598      @Override
599      public Tap getSink( String name )
600        {
601        return sinks.get( name );
602        }
603    
604      @Override
605      @DependencyOutgoing
606      public Collection<Tap> getSinksCollection()
607        {
608        return getSinks().values();
609        }
610    
611      @Override
612      public Tap getSink()
613        {
614        return sinks.values().iterator().next();
615        }
616    
617      @Override
618      public Map<String, Tap> getTraps()
619        {
620        return Collections.unmodifiableMap( traps );
621        }
622    
623      @Override
624      public List<String> getTrapNames()
625        {
626        return new ArrayList<String>( traps.keySet() );
627        }
628    
629      @Override
630      public Collection<Tap> getTrapsCollection()
631        {
632        return getTraps().values();
633        }
634    
635      @Override
636      public Map<String, Tap> getCheckpoints()
637        {
638        return Collections.unmodifiableMap( checkpoints );
639        }
640    
641      @Override
642      public List<String> getCheckpointNames()
643        {
644        return new ArrayList<String>( checkpoints.keySet() );
645        }
646    
647      @Override
648      public Collection<Tap> getCheckpointsCollection()
649        {
650        return getCheckpoints().values();
651        }
652    
653      @Override
654      public boolean isStopJobsOnExit()
655        {
656        return stopJobsOnExit;
657        }
658    
659      @Override
660      public FlowSkipStrategy getFlowSkipStrategy()
661        {
662        return flowSkipStrategy;
663        }
664    
665      @Override
666      public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy )
667        {
668        if( flowSkipStrategy == null )
669          throw new IllegalArgumentException( "flowSkipStrategy may not be null" );
670    
671        try
672          {
673          return this.flowSkipStrategy;
674          }
675        finally
676          {
677          this.flowSkipStrategy = flowSkipStrategy;
678          }
679        }
680    
681      @Override
682      public boolean isSkipFlow() throws IOException
683        {
684        return flowSkipStrategy.skipFlow( this );
685        }
686    
687      @Override
688      public boolean areSinksStale() throws IOException
689        {
690        return areSourcesNewer( getSinkModified() );
691        }
692    
693      @Override
694      public boolean areSourcesNewer( long sinkModified ) throws IOException
695        {
696        Config confCopy = getConfigCopy();
697        Iterator<Tap> values = sources.values().iterator();
698    
699        long sourceModified = 0;
700    
701        try
702          {
703          sourceModified = Util.getSourceModified( confCopy, values, sinkModified );
704    
705          if( sinkModified < sourceModified )
706            return true;
707    
708          return false;
709          }
710        finally
711          {
712          if( LOG.isInfoEnabled() )
713            logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all
714          }
715        }
716    
717      @Override
718      public long getSinkModified() throws IOException
719        {
720        long sinkModified = Util.getSinkModified( getConfigCopy(), sinks.values() );
721    
722        if( LOG.isInfoEnabled() )
723          {
724          if( sinkModified == -1L )
725            logInfo( "at least one sink is marked for delete" );
726          if( sinkModified == 0L )
727            logInfo( "at least one sink does not exist" );
728          else
729            logInfo( "sink oldest modified date: " + new Date( sinkModified ) );
730          }
731    
732        return sinkModified;
733        }
734    
735      @Override
736      public FlowStepStrategy getFlowStepStrategy()
737        {
738        return flowStepStrategy;
739        }
740    
741      @Override
742      public void setFlowStepStrategy( FlowStepStrategy flowStepStrategy )
743        {
744        this.flowStepStrategy = flowStepStrategy;
745        }
746    
747      @Override
748      public List<FlowStep<Config>> getFlowSteps()
749        {
750        if( steps != null )
751          return steps;
752    
753        if( flowStepGraph == null )
754          return Collections.EMPTY_LIST;
755    
756        TopologicalOrderIterator<FlowStep<Config>, Integer> topoIterator = flowStepGraph.getTopologicalIterator();
757    
758        steps = new ArrayList<FlowStep<Config>>();
759    
760        while( topoIterator.hasNext() )
761          steps.add( topoIterator.next() );
762    
763        return steps;
764        }
765    
766      @Override
767      @ProcessPrepare
768      public void prepare()
769        {
770        try
771          {
772          deleteSinksIfNotUpdate();
773          deleteTrapsIfNotUpdate();
774          deleteCheckpointsIfNotUpdate();
775          }
776        catch( IOException exception )
777          {
778          throw new FlowException( "unable to prepare flow", exception );
779          }
780        }
781    
782      @Override
783      @ProcessStart
784      public synchronized void start()
785        {
786        if( thread != null )
787          return;
788    
789        if( stop )
790          return;
791    
792        registerShutdownHook();
793    
794        internalStart();
795    
796        String threadName = ( "flow " + Util.toNull( getName() ) ).trim();
797    
798        thread = createFlowThread( threadName );
799    
800        thread.start();
801        }
802    
803      protected Thread createFlowThread( String threadName )
804        {
805        return new Thread( new Runnable()
806        {
807        @Override
808        public void run()
809          {
810          BaseFlow.this.run();
811          }
812        }, threadName );
813        }
814    
815      protected abstract void internalStart();
816    
817      @Override
818      @ProcessStop
819      public synchronized void stop()
820        {
821        stopLock.lock();
822    
823        try
824          {
825          if( stop )
826            return;
827    
828          stop = true;
829    
830          fireOnStopping();
831    
832          if( !flowStats.isFinished() )
833            flowStats.markStopped();
834    
835          internalStopAllJobs();
836    
837          handleExecutorShutdown();
838    
839          internalClean( true );
840          }
841        finally
842          {
843          flowStats.cleanup();
844          stopLock.unlock();
845          }
846        }
847    
848      protected abstract void internalClean( boolean stop );
849    
850      @Override
851      @ProcessComplete
852      public void complete()
853        {
854        start();
855    
856        try
857          {
858          try
859            {
860            synchronized( this ) // prevent NPE on quick stop() & complete() after start()
861              {
862              while( thread == null && !stop )
863                Util.safeSleep( 10 );
864              }
865    
866            if( thread != null )
867              thread.join();
868            }
869          catch( InterruptedException exception )
870            {
871            throw new FlowException( getName(), "thread interrupted", exception );
872            }
873    
874          // if in #stop and stopping, lets wait till its done in this thread
875          try
876            {
877            stopLock.lock();
878            }
879          finally
880            {
881            stopLock.unlock();
882            }
883    
884          if( throwable instanceof FlowException )
885            ( (FlowException) throwable ).setFlowName( getName() );
886    
887          if( throwable instanceof CascadingException )
888            throw (CascadingException) throwable;
889    
890          if( throwable instanceof OutOfMemoryError )
891            throw (OutOfMemoryError) throwable;
892    
893          if( throwable != null )
894            throw new FlowException( getName(), "unhandled exception", throwable );
895    
896          if( hasListeners() )
897            {
898            for( SafeFlowListener safeFlowListener : getListeners() )
899              {
900              if( safeFlowListener.throwable != null )
901                throw new FlowException( getName(), "unhandled listener exception", throwable );
902              }
903            }
904          }
905        finally
906          {
907          thread = null;
908          throwable = null;
909    
910          try
911            {
912            commitTraps();
913    
914            if( hasListeners() )
915              {
916              for( SafeFlowListener safeFlowListener : getListeners() )
917                safeFlowListener.throwable = null;
918              }
919            }
920          finally
921            {
922            flowStats.cleanup();
923            }
924          }
925        }
926    
927      private void commitTraps()
928        {
929        // commit all the traps, don't fail on an error
930    
931        for( Tap tap : traps.values() )
932          {
933          try
934            {
935            if( !tap.commitResource( getConfig() ) )
936              logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), null );
937            }
938          catch( IOException exception )
939            {
940            logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), exception );
941            }
942          }
943        }
944    
945      @Override
946      @ProcessCleanup
947      public void cleanup()
948        {
949        // do nothing
950        }
951    
952      @Override
953      public TupleEntryIterator openSource() throws IOException
954        {
955        return sources.values().iterator().next().openForRead( getFlowProcess() );
956        }
957    
958      @Override
959      public TupleEntryIterator openSource( String name ) throws IOException
960        {
961        if( !sources.containsKey( name ) )
962          throw new IllegalArgumentException( "source does not exist: " + name );
963    
964        return sources.get( name ).openForRead( getFlowProcess() );
965        }
966    
967      @Override
968      public TupleEntryIterator openSink() throws IOException
969        {
970        return sinks.values().iterator().next().openForRead( getFlowProcess() );
971        }
972    
973      @Override
974      public TupleEntryIterator openSink( String name ) throws IOException
975        {
976        if( !sinks.containsKey( name ) )
977          throw new IllegalArgumentException( "sink does not exist: " + name );
978    
979        return sinks.get( name ).openForRead( getFlowProcess() );
980        }
981    
982      @Override
983      public TupleEntryIterator openTrap() throws IOException
984        {
985        return traps.values().iterator().next().openForRead( getFlowProcess() );
986        }
987    
988      @Override
989      public TupleEntryIterator openTrap( String name ) throws IOException
990        {
991        if( !traps.containsKey( name ) )
992          throw new IllegalArgumentException( "trap does not exist: " + name );
993    
994        return traps.get( name ).openForRead( getFlowProcess() );
995        }
996    
997      /**
998       * Method deleteSinks deletes all sinks, whether or not they are configured for {@link cascading.tap.SinkMode#UPDATE}.
999       * <p/>
1000       * Use with caution.
1001       *
1002       * @throws IOException when
1003       * @see BaseFlow#deleteSinksIfNotUpdate()
1004       */
1005      public void deleteSinks() throws IOException
1006        {
1007        for( Tap tap : sinks.values() )
1008          deleteOrFail( tap );
1009        }
1010    
1011      private void deleteOrFail( Tap tap ) throws IOException
1012        {
1013        if( !tap.resourceExists( getConfig() ) )
1014          return;
1015    
1016        if( !tap.deleteResource( getConfig() ) )
1017          throw new FlowException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) );
1018        }
1019    
1020      /**
1021       * Method deleteSinksIfNotUpdate deletes all sinks if they are not configured with the {@link cascading.tap.SinkMode#UPDATE} flag.
1022       * <p/>
1023       * Typically used by a {@link Cascade} before executing the flow if the sinks are stale.
1024       * <p/>
1025       * Use with caution.
1026       *
1027       * @throws IOException when
1028       */
1029      public void deleteSinksIfNotUpdate() throws IOException
1030        {
1031        for( Tap tap : sinks.values() )
1032          {
1033          if( !tap.isUpdate() )
1034            deleteOrFail( tap );
1035          }
1036        }
1037    
1038      public void deleteSinksIfReplace() throws IOException
1039        {
1040        for( Tap tap : sinks.values() )
1041          {
1042          if( tap.isReplace() )
1043            deleteOrFail( tap );
1044          }
1045        }
1046    
1047      public void deleteTrapsIfNotUpdate() throws IOException
1048        {
1049        for( Tap tap : traps.values() )
1050          {
1051          if( !tap.isUpdate() )
1052            deleteOrFail( tap );
1053          }
1054        }
1055    
1056      public void deleteCheckpointsIfNotUpdate() throws IOException
1057        {
1058        for( Tap tap : checkpoints.values() )
1059          {
1060          if( !tap.isUpdate() )
1061            deleteOrFail( tap );
1062          }
1063        }
1064    
1065      public void deleteTrapsIfReplace() throws IOException
1066        {
1067        for( Tap tap : traps.values() )
1068          {
1069          if( tap.isReplace() )
1070            deleteOrFail( tap );
1071          }
1072        }
1073    
1074      public void deleteCheckpointsIfReplace() throws IOException
1075        {
1076        for( Tap tap : checkpoints.values() )
1077          {
1078          if( tap.isReplace() )
1079            deleteOrFail( tap );
1080          }
1081        }
1082    
1083      @Override
1084      public boolean resourceExists( Tap tap ) throws IOException
1085        {
1086        return tap.resourceExists( getConfig() );
1087        }
1088    
1089      @Override
1090      public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
1091        {
1092        return tap.openForRead( getFlowProcess() );
1093        }
1094    
1095      @Override
1096      public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
1097        {
1098        return tap.openForWrite( getFlowProcess() );
1099        }
1100    
1101      /** Method run implements the Runnable run method and should not be called by users. */
1102      private void run()
1103        {
1104        if( thread == null )
1105          throw new IllegalStateException( "to start a Flow call start() or complete(), not Runnable#run()" );
1106    
1107        Version.printBanner();
1108        Update.checkForUpdate( getPlatformInfo() );
1109    
1110        try
1111          {
1112          if( stop )
1113            return;
1114    
1115          flowStats.markStarted();
1116    
1117          fireOnStarting();
1118    
1119          if( LOG.isInfoEnabled() )
1120            {
1121            logInfo( "starting" );
1122    
1123            for( Tap source : getSourcesCollection() )
1124              logInfo( " source: " + source );
1125            for( Tap sink : getSinksCollection() )
1126              logInfo( " sink: " + sink );
1127            }
1128    
1129          // if jobs are run local, then only use one thread to force execution serially
1130          //int numThreads = jobsAreLocal() ? 1 : getMaxConcurrentSteps( getJobConf() );
1131          int numThreads = getMaxNumParallelSteps();
1132    
1133          if( numThreads == 0 )
1134            numThreads = jobsMap.size();
1135    
1136          if( numThreads == 0 )
1137            throw new IllegalStateException( "no jobs rendered for flow: " + getName() );
1138    
1139          if( LOG.isInfoEnabled() )
1140            {
1141            logInfo( " parallel execution is enabled: " + ( getMaxNumParallelSteps() != 1 ) );
1142            logInfo( " starting jobs: " + jobsMap.size() );
1143            logInfo( " allocating threads: " + numThreads );
1144            }
1145    
1146          List<Future<Throwable>> futures = spawnJobs( numThreads );
1147    
1148          for( Future<Throwable> future : futures )
1149            {
1150            throwable = future.get();
1151    
1152            if( throwable != null )
1153              {
1154              if( !stop )
1155                internalStopAllJobs();
1156    
1157              handleExecutorShutdown();
1158              break;
1159              }
1160            }
1161          }
1162        catch( Throwable throwable )
1163          {
1164          this.throwable = throwable;
1165          }
1166        finally
1167          {
1168          handleThrowableAndMarkFailed();
1169    
1170          if( !stop && !flowStats.isFinished() )
1171            flowStats.markSuccessful();
1172    
1173          internalClean( stop ); // cleaning temp taps may be determined by success/failure
1174    
1175          try
1176            {
1177            fireOnCompleted();
1178            }
1179          finally
1180            {
1181            flowStats.cleanup();
1182            internalShutdown();
1183            deregisterShutdownHook();
1184            }
1185          }
1186        }
1187    
1188      protected abstract int getMaxNumParallelSteps();
1189    
1190      protected abstract void internalShutdown();
1191    
1192      private List<Future<Throwable>> spawnJobs( int numThreads ) throws InterruptedException
1193        {
1194        if( stop )
1195          return new ArrayList<Future<Throwable>>();
1196    
1197        List<Callable<Throwable>> list = new ArrayList<Callable<Throwable>>();
1198    
1199        for( FlowStepJob<Config> job : jobsMap.values() )
1200          list.add( job );
1201    
1202        return spawnStrategy.start( this, numThreads, list );
1203        }
1204    
1205      private void handleThrowableAndMarkFailed()
1206        {
1207        if( throwable != null && !stop )
1208          {
1209          flowStats.markFailed( throwable );
1210    
1211          fireOnThrowable();
1212          }
1213        }
1214    
1215      Map<String, FlowStepJob<Config>> getJobsMap()
1216        {
1217        return jobsMap;
1218        }
1219    
1220      protected void initializeNewJobsMap()
1221        {
1222        // keep topo order
1223        jobsMap = new LinkedHashMap<String, FlowStepJob<Config>>();
1224        TopologicalOrderIterator<FlowStep<Config>, Integer> topoIterator = flowStepGraph.getTopologicalIterator();
1225    
1226        while( topoIterator.hasNext() )
1227          {
1228          BaseFlowStep<Config> step = (BaseFlowStep<Config>) topoIterator.next();
1229          FlowStepJob<Config> flowStepJob = step.getFlowStepJob( getFlowProcess(), getConfig() );
1230    
1231          jobsMap.put( step.getName(), flowStepJob );
1232    
1233          List<FlowStepJob<Config>> predecessors = new ArrayList<FlowStepJob<Config>>();
1234    
1235          for( Object flowStep : predecessorListOf( flowStepGraph, step ) )
1236            predecessors.add( jobsMap.get( ( (FlowStep<Config>) flowStep ).getName() ) );
1237    
1238          flowStepJob.setPredecessors( predecessors );
1239    
1240          flowStats.addStepStats( flowStepJob.getStepStats() );
1241          }
1242        }
1243    
1244      protected void internalStopAllJobs()
1245        {
1246        logInfo( "stopping all jobs" );
1247    
1248        try
1249          {
1250          if( jobsMap == null )
1251            return;
1252    
1253          List<FlowStepJob<Config>> jobs = new ArrayList<FlowStepJob<Config>>( jobsMap.values() );
1254    
1255          Collections.reverse( jobs );
1256    
1257          for( FlowStepJob<Config> job : jobs )
1258            job.stop();
1259          }
1260        finally
1261          {
1262          logInfo( "stopped all jobs" );
1263          }
1264        }
1265    
1266      protected void handleExecutorShutdown()
1267        {
1268        if( spawnStrategy.isCompleted( this ) )
1269          return;
1270    
1271        logInfo( "shutting down job executor" );
1272    
1273        try
1274          {
1275          spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS );
1276          }
1277        catch( InterruptedException exception )
1278          {
1279          // ignore
1280          }
1281    
1282        logInfo( "shutdown complete" );
1283        }
1284    
1285      protected void fireOnCompleted()
1286        {
1287        if( hasListeners() )
1288          {
1289          if( LOG.isDebugEnabled() )
1290            logDebug( "firing onCompleted event: " + getListeners().size() );
1291    
1292          for( FlowListener flowListener : getListeners() )
1293            flowListener.onCompleted( this );
1294          }
1295        }
1296    
1297      protected void fireOnThrowable()
1298        {
1299        if( hasListeners() )
1300          {
1301          if( LOG.isDebugEnabled() )
1302            logDebug( "firing onThrowable event: " + getListeners().size() );
1303    
1304          boolean isHandled = false;
1305    
1306          for( FlowListener flowListener : getListeners() )
1307            isHandled = flowListener.onThrowable( this, throwable ) || isHandled;
1308    
1309          if( isHandled )
1310            throwable = null;
1311          }
1312        }
1313    
1314      protected void fireOnStopping()
1315        {
1316        if( hasListeners() )
1317          {
1318          if( LOG.isDebugEnabled() )
1319            logDebug( "firing onStopping event: " + getListeners().size() );
1320    
1321          for( FlowListener flowListener : getListeners() )
1322            flowListener.onStopping( this );
1323          }
1324        }
1325    
1326      protected void fireOnStarting()
1327        {
1328        if( hasListeners() )
1329          {
1330          if( LOG.isDebugEnabled() )
1331            logDebug( "firing onStarting event: " + getListeners().size() );
1332    
1333          for( FlowListener flowListener : getListeners() )
1334            flowListener.onStarting( this );
1335          }
1336        }
1337    
1338      @Override
1339      public String toString()
1340        {
1341        StringBuffer buffer = new StringBuffer();
1342    
1343        if( getName() != null )
1344          buffer.append( getName() ).append( ": " );
1345    
1346        for( FlowStep step : getFlowSteps() )
1347          buffer.append( step );
1348    
1349        return buffer.toString();
1350        }
1351    
1352      protected void logInfo( String message )
1353        {
1354        LOG.info( "[" + Util.truncate( getName(), 25 ) + "] " + message );
1355        }
1356    
1357      private void logDebug( String message )
1358        {
1359        LOG.debug( "[" + Util.truncate( getName(), 25 ) + "] " + message );
1360        }
1361    
1362      private void logWarn( String message, Throwable throwable )
1363        {
1364        LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable );
1365        }
1366    
1367      private void logError( String message, Throwable throwable )
1368        {
1369        LOG.error( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable );
1370        }
1371    
1372      @Override
1373      public void writeDOT( String filename )
1374        {
1375        if( pipeGraph == null )
1376          throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" );
1377    
1378        pipeGraph.writeDOT( filename );
1379        }
1380    
1381      @Override
1382      public void writeStepsDOT( String filename )
1383        {
1384        if( flowStepGraph == null )
1385          throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" );
1386    
1387        flowStepGraph.writeDOT( filename );
1388        }
1389    
1390      /**
1391       * Used to return a simple wrapper for use as an edge in a graph where there can only be
1392       * one instance of every edge.
1393       *
1394       * @return FlowHolder
1395       */
1396      public FlowHolder getHolder()
1397        {
1398        return new FlowHolder( this );
1399        }
1400    
1401      public void setCascade( Cascade cascade )
1402        {
1403        setConfigProperty( getConfig(), "cascading.cascade.id", cascade.getID() );
1404        flowStats.recordInfo();
1405        }
1406    
1407      @Override
1408      public String getCascadeID()
1409        {
1410        return getProperty( "cascading.cascade.id" );
1411        }
1412    
1413      @Override
1414      public String getRunID()
1415        {
1416        return runID;
1417        }
1418    
1419      protected List<String> getClassPath()
1420        {
1421        return classPath;
1422        }
1423    
1424      @Override
1425      public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy )
1426        {
1427        this.spawnStrategy = spawnStrategy;
1428        }
1429    
1430      @Override
1431      public UnitOfWorkSpawnStrategy getSpawnStrategy()
1432        {
1433        return spawnStrategy;
1434        }
1435    
1436      protected void registerShutdownHook()
1437        {
1438        if( !isStopJobsOnExit() )
1439          return;
1440    
1441        shutdownHook = new ShutdownUtil.Hook()
1442        {
1443        @Override
1444        public Priority priority()
1445          {
1446          return Priority.WORK_CHILD;
1447          }
1448    
1449        @Override
1450        public void execute()
1451          {
1452          logInfo( "shutdown hook calling stop on flow" );
1453    
1454          BaseFlow.this.stop();
1455          }
1456        };
1457    
1458        ShutdownUtil.addHook( shutdownHook );
1459        }
1460    
1461      private void deregisterShutdownHook()
1462        {
1463        if( !isStopJobsOnExit() || stop )
1464          return;
1465    
1466        ShutdownUtil.removeHook( shutdownHook );
1467        }
1468    
1469      /** Class FlowHolder is a helper class for wrapping Flow instances. */
1470      public static class FlowHolder
1471        {
1472        /** Field flow */
1473        public Flow flow;
1474    
1475        public FlowHolder()
1476          {
1477          }
1478    
1479        public FlowHolder( Flow flow )
1480          {
1481          this.flow = flow;
1482          }
1483        }
1484    
1485      /**
1486       * Class SafeFlowListener safely calls a wrapped FlowListener.
1487       * <p/>
1488       * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
1489       * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method
1490       * which in turn is run in a new Thread.
1491       */
1492      private class SafeFlowListener implements FlowListener
1493        {
1494        /** Field flowListener */
1495        final FlowListener flowListener;
1496        /** Field throwable */
1497        Throwable throwable;
1498    
1499        private SafeFlowListener( FlowListener flowListener )
1500          {
1501          this.flowListener = flowListener;
1502          }
1503    
1504        public void onStarting( Flow flow )
1505          {
1506          try
1507            {
1508            flowListener.onStarting( flow );
1509            }
1510          catch( Throwable throwable )
1511            {
1512            handleThrowable( throwable );
1513            }
1514          }
1515    
1516        public void onStopping( Flow flow )
1517          {
1518          try
1519            {
1520            flowListener.onStopping( flow );
1521            }
1522          catch( Throwable throwable )
1523            {
1524            handleThrowable( throwable );
1525            }
1526          }
1527    
1528        public void onCompleted( Flow flow )
1529          {
1530          try
1531            {
1532            flowListener.onCompleted( flow );
1533            }
1534          catch( Throwable throwable )
1535            {
1536            handleThrowable( throwable );
1537            }
1538          }
1539    
1540        public boolean onThrowable( Flow flow, Throwable flowThrowable )
1541          {
1542          try
1543            {
1544            return flowListener.onThrowable( flow, flowThrowable );
1545            }
1546          catch( Throwable throwable )
1547            {
1548            handleThrowable( throwable );
1549            }
1550    
1551          return false;
1552          }
1553    
1554        private void handleThrowable( Throwable throwable )
1555          {
1556          this.throwable = throwable;
1557    
1558          logWarn( String.format( "flow listener %s threw throwable", flowListener ), throwable );
1559    
1560          // stop this flow
1561          stop();
1562          }
1563    
1564        public boolean equals( Object object )
1565          {
1566          if( object instanceof BaseFlow.SafeFlowListener )
1567            return flowListener.equals( ( (BaseFlow.SafeFlowListener) object ).flowListener );
1568    
1569          return flowListener.equals( object );
1570          }
1571    
1572        public int hashCode()
1573          {
1574          return flowListener.hashCode();
1575          }
1576        }
1577      }