001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow;
022    
023    import java.io.IOException;
024    import java.util.ArrayList;
025    import java.util.Collection;
026    import java.util.Collections;
027    import java.util.Date;
028    import java.util.HashMap;
029    import java.util.HashSet;
030    import java.util.Iterator;
031    import java.util.LinkedHashMap;
032    import java.util.LinkedList;
033    import java.util.List;
034    import java.util.Map;
035    import java.util.Properties;
036    import java.util.Set;
037    import java.util.concurrent.Callable;
038    import java.util.concurrent.Future;
039    import java.util.concurrent.TimeUnit;
040    import java.util.concurrent.locks.ReentrantLock;
041    
042    import cascading.CascadingException;
043    import cascading.cascade.Cascade;
044    import cascading.flow.planner.BaseFlowStep;
045    import cascading.flow.planner.ElementGraph;
046    import cascading.flow.planner.FlowStepGraph;
047    import cascading.flow.planner.FlowStepJob;
048    import cascading.flow.planner.PlatformInfo;
049    import cascading.management.CascadingServices;
050    import cascading.management.UnitOfWorkExecutorStrategy;
051    import cascading.management.UnitOfWorkSpawnStrategy;
052    import cascading.management.state.ClientState;
053    import cascading.property.AppProps;
054    import cascading.property.PropertyUtil;
055    import cascading.stats.FlowStats;
056    import cascading.tap.Tap;
057    import cascading.tuple.Fields;
058    import cascading.tuple.TupleEntryCollector;
059    import cascading.tuple.TupleEntryIterator;
060    import cascading.util.ShutdownUtil;
061    import cascading.util.Update;
062    import cascading.util.Util;
063    import cascading.util.Version;
064    import org.jgrapht.traverse.TopologicalOrderIterator;
065    import org.slf4j.Logger;
066    import org.slf4j.LoggerFactory;
067    import riffle.process.DependencyIncoming;
068    import riffle.process.DependencyOutgoing;
069    import riffle.process.ProcessCleanup;
070    import riffle.process.ProcessComplete;
071    import riffle.process.ProcessPrepare;
072    import riffle.process.ProcessStart;
073    import riffle.process.ProcessStop;
074    
075    import static org.jgrapht.Graphs.predecessorListOf;
076    
077    @riffle.process.Process
078    public abstract class BaseFlow<Config> implements Flow<Config>
079      {
080      /** Field LOG */
081      private static final Logger LOG = LoggerFactory.getLogger( Flow.class );
082    
083      private PlatformInfo platformInfo;
084    
085      /** Field id */
086      private String id;
087      /** Field name */
088      private String name;
089      /** Fields runID */
090      private String runID;
091      /** Fields classpath */
092      private List<String> classPath; // may remain null
093      /** Field tags */
094      private String tags;
095      /** Field listeners */
096      private List<SafeFlowListener> listeners;
097      /** Field skipStrategy */
098      private FlowSkipStrategy flowSkipStrategy = new FlowSkipIfSinkNotStale();
099      /** Field flowStats */
100      protected FlowStats flowStats; // don't use a listener to set values
101      /** Field sources */
102      protected Map<String, Tap> sources = Collections.EMPTY_MAP;
103      /** Field sinks */
104      protected Map<String, Tap> sinks = Collections.EMPTY_MAP;
105      /** Field traps */
106      private Map<String, Tap> traps = Collections.EMPTY_MAP;
107      /** Field checkpoints */
108      private Map<String, Tap> checkpoints = Collections.EMPTY_MAP;
109      /** Field stopJobsOnExit */
110      protected boolean stopJobsOnExit = true;
111      /** Field submitPriority */
112      private int submitPriority = 5;
113    
114      /** Field stepGraph */
115      private FlowStepGraph<Config> flowStepGraph;
116      /** Field thread */
117      protected transient Thread thread;
118      /** Field throwable */
119      private Throwable throwable;
120      /** Field stop */
121      protected boolean stop;
122    
123      /** Field pipeGraph */
124      private ElementGraph pipeGraph; // only used for documentation purposes
125    
126      private transient CascadingServices cascadingServices;
127    
128      private FlowStepStrategy<Config> flowStepStrategy = null;
129      /** Field steps */
130      private transient List<FlowStep<Config>> steps;
131      /** Field jobsMap */
132      private transient Map<String, FlowStepJob<Config>> jobsMap;
133      private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy();
134    
135      private transient ReentrantLock stopLock = new ReentrantLock( true );
136      protected ShutdownUtil.Hook shutdownHook;
137    
138      private HashMap<String, String> flowDescriptor;
139    
140      /**
141       * Returns property stopJobsOnExit.
142       *
143       * @param properties of type Map
144       * @return a boolean
145       */
146      static boolean getStopJobsOnExit( Map<Object, Object> properties )
147        {
148        return Boolean.parseBoolean( PropertyUtil.getProperty( properties, FlowProps.STOP_JOBS_ON_EXIT, "true" ) );
149        }
150    
151      /** Used for testing. */
152      protected BaseFlow()
153        {
154        this.name = "NA";
155        this.flowStats = createPrepareFlowStats();
156        }
157    
158      protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name )
159        {
160        this( platformInfo, properties, defaultConfig, name, new LinkedHashMap<String, String>() );
161        }
162    
163      protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name, Map<String, String> flowDescriptor )
164        {
165        this.platformInfo = platformInfo;
166        this.name = name;
167    
168        if( flowDescriptor != null )
169          this.flowDescriptor = new LinkedHashMap<String, String>( flowDescriptor );
170    
171        addSessionProperties( properties );
172        initConfig( properties, defaultConfig );
173    
174        this.flowStats = createPrepareFlowStats(); // must be last
175        }
176    
177      protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, FlowDef flowDef )
178        {
179        this.platformInfo = platformInfo;
180        this.name = flowDef.getName();
181        this.tags = flowDef.getTags();
182        this.runID = flowDef.getRunID();
183        this.classPath = flowDef.getClassPath();
184    
185        if( !flowDef.getFlowDescriptor().isEmpty() )
186          this.flowDescriptor = new LinkedHashMap<String, String>( flowDef.getFlowDescriptor() );
187    
188        addSessionProperties( properties );
189        initConfig( properties, defaultConfig );
190        setSources( flowDef.getSourcesCopy() );
191        setSinks( flowDef.getSinksCopy() );
192        setTraps( flowDef.getTrapsCopy() );
193        setCheckpoints( flowDef.getCheckpointsCopy() );
194        initFromTaps();
195    
196        retrieveSourceFields();
197        retrieveSinkFields();
198        }
199    
200      public PlatformInfo getPlatformInfo()
201        {
202        return platformInfo;
203        }
204    
205      public void initialize( ElementGraph pipeGraph, FlowStepGraph<Config> flowStepGraph )
206        {
207        this.pipeGraph = pipeGraph;
208        this.flowStepGraph = flowStepGraph;
209    
210        initSteps();
211    
212        this.flowStats = createPrepareFlowStats(); // must be last
213    
214        initializeNewJobsMap();
215        }
216    
217      public ElementGraph updateSchemes( ElementGraph pipeGraph )
218        {
219        presentSourceFields( pipeGraph );
220    
221        presentSinkFields( pipeGraph );
222    
223        return new ElementGraph( pipeGraph );
224        }
225    
226      /** Force a Scheme to fetch any fields from a meta-data store */
227      protected void retrieveSourceFields()
228        {
229        for( Tap tap : sources.values() )
230          tap.retrieveSourceFields( getFlowProcess() );
231        }
232    
233      /**
234       * Present the current resolved fields for the Tap
235       *
236       * @param pipeGraph
237       */
238      protected void presentSourceFields( ElementGraph pipeGraph )
239        {
240        for( Tap tap : sources.values() )
241          {
242          if( pipeGraph.containsVertex( tap ) )
243            tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
244          }
245    
246        for( Tap tap : checkpoints.values() )
247          {
248          if( pipeGraph.containsVertex( tap ) )
249            tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
250          }
251        }
252    
253      /** Force a Scheme to fetch any fields from a meta-data store */
254      protected void retrieveSinkFields()
255        {
256        for( Tap tap : sinks.values() )
257          tap.retrieveSinkFields( getFlowProcess() );
258        }
259    
260      /**
261       * Present the current resolved fields for the Tap
262       *
263       * @param pipeGraph
264       */
265      protected void presentSinkFields( ElementGraph pipeGraph )
266        {
267        for( Tap tap : sinks.values() )
268          {
269          if( pipeGraph.containsVertex( tap ) )
270            tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
271          }
272    
273        for( Tap tap : checkpoints.values() )
274          {
275          if( pipeGraph.containsVertex( tap ) )
276            tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
277          }
278        }
279    
280      protected Fields getFieldsFor( ElementGraph pipeGraph, Tap tap )
281        {
282        return pipeGraph.outgoingEdgesOf( tap ).iterator().next().getOutValuesFields();
283        }
284    
285      private void addSessionProperties( Map<Object, Object> properties )
286        {
287        if( properties == null )
288          return;
289    
290        PropertyUtil.setProperty( properties, CASCADING_FLOW_ID, getID() );
291        PropertyUtil.setProperty( properties, "cascading.flow.tags", getTags() );
292        AppProps.setApplicationID( properties );
293        PropertyUtil.setProperty( properties, "cascading.app.name", makeAppName( properties ) );
294        PropertyUtil.setProperty( properties, "cascading.app.version", makeAppVersion( properties ) );
295        }
296    
297      private String makeAppName( Map<Object, Object> properties )
298        {
299        if( properties == null )
300          return null;
301    
302        String name = AppProps.getApplicationName( properties );
303    
304        if( name != null )
305          return name;
306    
307        return Util.findName( AppProps.getApplicationJarPath( properties ) );
308        }
309    
310      private String makeAppVersion( Map<Object, Object> properties )
311        {
312        if( properties == null )
313          return null;
314    
315        String name = AppProps.getApplicationVersion( properties );
316    
317        if( name != null )
318          return name;
319    
320        return Util.findVersion( AppProps.getApplicationJarPath( properties ) );
321        }
322    
323      private FlowStats createPrepareFlowStats()
324        {
325        FlowStats flowStats = new FlowStats( this, getClientState() );
326    
327        flowStats.prepare();
328        flowStats.markPending();
329    
330        return flowStats;
331        }
332    
333      public CascadingServices getCascadingServices()
334        {
335        if( cascadingServices == null )
336          cascadingServices = new CascadingServices( getConfigAsProperties() );
337    
338        return cascadingServices;
339        }
340    
341      private ClientState getClientState()
342        {
343        return getFlowSession().getCascadingServices().createClientState( getID() );
344        }
345    
346      protected void initSteps()
347        {
348        if( flowStepGraph == null )
349          return;
350    
351        for( Object flowStep : flowStepGraph.vertexSet() )
352          ( (BaseFlowStep<Config>) flowStep ).setFlow( this );
353        }
354    
355      private void initFromTaps()
356        {
357        initFromTaps( sources );
358        initFromTaps( sinks );
359        initFromTaps( traps );
360        }
361    
362      private void initFromTaps( Map<String, Tap> taps )
363        {
364        for( Tap tap : taps.values() )
365          tap.flowConfInit( this );
366        }
367    
368      @Override
369      public String getName()
370        {
371        return name;
372        }
373    
374      protected void setName( String name )
375        {
376        this.name = name;
377        }
378    
379      @Override
380      public String getID()
381        {
382        if( id == null )
383          id = Util.createUniqueID();
384    
385        return id;
386        }
387    
388      @Override
389      public String getTags()
390        {
391        return tags;
392        }
393    
394      @Override
395      public int getSubmitPriority()
396        {
397        return submitPriority;
398        }
399    
400      @Override
401      public void setSubmitPriority( int submitPriority )
402        {
403        if( submitPriority < 1 || submitPriority > 10 )
404          throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority );
405    
406        this.submitPriority = submitPriority;
407        }
408    
409      ElementGraph getPipeGraph()
410        {
411        return pipeGraph;
412        }
413    
414      FlowStepGraph getFlowStepGraph()
415        {
416        return flowStepGraph;
417        }
418    
419      protected void setSources( Map<String, Tap> sources )
420        {
421        addListeners( sources.values() );
422        this.sources = sources;
423        }
424    
425      protected void setSinks( Map<String, Tap> sinks )
426        {
427        addListeners( sinks.values() );
428        this.sinks = sinks;
429        }
430    
431      protected void setTraps( Map<String, Tap> traps )
432        {
433        addListeners( traps.values() );
434        this.traps = traps;
435        }
436    
437      protected void setCheckpoints( Map<String, Tap> checkpoints )
438        {
439        addListeners( checkpoints.values() );
440        this.checkpoints = checkpoints;
441        }
442    
443      protected void setFlowStepGraph( FlowStepGraph flowStepGraph )
444        {
445        this.flowStepGraph = flowStepGraph;
446        }
447    
448      /**
449       * This method creates a new internal Config with the parentConfig as defaults using the properties to override
450       * the defaults.
451       *
452       * @param properties   of type Map
453       * @param parentConfig of type Config
454       */
455      protected abstract void initConfig( Map<Object, Object> properties, Config parentConfig );
456    
457      public Config createConfig( Map<Object, Object> properties, Config defaultConfig )
458        {
459        Config config = newConfig( defaultConfig );
460    
461        if( properties == null )
462          return config;
463    
464        Set<Object> keys = new HashSet<Object>( properties.keySet() );
465    
466        // keys will only be grabbed if both key/value are String, so keep orig keys
467        if( properties instanceof Properties )
468          keys.addAll( ( (Properties) properties ).stringPropertyNames() );
469    
470        for( Object key : keys )
471          {
472          Object value = properties.get( key );
473    
474          if( value == null && properties instanceof Properties && key instanceof String )
475            value = ( (Properties) properties ).getProperty( (String) key );
476    
477          if( value == null ) // don't stuff null values
478            continue;
479    
480          setConfigProperty( config, key, value );
481          }
482    
483        return config;
484        }
485    
486      protected abstract void setConfigProperty( Config config, Object key, Object value );
487    
488      protected abstract Config newConfig( Config defaultConfig );
489    
490      protected void initFromProperties( Map<Object, Object> properties )
491        {
492        stopJobsOnExit = getStopJobsOnExit( properties );
493        }
494    
495      public FlowSession getFlowSession()
496        {
497        return new FlowSession( getCascadingServices() );
498        }
499    
500      @Override
501      public FlowStats getFlowStats()
502        {
503        return flowStats;
504        }
505    
506      @Override
507      public Map<String, String> getFlowDescriptor()
508        {
509        if( flowDescriptor == null )
510          return Collections.emptyMap();
511    
512        return Collections.unmodifiableMap( flowDescriptor );
513        }
514    
515      @Override
516      public FlowStats getStats()
517        {
518        return getFlowStats();
519        }
520    
521      void addListeners( Collection listeners )
522        {
523        for( Object listener : listeners )
524          {
525          if( listener instanceof FlowListener )
526            addListener( (FlowListener) listener );
527          }
528        }
529    
530      List<SafeFlowListener> getListeners()
531        {
532        if( listeners == null )
533          listeners = new LinkedList<SafeFlowListener>();
534    
535        return listeners;
536        }
537    
538      @Override
539      public boolean hasListeners()
540        {
541        return listeners != null && !listeners.isEmpty();
542        }
543    
544      @Override
545      public void addListener( FlowListener flowListener )
546        {
547        getListeners().add( new SafeFlowListener( flowListener ) );
548        }
549    
550      @Override
551      public boolean removeListener( FlowListener flowListener )
552        {
553        return getListeners().remove( new SafeFlowListener( flowListener ) );
554        }
555    
556      @Override
557      public boolean hasStepListeners()
558        {
559        boolean hasStepListeners = false;
560    
561        for( FlowStep step : getFlowSteps() )
562          hasStepListeners |= step.hasListeners();
563    
564        return hasStepListeners;
565        }
566    
567      @Override
568      public void addStepListener( FlowStepListener flowStepListener )
569        {
570        for( FlowStep step : getFlowSteps() )
571          step.addListener( flowStepListener );
572        }
573    
574      @Override
575      public boolean removeStepListener( FlowStepListener flowStepListener )
576        {
577        boolean listenerRemoved = true;
578    
579        for( FlowStep step : getFlowSteps() )
580          listenerRemoved &= step.removeListener( flowStepListener );
581    
582        return listenerRemoved;
583        }
584    
585      @Override
586      public Map<String, Tap> getSources()
587        {
588        return Collections.unmodifiableMap( sources );
589        }
590    
591      @Override
592      public List<String> getSourceNames()
593        {
594        return new ArrayList<String>( sources.keySet() );
595        }
596    
597      @Override
598      public Tap getSource( String name )
599        {
600        return sources.get( name );
601        }
602    
603      @Override
604      @DependencyIncoming
605      public Collection<Tap> getSourcesCollection()
606        {
607        return getSources().values();
608        }
609    
610      @Override
611      public Map<String, Tap> getSinks()
612        {
613        return Collections.unmodifiableMap( sinks );
614        }
615    
616      @Override
617      public List<String> getSinkNames()
618        {
619        return new ArrayList<String>( sinks.keySet() );
620        }
621    
622      @Override
623      public Tap getSink( String name )
624        {
625        return sinks.get( name );
626        }
627    
628      @Override
629      @DependencyOutgoing
630      public Collection<Tap> getSinksCollection()
631        {
632        return getSinks().values();
633        }
634    
635      @Override
636      public Tap getSink()
637        {
638        return sinks.values().iterator().next();
639        }
640    
641      @Override
642      public Map<String, Tap> getTraps()
643        {
644        return Collections.unmodifiableMap( traps );
645        }
646    
647      @Override
648      public List<String> getTrapNames()
649        {
650        return new ArrayList<String>( traps.keySet() );
651        }
652    
653      @Override
654      public Collection<Tap> getTrapsCollection()
655        {
656        return getTraps().values();
657        }
658    
659      @Override
660      public Map<String, Tap> getCheckpoints()
661        {
662        return Collections.unmodifiableMap( checkpoints );
663        }
664    
665      @Override
666      public List<String> getCheckpointNames()
667        {
668        return new ArrayList<String>( checkpoints.keySet() );
669        }
670    
671      @Override
672      public Collection<Tap> getCheckpointsCollection()
673        {
674        return getCheckpoints().values();
675        }
676    
677      @Override
678      public boolean isStopJobsOnExit()
679        {
680        return stopJobsOnExit;
681        }
682    
683      @Override
684      public FlowSkipStrategy getFlowSkipStrategy()
685        {
686        return flowSkipStrategy;
687        }
688    
689      @Override
690      public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy )
691        {
692        if( flowSkipStrategy == null )
693          throw new IllegalArgumentException( "flowSkipStrategy may not be null" );
694    
695        try
696          {
697          return this.flowSkipStrategy;
698          }
699        finally
700          {
701          this.flowSkipStrategy = flowSkipStrategy;
702          }
703        }
704    
705      @Override
706      public boolean isSkipFlow() throws IOException
707        {
708        return flowSkipStrategy.skipFlow( this );
709        }
710    
711      @Override
712      public boolean areSinksStale() throws IOException
713        {
714        return areSourcesNewer( getSinkModified() );
715        }
716    
717      @Override
718      public boolean areSourcesNewer( long sinkModified ) throws IOException
719        {
720        Config confCopy = getConfigCopy();
721        Iterator<Tap> values = sources.values().iterator();
722    
723        long sourceModified = 0;
724    
725        try
726          {
727          sourceModified = Util.getSourceModified( confCopy, values, sinkModified );
728    
729          if( sinkModified < sourceModified )
730            return true;
731    
732          return false;
733          }
734        finally
735          {
736          if( LOG.isInfoEnabled() )
737            logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all
738          }
739        }
740    
741      @Override
742      public long getSinkModified() throws IOException
743        {
744        long sinkModified = Util.getSinkModified( getConfigCopy(), sinks.values() );
745    
746        if( LOG.isInfoEnabled() )
747          {
748          if( sinkModified == -1L )
749            logInfo( "at least one sink is marked for delete" );
750          if( sinkModified == 0L )
751            logInfo( "at least one sink does not exist" );
752          else
753            logInfo( "sink oldest modified date: " + new Date( sinkModified ) );
754          }
755    
756        return sinkModified;
757        }
758    
759      @Override
760      public FlowStepStrategy getFlowStepStrategy()
761        {
762        return flowStepStrategy;
763        }
764    
765      @Override
766      public void setFlowStepStrategy( FlowStepStrategy flowStepStrategy )
767        {
768        this.flowStepStrategy = flowStepStrategy;
769        }
770    
771      @Override
772      public List<FlowStep<Config>> getFlowSteps()
773        {
774        if( steps != null )
775          return steps;
776    
777        if( flowStepGraph == null )
778          return Collections.EMPTY_LIST;
779    
780        TopologicalOrderIterator<FlowStep<Config>, Integer> topoIterator = flowStepGraph.getTopologicalIterator();
781    
782        steps = new ArrayList<FlowStep<Config>>();
783    
784        while( topoIterator.hasNext() )
785          steps.add( topoIterator.next() );
786    
787        return steps;
788        }
789    
790      @Override
791      @ProcessPrepare
792      public void prepare()
793        {
794        try
795          {
796          deleteSinksIfNotUpdate();
797          deleteTrapsIfNotUpdate();
798          deleteCheckpointsIfNotUpdate();
799          }
800        catch( IOException exception )
801          {
802          throw new FlowException( "unable to prepare flow", exception );
803          }
804        }
805    
806      @Override
807      @ProcessStart
808      public synchronized void start()
809        {
810        if( thread != null )
811          return;
812    
813        if( stop )
814          return;
815    
816        registerShutdownHook();
817    
818        internalStart();
819    
820        String threadName = ( "flow " + Util.toNull( getName() ) ).trim();
821    
822        thread = createFlowThread( threadName );
823    
824        thread.start();
825        }
826    
827      protected Thread createFlowThread( String threadName )
828        {
829        return new Thread( new Runnable()
830        {
831        @Override
832        public void run()
833          {
834          BaseFlow.this.run();
835          }
836        }, threadName );
837        }
838    
839      protected abstract void internalStart();
840    
841      @Override
842      @ProcessStop
843      public synchronized void stop()
844        {
845        stopLock.lock();
846    
847        try
848          {
849          if( stop )
850            return;
851    
852          stop = true;
853    
854          fireOnStopping();
855    
856          if( !flowStats.isFinished() )
857            flowStats.markStopped();
858    
859          internalStopAllJobs();
860    
861          handleExecutorShutdown();
862    
863          internalClean( true );
864          }
865        finally
866          {
867          flowStats.cleanup();
868          stopLock.unlock();
869          }
870        }
871    
872      protected abstract void internalClean( boolean stop );
873    
874      @Override
875      @ProcessComplete
876      public void complete()
877        {
878        start();
879    
880        try
881          {
882          try
883            {
884            synchronized( this ) // prevent NPE on quick stop() & complete() after start()
885              {
886              while( thread == null && !stop )
887                Util.safeSleep( 10 );
888              }
889    
890            if( thread != null )
891              thread.join();
892            }
893          catch( InterruptedException exception )
894            {
895            throw new FlowException( getName(), "thread interrupted", exception );
896            }
897    
898          // if in #stop and stopping, lets wait till its done in this thread
899          try
900            {
901            stopLock.lock();
902            }
903          finally
904            {
905            stopLock.unlock();
906            }
907    
908          if( throwable instanceof FlowException )
909            ( (FlowException) throwable ).setFlowName( getName() );
910    
911          if( throwable instanceof CascadingException )
912            throw (CascadingException) throwable;
913    
914          if( throwable instanceof OutOfMemoryError )
915            throw (OutOfMemoryError) throwable;
916    
917          if( throwable != null )
918            throw new FlowException( getName(), "unhandled exception", throwable );
919    
920          if( hasListeners() )
921            {
922            for( SafeFlowListener safeFlowListener : getListeners() )
923              {
924              if( safeFlowListener.throwable != null )
925                throw new FlowException( getName(), "unhandled listener exception", throwable );
926              }
927            }
928          }
929        finally
930          {
931          thread = null;
932          throwable = null;
933    
934          try
935            {
936            commitTraps();
937    
938            if( hasListeners() )
939              {
940              for( SafeFlowListener safeFlowListener : getListeners() )
941                safeFlowListener.throwable = null;
942              }
943            }
944          finally
945            {
946            flowStats.cleanup();
947            }
948          }
949        }
950    
951      private void commitTraps()
952        {
953        // commit all the traps, don't fail on an error
954    
955        for( Tap tap : traps.values() )
956          {
957          try
958            {
959            if( !tap.commitResource( getConfig() ) )
960              logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), null );
961            }
962          catch( IOException exception )
963            {
964            logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), exception );
965            }
966          }
967        }
968    
969      @Override
970      @ProcessCleanup
971      public void cleanup()
972        {
973        // do nothing
974        }
975    
976      @Override
977      public TupleEntryIterator openSource() throws IOException
978        {
979        return sources.values().iterator().next().openForRead( getFlowProcess() );
980        }
981    
982      @Override
983      public TupleEntryIterator openSource( String name ) throws IOException
984        {
985        if( !sources.containsKey( name ) )
986          throw new IllegalArgumentException( "source does not exist: " + name );
987    
988        return sources.get( name ).openForRead( getFlowProcess() );
989        }
990    
991      @Override
992      public TupleEntryIterator openSink() throws IOException
993        {
994        return sinks.values().iterator().next().openForRead( getFlowProcess() );
995        }
996    
997      @Override
998      public TupleEntryIterator openSink( String name ) throws IOException
999        {
1000        if( !sinks.containsKey( name ) )
1001          throw new IllegalArgumentException( "sink does not exist: " + name );
1002    
1003        return sinks.get( name ).openForRead( getFlowProcess() );
1004        }
1005    
1006      @Override
1007      public TupleEntryIterator openTrap() throws IOException
1008        {
1009        return traps.values().iterator().next().openForRead( getFlowProcess() );
1010        }
1011    
1012      @Override
1013      public TupleEntryIterator openTrap( String name ) throws IOException
1014        {
1015        if( !traps.containsKey( name ) )
1016          throw new IllegalArgumentException( "trap does not exist: " + name );
1017    
1018        return traps.get( name ).openForRead( getFlowProcess() );
1019        }
1020    
1021      /**
1022       * Method deleteSinks deletes all sinks, whether or not they are configured for {@link cascading.tap.SinkMode#UPDATE}.
1023       * <p/>
1024       * Use with caution.
1025       *
1026       * @throws IOException when
1027       * @see BaseFlow#deleteSinksIfNotUpdate()
1028       */
1029      public void deleteSinks() throws IOException
1030        {
1031        for( Tap tap : sinks.values() )
1032          deleteOrFail( tap );
1033        }
1034    
1035      private void deleteOrFail( Tap tap ) throws IOException
1036        {
1037        if( !tap.resourceExists( getConfig() ) )
1038          return;
1039    
1040        if( !tap.deleteResource( getConfig() ) )
1041          throw new FlowException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) );
1042        }
1043    
1044      /**
1045       * Method deleteSinksIfNotUpdate deletes all sinks if they are not configured with the {@link cascading.tap.SinkMode#UPDATE} flag.
1046       * <p/>
1047       * Typically used by a {@link Cascade} before executing the flow if the sinks are stale.
1048       * <p/>
1049       * Use with caution.
1050       *
1051       * @throws IOException when
1052       */
1053      public void deleteSinksIfNotUpdate() throws IOException
1054        {
1055        for( Tap tap : sinks.values() )
1056          {
1057          if( !tap.isUpdate() )
1058            deleteOrFail( tap );
1059          }
1060        }
1061    
1062      public void deleteSinksIfReplace() throws IOException
1063        {
1064        for( Tap tap : sinks.values() )
1065          {
1066          if( tap.isReplace() )
1067            deleteOrFail( tap );
1068          }
1069        }
1070    
1071      public void deleteTrapsIfNotUpdate() throws IOException
1072        {
1073        for( Tap tap : traps.values() )
1074          {
1075          if( !tap.isUpdate() )
1076            deleteOrFail( tap );
1077          }
1078        }
1079    
1080      public void deleteCheckpointsIfNotUpdate() throws IOException
1081        {
1082        for( Tap tap : checkpoints.values() )
1083          {
1084          if( !tap.isUpdate() )
1085            deleteOrFail( tap );
1086          }
1087        }
1088    
1089      public void deleteTrapsIfReplace() throws IOException
1090        {
1091        for( Tap tap : traps.values() )
1092          {
1093          if( tap.isReplace() )
1094            deleteOrFail( tap );
1095          }
1096        }
1097    
1098      public void deleteCheckpointsIfReplace() throws IOException
1099        {
1100        for( Tap tap : checkpoints.values() )
1101          {
1102          if( tap.isReplace() )
1103            deleteOrFail( tap );
1104          }
1105        }
1106    
1107      @Override
1108      public boolean resourceExists( Tap tap ) throws IOException
1109        {
1110        return tap.resourceExists( getConfig() );
1111        }
1112    
1113      @Override
1114      public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
1115        {
1116        return tap.openForRead( getFlowProcess() );
1117        }
1118    
1119      @Override
1120      public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
1121        {
1122        return tap.openForWrite( getFlowProcess() );
1123        }
1124    
1125      /** Method run implements the Runnable run method and should not be called by users. */
1126      private void run()
1127        {
1128        if( thread == null )
1129          throw new IllegalStateException( "to start a Flow call start() or complete(), not Runnable#run()" );
1130    
1131        Version.printBanner();
1132        Update.checkForUpdate( getPlatformInfo() );
1133    
1134        try
1135          {
1136          if( stop )
1137            return;
1138    
1139          flowStats.markStarted();
1140    
1141          fireOnStarting();
1142    
1143          if( LOG.isInfoEnabled() )
1144            {
1145            logInfo( "starting" );
1146    
1147            for( Tap source : getSourcesCollection() )
1148              logInfo( " source: " + source );
1149            for( Tap sink : getSinksCollection() )
1150              logInfo( " sink: " + sink );
1151            }
1152    
1153          // if jobs are run local, then only use one thread to force execution serially
1154          //int numThreads = jobsAreLocal() ? 1 : getMaxConcurrentSteps( getJobConf() );
1155          int numThreads = getMaxNumParallelSteps();
1156    
1157          if( numThreads == 0 )
1158            numThreads = jobsMap.size();
1159    
1160          if( numThreads == 0 )
1161            throw new IllegalStateException( "no jobs rendered for flow: " + getName() );
1162    
1163          if( LOG.isInfoEnabled() )
1164            {
1165            logInfo( " parallel execution is enabled: " + ( getMaxNumParallelSteps() != 1 ) );
1166            logInfo( " starting jobs: " + jobsMap.size() );
1167            logInfo( " allocating threads: " + numThreads );
1168            }
1169    
1170          List<Future<Throwable>> futures = spawnJobs( numThreads );
1171    
1172          for( Future<Throwable> future : futures )
1173            {
1174            throwable = future.get();
1175    
1176            if( throwable != null )
1177              {
1178              if( !stop )
1179                internalStopAllJobs();
1180    
1181              handleExecutorShutdown();
1182              break;
1183              }
1184            }
1185          }
1186        catch( Throwable throwable )
1187          {
1188          this.throwable = throwable;
1189          }
1190        finally
1191          {
1192          handleThrowableAndMarkFailed();
1193    
1194          if( !stop && !flowStats.isFinished() )
1195            flowStats.markSuccessful();
1196    
1197          internalClean( stop ); // cleaning temp taps may be determined by success/failure
1198    
1199          try
1200            {
1201            fireOnCompleted();
1202            }
1203          finally
1204            {
1205            flowStats.cleanup();
1206            internalShutdown();
1207            deregisterShutdownHook();
1208            }
1209          }
1210        }
1211    
1212      protected abstract int getMaxNumParallelSteps();
1213    
1214      protected abstract void internalShutdown();
1215    
1216      private List<Future<Throwable>> spawnJobs( int numThreads ) throws InterruptedException
1217        {
1218        if( stop )
1219          return new ArrayList<Future<Throwable>>();
1220    
1221        List<Callable<Throwable>> list = new ArrayList<Callable<Throwable>>();
1222    
1223        for( FlowStepJob<Config> job : jobsMap.values() )
1224          list.add( job );
1225    
1226        return spawnStrategy.start( this, numThreads, list );
1227        }
1228    
1229      private void handleThrowableAndMarkFailed()
1230        {
1231        if( throwable != null && !stop )
1232          {
1233          flowStats.markFailed( throwable );
1234    
1235          fireOnThrowable();
1236          }
1237        }
1238    
1239      Map<String, FlowStepJob<Config>> getJobsMap()
1240        {
1241        return jobsMap;
1242        }
1243    
1244      protected void initializeNewJobsMap()
1245        {
1246        // keep topo order
1247        jobsMap = new LinkedHashMap<String, FlowStepJob<Config>>();
1248        TopologicalOrderIterator<FlowStep<Config>, Integer> topoIterator = flowStepGraph.getTopologicalIterator();
1249    
1250        while( topoIterator.hasNext() )
1251          {
1252          BaseFlowStep<Config> step = (BaseFlowStep<Config>) topoIterator.next();
1253          FlowStepJob<Config> flowStepJob = step.getFlowStepJob( getFlowProcess(), getConfig() );
1254    
1255          jobsMap.put( step.getName(), flowStepJob );
1256    
1257          List<FlowStepJob<Config>> predecessors = new ArrayList<FlowStepJob<Config>>();
1258    
1259          for( Object flowStep : predecessorListOf( flowStepGraph, step ) )
1260            predecessors.add( jobsMap.get( ( (FlowStep<Config>) flowStep ).getName() ) );
1261    
1262          flowStepJob.setPredecessors( predecessors );
1263    
1264          flowStats.addStepStats( flowStepJob.getStepStats() );
1265          }
1266        }
1267    
1268      protected void internalStopAllJobs()
1269        {
1270        logInfo( "stopping all jobs" );
1271    
1272        try
1273          {
1274          if( jobsMap == null )
1275            return;
1276    
1277          List<FlowStepJob<Config>> jobs = new ArrayList<FlowStepJob<Config>>( jobsMap.values() );
1278    
1279          Collections.reverse( jobs );
1280    
1281          for( FlowStepJob<Config> job : jobs )
1282            job.stop();
1283          }
1284        finally
1285          {
1286          logInfo( "stopped all jobs" );
1287          }
1288        }
1289    
1290      protected void handleExecutorShutdown()
1291        {
1292        if( spawnStrategy.isCompleted( this ) )
1293          return;
1294    
1295        logInfo( "shutting down job executor" );
1296    
1297        try
1298          {
1299          spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS );
1300          }
1301        catch( InterruptedException exception )
1302          {
1303          // ignore
1304          }
1305    
1306        logInfo( "shutdown complete" );
1307        }
1308    
1309      protected void fireOnCompleted()
1310        {
1311        if( hasListeners() )
1312          {
1313          if( LOG.isDebugEnabled() )
1314            logDebug( "firing onCompleted event: " + getListeners().size() );
1315    
1316          for( FlowListener flowListener : getListeners() )
1317            flowListener.onCompleted( this );
1318          }
1319        }
1320    
1321      protected void fireOnThrowable()
1322        {
1323        if( hasListeners() )
1324          {
1325          if( LOG.isDebugEnabled() )
1326            logDebug( "firing onThrowable event: " + getListeners().size() );
1327    
1328          boolean isHandled = false;
1329    
1330          for( FlowListener flowListener : getListeners() )
1331            isHandled = flowListener.onThrowable( this, throwable ) || isHandled;
1332    
1333          if( isHandled )
1334            throwable = null;
1335          }
1336        }
1337    
1338      protected void fireOnStopping()
1339        {
1340        if( hasListeners() )
1341          {
1342          if( LOG.isDebugEnabled() )
1343            logDebug( "firing onStopping event: " + getListeners().size() );
1344    
1345          for( FlowListener flowListener : getListeners() )
1346            flowListener.onStopping( this );
1347          }
1348        }
1349    
1350      protected void fireOnStarting()
1351        {
1352        if( hasListeners() )
1353          {
1354          if( LOG.isDebugEnabled() )
1355            logDebug( "firing onStarting event: " + getListeners().size() );
1356    
1357          for( FlowListener flowListener : getListeners() )
1358            flowListener.onStarting( this );
1359          }
1360        }
1361    
1362      @Override
1363      public String toString()
1364        {
1365        StringBuffer buffer = new StringBuffer();
1366    
1367        if( getName() != null )
1368          buffer.append( getName() ).append( ": " );
1369    
1370        for( FlowStep step : getFlowSteps() )
1371          buffer.append( step );
1372    
1373        return buffer.toString();
1374        }
1375    
1376      protected void logInfo( String message )
1377        {
1378        LOG.info( "[" + Util.truncate( getName(), 25 ) + "] " + message );
1379        }
1380    
1381      private void logDebug( String message )
1382        {
1383        LOG.debug( "[" + Util.truncate( getName(), 25 ) + "] " + message );
1384        }
1385    
1386      private void logWarn( String message, Throwable throwable )
1387        {
1388        LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable );
1389        }
1390    
1391      private void logError( String message, Throwable throwable )
1392        {
1393        LOG.error( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable );
1394        }
1395    
1396      @Override
1397      public void writeDOT( String filename )
1398        {
1399        if( pipeGraph == null )
1400          throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" );
1401    
1402        pipeGraph.writeDOT( filename );
1403        }
1404    
1405      @Override
1406      public void writeStepsDOT( String filename )
1407        {
1408        if( flowStepGraph == null )
1409          throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" );
1410    
1411        flowStepGraph.writeDOT( filename );
1412        }
1413    
1414      /**
1415       * Used to return a simple wrapper for use as an edge in a graph where there can only be
1416       * one instance of every edge.
1417       *
1418       * @return FlowHolder
1419       */
1420      public FlowHolder getHolder()
1421        {
1422        return new FlowHolder( this );
1423        }
1424    
1425      public void setCascade( Cascade cascade )
1426        {
1427        setConfigProperty( getConfig(), "cascading.cascade.id", cascade.getID() );
1428        flowStats.recordInfo();
1429        }
1430    
1431      @Override
1432      public String getCascadeID()
1433        {
1434        return getProperty( "cascading.cascade.id" );
1435        }
1436    
1437      @Override
1438      public String getRunID()
1439        {
1440        return runID;
1441        }
1442    
1443      protected List<String> getClassPath()
1444        {
1445        return classPath;
1446        }
1447    
1448      @Override
1449      public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy )
1450        {
1451        this.spawnStrategy = spawnStrategy;
1452        }
1453    
1454      @Override
1455      public UnitOfWorkSpawnStrategy getSpawnStrategy()
1456        {
1457        return spawnStrategy;
1458        }
1459    
1460      protected void registerShutdownHook()
1461        {
1462        if( !isStopJobsOnExit() )
1463          return;
1464    
1465        shutdownHook = new ShutdownUtil.Hook()
1466        {
1467        @Override
1468        public Priority priority()
1469          {
1470          return Priority.WORK_CHILD;
1471          }
1472    
1473        @Override
1474        public void execute()
1475          {
1476          logInfo( "shutdown hook calling stop on flow" );
1477    
1478          BaseFlow.this.stop();
1479          }
1480        };
1481    
1482        ShutdownUtil.addHook( shutdownHook );
1483        }
1484    
1485      private void deregisterShutdownHook()
1486        {
1487        if( !isStopJobsOnExit() || stop )
1488          return;
1489    
1490        ShutdownUtil.removeHook( shutdownHook );
1491        }
1492    
1493      /** Class FlowHolder is a helper class for wrapping Flow instances. */
1494      public static class FlowHolder
1495        {
1496        /** Field flow */
1497        public Flow flow;
1498    
1499        public FlowHolder()
1500          {
1501          }
1502    
1503        public FlowHolder( Flow flow )
1504          {
1505          this.flow = flow;
1506          }
1507        }
1508    
1509      /**
1510       * Class SafeFlowListener safely calls a wrapped FlowListener.
1511       * <p/>
1512       * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
1513       * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method
1514       * which in turn is run in a new Thread.
1515       */
1516      private class SafeFlowListener implements FlowListener
1517        {
1518        /** Field flowListener */
1519        final FlowListener flowListener;
1520        /** Field throwable */
1521        Throwable throwable;
1522    
1523        private SafeFlowListener( FlowListener flowListener )
1524          {
1525          this.flowListener = flowListener;
1526          }
1527    
1528        public void onStarting( Flow flow )
1529          {
1530          try
1531            {
1532            flowListener.onStarting( flow );
1533            }
1534          catch( Throwable throwable )
1535            {
1536            handleThrowable( throwable );
1537            }
1538          }
1539    
1540        public void onStopping( Flow flow )
1541          {
1542          try
1543            {
1544            flowListener.onStopping( flow );
1545            }
1546          catch( Throwable throwable )
1547            {
1548            handleThrowable( throwable );
1549            }
1550          }
1551    
1552        public void onCompleted( Flow flow )
1553          {
1554          try
1555            {
1556            flowListener.onCompleted( flow );
1557            }
1558          catch( Throwable throwable )
1559            {
1560            handleThrowable( throwable );
1561            }
1562          }
1563    
1564        public boolean onThrowable( Flow flow, Throwable flowThrowable )
1565          {
1566          try
1567            {
1568            return flowListener.onThrowable( flow, flowThrowable );
1569            }
1570          catch( Throwable throwable )
1571            {
1572            handleThrowable( throwable );
1573            }
1574    
1575          return false;
1576          }
1577    
1578        private void handleThrowable( Throwable throwable )
1579          {
1580          this.throwable = throwable;
1581    
1582          logWarn( String.format( "flow listener %s threw throwable", flowListener ), throwable );
1583    
1584          // stop this flow
1585          stop();
1586          }
1587    
1588        public boolean equals( Object object )
1589          {
1590          if( object instanceof BaseFlow.SafeFlowListener )
1591            return flowListener.equals( ( (BaseFlow.SafeFlowListener) object ).flowListener );
1592    
1593          return flowListener.equals( object );
1594          }
1595    
1596        public int hashCode()
1597          {
1598          return flowListener.hashCode();
1599          }
1600        }
1601      }