001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.cascade;
022    
023    import java.io.FileWriter;
024    import java.io.IOException;
025    import java.io.Writer;
026    import java.util.ArrayList;
027    import java.util.Collection;
028    import java.util.Collections;
029    import java.util.HashSet;
030    import java.util.LinkedHashMap;
031    import java.util.LinkedList;
032    import java.util.List;
033    import java.util.Map;
034    import java.util.Set;
035    import java.util.concurrent.Callable;
036    import java.util.concurrent.CountDownLatch;
037    import java.util.concurrent.Future;
038    import java.util.concurrent.TimeUnit;
039    
040    import cascading.CascadingException;
041    import cascading.cascade.planner.FlowGraph;
042    import cascading.cascade.planner.IdentifierGraph;
043    import cascading.cascade.planner.TapGraph;
044    import cascading.flow.BaseFlow;
045    import cascading.flow.Flow;
046    import cascading.flow.FlowException;
047    import cascading.flow.FlowSkipStrategy;
048    import cascading.management.CascadingServices;
049    import cascading.management.UnitOfWork;
050    import cascading.management.UnitOfWorkExecutorStrategy;
051    import cascading.management.UnitOfWorkSpawnStrategy;
052    import cascading.management.state.ClientState;
053    import cascading.stats.CascadeStats;
054    import cascading.tap.Tap;
055    import cascading.util.ShutdownUtil;
056    import cascading.util.Util;
057    import cascading.util.Version;
058    import org.jgrapht.Graphs;
059    import org.jgrapht.ext.EdgeNameProvider;
060    import org.jgrapht.ext.IntegerNameProvider;
061    import org.jgrapht.ext.VertexNameProvider;
062    import org.jgrapht.graph.SimpleDirectedGraph;
063    import org.jgrapht.traverse.TopologicalOrderIterator;
064    import org.slf4j.Logger;
065    import org.slf4j.LoggerFactory;
066    
067    import static cascading.property.PropertyUtil.getProperty;
068    
069    /**
070     * A Cascade is an assembly of {@link cascading.flow.Flow} instances that share or depend on equivalent {@link Tap} instances and are executed as
071     * a single group. The most common case is where one Flow instance depends on a Tap created by a second Flow instance. This
072     * dependency chain can continue as practical.
073     * <p/>
074     * Note Flow instances that have no shared dependencies will be executed in parallel.
075     * <p/>
076     * Additionally, a Cascade allows for incremental builds of complex data processing processes. If a given source {@link Tap} is newer than
077     * a subsequent sink {@link Tap} in the assembly, the connecting {@link cascading.flow.Flow}(s) will be executed
078     * when the Cascade executed. If all the targets (sinks) are up to date, the Cascade exits immediately and does nothing.
079     * <p/>
080     * The concept of 'stale' is pluggable, see the {@link cascading.flow.FlowSkipStrategy} class.
081     * <p/>
082     * When a Cascade starts up, if first verifies which Flow instances have stale sinks, if the sinks are not stale, the
083     * method {@link cascading.flow.BaseFlow#deleteSinksIfNotUpdate()} is called. Before appends/updates were supported (logically)
084     * the Cascade deleted all the sinks in a Flow.
085     * <p/>
086     * The new consequence of this is if the Cascade fails, but does complete a Flow that appended or updated data, re-running
087     * the Cascade (and the successful append/update Flow) will re-update data to the source. Some systems may be idempotent and
088     * may not have any side-effects. So plan accordingly.
089     * <p/>
090     * Use the {@link CascadeListener} to receive any events on the life-cycle of the Cascade as it executes. Any
091     * {@link Tap} instances owned by managed Flows also implementing CascadeListener will automatically be added to the
092     * set of listeners.
093     *
094     * @see CascadeListener
095     * @see cascading.flow.Flow
096     * @see cascading.flow.FlowSkipStrategy
097     */
098    public class Cascade implements UnitOfWork<CascadeStats>
099      {
100      /** Field LOG */
101      private static final Logger LOG = LoggerFactory.getLogger( Cascade.class );
102    
103      /** Field id */
104      private String id;
105      /** Field name */
106      private final String name;
107      /** Field tags */
108      private String tags;
109      /** Field properties */
110      private final Map<Object, Object> properties;
111      /** Fields listeners */
112      private List<SafeCascadeListener> listeners;
113      /** Field jobGraph */
114      private final FlowGraph flowGraph;
115      /** Field tapGraph */
116      private final IdentifierGraph identifierGraph;
117      /** Field cascadeStats */
118      private final CascadeStats cascadeStats;
119      /** Field cascadingServices */
120      private CascadingServices cascadingServices;
121      /** Field thread */
122      private Thread thread;
123      /** Field throwable */
124      private Throwable throwable;
125      private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy();
126      /** Field shutdownHook */
127      private ShutdownUtil.Hook shutdownHook;
128      /** Field jobsMap */
129      private final Map<String, Callable<Throwable>> jobsMap = new LinkedHashMap<String, Callable<Throwable>>();
130      /** Field stop */
131      private boolean stop;
132      /** Field flowSkipStrategy */
133      private FlowSkipStrategy flowSkipStrategy = null;
134      /** Field maxConcurrentFlows */
135      private int maxConcurrentFlows = 0;
136    
137      /** Field tapGraph * */
138      private transient TapGraph tapGraph;
139    
140      static int getMaxConcurrentFlows( Map<Object, Object> properties, int maxConcurrentFlows )
141        {
142        if( maxConcurrentFlows != -1 ) // CascadeDef is -1 by default
143          return maxConcurrentFlows;
144    
145        return Integer.parseInt( getProperty( properties, CascadeProps.MAX_CONCURRENT_FLOWS, "0" ) );
146        }
147    
148      /** for testing */
149      protected Cascade()
150        {
151        this.name = null;
152        this.tags = null;
153        this.properties = null;
154        this.flowGraph = null;
155        this.identifierGraph = null;
156        this.cascadeStats = null;
157        }
158    
159      Cascade( CascadeDef cascadeDef, Map<Object, Object> properties, FlowGraph flowGraph, IdentifierGraph identifierGraph )
160        {
161        this.name = cascadeDef.getName();
162        this.tags = cascadeDef.getTags();
163        this.properties = properties;
164        this.flowGraph = flowGraph;
165        this.identifierGraph = identifierGraph;
166        this.cascadeStats = createPrepareCascadeStats();
167        setIDOnFlow();
168        this.maxConcurrentFlows = cascadeDef.getMaxConcurrentFlows();
169    
170        addListeners( getAllTaps() );
171        }
172    
173      private CascadeStats createPrepareCascadeStats()
174        {
175        CascadeStats cascadeStats = new CascadeStats( this, getClientState() );
176    
177        cascadeStats.prepare();
178        cascadeStats.markPending();
179    
180        return cascadeStats;
181        }
182    
183      /**
184       * Method getName returns the name of this Cascade object.
185       *
186       * @return the name (type String) of this Cascade object.
187       */
188      @Override
189      public String getName()
190        {
191        return name;
192        }
193    
194      /**
195       * Method getID returns the ID of this Cascade object.
196       * <p/>
197       * The ID value is a long HEX String used to identify this instance globally. Subsequent Cascade
198       * instances created with identical parameters will not return the same ID.
199       *
200       * @return the ID (type String) of this Cascade object.
201       */
202      @Override
203      public String getID()
204        {
205        if( id == null )
206          id = Util.createUniqueID();
207    
208        return id;
209        }
210    
211      /**
212       * Method getTags returns the tags associated with this Cascade object.
213       *
214       * @return the tags (type String) of this Cascade object.
215       */
216      @Override
217      public String getTags()
218        {
219        return tags;
220        }
221    
222      void addListeners( Collection listeners )
223        {
224        for( Object listener : listeners )
225          {
226          if( listener instanceof CascadeListener )
227            addListener( (CascadeListener) listener );
228          }
229        }
230    
231      List<SafeCascadeListener> getListeners()
232        {
233        if( listeners == null )
234          listeners = new LinkedList<SafeCascadeListener>();
235    
236        return listeners;
237        }
238    
239      public boolean hasListeners()
240        {
241        return listeners != null && !listeners.isEmpty();
242        }
243    
244      public void addListener( CascadeListener flowListener )
245        {
246        getListeners().add( new SafeCascadeListener( flowListener ) );
247        }
248    
249      public boolean removeListener( CascadeListener flowListener )
250        {
251        return getListeners().remove( new SafeCascadeListener( flowListener ) );
252        }
253    
254      private void fireOnCompleted()
255        {
256        if( hasListeners() )
257          {
258          if( LOG.isDebugEnabled() )
259            logDebug( "firing onCompleted event: " + getListeners().size() );
260    
261          for( CascadeListener cascadeListener : getListeners() )
262            cascadeListener.onCompleted( this );
263          }
264        }
265    
266      private void fireOnThrowable()
267        {
268        if( hasListeners() )
269          {
270          if( LOG.isDebugEnabled() )
271            logDebug( "firing onThrowable event: " + getListeners().size() );
272    
273          boolean isHandled = false;
274    
275          for( CascadeListener cascadeListener : getListeners() )
276            isHandled = cascadeListener.onThrowable( this, throwable ) || isHandled;
277    
278          if( isHandled )
279            throwable = null;
280          }
281        }
282    
283      protected void fireOnStopping()
284        {
285        if( hasListeners() )
286          {
287          if( LOG.isDebugEnabled() )
288            logDebug( "firing onStopping event: " + getListeners().size() );
289    
290          for( CascadeListener cascadeListener : getListeners() )
291            cascadeListener.onStopping( this );
292          }
293        }
294    
295      protected void fireOnStarting()
296        {
297        if( hasListeners() )
298          {
299          if( LOG.isDebugEnabled() )
300            logDebug( "firing onStarting event: " + getListeners().size() );
301    
302          for( CascadeListener cascadeListener : getListeners() )
303            cascadeListener.onStarting( this );
304          }
305        }
306    
307      private CascadingServices getCascadingServices()
308        {
309        if( cascadingServices == null )
310          cascadingServices = new CascadingServices( properties );
311    
312        return cascadingServices;
313        }
314    
315      private ClientState getClientState()
316        {
317        return getCascadingServices().createClientState( getID() );
318        }
319    
320      /**
321       * Method getCascadeStats returns the cascadeStats of this Cascade object.
322       *
323       * @return the cascadeStats (type CascadeStats) of this Cascade object.
324       */
325      public CascadeStats getCascadeStats()
326        {
327        return cascadeStats;
328        }
329    
330      @Override
331      public CascadeStats getStats()
332        {
333        return getCascadeStats();
334        }
335    
336      private void setIDOnFlow()
337        {
338        for( Flow<?> flow : getFlows() )
339          ( (BaseFlow<?>) flow ).setCascade( this );
340        }
341    
342      protected FlowGraph getFlowGraph()
343        {
344        return flowGraph;
345        }
346    
347      protected IdentifierGraph getIdentifierGraph()
348        {
349        return identifierGraph;
350        }
351    
352      /**
353       * Method getFlows returns the flows managed by this Cascade object. The returned {@link cascading.flow.Flow} instances
354       * will be in topological order.
355       *
356       * @return the flows (type Collection<Flow>) of this Cascade object.
357       */
358      public List<Flow> getFlows()
359        {
360        List<Flow> flows = new LinkedList<Flow>();
361        TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator();
362    
363        while( topoIterator.hasNext() )
364          flows.add( topoIterator.next() );
365    
366        return flows;
367        }
368    
369      /**
370       * Method findFlows returns a List of flows whose names match the given regex pattern.
371       *
372       * @param regex of type String
373       * @return List<Flow>
374       */
375      public List<Flow> findFlows( String regex )
376        {
377        List<Flow> flows = new ArrayList<Flow>();
378    
379        for( Flow flow : getFlows() )
380          {
381          if( flow.getName().matches( regex ) )
382            flows.add( flow );
383          }
384    
385        return flows;
386        }
387    
388      /**
389       * Method getHeadFlows returns all Flow instances that are at the "head" of the flow graph.
390       * <p/>
391       * That is, they are the first to execute and have no Tap source dependencies with Flow instances in the this Cascade
392       * instance.
393       *
394       * @return Collection<Flow>
395       */
396      public Collection<Flow> getHeadFlows()
397        {
398        Set<Flow> flows = new HashSet<Flow>();
399    
400        for( Flow flow : flowGraph.vertexSet() )
401          {
402          if( flowGraph.inDegreeOf( flow ) == 0 )
403            flows.add( flow );
404          }
405    
406        return flows;
407        }
408    
409      /**
410       * Method getTailFlows returns all Flow instances that are at the "tail" of the flow graph.
411       * <p/>
412       * That is, they are the last to execute and have no Tap sink dependencies with Flow instances in the this Cascade
413       * instance.
414       *
415       * @return Collection<Flow>
416       */
417      public Collection<Flow> getTailFlows()
418        {
419        Set<Flow> flows = new HashSet<Flow>();
420    
421        for( Flow flow : flowGraph.vertexSet() )
422          {
423          if( flowGraph.outDegreeOf( flow ) == 0 )
424            flows.add( flow );
425          }
426    
427        return flows;
428        }
429    
430      /**
431       * Method getIntermediateFlows returns all Flow instances that are neither at the "tail" or "tail" of the flow graph.
432       *
433       * @return Collection<Flow>
434       */
435      public Collection<Flow> getIntermediateFlows()
436        {
437        Set<Flow> flows = new HashSet<Flow>( flowGraph.vertexSet() );
438    
439        flows.removeAll( getHeadFlows() );
440        flows.removeAll( getTailFlows() );
441    
442        return flows;
443        }
444    
445      protected TapGraph getTapGraph()
446        {
447        if( tapGraph == null )
448          tapGraph = new TapGraph( flowGraph.vertexSet() );
449    
450        return tapGraph;
451        }
452    
453      /**
454       * Method getSourceTaps returns all source Tap instances in this Cascade instance.
455       * <p/>
456       * That is, none of returned Tap instances are the sinks of other Flow instances in this Cascade.
457       * <p/>
458       * All {@link cascading.tap.CompositeTap} instances are unwound if addressed directly by a managed Flow instance.
459       *
460       * @return Collection<Tap>
461       */
462      public Collection<Tap> getSourceTaps()
463        {
464        TapGraph tapGraph = getTapGraph();
465        Set<Tap> taps = new HashSet<Tap>();
466    
467        for( Tap tap : tapGraph.vertexSet() )
468          {
469          if( tapGraph.inDegreeOf( tap ) == 0 )
470            taps.add( tap );
471          }
472    
473        return taps;
474        }
475    
476      /**
477       * Method getSinkTaps returns all sink Tap instances in this Cascade instance.
478       * <p/>
479       * That is, none of returned Tap instances are the sources of other Flow instances in this Cascade.
480       * <p/>
481       * All {@link cascading.tap.CompositeTap} instances are unwound if addressed directly by a managed Flow instance.
482       * <p/>
483       * This method will return checkpoint Taps managed by Flow instances if not used as a source by other Flow instances.
484       *
485       * @return Collection<Tap>
486       */
487      public Collection<Tap> getSinkTaps()
488        {
489        TapGraph tapGraph = getTapGraph();
490        Set<Tap> taps = new HashSet<Tap>();
491    
492        for( Tap tap : tapGraph.vertexSet() )
493          {
494          if( tapGraph.outDegreeOf( tap ) == 0 )
495            taps.add( tap );
496          }
497    
498        return taps;
499        }
500    
501      /**
502       * Method getCheckpointTaps returns all checkpoint Tap instances from all the Flow instances in this Cascade instance.
503       *
504       * @return Collection<Tap>
505       */
506      public Collection<Tap> getCheckpointsTaps()
507        {
508        Set<Tap> taps = new HashSet<Tap>();
509    
510        for( Flow flow : getFlows() )
511          taps.addAll( flow.getCheckpointsCollection() );
512    
513        return taps;
514        }
515    
516      /**
517       * Method getIntermediateTaps returns all Tap instances that are neither at the source or sink of the flow graph.
518       * <p/>
519       * This method does consider checkpoint Taps managed by Flow instances in this Cascade instance.
520       *
521       * @return Collection<Flow>
522       */
523      public Collection<Tap> getIntermediateTaps()
524        {
525        TapGraph tapGraph = getTapGraph();
526        Set<Tap> taps = new HashSet<Tap>( tapGraph.vertexSet() );
527    
528        taps.removeAll( getSourceTaps() );
529        taps.removeAll( getSinkTaps() );
530    
531        return taps;
532        }
533    
534      /**
535       * Method getAllTaps returns all source, sink, and checkpoint Tap instances associated with the managed
536       * Flow instances in this Cascade instance.
537       *
538       * @return Collection<Tap>
539       */
540      public Collection<Tap> getAllTaps()
541        {
542        return new HashSet<Tap>( getTapGraph().vertexSet() );
543        }
544    
545      /**
546       * Method getSuccessorFlows returns a Collection of all the Flow instances that will be
547       * executed after the given Flow instance.
548       *
549       * @param flow of type Flow
550       * @return Collection<Flow>
551       */
552      public Collection<Flow> getSuccessorFlows( Flow flow )
553        {
554        return Graphs.successorListOf( flowGraph, flow );
555        }
556    
557      /**
558       * Method getPredecessorFlows returns a Collection of all the Flow instances that will be
559       * executed before the given Flow instance.
560       *
561       * @param flow of type Flow
562       * @return Collection<Flow>
563       */
564      public Collection<Flow> getPredecessorFlows( Flow flow )
565        {
566        return Graphs.predecessorListOf( flowGraph, flow );
567        }
568    
569      /**
570       * Method findFlowsSourcingFrom returns all Flow instances that reads from a source with the given identifier.
571       *
572       * @param identifier of type String
573       * @return Collection<Flow>
574       */
575      public Collection<Flow> findFlowsSourcingFrom( String identifier )
576        {
577        try
578          {
579          return unwrapFlows( identifierGraph.outgoingEdgesOf( identifier ) );
580          }
581        catch( Exception exception )
582          {
583          return Collections.emptySet();
584          }
585        }
586    
587      /**
588       * Method findFlowsSinkingTo returns all Flow instances that writes to a sink with the given identifier.
589       *
590       * @param identifier of type String
591       * @return Collection<Flow>
592       */
593      public Collection<Flow> findFlowsSinkingTo( String identifier )
594        {
595        try
596          {
597          return unwrapFlows( identifierGraph.incomingEdgesOf( identifier ) );
598          }
599        catch( Exception exception )
600          {
601          return Collections.emptySet();
602          }
603        }
604    
605      private Collection<Flow> unwrapFlows( Set<BaseFlow.FlowHolder> flowHolders )
606        {
607        Set<Flow> flows = new HashSet<Flow>();
608    
609        for( BaseFlow.FlowHolder flowHolder : flowHolders )
610          flows.add( flowHolder.flow );
611    
612        return flows;
613        }
614    
615      /**
616       * Method getFlowSkipStrategy returns the current {@link cascading.flow.FlowSkipStrategy} used by this Flow.
617       *
618       * @return FlowSkipStrategy
619       */
620      public FlowSkipStrategy getFlowSkipStrategy()
621        {
622        return flowSkipStrategy;
623        }
624    
625      /**
626       * Method setFlowSkipStrategy sets a new {@link cascading.flow.FlowSkipStrategy}, the current strategy, if any, is returned.
627       * If a strategy is given, it will be used as the strategy for all {@link cascading.flow.BaseFlow} instances managed by this Cascade instance.
628       * To revert back to consulting the strategies associated with each Flow instance, re-set this value to {@code null}, its
629       * default value.
630       * <p/>
631       * FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is {@link cascading.flow.FlowSkipIfSinkNotStale}
632       * and is inherited from the Flow instance in question. An alternative strategy would be {@link cascading.flow.FlowSkipIfSinkExists}.
633       * <p/>
634       * A FlowSkipStrategy will not be consulted when executing a Flow directly through {@link #start()}
635       *
636       * @param flowSkipStrategy of type FlowSkipStrategy
637       * @return FlowSkipStrategy
638       */
639      public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy )
640        {
641        try
642          {
643          return this.flowSkipStrategy;
644          }
645        finally
646          {
647          this.flowSkipStrategy = flowSkipStrategy;
648          }
649        }
650    
651      @Override
652      public void prepare()
653        {
654        }
655    
656      /**
657       * Method start begins the current Cascade process. It returns immediately. See method {@link #complete()} to block
658       * until the Cascade completes.
659       */
660      public void start()
661        {
662        if( thread != null )
663          return;
664    
665        thread = new Thread( new Runnable()
666        {
667        @Override
668        public void run()
669          {
670          Cascade.this.run();
671          }
672        }, ( "cascade " + Util.toNull( getName() ) ).trim() );
673    
674        thread.start();
675        }
676    
677      /**
678       * Method complete begins the current Cascade process if method {@link #start()} was not previously called. This method
679       * blocks until the process completes.
680       *
681       * @throws RuntimeException wrapping any exception thrown internally.
682       */
683      public void complete()
684        {
685        start();
686    
687        try
688          {
689          try
690            {
691            thread.join();
692            }
693          catch( InterruptedException exception )
694            {
695            throw new FlowException( "thread interrupted", exception );
696            }
697    
698          if( throwable instanceof CascadingException )
699            throw (CascadingException) throwable;
700    
701          if( throwable != null )
702            throw new CascadeException( "unhandled exception", throwable );
703          }
704        finally
705          {
706          thread = null;
707          throwable = null;
708          shutdownHook = null;
709          cascadeStats.cleanup();
710          }
711        }
712    
713      public synchronized void stop()
714        {
715        if( stop )
716          return;
717    
718        stop = true;
719    
720        fireOnStopping();
721    
722        if( !cascadeStats.isFinished() )
723          cascadeStats.markStopped();
724    
725        internalStopAllFlows();
726        handleExecutorShutdown();
727    
728        cascadeStats.cleanup();
729        }
730    
731      @Override
732      public void cleanup()
733        {
734        }
735    
736      /** Method run implements the Runnable run method. */
737      private void run()
738        {
739        Version.printBanner();
740    
741        if( LOG.isInfoEnabled() )
742          logInfo( "starting" );
743    
744        registerShutdownHook();
745    
746        try
747          {
748          if( stop )
749            return;
750    
751          // mark started, not submitted
752          cascadeStats.markStartedThenRunning();
753    
754          fireOnStarting();
755    
756          initializeNewJobsMap();
757    
758          int numThreads = getMaxConcurrentFlows( properties, maxConcurrentFlows );
759    
760          if( numThreads == 0 )
761            numThreads = jobsMap.size();
762    
763          int numLocalFlows = numLocalFlows();
764    
765          boolean runFlowsLocal = numLocalFlows > 1;
766    
767          if( runFlowsLocal )
768            numThreads = 1;
769    
770          if( LOG.isInfoEnabled() )
771            {
772            logInfo( " parallel execution is enabled: " + !runFlowsLocal );
773            logInfo( " starting flows: " + jobsMap.size() );
774            logInfo( " allocating threads: " + numThreads );
775            }
776    
777          List<Future<Throwable>> futures = spawnStrategy.start( this, numThreads, jobsMap.values() );
778    
779          for( Future<Throwable> future : futures )
780            {
781            throwable = future.get();
782    
783            if( throwable != null )
784              {
785              if( !stop )
786                {
787                cascadeStats.markFailed( throwable );
788                internalStopAllFlows();
789                fireOnThrowable();
790                }
791    
792              handleExecutorShutdown();
793              break;
794              }
795            }
796          }
797        catch( Throwable throwable )
798          {
799          this.throwable = throwable;
800          }
801        finally
802          {
803          if( !cascadeStats.isFinished() )
804            cascadeStats.markSuccessful();
805    
806          try
807            {
808            fireOnCompleted();
809            }
810          finally
811            {
812            deregisterShutdownHook();
813            }
814          }
815        }
816    
817      private void registerShutdownHook()
818        {
819        if( !isStopJobsOnExit() )
820          return;
821    
822        shutdownHook = new ShutdownUtil.Hook()
823        {
824        @Override
825        public Priority priority()
826          {
827          return Priority.WORK_PARENT;
828          }
829    
830        @Override
831        public void execute()
832          {
833          logInfo( "shutdown hook calling stop on cascade" );
834    
835          Cascade.this.stop();
836          }
837        };
838    
839        ShutdownUtil.addHook( shutdownHook );
840        }
841    
842      private void deregisterShutdownHook()
843        {
844        if( !isStopJobsOnExit() || stop )
845          return;
846    
847        ShutdownUtil.removeHook( shutdownHook );
848        }
849    
850      private boolean isStopJobsOnExit()
851        {
852        if( getFlows().isEmpty() )
853          return false; // don't bother registering hook
854    
855        return getFlows().get( 0 ).isStopJobsOnExit();
856        }
857    
858      /**
859       * If the number of flows that are local is greater than one, force the Cascade to run without parallelization.
860       *
861       * @return of type int
862       */
863      private int numLocalFlows()
864        {
865        int countLocalJobs = 0;
866    
867        for( Flow flow : getFlows() )
868          {
869          if( flow.stepsAreLocal() )
870            countLocalJobs++;
871          }
872    
873        return countLocalJobs;
874        }
875    
876      private void initializeNewJobsMap()
877        {
878        synchronized( jobsMap )
879          {
880          // keep topo order
881          TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator();
882    
883          while( topoIterator.hasNext() )
884            {
885            Flow flow = topoIterator.next();
886    
887            cascadeStats.addFlowStats( flow.getFlowStats() );
888    
889            CascadeJob job = new CascadeJob( flow );
890    
891            jobsMap.put( flow.getName(), job );
892    
893            List<CascadeJob> predecessors = new ArrayList<CascadeJob>();
894    
895            for( Flow predecessor : Graphs.predecessorListOf( flowGraph, flow ) )
896              predecessors.add( (CascadeJob) jobsMap.get( predecessor.getName() ) );
897    
898            job.init( predecessors );
899            }
900          }
901        }
902    
903      private void handleExecutorShutdown()
904        {
905        if( spawnStrategy.isCompleted( this ) )
906          return;
907    
908        logInfo( "shutting down flow executor" );
909    
910        try
911          {
912          spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS );
913          }
914        catch( InterruptedException exception )
915          {
916          // ignore
917          }
918    
919        logInfo( "shutdown complete" );
920        }
921    
922      private void internalStopAllFlows()
923        {
924        logInfo( "stopping all flows" );
925    
926        synchronized( jobsMap )
927          {
928          List<Callable<Throwable>> jobs = new ArrayList<Callable<Throwable>>( jobsMap.values() );
929    
930          Collections.reverse( jobs );
931    
932          for( Callable<Throwable> callable : jobs )
933            ( (CascadeJob) callable ).stop();
934          }
935    
936        logInfo( "stopped all flows" );
937        }
938    
939      /**
940       * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging.
941       *
942       * @param filename of type String
943       */
944      public void writeDOT( String filename )
945        {
946        printElementGraph( filename, identifierGraph );
947        }
948    
949      protected void printElementGraph( String filename, SimpleDirectedGraph<String, BaseFlow.FlowHolder> graph )
950        {
951        try
952          {
953          Writer writer = new FileWriter( filename );
954    
955          Util.writeDOT( writer, graph, new IntegerNameProvider<String>(), new VertexNameProvider<String>()
956            {
957            public String getVertexName( String object )
958              {
959              return object.toString().replaceAll( "\"", "\'" );
960              }
961            }, new EdgeNameProvider<BaseFlow.FlowHolder>()
962            {
963            public String getEdgeName( BaseFlow.FlowHolder object )
964              {
965              return object.flow.getName().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz
966              }
967            }
968          );
969    
970          writer.close();
971          }
972        catch( IOException exception )
973          {
974          LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
975          }
976        }
977    
978      @Override
979      public String toString()
980        {
981        return getName();
982        }
983    
984      private void logDebug( String message )
985        {
986        LOG.debug( "[" + Util.truncate( getName(), 25 ) + "] " + message );
987        }
988    
989      private void logInfo( String message )
990        {
991        LOG.info( "[" + Util.truncate( getName(), 25 ) + "] " + message );
992        }
993    
994      private void logWarn( String message )
995        {
996        logWarn( message, null );
997        }
998    
999      private void logWarn( String message, Throwable throwable )
1000        {
1001        LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable );
1002        }
1003    
1004      /** Class CascadeJob manages Flow execution in the current Cascade instance. */
1005      protected class CascadeJob implements Callable<Throwable>
1006        {
1007        /** Field flow */
1008        final Flow flow;
1009        /** Field predecessors */
1010        private List<CascadeJob> predecessors;
1011        /** Field latch */
1012        private final CountDownLatch latch = new CountDownLatch( 1 );
1013        /** Field stop */
1014        private boolean stop = false;
1015        /** Field failed */
1016        private boolean failed = false;
1017    
1018        public CascadeJob( Flow flow )
1019          {
1020          this.flow = flow;
1021          }
1022    
1023        public String getName()
1024          {
1025          return flow.getName();
1026          }
1027    
1028        public Throwable call()
1029          {
1030          try
1031            {
1032            for( CascadeJob predecessor : predecessors )
1033              {
1034              if( !predecessor.isSuccessful() )
1035                return null;
1036              }
1037    
1038            if( stop )
1039              return null;
1040    
1041            try
1042              {
1043              if( LOG.isInfoEnabled() )
1044                logInfo( "starting flow: " + flow.getName() );
1045    
1046              if( flowSkipStrategy == null ? flow.isSkipFlow() : flowSkipStrategy.skipFlow( flow ) )
1047                {
1048                if( LOG.isInfoEnabled() )
1049                  logInfo( "skipping flow: " + flow.getName() );
1050    
1051                flow.getFlowStats().markSkipped();
1052    
1053                return null;
1054                }
1055    
1056              flow.prepare(); // do not delete append/update mode taps
1057              flow.complete();
1058    
1059              if( LOG.isInfoEnabled() )
1060                logInfo( "completed flow: " + flow.getName() );
1061              }
1062            catch( Throwable exception )
1063              {
1064              logWarn( "flow failed: " + flow.getName(), exception );
1065              failed = true;
1066              return new CascadeException( "flow failed: " + flow.getName(), exception );
1067              }
1068            finally
1069              {
1070              flow.cleanup();
1071              }
1072            }
1073          catch( Throwable throwable )
1074            {
1075            failed = true;
1076            return throwable;
1077            }
1078          finally
1079            {
1080            latch.countDown();
1081            }
1082    
1083          return null;
1084          }
1085    
1086        public void init( List<CascadeJob> predecessors )
1087          {
1088          this.predecessors = predecessors;
1089          }
1090    
1091        public void stop()
1092          {
1093          if( LOG.isInfoEnabled() )
1094            logInfo( "stopping flow: " + flow.getName() );
1095    
1096          stop = true;
1097    
1098          if( flow != null )
1099            flow.stop();
1100          }
1101    
1102        public boolean isSuccessful()
1103          {
1104          try
1105            {
1106            latch.await();
1107    
1108            return flow != null && !failed;
1109            }
1110          catch( InterruptedException exception )
1111            {
1112            logWarn( "latch interrupted", exception );
1113            }
1114    
1115          return false;
1116          }
1117        }
1118    
1119      @Override
1120      public UnitOfWorkSpawnStrategy getSpawnStrategy()
1121        {
1122        return spawnStrategy;
1123        }
1124    
1125      @Override
1126      public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy )
1127        {
1128        this.spawnStrategy = spawnStrategy;
1129        }
1130    
1131      /**
1132       * Class SafeCascadeListener safely calls a wrapped CascadeListener.
1133       * <p/>
1134       * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
1135       * can be caught by the calling Thread. Since Cascade is asynchronous, much of the work is done in the run() method
1136       * which in turn is run in a new Thread.
1137       */
1138      private class SafeCascadeListener implements CascadeListener
1139        {
1140        /** Field flowListener */
1141        final CascadeListener cascadeListener;
1142        /** Field throwable */
1143        Throwable throwable;
1144    
1145        private SafeCascadeListener( CascadeListener cascadeListener )
1146          {
1147          this.cascadeListener = cascadeListener;
1148          }
1149    
1150        public void onStarting( Cascade cascade )
1151          {
1152          try
1153            {
1154            cascadeListener.onStarting( cascade );
1155            }
1156          catch( Throwable throwable )
1157            {
1158            handleThrowable( throwable );
1159            }
1160          }
1161    
1162        public void onStopping( Cascade cascade )
1163          {
1164          try
1165            {
1166            cascadeListener.onStopping( cascade );
1167            }
1168          catch( Throwable throwable )
1169            {
1170            handleThrowable( throwable );
1171            }
1172          }
1173    
1174        public void onCompleted( Cascade cascade )
1175          {
1176          try
1177            {
1178            cascadeListener.onCompleted( cascade );
1179            }
1180          catch( Throwable throwable )
1181            {
1182            handleThrowable( throwable );
1183            }
1184          }
1185    
1186        public boolean onThrowable( Cascade cascade, Throwable flowThrowable )
1187          {
1188          try
1189            {
1190            return cascadeListener.onThrowable( cascade, flowThrowable );
1191            }
1192          catch( Throwable throwable )
1193            {
1194            handleThrowable( throwable );
1195            }
1196    
1197          return false;
1198          }
1199    
1200        private void handleThrowable( Throwable throwable )
1201          {
1202          this.throwable = throwable;
1203    
1204          logWarn( String.format( "cascade listener %s threw throwable", cascadeListener ), throwable );
1205    
1206          // stop this flow
1207          stop();
1208          }
1209    
1210        public boolean equals( Object object )
1211          {
1212          if( object instanceof SafeCascadeListener )
1213            return cascadeListener.equals( ( (SafeCascadeListener) object ).cascadeListener );
1214    
1215          return cascadeListener.equals( object );
1216          }
1217    
1218        public int hashCode()
1219          {
1220          return cascadeListener.hashCode();
1221          }
1222        }
1223      }