001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.cascade;
022    
023    import java.io.FileWriter;
024    import java.io.IOException;
025    import java.io.Writer;
026    import java.util.ArrayList;
027    import java.util.Arrays;
028    import java.util.Collection;
029    import java.util.Collections;
030    import java.util.HashSet;
031    import java.util.LinkedHashMap;
032    import java.util.LinkedList;
033    import java.util.List;
034    import java.util.Map;
035    import java.util.Set;
036    import java.util.concurrent.Callable;
037    import java.util.concurrent.CountDownLatch;
038    import java.util.concurrent.ExecutorService;
039    import java.util.concurrent.Executors;
040    import java.util.concurrent.Future;
041    import java.util.concurrent.FutureTask;
042    import java.util.concurrent.TimeUnit;
043    
044    import cascading.CascadingException;
045    import cascading.cascade.planner.FlowGraph;
046    import cascading.cascade.planner.IdentifierGraph;
047    import cascading.cascade.planner.TapGraph;
048    import cascading.flow.BaseFlow;
049    import cascading.flow.Flow;
050    import cascading.flow.FlowException;
051    import cascading.flow.FlowSkipStrategy;
052    import cascading.management.CascadingServices;
053    import cascading.management.UnitOfWork;
054    import cascading.management.UnitOfWorkExecutorStrategy;
055    import cascading.management.UnitOfWorkSpawnStrategy;
056    import cascading.management.state.ClientState;
057    import cascading.stats.CascadeStats;
058    import cascading.tap.Tap;
059    import cascading.util.ShutdownUtil;
060    import cascading.util.Util;
061    import cascading.util.Version;
062    import org.jgrapht.Graphs;
063    import org.jgrapht.ext.EdgeNameProvider;
064    import org.jgrapht.ext.IntegerNameProvider;
065    import org.jgrapht.ext.VertexNameProvider;
066    import org.jgrapht.graph.SimpleDirectedGraph;
067    import org.jgrapht.traverse.TopologicalOrderIterator;
068    import org.slf4j.Logger;
069    import org.slf4j.LoggerFactory;
070    
071    import static cascading.property.PropertyUtil.getProperty;
072    
073    /**
074     * A Cascade is an assembly of {@link cascading.flow.Flow} instances that share or depend on equivalent {@link Tap} instances and are executed as
075     * a single group. The most common case is where one Flow instance depends on a Tap created by a second Flow instance. This
076     * dependency chain can continue as practical.
077     * <p/>
078     * Note Flow instances that have no shared dependencies will be executed in parallel.
079     * <p/>
080     * Additionally, a Cascade allows for incremental builds of complex data processing processes. If a given source {@link Tap} is newer than
081     * a subsequent sink {@link Tap} in the assembly, the connecting {@link cascading.flow.Flow}(s) will be executed
082     * when the Cascade executed. If all the targets (sinks) are up to date, the Cascade exits immediately and does nothing.
083     * <p/>
084     * The concept of 'stale' is pluggable, see the {@link cascading.flow.FlowSkipStrategy} class.
085     * <p/>
086     * When a Cascade starts up, if first verifies which Flow instances have stale sinks, if the sinks are not stale, the
087     * method {@link cascading.flow.BaseFlow#deleteSinksIfNotUpdate()} is called. Before appends/updates were supported (logically)
088     * the Cascade deleted all the sinks in a Flow.
089     * <p/>
090     * The new consequence of this is if the Cascade fails, but does complete a Flow that appended or updated data, re-running
091     * the Cascade (and the successful append/update Flow) will re-update data to the source. Some systems may be idempotent and
092     * may not have any side-effects. So plan accordingly.
093     * <p/>
094     * Use the {@link CascadeListener} to receive any events on the life-cycle of the Cascade as it executes. Any
095     * {@link Tap} instances owned by managed Flows also implementing CascadeListener will automatically be added to the
096     * set of listeners.
097     *
098     * @see CascadeListener
099     * @see cascading.flow.Flow
100     * @see cascading.flow.FlowSkipStrategy
101     */
102    public class Cascade implements UnitOfWork<CascadeStats>
103      {
104      /** Field LOG */
105      private static final Logger LOG = LoggerFactory.getLogger( Cascade.class );
106    
107      /** Field id */
108      private String id;
109      /** Field name */
110      private final String name;
111      /** Field tags */
112      private String tags;
113      /** Field properties */
114      private final Map<Object, Object> properties;
115      /** Fields listeners */
116      private List<SafeCascadeListener> listeners;
117      /** Field jobGraph */
118      private final FlowGraph flowGraph;
119      /** Field tapGraph */
120      private final IdentifierGraph identifierGraph;
121      /** Field cascadeStats */
122      private final CascadeStats cascadeStats;
123      /** Field cascadingServices */
124      private CascadingServices cascadingServices;
125      /** Field thread */
126      private Thread thread;
127      /** Field throwable */
128      private Throwable throwable;
129      private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy();
130      /** Field shutdownHook */
131      private ShutdownUtil.Hook shutdownHook;
132      /** Field jobsMap */
133      private final Map<String, Callable<Throwable>> jobsMap = new LinkedHashMap<String, Callable<Throwable>>();
134      /** Field stop */
135      private boolean stop;
136      /** Field flowSkipStrategy */
137      private FlowSkipStrategy flowSkipStrategy = null;
138      /** Field maxConcurrentFlows */
139      private int maxConcurrentFlows = 0;
140    
141      /** Field tapGraph * */
142      private transient TapGraph tapGraph;
143    
144      static int getMaxConcurrentFlows( Map<Object, Object> properties, int maxConcurrentFlows )
145        {
146        if( maxConcurrentFlows != -1 ) // CascadeDef is -1 by default
147          return maxConcurrentFlows;
148    
149        return Integer.parseInt( getProperty( properties, CascadeProps.MAX_CONCURRENT_FLOWS, "0" ) );
150        }
151    
152      /** for testing */
153      protected Cascade()
154        {
155        this.name = null;
156        this.tags = null;
157        this.properties = null;
158        this.flowGraph = null;
159        this.identifierGraph = null;
160        this.cascadeStats = null;
161        }
162    
163      Cascade( CascadeDef cascadeDef, Map<Object, Object> properties, FlowGraph flowGraph, IdentifierGraph identifierGraph )
164        {
165        this.name = cascadeDef.getName();
166        this.tags = cascadeDef.getTags();
167        this.properties = properties;
168        this.flowGraph = flowGraph;
169        this.identifierGraph = identifierGraph;
170        this.cascadeStats = createPrepareCascadeStats();
171        setIDOnFlow();
172        this.maxConcurrentFlows = cascadeDef.getMaxConcurrentFlows();
173    
174        addListeners( getAllTaps() );
175        }
176    
177      private CascadeStats createPrepareCascadeStats()
178        {
179        CascadeStats cascadeStats = new CascadeStats( this, getClientState() );
180    
181        cascadeStats.prepare();
182        cascadeStats.markPending();
183    
184        return cascadeStats;
185        }
186    
187      /**
188       * Method getName returns the name of this Cascade object.
189       *
190       * @return the name (type String) of this Cascade object.
191       */
192      @Override
193      public String getName()
194        {
195        return name;
196        }
197    
198      /**
199       * Method getID returns the ID of this Cascade object.
200       * <p/>
201       * The ID value is a long HEX String used to identify this instance globally. Subsequent Cascade
202       * instances created with identical parameters will not return the same ID.
203       *
204       * @return the ID (type String) of this Cascade object.
205       */
206      @Override
207      public String getID()
208        {
209        if( id == null )
210          id = Util.createUniqueID();
211    
212        return id;
213        }
214    
215      /**
216       * Method getTags returns the tags associated with this Cascade object.
217       *
218       * @return the tags (type String) of this Cascade object.
219       */
220      @Override
221      public String getTags()
222        {
223        return tags;
224        }
225    
226      void addListeners( Collection listeners )
227        {
228        for( Object listener : listeners )
229          {
230          if( listener instanceof CascadeListener )
231            addListener( (CascadeListener) listener );
232          }
233        }
234    
235      List<SafeCascadeListener> getListeners()
236        {
237        if( listeners == null )
238          listeners = new LinkedList<SafeCascadeListener>();
239    
240        return listeners;
241        }
242    
243      public boolean hasListeners()
244        {
245        return listeners != null && !listeners.isEmpty();
246        }
247    
248      public void addListener( CascadeListener flowListener )
249        {
250        getListeners().add( new SafeCascadeListener( flowListener ) );
251        }
252    
253      public boolean removeListener( CascadeListener flowListener )
254        {
255        return getListeners().remove( new SafeCascadeListener( flowListener ) );
256        }
257    
258      private void fireOnCompleted()
259        {
260        if( hasListeners() )
261          {
262          if( LOG.isDebugEnabled() )
263            logDebug( "firing onCompleted event: " + getListeners().size() );
264    
265          for( CascadeListener cascadeListener : getListeners() )
266            cascadeListener.onCompleted( this );
267          }
268        }
269    
270      private void fireOnThrowable()
271        {
272        if( hasListeners() )
273          {
274          if( LOG.isDebugEnabled() )
275            logDebug( "firing onThrowable event: " + getListeners().size() );
276    
277          boolean isHandled = false;
278    
279          for( CascadeListener cascadeListener : getListeners() )
280            isHandled = cascadeListener.onThrowable( this, throwable ) || isHandled;
281    
282          if( isHandled )
283            throwable = null;
284          }
285        }
286    
287      protected void fireOnStopping()
288        {
289        if( hasListeners() )
290          {
291          if( LOG.isDebugEnabled() )
292            logDebug( "firing onStopping event: " + getListeners().size() );
293    
294          for( CascadeListener cascadeListener : getListeners() )
295            cascadeListener.onStopping( this );
296          }
297        }
298    
299      protected void fireOnStarting()
300        {
301        if( hasListeners() )
302          {
303          if( LOG.isDebugEnabled() )
304            logDebug( "firing onStarting event: " + getListeners().size() );
305    
306          for( CascadeListener cascadeListener : getListeners() )
307            cascadeListener.onStarting( this );
308          }
309        }
310    
311      private CascadingServices getCascadingServices()
312        {
313        if( cascadingServices == null )
314          cascadingServices = new CascadingServices( properties );
315    
316        return cascadingServices;
317        }
318    
319      private ClientState getClientState()
320        {
321        return getCascadingServices().createClientState( getID() );
322        }
323    
324      /**
325       * Method getCascadeStats returns the cascadeStats of this Cascade object.
326       *
327       * @return the cascadeStats (type CascadeStats) of this Cascade object.
328       */
329      public CascadeStats getCascadeStats()
330        {
331        return cascadeStats;
332        }
333    
334      @Override
335      public CascadeStats getStats()
336        {
337        return getCascadeStats();
338        }
339    
340      private void setIDOnFlow()
341        {
342        for( Flow<?> flow : getFlows() )
343          ( (BaseFlow<?>) flow ).setCascade( this );
344        }
345    
346      protected FlowGraph getFlowGraph()
347        {
348        return flowGraph;
349        }
350    
351      protected IdentifierGraph getIdentifierGraph()
352        {
353        return identifierGraph;
354        }
355    
356      /**
357       * Method getFlows returns the flows managed by this Cascade object. The returned {@link cascading.flow.Flow} instances
358       * will be in topological order.
359       *
360       * @return the flows (type Collection<Flow>) of this Cascade object.
361       */
362      public List<Flow> getFlows()
363        {
364        List<Flow> flows = new LinkedList<Flow>();
365        TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator();
366    
367        while( topoIterator.hasNext() )
368          flows.add( topoIterator.next() );
369    
370        return flows;
371        }
372    
373      /**
374       * Method findFlows returns a List of flows whose names match the given regex pattern.
375       *
376       * @param regex of type String
377       * @return List<Flow>
378       */
379      public List<Flow> findFlows( String regex )
380        {
381        List<Flow> flows = new ArrayList<Flow>();
382    
383        for( Flow flow : getFlows() )
384          {
385          if( flow.getName().matches( regex ) )
386            flows.add( flow );
387          }
388    
389        return flows;
390        }
391    
392      /**
393       * Method getHeadFlows returns all Flow instances that are at the "head" of the flow graph.
394       * <p/>
395       * That is, they are the first to execute and have no Tap source dependencies with Flow instances in the this Cascade
396       * instance.
397       *
398       * @return Collection<Flow>
399       */
400      public Collection<Flow> getHeadFlows()
401        {
402        Set<Flow> flows = new HashSet<Flow>();
403    
404        for( Flow flow : flowGraph.vertexSet() )
405          {
406          if( flowGraph.inDegreeOf( flow ) == 0 )
407            flows.add( flow );
408          }
409    
410        return flows;
411        }
412    
413      /**
414       * Method getTailFlows returns all Flow instances that are at the "tail" of the flow graph.
415       * <p/>
416       * That is, they are the last to execute and have no Tap sink dependencies with Flow instances in the this Cascade
417       * instance.
418       *
419       * @return Collection<Flow>
420       */
421      public Collection<Flow> getTailFlows()
422        {
423        Set<Flow> flows = new HashSet<Flow>();
424    
425        for( Flow flow : flowGraph.vertexSet() )
426          {
427          if( flowGraph.outDegreeOf( flow ) == 0 )
428            flows.add( flow );
429          }
430    
431        return flows;
432        }
433    
434      /**
435       * Method getIntermediateFlows returns all Flow instances that are neither at the "tail" or "tail" of the flow graph.
436       *
437       * @return Collection<Flow>
438       */
439      public Collection<Flow> getIntermediateFlows()
440        {
441        Set<Flow> flows = new HashSet<Flow>( flowGraph.vertexSet() );
442    
443        flows.removeAll( getHeadFlows() );
444        flows.removeAll( getTailFlows() );
445    
446        return flows;
447        }
448    
449      protected TapGraph getTapGraph()
450        {
451        if( tapGraph == null )
452          tapGraph = new TapGraph( flowGraph.vertexSet() );
453    
454        return tapGraph;
455        }
456    
457      /**
458       * Method getSourceTaps returns all source Tap instances in this Cascade instance.
459       * <p/>
460       * That is, none of returned Tap instances are the sinks of other Flow instances in this Cascade.
461       * <p/>
462       * All {@link cascading.tap.CompositeTap} instances are unwound if addressed directly by a managed Flow instance.
463       *
464       * @return Collection<Tap>
465       */
466      public Collection<Tap> getSourceTaps()
467        {
468        TapGraph tapGraph = getTapGraph();
469        Set<Tap> taps = new HashSet<Tap>();
470    
471        for( Tap tap : tapGraph.vertexSet() )
472          {
473          if( tapGraph.inDegreeOf( tap ) == 0 )
474            taps.add( tap );
475          }
476    
477        return taps;
478        }
479    
480      /**
481       * Method getSinkTaps returns all sink Tap instances in this Cascade instance.
482       * <p/>
483       * That is, none of returned Tap instances are the sources of other Flow instances in this Cascade.
484       * <p/>
485       * All {@link cascading.tap.CompositeTap} instances are unwound if addressed directly by a managed Flow instance.
486       * <p/>
487       * This method will return checkpoint Taps managed by Flow instances if not used as a source by other Flow instances.
488       *
489       * @return Collection<Tap>
490       */
491      public Collection<Tap> getSinkTaps()
492        {
493        TapGraph tapGraph = getTapGraph();
494        Set<Tap> taps = new HashSet<Tap>();
495    
496        for( Tap tap : tapGraph.vertexSet() )
497          {
498          if( tapGraph.outDegreeOf( tap ) == 0 )
499            taps.add( tap );
500          }
501    
502        return taps;
503        }
504    
505      /**
506       * Method getCheckpointTaps returns all checkpoint Tap instances from all the Flow instances in this Cascade instance.
507       *
508       * @return Collection<Tap>
509       */
510      public Collection<Tap> getCheckpointsTaps()
511        {
512        Set<Tap> taps = new HashSet<Tap>();
513    
514        for( Flow flow : getFlows() )
515          taps.addAll( flow.getCheckpointsCollection() );
516    
517        return taps;
518        }
519    
520      /**
521       * Method getIntermediateTaps returns all Tap instances that are neither at the source or sink of the flow graph.
522       * <p/>
523       * This method does consider checkpoint Taps managed by Flow instances in this Cascade instance.
524       *
525       * @return Collection<Flow>
526       */
527      public Collection<Tap> getIntermediateTaps()
528        {
529        TapGraph tapGraph = getTapGraph();
530        Set<Tap> taps = new HashSet<Tap>( tapGraph.vertexSet() );
531    
532        taps.removeAll( getSourceTaps() );
533        taps.removeAll( getSinkTaps() );
534    
535        return taps;
536        }
537    
538      /**
539       * Method getAllTaps returns all source, sink, and checkpoint Tap instances associated with the managed
540       * Flow instances in this Cascade instance.
541       *
542       * @return Collection<Tap>
543       */
544      public Collection<Tap> getAllTaps()
545        {
546        return new HashSet<Tap>( getTapGraph().vertexSet() );
547        }
548    
549      /**
550       * Method getSuccessorFlows returns a Collection of all the Flow instances that will be
551       * executed after the given Flow instance.
552       *
553       * @param flow of type Flow
554       * @return Collection<Flow>
555       */
556      public Collection<Flow> getSuccessorFlows( Flow flow )
557        {
558        return Graphs.successorListOf( flowGraph, flow );
559        }
560    
561      /**
562       * Method getPredecessorFlows returns a Collection of all the Flow instances that will be
563       * executed before the given Flow instance.
564       *
565       * @param flow of type Flow
566       * @return Collection<Flow>
567       */
568      public Collection<Flow> getPredecessorFlows( Flow flow )
569        {
570        return Graphs.predecessorListOf( flowGraph, flow );
571        }
572    
573      /**
574       * Method findFlowsSourcingFrom returns all Flow instances that reads from a source with the given identifier.
575       *
576       * @param identifier of type String
577       * @return Collection<Flow>
578       */
579      public Collection<Flow> findFlowsSourcingFrom( String identifier )
580        {
581        try
582          {
583          return unwrapFlows( identifierGraph.outgoingEdgesOf( identifier ) );
584          }
585        catch( Exception exception )
586          {
587          return Collections.emptySet();
588          }
589        }
590    
591      /**
592       * Method findFlowsSinkingTo returns all Flow instances that writes to a sink with the given identifier.
593       *
594       * @param identifier of type String
595       * @return Collection<Flow>
596       */
597      public Collection<Flow> findFlowsSinkingTo( String identifier )
598        {
599        try
600          {
601          return unwrapFlows( identifierGraph.incomingEdgesOf( identifier ) );
602          }
603        catch( Exception exception )
604          {
605          return Collections.emptySet();
606          }
607        }
608    
609      private Collection<Flow> unwrapFlows( Set<BaseFlow.FlowHolder> flowHolders )
610        {
611        Set<Flow> flows = new HashSet<Flow>();
612    
613        for( BaseFlow.FlowHolder flowHolder : flowHolders )
614          flows.add( flowHolder.flow );
615    
616        return flows;
617        }
618    
619      /**
620       * Method getFlowSkipStrategy returns the current {@link cascading.flow.FlowSkipStrategy} used by this Flow.
621       *
622       * @return FlowSkipStrategy
623       */
624      public FlowSkipStrategy getFlowSkipStrategy()
625        {
626        return flowSkipStrategy;
627        }
628    
629      /**
630       * Method setFlowSkipStrategy sets a new {@link cascading.flow.FlowSkipStrategy}, the current strategy, if any, is returned.
631       * If a strategy is given, it will be used as the strategy for all {@link cascading.flow.BaseFlow} instances managed by this Cascade instance.
632       * To revert back to consulting the strategies associated with each Flow instance, re-set this value to {@code null}, its
633       * default value.
634       * <p/>
635       * FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is {@link cascading.flow.FlowSkipIfSinkNotStale}
636       * and is inherited from the Flow instance in question. An alternative strategy would be {@link cascading.flow.FlowSkipIfSinkExists}.
637       * <p/>
638       * A FlowSkipStrategy will not be consulted when executing a Flow directly through {@link #start()}
639       *
640       * @param flowSkipStrategy of type FlowSkipStrategy
641       * @return FlowSkipStrategy
642       */
643      public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy )
644        {
645        try
646          {
647          return this.flowSkipStrategy;
648          }
649        finally
650          {
651          this.flowSkipStrategy = flowSkipStrategy;
652          }
653        }
654    
655      @Override
656      public void prepare()
657        {
658        }
659    
660      /**
661       * Method start begins the current Cascade process. It returns immediately. See method {@link #complete()} to block
662       * until the Cascade completes.
663       */
664      public void start()
665        {
666        if( thread != null )
667          return;
668    
669        thread = new Thread( new Runnable()
670        {
671        @Override
672        public void run()
673          {
674          Cascade.this.run();
675          }
676        }, ( "cascade " + Util.toNull( getName() ) ).trim() );
677    
678        thread.start();
679        }
680    
681      /**
682       * Method complete begins the current Cascade process if method {@link #start()} was not previously called. This method
683       * blocks until the process completes.
684       *
685       * @throws RuntimeException wrapping any exception thrown internally.
686       */
687      public void complete()
688        {
689        start();
690    
691        try
692          {
693          try
694            {
695            thread.join();
696            }
697          catch( InterruptedException exception )
698            {
699            throw new FlowException( "thread interrupted", exception );
700            }
701    
702          if( throwable instanceof CascadingException )
703            throw (CascadingException) throwable;
704    
705          if( throwable != null )
706            throw new CascadeException( "unhandled exception", throwable );
707          }
708        finally
709          {
710          thread = null;
711          throwable = null;
712          shutdownHook = null;
713          cascadeStats.cleanup();
714          }
715        }
716    
717      public synchronized void stop()
718        {
719        if( stop )
720          return;
721    
722        stop = true;
723    
724        fireOnStopping();
725    
726        if( !cascadeStats.isFinished() )
727          cascadeStats.markStopped();
728    
729        internalStopAllFlows();
730        handleExecutorShutdown();
731    
732        cascadeStats.cleanup();
733        }
734    
735      @Override
736      public void cleanup()
737        {
738        }
739    
740      /** Method run implements the Runnable run method. */
741      private void run()
742        {
743        Version.printBanner();
744    
745        if( LOG.isInfoEnabled() )
746          logInfo( "starting" );
747    
748        registerShutdownHook();
749    
750        try
751          {
752          if( stop )
753            return;
754    
755          // mark started, not submitted
756          cascadeStats.markStartedThenRunning();
757    
758          fireOnStarting();
759    
760          initializeNewJobsMap();
761    
762          int numThreads = getMaxConcurrentFlows( properties, maxConcurrentFlows );
763    
764          if( numThreads == 0 )
765            numThreads = jobsMap.size();
766    
767          int numLocalFlows = numLocalFlows();
768    
769          boolean runFlowsLocal = numLocalFlows > 1;
770    
771          if( runFlowsLocal )
772            numThreads = 1;
773    
774          if( LOG.isInfoEnabled() )
775            {
776            logInfo( " parallel execution is enabled: " + !runFlowsLocal );
777            logInfo( " starting flows: " + jobsMap.size() );
778            logInfo( " allocating threads: " + numThreads );
779            }
780    
781          List<Future<Throwable>> futures = spawnStrategy.start( this, numThreads, jobsMap.values() );
782    
783          for( Future<Throwable> future : futures )
784            {
785            throwable = future.get();
786    
787            if( throwable != null )
788              {
789              if( !stop )
790                {
791                if( !cascadeStats.isFinished() )
792                  cascadeStats.markFailed( throwable );
793                internalStopAllFlows();
794                fireOnThrowable();
795                }
796    
797              handleExecutorShutdown();
798              break;
799              }
800            }
801          }
802        catch( Throwable throwable )
803          {
804          this.throwable = throwable;
805          }
806        finally
807          {
808          if( !cascadeStats.isFinished() )
809            cascadeStats.markSuccessful();
810    
811          try
812            {
813            fireOnCompleted();
814            }
815          finally
816            {
817            deregisterShutdownHook();
818            }
819          }
820        }
821    
822      private void registerShutdownHook()
823        {
824        if( !isStopJobsOnExit() )
825          return;
826    
827        shutdownHook = new ShutdownUtil.Hook()
828        {
829        @Override
830        public Priority priority()
831          {
832          return Priority.WORK_PARENT;
833          }
834    
835        @Override
836        public void execute()
837          {
838          logInfo( "shutdown hook calling stop on cascade" );
839    
840          Cascade.this.stop();
841          }
842        };
843    
844        ShutdownUtil.addHook( shutdownHook );
845        }
846    
847      private void deregisterShutdownHook()
848        {
849        if( !isStopJobsOnExit() || stop )
850          return;
851    
852        ShutdownUtil.removeHook( shutdownHook );
853        }
854    
855      private boolean isStopJobsOnExit()
856        {
857        if( getFlows().isEmpty() )
858          return false; // don't bother registering hook
859    
860        return getFlows().get( 0 ).isStopJobsOnExit();
861        }
862    
863      /**
864       * If the number of flows that are local is greater than one, force the Cascade to run without parallelization.
865       *
866       * @return of type int
867       */
868      private int numLocalFlows()
869        {
870        int countLocalJobs = 0;
871    
872        for( Flow flow : getFlows() )
873          {
874          if( flow.stepsAreLocal() )
875            countLocalJobs++;
876          }
877    
878        return countLocalJobs;
879        }
880    
881      private void initializeNewJobsMap()
882        {
883        synchronized( jobsMap )
884          {
885          // keep topo order
886          TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator();
887    
888          while( topoIterator.hasNext() )
889            {
890            Flow flow = topoIterator.next();
891    
892            cascadeStats.addFlowStats( flow.getFlowStats() );
893    
894            CascadeJob job = new CascadeJob( flow );
895    
896            jobsMap.put( flow.getName(), job );
897    
898            List<CascadeJob> predecessors = new ArrayList<CascadeJob>();
899    
900            for( Flow predecessor : Graphs.predecessorListOf( flowGraph, flow ) )
901              predecessors.add( (CascadeJob) jobsMap.get( predecessor.getName() ) );
902    
903            job.init( predecessors );
904            }
905          }
906        }
907    
908      private void handleExecutorShutdown()
909        {
910        if( spawnStrategy.isCompleted( this ) )
911          return;
912    
913        logInfo( "shutting down flow executor" );
914    
915        try
916          {
917          spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS );
918          }
919        catch( InterruptedException exception )
920          {
921          // ignore
922          }
923    
924        logInfo( "shutdown complete" );
925        }
926    
927      private void internalStopAllFlows()
928        {
929        logInfo( "stopping all flows" );
930    
931        synchronized( jobsMap )
932          {
933          List<Callable<Throwable>> jobs = new ArrayList<Callable<Throwable>>( jobsMap.values() );
934    
935          Collections.reverse( jobs );
936    
937          for( Callable<Throwable> callable : jobs )
938            ( (CascadeJob) callable ).stop();
939          }
940    
941        logInfo( "stopped all flows" );
942        }
943    
944      /**
945       * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging.
946       *
947       * @param filename of type String
948       */
949      public void writeDOT( String filename )
950        {
951        printElementGraph( filename, identifierGraph );
952        }
953    
954      protected void printElementGraph( String filename, SimpleDirectedGraph<String, BaseFlow.FlowHolder> graph )
955        {
956        try
957          {
958          Writer writer = new FileWriter( filename );
959    
960          Util.writeDOT( writer, graph, new IntegerNameProvider<String>(), new VertexNameProvider<String>()
961            {
962            public String getVertexName( String object )
963              {
964              return object.toString().replaceAll( "\"", "\'" );
965              }
966            }, new EdgeNameProvider<BaseFlow.FlowHolder>()
967            {
968            public String getEdgeName( BaseFlow.FlowHolder object )
969              {
970              return object.flow.getName().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz
971              }
972            }
973          );
974    
975          writer.close();
976          }
977        catch( IOException exception )
978          {
979          LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
980          }
981        }
982    
983      @Override
984      public String toString()
985        {
986        return getName();
987        }
988    
989      private void logDebug( String message )
990        {
991        LOG.debug( "[" + Util.truncate( getName(), 25 ) + "] " + message );
992        }
993    
994      private void logInfo( String message )
995        {
996        LOG.info( "[" + Util.truncate( getName(), 25 ) + "] " + message );
997        }
998    
999      private void logWarn( String message )
1000        {
1001        logWarn( message, null );
1002        }
1003    
1004      private void logWarn( String message, Throwable throwable )
1005        {
1006        LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable );
1007        }
1008    
1009      /** Class CascadeJob manages Flow execution in the current Cascade instance. */
1010      protected class CascadeJob implements Callable<Throwable>
1011        {
1012        /** Field flow */
1013        final Flow flow;
1014        /** Field predecessors */
1015        private List<CascadeJob> predecessors;
1016        /** Field latch */
1017        private final CountDownLatch latch = new CountDownLatch( 1 );
1018        /** Field stop */
1019        private boolean stop = false;
1020        /** Field failed */
1021        private boolean failed = false;
1022    
1023        public CascadeJob( Flow flow )
1024          {
1025          this.flow = flow;
1026          }
1027    
1028        public String getName()
1029          {
1030          return flow.getName();
1031          }
1032    
1033        public Throwable call()
1034          {
1035          try
1036            {
1037            for( CascadeJob predecessor : predecessors )
1038              {
1039              if( !predecessor.isSuccessful() )
1040                return null;
1041              }
1042    
1043            if( stop || cascadeStats.isFinished() )
1044              return null;
1045    
1046            try
1047              {
1048              if( LOG.isInfoEnabled() )
1049                logInfo( "starting flow: " + flow.getName() );
1050    
1051              if( flowSkipStrategy == null ? flow.isSkipFlow() : flowSkipStrategy.skipFlow( flow ) )
1052                {
1053                if( LOG.isInfoEnabled() )
1054                  logInfo( "skipping flow: " + flow.getName() );
1055    
1056                flow.getFlowStats().markSkipped();
1057    
1058                return null;
1059                }
1060    
1061              flow.prepare(); // do not delete append/update mode taps
1062              flow.complete();
1063    
1064              if( LOG.isInfoEnabled() )
1065                logInfo( "completed flow: " + flow.getName() );
1066              }
1067            catch( Throwable exception )
1068              {
1069              failed = true;
1070              logWarn( "flow failed: " + flow.getName(), exception );
1071    
1072              CascadeException cascadeException = new CascadeException( "flow failed: " + flow.getName(), exception );
1073    
1074              if( !cascadeStats.isFinished() )
1075                cascadeStats.markFailed( cascadeException );
1076    
1077              return cascadeException;
1078              }
1079            finally
1080              {
1081              flow.cleanup();
1082              }
1083            }
1084          catch( Throwable throwable )
1085            {
1086            failed = true;
1087            return throwable;
1088            }
1089          finally
1090            {
1091            latch.countDown();
1092            }
1093    
1094          return null;
1095          }
1096    
1097        public void init( List<CascadeJob> predecessors )
1098          {
1099          this.predecessors = predecessors;
1100          }
1101    
1102        public void stop()
1103          {
1104          if( LOG.isInfoEnabled() )
1105            logInfo( "stopping flow: " + flow.getName() );
1106    
1107          stop = true;
1108    
1109          if( flow != null )
1110            flow.stop();
1111          }
1112    
1113        public boolean isSuccessful()
1114          {
1115          try
1116            {
1117            latch.await();
1118    
1119            return flow != null && !failed && !stop;
1120            }
1121          catch( InterruptedException exception )
1122            {
1123            logWarn( "latch interrupted", exception );
1124            }
1125    
1126          return false;
1127          }
1128        }
1129    
1130      @Override
1131      public UnitOfWorkSpawnStrategy getSpawnStrategy()
1132        {
1133        return spawnStrategy;
1134        }
1135    
1136      @Override
1137      public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy )
1138        {
1139        this.spawnStrategy = spawnStrategy;
1140        }
1141    
1142      /**
1143       * Class SafeCascadeListener safely calls a wrapped CascadeListener.
1144       * <p/>
1145       * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
1146       * can be caught by the calling Thread. Since Cascade is asynchronous, much of the work is done in the run() method
1147       * which in turn is run in a new Thread.
1148       */
1149      private class SafeCascadeListener implements CascadeListener
1150        {
1151        /** Field flowListener */
1152        final CascadeListener cascadeListener;
1153        /** Field throwable */
1154        Throwable throwable;
1155    
1156        private SafeCascadeListener( CascadeListener cascadeListener )
1157          {
1158          this.cascadeListener = cascadeListener;
1159          }
1160    
1161        public void onStarting( Cascade cascade )
1162          {
1163          try
1164            {
1165            cascadeListener.onStarting( cascade );
1166            }
1167          catch( Throwable throwable )
1168            {
1169            handleThrowable( throwable );
1170            }
1171          }
1172    
1173        public void onStopping( Cascade cascade )
1174          {
1175          try
1176            {
1177            cascadeListener.onStopping( cascade );
1178            }
1179          catch( Throwable throwable )
1180            {
1181            handleThrowable( throwable );
1182            }
1183          }
1184    
1185        public void onCompleted( Cascade cascade )
1186          {
1187          try
1188            {
1189            cascadeListener.onCompleted( cascade );
1190            }
1191          catch( Throwable throwable )
1192            {
1193            handleThrowable( throwable );
1194            }
1195          }
1196    
1197        public boolean onThrowable( Cascade cascade, Throwable flowThrowable )
1198          {
1199          try
1200            {
1201            return cascadeListener.onThrowable( cascade, flowThrowable );
1202            }
1203          catch( Throwable throwable )
1204            {
1205            handleThrowable( throwable );
1206            }
1207    
1208          return false;
1209          }
1210    
1211        private void handleThrowable( Throwable throwable )
1212          {
1213          this.throwable = throwable;
1214    
1215          logWarn( String.format( "cascade listener %s threw throwable", cascadeListener ), throwable );
1216    
1217          // stop this flow
1218          stop();
1219          }
1220    
1221        public boolean equals( Object object )
1222          {
1223          if( object instanceof SafeCascadeListener )
1224            return cascadeListener.equals( ( (SafeCascadeListener) object ).cascadeListener );
1225    
1226          return cascadeListener.equals( object );
1227          }
1228    
1229        public int hashCode()
1230          {
1231          return cascadeListener.hashCode();
1232          }
1233        }
1234      }