001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.planner;
022    
023    import java.io.IOException;
024    import java.io.Serializable;
025    import java.util.ArrayList;
026    import java.util.Collection;
027    import java.util.Collections;
028    import java.util.Date;
029    import java.util.HashMap;
030    import java.util.HashSet;
031    import java.util.Iterator;
032    import java.util.LinkedHashMap;
033    import java.util.LinkedList;
034    import java.util.List;
035    import java.util.ListIterator;
036    import java.util.Map;
037    import java.util.Set;
038    
039    import cascading.flow.Flow;
040    import cascading.flow.FlowElement;
041    import cascading.flow.FlowException;
042    import cascading.flow.FlowProcess;
043    import cascading.flow.FlowStep;
044    import cascading.flow.FlowStepListener;
045    import cascading.management.CascadingServices;
046    import cascading.management.state.ClientState;
047    import cascading.operation.Operation;
048    import cascading.pipe.Group;
049    import cascading.pipe.HashJoin;
050    import cascading.pipe.Merge;
051    import cascading.pipe.Operator;
052    import cascading.pipe.Pipe;
053    import cascading.property.ConfigDef;
054    import cascading.stats.FlowStepStats;
055    import cascading.tap.Tap;
056    import cascading.util.Util;
057    import org.jgrapht.GraphPath;
058    import org.jgrapht.Graphs;
059    import org.jgrapht.alg.KShortestPaths;
060    import org.jgrapht.graph.SimpleDirectedGraph;
061    import org.jgrapht.traverse.TopologicalOrderIterator;
062    import org.slf4j.Logger;
063    import org.slf4j.LoggerFactory;
064    
065    /**
066     * Class FlowStep is an internal representation of a given Job to be executed on a remote cluster. During
067     * planning, pipe assemblies are broken down into "steps" and encapsulated in this class.
068     * <p/>
069     * FlowSteps are submitted in order of dependency. If two or more steps do not share the same dependencies and all
070     * can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines the order in which
071     * all steps will be submitted for execution. The default submit priority is 5.
072     * <p/>
073     * This class is for internal use, there are no stable public methods.
074     */
075    public abstract class BaseFlowStep<Config> implements Serializable, FlowStep<Config>
076      {
077      /** Field LOG */
078      private static final Logger LOG = LoggerFactory.getLogger( FlowStep.class );
079    
080      /** Field flow */
081      private transient Flow<Config> flow;
082      /** Field flowName */
083      private String flowName;
084      /** Field flowID */
085      private String flowID;
086    
087      private transient Config conf;
088    
089      /** Field submitPriority */
090      private int submitPriority = 5;
091    
092      /** Field name */
093      String name;
094      /** Field id */
095      private String id;
096      private final int stepNum;
097    
098      /** Field step listeners */
099      private List<SafeFlowStepListener> listeners;
100    
101      /** Field graph */
102      private final SimpleDirectedGraph<FlowElement, Scope> graph = new SimpleDirectedGraph<FlowElement, Scope>( Scope.class );
103    
104      /** Field sources */
105      protected final Map<Tap, Set<String>> sources = new HashMap<Tap, Set<String>>(); // all sources
106      /** Field sink */
107      protected final Map<Tap, Set<String>> sinks = new HashMap<Tap, Set<String>>(); // all sinks
108    
109      /** Field tempSink */
110      protected Tap tempSink; // used if we need to bypass the filesystem
111    
112      /** Field groups */
113      private final List<Group> groups = new ArrayList<Group>();
114    
115      // sources streamed into join - not necessarily all sources
116      protected final Map<HashJoin, Tap> streamedSourceByJoin = new LinkedHashMap<HashJoin, Tap>();
117      // sources accumulated by join
118      protected final Map<HashJoin, Set<Tap>> accumulatedSourcesByJoin = new LinkedHashMap<HashJoin, Set<Tap>>();
119    
120      private transient FlowStepJob<Config> flowStepJob;
121    
122      protected BaseFlowStep( String name, int stepNum )
123        {
124        setName( name );
125        this.stepNum = stepNum;
126        }
127    
128      @Override
129      public String getID()
130        {
131        if( id == null )
132          id = Util.createUniqueID();
133    
134        return id;
135        }
136    
137      @Override
138      public int getStepNum()
139        {
140        return stepNum;
141        }
142    
143      @Override
144      public String getName()
145        {
146        return name;
147        }
148    
149      void setName( String name )
150        {
151        if( name == null || name.isEmpty() )
152          throw new IllegalArgumentException( "step name may not be null or empty" );
153    
154        this.name = name;
155        }
156    
157      public void setFlow( Flow<Config> flow )
158        {
159        this.flow = flow;
160        this.flowID = flow.getID();
161        this.flowName = flow.getName();
162        }
163    
164      @Override
165      public Flow<Config> getFlow()
166        {
167        return flow;
168        }
169    
170      @Override
171      public String getFlowID()
172        {
173        return flowID;
174        }
175    
176      @Override
177      public String getFlowName()
178        {
179        return flowName;
180        }
181    
182      protected void setFlowName( String flowName )
183        {
184        this.flowName = flowName;
185        }
186    
187      @Override
188      public Config getConfig()
189        {
190        return conf;
191        }
192    
193      protected void setConf( Config conf )
194        {
195        this.conf = conf;
196        }
197    
198      @Override
199      public String getStepDisplayName()
200        {
201        return getStepDisplayName( Util.ID_LENGTH );
202        }
203    
204      protected String getStepDisplayName( int idLength )
205        {
206        if( idLength > Util.ID_LENGTH )
207          idLength = Util.ID_LENGTH;
208    
209        String flowID = getFlowID().substring( 0, idLength );
210        String stepID = getID().substring( 0, idLength );
211    
212        return String.format( "[%s/%s] %s/%s", flowID, stepID, getFlowName(), getName() );
213        }
214    
215      @Override
216      public int getSubmitPriority()
217        {
218        return submitPriority;
219        }
220    
221      @Override
222      public void setSubmitPriority( int submitPriority )
223        {
224        if( submitPriority < 1 || submitPriority > 10 )
225          throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority );
226    
227        this.submitPriority = submitPriority;
228        }
229    
230      @Override
231      public FlowStepStats getFlowStepStats()
232        {
233        return flowStepJob.getStepStats();
234        }
235    
236      public SimpleDirectedGraph<FlowElement, Scope> getGraph()
237        {
238        return graph;
239        }
240    
241      @Override
242      public Group getGroup()
243        {
244        if( groups.isEmpty() )
245          return null;
246    
247        if( groups.size() > 1 )
248          throw new IllegalStateException( "more than one group" );
249    
250        return groups.get( 0 );
251        }
252    
253      @Override
254      public List<Group> getGroups()
255        {
256        return groups;
257        }
258    
259      public void addGroup( Group group )
260        {
261        if( !groups.contains( group ) )
262          groups.add( group );
263        }
264    
265      @Override
266      public Map<HashJoin, Tap> getStreamedSourceByJoin()
267        {
268        return streamedSourceByJoin;
269        }
270    
271      public void addStreamedSourceFor( HashJoin join, Tap streamedSource )
272        {
273        streamedSourceByJoin.put( join, streamedSource );
274        }
275    
276      @Override
277      public Set<Tap> getAllAccumulatedSources()
278        {
279        HashSet<Tap> set = new HashSet<Tap>();
280    
281        for( Set<Tap> taps : accumulatedSourcesByJoin.values() )
282          set.addAll( taps );
283    
284        return set;
285        }
286    
287      public void addAccumulatedSourceFor( HashJoin join, Tap accumulatedSource )
288        {
289        if( !accumulatedSourcesByJoin.containsKey( join ) )
290          accumulatedSourcesByJoin.put( join, new HashSet<Tap>() );
291    
292        accumulatedSourcesByJoin.get( join ).add( accumulatedSource );
293        }
294    
295      public void addSource( String name, Tap source )
296        {
297        if( !sources.containsKey( source ) )
298          sources.put( source, new HashSet<String>() );
299    
300        sources.get( source ).add( name );
301        }
302    
303      public void addSink( String name, Tap sink )
304        {
305        if( !sinks.containsKey( sink ) )
306          sinks.put( sink, new HashSet<String>() );
307    
308        sinks.get( sink ).add( name );
309        }
310    
311      @Override
312      public Set<Tap> getSources()
313        {
314        return Collections.unmodifiableSet( new HashSet<Tap>( sources.keySet() ) );
315        }
316    
317      @Override
318      public Set<Tap> getSinks()
319        {
320        return Collections.unmodifiableSet( new HashSet<Tap>( sinks.keySet() ) );
321        }
322    
323      @Override
324      public Tap getSink()
325        {
326        if( sinks.size() != 1 )
327          throw new IllegalStateException( "more than one sink" );
328    
329        return sinks.keySet().iterator().next();
330        }
331    
332      @Override
333      public Set<String> getSourceName( Tap source )
334        {
335        return Collections.unmodifiableSet( sources.get( source ) );
336        }
337    
338      @Override
339      public Set<String> getSinkName( Tap sink )
340        {
341        return Collections.unmodifiableSet( sinks.get( sink ) );
342        }
343    
344      @Override
345      public Tap getSourceWith( String identifier )
346        {
347        for( Tap tap : sources.keySet() )
348          {
349          if( tap.getIdentifier().equalsIgnoreCase( identifier ) )
350            return tap;
351          }
352    
353        return null;
354        }
355    
356      @Override
357      public Tap getSinkWith( String identifier )
358        {
359        for( Tap tap : sinks.keySet() )
360          {
361          if( tap.getIdentifier().equalsIgnoreCase( identifier ) )
362            return tap;
363          }
364    
365        return null;
366        }
367    
368      boolean allSourcesExist() throws IOException
369        {
370        for( Tap tap : sources.keySet() )
371          {
372          if( !tap.resourceExists( getConfig() ) )
373            return false;
374          }
375    
376        return true;
377        }
378    
379      boolean areSourcesNewer( long sinkModified ) throws IOException
380        {
381        Config config = getConfig();
382        Iterator<Tap> values = sources.keySet().iterator();
383    
384        long sourceModified = 0;
385    
386        try
387          {
388          sourceModified = Util.getSourceModified( config, values, sinkModified );
389    
390          if( sinkModified < sourceModified )
391            return true;
392    
393          return false;
394          }
395        finally
396          {
397          if( LOG.isInfoEnabled() )
398            logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all
399          }
400        }
401    
402      long getSinkModified() throws IOException
403        {
404        long sinkModified = Util.getSinkModified( getConfig(), sinks.keySet() );
405    
406        if( LOG.isInfoEnabled() )
407          {
408          if( sinkModified == -1L )
409            logInfo( "at least one sink is marked for delete" );
410          if( sinkModified == 0L )
411            logInfo( "at least one sink does not exist" );
412          else
413            logInfo( "sink oldest modified date: " + new Date( sinkModified ) );
414          }
415    
416        return sinkModified;
417        }
418    
419      protected Throwable commitSinks()
420        {
421        Throwable throwable = null;
422    
423        for( Tap tap : sinks.keySet() )
424          {
425          if( throwable != null )
426            rollbackResource( tap );
427          else
428            throwable = commitResource( tap );
429          }
430    
431        return throwable;
432        }
433    
434      private Throwable commitResource( Tap tap )
435        {
436        Throwable throwable = null;
437    
438        try
439          {
440          if( !tap.commitResource( getConfig() ) )
441            {
442            String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
443    
444            logError( message, null );
445    
446            throwable = new FlowException( message );
447            }
448          }
449        catch( Throwable exception )
450          {
451          String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
452    
453          logError( message, exception );
454    
455          throwable = new FlowException( message, exception );
456          }
457    
458        return throwable;
459        }
460    
461      private Throwable rollbackResource( Tap tap )
462        {
463        Throwable throwable = null;
464    
465        try
466          {
467          if( !tap.rollbackResource( getConfig() ) )
468            {
469            String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() );
470    
471            logError( message, null );
472    
473            throwable = new FlowException( message );
474            }
475          }
476        catch( Throwable exception )
477          {
478          String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() );
479    
480          logError( message, exception );
481    
482          throwable = new FlowException( message, exception );
483          }
484    
485        return throwable;
486        }
487    
488      protected Throwable rollbackSinks()
489        {
490        Throwable throwable = null;
491    
492        for( Tap tap : sinks.keySet() )
493          {
494          if( throwable != null )
495            rollbackResource( tap );
496          else
497            throwable = rollbackResource( tap );
498          }
499    
500        return throwable;
501        }
502    
503      protected abstract Config getInitializedConfig( FlowProcess<Config> flowProcess, Config parentConfig );
504    
505      /**
506       * Method getPreviousScopes returns the previous Scope instances. If the flowElement is a Group (specifically a CoGroup),
507       * there will be more than one instance.
508       *
509       * @param flowElement of type FlowElement
510       * @return Set<Scope>
511       */
512      public Set<Scope> getPreviousScopes( FlowElement flowElement )
513        {
514        return getGraph().incomingEdgesOf( flowElement );
515        }
516    
517      /**
518       * Method getNextScope returns the next Scope instance in the graph. There will always only be one next.
519       *
520       * @param flowElement of type FlowElement
521       * @return Scope
522       */
523      public Scope getNextScope( FlowElement flowElement )
524        {
525        Set<Scope> set = getGraph().outgoingEdgesOf( flowElement );
526    
527        if( set.size() != 1 )
528          throw new IllegalStateException( "should only be one scope after current flow element: " + flowElement + " found: " + set.size() );
529    
530        return set.iterator().next();
531        }
532    
533      public Scope getScopeFor( FlowElement sourceElement, FlowElement targetElement )
534        {
535        return getGraph().getEdge( sourceElement, targetElement );
536        }
537    
538      public Set<Scope> getNextScopes( FlowElement flowElement )
539        {
540        return getGraph().outgoingEdgesOf( flowElement );
541        }
542    
543      public FlowElement getNextFlowElement( Scope scope )
544        {
545        return getGraph().getEdgeTarget( scope );
546        }
547    
548      public TopologicalOrderIterator<FlowElement, Scope> getTopologicalOrderIterator()
549        {
550        return new TopologicalOrderIterator<FlowElement, Scope>( graph );
551        }
552    
553      public List<FlowElement> getSuccessors( FlowElement element )
554        {
555        return Graphs.successorListOf( graph, element );
556        }
557    
558      public Set<Tap> getJoinTributariesBetween( FlowElement from, FlowElement to )
559        {
560        Set<HashJoin> joins = new HashSet<HashJoin>();
561        Set<Merge> merges = new HashSet<Merge>();
562    
563        List<GraphPath<FlowElement, Scope>> paths = getPathsBetween( from, to );
564    
565        for( GraphPath<FlowElement, Scope> path : paths )
566          {
567          for( FlowElement flowElement : Graphs.getPathVertexList( path ) )
568            {
569            if( flowElement instanceof HashJoin )
570              joins.add( (HashJoin) flowElement );
571    
572            if( flowElement instanceof Merge )
573              merges.add( (Merge) flowElement );
574            }
575          }
576    
577        Set<Tap> tributaries = new HashSet<Tap>();
578    
579        for( HashJoin join : joins )
580          {
581          for( Tap source : sources.keySet() )
582            {
583            List<GraphPath<FlowElement, Scope>> joinPaths = new LinkedList( getPathsBetween( source, join ) );
584    
585            ListIterator<GraphPath<FlowElement, Scope>> iterator = joinPaths.listIterator();
586    
587            while( iterator.hasNext() )
588              {
589              GraphPath<FlowElement, Scope> joinPath = iterator.next();
590    
591              if( !Collections.disjoint( Graphs.getPathVertexList( joinPath ), merges ) )
592                iterator.remove();
593              }
594    
595            if( !joinPaths.isEmpty() )
596              tributaries.add( source );
597            }
598          }
599    
600        return tributaries;
601        }
602    
603      private List<GraphPath<FlowElement, Scope>> getPathsBetween( FlowElement from, FlowElement to )
604        {
605        KShortestPaths<FlowElement, Scope> paths = new KShortestPaths<FlowElement, Scope>( graph, from, Integer.MAX_VALUE );
606        List<GraphPath<FlowElement, Scope>> results = paths.getPaths( to );
607    
608        if( results == null )
609          return Collections.EMPTY_LIST;
610    
611        return results;
612        }
613    
614      public Collection<Operation> getAllOperations()
615        {
616        Set<FlowElement> vertices = getGraph().vertexSet();
617        List<Operation> operations = new ArrayList<Operation>(); // operations impl equals, so two instance may be the same
618    
619        for( FlowElement vertex : vertices )
620          {
621          if( vertex instanceof Operator )
622            operations.add( ( (Operator) vertex ).getOperation() );
623          }
624    
625        return operations;
626        }
627    
628      @Override
629      public boolean containsPipeNamed( String pipeName )
630        {
631        Set<FlowElement> vertices = getGraph().vertexSet();
632    
633        for( FlowElement vertex : vertices )
634          {
635          if( vertex instanceof Pipe && ( (Pipe) vertex ).getName().equals( pipeName ) )
636            return true;
637          }
638    
639        return false;
640        }
641    
642      public void clean()
643        {
644        // use step config by default
645        clean( getConfig() );
646        }
647    
648      public abstract void clean( Config config );
649    
650      List<SafeFlowStepListener> getListeners()
651        {
652        if( listeners == null )
653          listeners = new LinkedList<SafeFlowStepListener>();
654    
655        return listeners;
656        }
657    
658      @Override
659      public boolean hasListeners()
660        {
661        return listeners != null && !listeners.isEmpty();
662        }
663    
664      @Override
665      public void addListener( FlowStepListener flowStepListener )
666        {
667        getListeners().add( new SafeFlowStepListener( flowStepListener ) );
668        }
669    
670      @Override
671      public boolean removeListener( FlowStepListener flowStepListener )
672        {
673        return getListeners().remove( new SafeFlowStepListener( flowStepListener ) );
674        }
675    
676      protected void fireOnCompleted()
677        {
678    
679        if( hasListeners() )
680          {
681          if( LOG.isDebugEnabled() )
682            logDebug( "firing onCompleted event: " + getListeners().size() );
683    
684          for( Object flowStepListener : getListeners() )
685            ( (FlowStepListener) flowStepListener ).onStepCompleted( this );
686          }
687        }
688    
689      protected void fireOnThrowable( Throwable throwable )
690        {
691        if( hasListeners() )
692          {
693          if( LOG.isDebugEnabled() )
694            logDebug( "firing onThrowable event: " + getListeners().size() );
695    
696    
697          for( Object flowStepListener : getListeners() )
698            ( (FlowStepListener) flowStepListener ).onStepThrowable( this, throwable );
699          }
700        }
701    
702      protected void fireOnStopping()
703        {
704        if( hasListeners() )
705          {
706          if( LOG.isDebugEnabled() )
707            logDebug( "firing onStopping event: " + getListeners() );
708    
709          for( Object flowStepListener : getListeners() )
710            ( (FlowStepListener) flowStepListener ).onStepStopping( this );
711          }
712        }
713    
714      protected void fireOnStarting()
715        {
716        if( hasListeners() )
717          {
718          if( LOG.isDebugEnabled() )
719            logDebug( "firing onStarting event: " + getListeners().size() );
720    
721          for( Object flowStepListener : getListeners() )
722            ( (FlowStepListener) flowStepListener ).onStepStarting( this );
723          }
724        }
725    
726      protected void fireOnRunning()
727        {
728        if( hasListeners() )
729          {
730          if( LOG.isDebugEnabled() )
731            logDebug( "firing onRunning event: " + getListeners().size() );
732    
733          for( Object flowStepListener : getListeners() )
734            ( (FlowStepListener) flowStepListener ).onStepRunning( this );
735          }
736        }
737    
738      @Override
739      public boolean equals( Object object )
740        {
741        if( this == object )
742          return true;
743        if( object == null || getClass() != object.getClass() )
744          return false;
745    
746        BaseFlowStep flowStep = (BaseFlowStep) object;
747    
748        if( name != null ? !name.equals( flowStep.name ) : flowStep.name != null )
749          return false;
750    
751        return true;
752        }
753    
754      protected ClientState createClientState( FlowProcess flowProcess )
755        {
756        CascadingServices services = flowProcess.getCurrentSession().getCascadingServices();
757        return services.createClientState( getID() );
758        }
759    
760      public FlowStepJob<Config> getFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig )
761        {
762        if( flowStepJob != null )
763          return flowStepJob;
764    
765        if( flowProcess == null )
766          return null;
767    
768        flowStepJob = createFlowStepJob( flowProcess, parentConfig );
769    
770        return flowStepJob;
771        }
772    
773      protected abstract FlowStepJob createFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig );
774    
775      protected void initConfFromProcessConfigDef( ConfigDef.Setter setter )
776        {
777        // applies each mode in order, topologically
778        for( ConfigDef.Mode mode : ConfigDef.Mode.values() )
779          {
780          TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalOrderIterator();
781    
782          while( iterator.hasNext() )
783            {
784            FlowElement element = iterator.next();
785    
786            while( element != null )
787              {
788              if( element.hasStepConfigDef() )
789                element.getStepConfigDef().apply( mode, setter );
790    
791              if( element instanceof Pipe )
792                element = ( (Pipe) element ).getParent();
793              else
794                element = null;
795              }
796            }
797          }
798        }
799    
800      @Override
801      public int hashCode()
802        {
803        return name != null ? name.hashCode() : 0;
804        }
805    
806      @Override
807      public String toString()
808        {
809        StringBuffer buffer = new StringBuffer();
810    
811        buffer.append( getClass().getSimpleName() );
812        buffer.append( "[name: " ).append( getName() ).append( "]" );
813    
814        return buffer.toString();
815        }
816    
817      public final boolean isInfoEnabled()
818        {
819        return LOG.isInfoEnabled();
820        }
821    
822      public final boolean isDebugEnabled()
823        {
824        return LOG.isDebugEnabled();
825        }
826    
827      public void logDebug( String message )
828        {
829        LOG.debug( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message );
830        }
831    
832      public void logInfo( String message )
833        {
834        LOG.info( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message );
835        }
836    
837      public void logWarn( String message )
838        {
839        LOG.warn( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message );
840        }
841    
842      public void logWarn( String message, Throwable throwable )
843        {
844        LOG.warn( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message, throwable );
845        }
846    
847      public void logError( String message, Throwable throwable )
848        {
849        LOG.error( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message, throwable );
850        }
851    
852      /**
853       * Class SafeFlowStepListener safely calls a wrapped FlowStepListener.
854       * <p/>
855       * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
856       * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method
857       * which in turn is run in a new Thread.
858       */
859      private class SafeFlowStepListener implements FlowStepListener
860        {
861        /** Field flowListener */
862        final FlowStepListener flowStepListener;
863        /** Field throwable */
864        Throwable throwable;
865    
866        private SafeFlowStepListener( FlowStepListener flowStepListener )
867          {
868          this.flowStepListener = flowStepListener;
869          }
870    
871        public void onStepStarting( FlowStep flowStep )
872          {
873          try
874            {
875            flowStepListener.onStepStarting( flowStep );
876            }
877          catch( Throwable throwable )
878            {
879            handleThrowable( throwable );
880            }
881          }
882    
883        public void onStepStopping( FlowStep flowStep )
884          {
885          try
886            {
887            flowStepListener.onStepStopping( flowStep );
888            }
889          catch( Throwable throwable )
890            {
891            handleThrowable( throwable );
892            }
893          }
894    
895        public void onStepCompleted( FlowStep flowStep )
896          {
897          try
898            {
899            flowStepListener.onStepCompleted( flowStep );
900            }
901          catch( Throwable throwable )
902            {
903            handleThrowable( throwable );
904            }
905          }
906    
907        public void onStepRunning( FlowStep flowStep )
908          {
909          try
910            {
911            flowStepListener.onStepRunning( flowStep );
912            }
913          catch( Throwable throwable )
914            {
915            handleThrowable( throwable );
916            }
917          }
918    
919        public boolean onStepThrowable( FlowStep flowStep, Throwable flowStepThrowable )
920          {
921          try
922            {
923            return flowStepListener.onStepThrowable( flowStep, flowStepThrowable );
924            }
925          catch( Throwable throwable )
926            {
927            handleThrowable( throwable );
928            }
929    
930          return false;
931          }
932    
933        private void handleThrowable( Throwable throwable )
934          {
935          this.throwable = throwable;
936    
937          logWarn( String.format( "flow step listener %s threw throwable", flowStepListener ), throwable );
938          }
939    
940        public boolean equals( Object object )
941          {
942          if( object instanceof BaseFlowStep.SafeFlowStepListener )
943            return flowStepListener.equals( ( (BaseFlowStep.SafeFlowStepListener) object ).flowStepListener );
944    
945          return flowStepListener.equals( object );
946          }
947    
948        public int hashCode()
949          {
950          return flowStepListener.hashCode();
951          }
952        }
953      }