001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.planner; 022 023 import java.io.IOException; 024 import java.io.Serializable; 025 import java.util.ArrayList; 026 import java.util.Collection; 027 import java.util.Collections; 028 import java.util.Date; 029 import java.util.HashMap; 030 import java.util.HashSet; 031 import java.util.Iterator; 032 import java.util.LinkedHashMap; 033 import java.util.LinkedList; 034 import java.util.List; 035 import java.util.ListIterator; 036 import java.util.Map; 037 import java.util.Set; 038 039 import cascading.flow.Flow; 040 import cascading.flow.FlowElement; 041 import cascading.flow.FlowException; 042 import cascading.flow.FlowProcess; 043 import cascading.flow.FlowStep; 044 import cascading.flow.FlowStepListener; 045 import cascading.management.CascadingServices; 046 import cascading.management.state.ClientState; 047 import cascading.operation.Operation; 048 import cascading.pipe.Group; 049 import cascading.pipe.HashJoin; 050 import cascading.pipe.Merge; 051 import cascading.pipe.Operator; 052 import cascading.pipe.Pipe; 053 import cascading.property.ConfigDef; 054 import cascading.stats.FlowStepStats; 055 import cascading.tap.Tap; 056 import cascading.util.Util; 057 import org.jgrapht.GraphPath; 058 import org.jgrapht.Graphs; 059 import org.jgrapht.alg.KShortestPaths; 060 import org.jgrapht.graph.SimpleDirectedGraph; 061 import org.jgrapht.traverse.TopologicalOrderIterator; 062 import org.slf4j.Logger; 063 import org.slf4j.LoggerFactory; 064 065 /** 066 * Class FlowStep is an internal representation of a given Job to be executed on a remote cluster. During 067 * planning, pipe assemblies are broken down into "steps" and encapsulated in this class. 068 * <p/> 069 * FlowSteps are submitted in order of dependency. If two or more steps do not share the same dependencies and all 070 * can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines the order in which 071 * all steps will be submitted for execution. The default submit priority is 5. 072 * <p/> 073 * This class is for internal use, there are no stable public methods. 074 */ 075 public abstract class BaseFlowStep<Config> implements Serializable, FlowStep<Config> 076 { 077 /** Field LOG */ 078 private static final Logger LOG = LoggerFactory.getLogger( FlowStep.class ); 079 080 /** Field flow */ 081 private transient Flow<Config> flow; 082 /** Field flowName */ 083 private String flowName; 084 /** Field flowID */ 085 private String flowID; 086 087 private transient Config conf; 088 089 /** Field submitPriority */ 090 private int submitPriority = 5; 091 092 /** Field name */ 093 String name; 094 /** Field id */ 095 private String id; 096 private final int stepNum; 097 098 /** Field step listeners */ 099 private List<SafeFlowStepListener> listeners; 100 101 /** Field graph */ 102 private final SimpleDirectedGraph<FlowElement, Scope> graph = new SimpleDirectedGraph<FlowElement, Scope>( Scope.class ); 103 104 /** Field sources */ 105 protected final Map<Tap, Set<String>> sources = new HashMap<Tap, Set<String>>(); // all sources 106 /** Field sink */ 107 protected final Map<Tap, Set<String>> sinks = new HashMap<Tap, Set<String>>(); // all sinks 108 109 /** Field tempSink */ 110 protected Tap tempSink; // used if we need to bypass the filesystem 111 112 /** Field groups */ 113 private final List<Group> groups = new ArrayList<Group>(); 114 115 // sources streamed into join - not necessarily all sources 116 protected final Map<HashJoin, Tap> streamedSourceByJoin = new LinkedHashMap<HashJoin, Tap>(); 117 // sources accumulated by join 118 protected final Map<HashJoin, Set<Tap>> accumulatedSourcesByJoin = new LinkedHashMap<HashJoin, Set<Tap>>(); 119 120 private transient FlowStepJob<Config> flowStepJob; 121 122 protected BaseFlowStep( String name, int stepNum ) 123 { 124 setName( name ); 125 this.stepNum = stepNum; 126 } 127 128 @Override 129 public String getID() 130 { 131 if( id == null ) 132 id = Util.createUniqueID(); 133 134 return id; 135 } 136 137 @Override 138 public int getStepNum() 139 { 140 return stepNum; 141 } 142 143 @Override 144 public String getName() 145 { 146 return name; 147 } 148 149 void setName( String name ) 150 { 151 if( name == null || name.isEmpty() ) 152 throw new IllegalArgumentException( "step name may not be null or empty" ); 153 154 this.name = name; 155 } 156 157 public void setFlow( Flow<Config> flow ) 158 { 159 this.flow = flow; 160 this.flowID = flow.getID(); 161 this.flowName = flow.getName(); 162 } 163 164 @Override 165 public Flow<Config> getFlow() 166 { 167 return flow; 168 } 169 170 @Override 171 public String getFlowID() 172 { 173 return flowID; 174 } 175 176 @Override 177 public String getFlowName() 178 { 179 return flowName; 180 } 181 182 protected void setFlowName( String flowName ) 183 { 184 this.flowName = flowName; 185 } 186 187 @Override 188 public Config getConfig() 189 { 190 return conf; 191 } 192 193 protected void setConf( Config conf ) 194 { 195 this.conf = conf; 196 } 197 198 @Override 199 public String getStepDisplayName() 200 { 201 return getStepDisplayName( Util.ID_LENGTH ); 202 } 203 204 protected String getStepDisplayName( int idLength ) 205 { 206 if( idLength > Util.ID_LENGTH ) 207 idLength = Util.ID_LENGTH; 208 209 String flowID = getFlowID().substring( 0, idLength ); 210 String stepID = getID().substring( 0, idLength ); 211 212 return String.format( "[%s/%s] %s/%s", flowID, stepID, getFlowName(), getName() ); 213 } 214 215 @Override 216 public int getSubmitPriority() 217 { 218 return submitPriority; 219 } 220 221 @Override 222 public void setSubmitPriority( int submitPriority ) 223 { 224 if( submitPriority < 1 || submitPriority > 10 ) 225 throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority ); 226 227 this.submitPriority = submitPriority; 228 } 229 230 @Override 231 public FlowStepStats getFlowStepStats() 232 { 233 return flowStepJob.getStepStats(); 234 } 235 236 public SimpleDirectedGraph<FlowElement, Scope> getGraph() 237 { 238 return graph; 239 } 240 241 @Override 242 public Group getGroup() 243 { 244 if( groups.isEmpty() ) 245 return null; 246 247 if( groups.size() > 1 ) 248 throw new IllegalStateException( "more than one group" ); 249 250 return groups.get( 0 ); 251 } 252 253 @Override 254 public List<Group> getGroups() 255 { 256 return groups; 257 } 258 259 public void addGroup( Group group ) 260 { 261 if( !groups.contains( group ) ) 262 groups.add( group ); 263 } 264 265 @Override 266 public Map<HashJoin, Tap> getStreamedSourceByJoin() 267 { 268 return streamedSourceByJoin; 269 } 270 271 public void addStreamedSourceFor( HashJoin join, Tap streamedSource ) 272 { 273 streamedSourceByJoin.put( join, streamedSource ); 274 } 275 276 @Override 277 public Set<Tap> getAllAccumulatedSources() 278 { 279 HashSet<Tap> set = new HashSet<Tap>(); 280 281 for( Set<Tap> taps : accumulatedSourcesByJoin.values() ) 282 set.addAll( taps ); 283 284 return set; 285 } 286 287 public void addAccumulatedSourceFor( HashJoin join, Tap accumulatedSource ) 288 { 289 if( !accumulatedSourcesByJoin.containsKey( join ) ) 290 accumulatedSourcesByJoin.put( join, new HashSet<Tap>() ); 291 292 accumulatedSourcesByJoin.get( join ).add( accumulatedSource ); 293 } 294 295 public void addSource( String name, Tap source ) 296 { 297 if( !sources.containsKey( source ) ) 298 sources.put( source, new HashSet<String>() ); 299 300 sources.get( source ).add( name ); 301 } 302 303 public void addSink( String name, Tap sink ) 304 { 305 if( !sinks.containsKey( sink ) ) 306 sinks.put( sink, new HashSet<String>() ); 307 308 sinks.get( sink ).add( name ); 309 } 310 311 @Override 312 public Set<Tap> getSources() 313 { 314 return Collections.unmodifiableSet( new HashSet<Tap>( sources.keySet() ) ); 315 } 316 317 @Override 318 public Set<Tap> getSinks() 319 { 320 return Collections.unmodifiableSet( new HashSet<Tap>( sinks.keySet() ) ); 321 } 322 323 @Override 324 public Tap getSink() 325 { 326 if( sinks.size() != 1 ) 327 throw new IllegalStateException( "more than one sink" ); 328 329 return sinks.keySet().iterator().next(); 330 } 331 332 @Override 333 public Set<String> getSourceName( Tap source ) 334 { 335 return Collections.unmodifiableSet( sources.get( source ) ); 336 } 337 338 @Override 339 public Set<String> getSinkName( Tap sink ) 340 { 341 return Collections.unmodifiableSet( sinks.get( sink ) ); 342 } 343 344 @Override 345 public Tap getSourceWith( String identifier ) 346 { 347 for( Tap tap : sources.keySet() ) 348 { 349 if( tap.getIdentifier().equalsIgnoreCase( identifier ) ) 350 return tap; 351 } 352 353 return null; 354 } 355 356 @Override 357 public Tap getSinkWith( String identifier ) 358 { 359 for( Tap tap : sinks.keySet() ) 360 { 361 if( tap.getIdentifier().equalsIgnoreCase( identifier ) ) 362 return tap; 363 } 364 365 return null; 366 } 367 368 boolean allSourcesExist() throws IOException 369 { 370 for( Tap tap : sources.keySet() ) 371 { 372 if( !tap.resourceExists( getConfig() ) ) 373 return false; 374 } 375 376 return true; 377 } 378 379 boolean areSourcesNewer( long sinkModified ) throws IOException 380 { 381 Config config = getConfig(); 382 Iterator<Tap> values = sources.keySet().iterator(); 383 384 long sourceModified = 0; 385 386 try 387 { 388 sourceModified = Util.getSourceModified( config, values, sinkModified ); 389 390 if( sinkModified < sourceModified ) 391 return true; 392 393 return false; 394 } 395 finally 396 { 397 if( LOG.isInfoEnabled() ) 398 logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all 399 } 400 } 401 402 long getSinkModified() throws IOException 403 { 404 long sinkModified = Util.getSinkModified( getConfig(), sinks.keySet() ); 405 406 if( LOG.isInfoEnabled() ) 407 { 408 if( sinkModified == -1L ) 409 logInfo( "at least one sink is marked for delete" ); 410 if( sinkModified == 0L ) 411 logInfo( "at least one sink does not exist" ); 412 else 413 logInfo( "sink oldest modified date: " + new Date( sinkModified ) ); 414 } 415 416 return sinkModified; 417 } 418 419 protected Throwable commitSinks() 420 { 421 Throwable throwable = null; 422 423 for( Tap tap : sinks.keySet() ) 424 { 425 if( throwable != null ) 426 rollbackResource( tap ); 427 else 428 throwable = commitResource( tap ); 429 } 430 431 return throwable; 432 } 433 434 private Throwable commitResource( Tap tap ) 435 { 436 Throwable throwable = null; 437 438 try 439 { 440 if( !tap.commitResource( getConfig() ) ) 441 { 442 String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() ); 443 444 logError( message, null ); 445 446 throwable = new FlowException( message ); 447 } 448 } 449 catch( Throwable exception ) 450 { 451 String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() ); 452 453 logError( message, exception ); 454 455 throwable = new FlowException( message, exception ); 456 } 457 458 return throwable; 459 } 460 461 private Throwable rollbackResource( Tap tap ) 462 { 463 Throwable throwable = null; 464 465 try 466 { 467 if( !tap.rollbackResource( getConfig() ) ) 468 { 469 String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() ); 470 471 logError( message, null ); 472 473 throwable = new FlowException( message ); 474 } 475 } 476 catch( Throwable exception ) 477 { 478 String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() ); 479 480 logError( message, exception ); 481 482 throwable = new FlowException( message, exception ); 483 } 484 485 return throwable; 486 } 487 488 protected Throwable rollbackSinks() 489 { 490 Throwable throwable = null; 491 492 for( Tap tap : sinks.keySet() ) 493 { 494 if( throwable != null ) 495 rollbackResource( tap ); 496 else 497 throwable = rollbackResource( tap ); 498 } 499 500 return throwable; 501 } 502 503 protected abstract Config getInitializedConfig( FlowProcess<Config> flowProcess, Config parentConfig ); 504 505 /** 506 * Method getPreviousScopes returns the previous Scope instances. If the flowElement is a Group (specifically a CoGroup), 507 * there will be more than one instance. 508 * 509 * @param flowElement of type FlowElement 510 * @return Set<Scope> 511 */ 512 public Set<Scope> getPreviousScopes( FlowElement flowElement ) 513 { 514 return getGraph().incomingEdgesOf( flowElement ); 515 } 516 517 /** 518 * Method getNextScope returns the next Scope instance in the graph. There will always only be one next. 519 * 520 * @param flowElement of type FlowElement 521 * @return Scope 522 */ 523 public Scope getNextScope( FlowElement flowElement ) 524 { 525 Set<Scope> set = getGraph().outgoingEdgesOf( flowElement ); 526 527 if( set.size() != 1 ) 528 throw new IllegalStateException( "should only be one scope after current flow element: " + flowElement + " found: " + set.size() ); 529 530 return set.iterator().next(); 531 } 532 533 public Scope getScopeFor( FlowElement sourceElement, FlowElement targetElement ) 534 { 535 return getGraph().getEdge( sourceElement, targetElement ); 536 } 537 538 public Set<Scope> getNextScopes( FlowElement flowElement ) 539 { 540 return getGraph().outgoingEdgesOf( flowElement ); 541 } 542 543 public FlowElement getNextFlowElement( Scope scope ) 544 { 545 return getGraph().getEdgeTarget( scope ); 546 } 547 548 public TopologicalOrderIterator<FlowElement, Scope> getTopologicalOrderIterator() 549 { 550 return new TopologicalOrderIterator<FlowElement, Scope>( graph ); 551 } 552 553 public List<FlowElement> getSuccessors( FlowElement element ) 554 { 555 return Graphs.successorListOf( graph, element ); 556 } 557 558 public Set<Tap> getJoinTributariesBetween( FlowElement from, FlowElement to ) 559 { 560 Set<HashJoin> joins = new HashSet<HashJoin>(); 561 Set<Merge> merges = new HashSet<Merge>(); 562 563 List<GraphPath<FlowElement, Scope>> paths = getPathsBetween( from, to ); 564 565 for( GraphPath<FlowElement, Scope> path : paths ) 566 { 567 for( FlowElement flowElement : Graphs.getPathVertexList( path ) ) 568 { 569 if( flowElement instanceof HashJoin ) 570 joins.add( (HashJoin) flowElement ); 571 572 if( flowElement instanceof Merge ) 573 merges.add( (Merge) flowElement ); 574 } 575 } 576 577 Set<Tap> tributaries = new HashSet<Tap>(); 578 579 for( HashJoin join : joins ) 580 { 581 for( Tap source : sources.keySet() ) 582 { 583 List<GraphPath<FlowElement, Scope>> joinPaths = new LinkedList( getPathsBetween( source, join ) ); 584 585 ListIterator<GraphPath<FlowElement, Scope>> iterator = joinPaths.listIterator(); 586 587 while( iterator.hasNext() ) 588 { 589 GraphPath<FlowElement, Scope> joinPath = iterator.next(); 590 591 if( !Collections.disjoint( Graphs.getPathVertexList( joinPath ), merges ) ) 592 iterator.remove(); 593 } 594 595 if( !joinPaths.isEmpty() ) 596 tributaries.add( source ); 597 } 598 } 599 600 return tributaries; 601 } 602 603 private List<GraphPath<FlowElement, Scope>> getPathsBetween( FlowElement from, FlowElement to ) 604 { 605 KShortestPaths<FlowElement, Scope> paths = new KShortestPaths<FlowElement, Scope>( graph, from, Integer.MAX_VALUE ); 606 List<GraphPath<FlowElement, Scope>> results = paths.getPaths( to ); 607 608 if( results == null ) 609 return Collections.EMPTY_LIST; 610 611 return results; 612 } 613 614 public Collection<Operation> getAllOperations() 615 { 616 Set<FlowElement> vertices = getGraph().vertexSet(); 617 List<Operation> operations = new ArrayList<Operation>(); // operations impl equals, so two instance may be the same 618 619 for( FlowElement vertex : vertices ) 620 { 621 if( vertex instanceof Operator ) 622 operations.add( ( (Operator) vertex ).getOperation() ); 623 } 624 625 return operations; 626 } 627 628 @Override 629 public boolean containsPipeNamed( String pipeName ) 630 { 631 Set<FlowElement> vertices = getGraph().vertexSet(); 632 633 for( FlowElement vertex : vertices ) 634 { 635 if( vertex instanceof Pipe && ( (Pipe) vertex ).getName().equals( pipeName ) ) 636 return true; 637 } 638 639 return false; 640 } 641 642 public void clean() 643 { 644 // use step config by default 645 clean( getConfig() ); 646 } 647 648 public abstract void clean( Config config ); 649 650 List<SafeFlowStepListener> getListeners() 651 { 652 if( listeners == null ) 653 listeners = new LinkedList<SafeFlowStepListener>(); 654 655 return listeners; 656 } 657 658 @Override 659 public boolean hasListeners() 660 { 661 return listeners != null && !listeners.isEmpty(); 662 } 663 664 @Override 665 public void addListener( FlowStepListener flowStepListener ) 666 { 667 getListeners().add( new SafeFlowStepListener( flowStepListener ) ); 668 } 669 670 @Override 671 public boolean removeListener( FlowStepListener flowStepListener ) 672 { 673 return getListeners().remove( new SafeFlowStepListener( flowStepListener ) ); 674 } 675 676 protected void fireOnCompleted() 677 { 678 679 if( hasListeners() ) 680 { 681 if( LOG.isDebugEnabled() ) 682 logDebug( "firing onCompleted event: " + getListeners().size() ); 683 684 for( Object flowStepListener : getListeners() ) 685 ( (FlowStepListener) flowStepListener ).onStepCompleted( this ); 686 } 687 } 688 689 protected void fireOnThrowable( Throwable throwable ) 690 { 691 if( hasListeners() ) 692 { 693 if( LOG.isDebugEnabled() ) 694 logDebug( "firing onThrowable event: " + getListeners().size() ); 695 696 697 for( Object flowStepListener : getListeners() ) 698 ( (FlowStepListener) flowStepListener ).onStepThrowable( this, throwable ); 699 } 700 } 701 702 protected void fireOnStopping() 703 { 704 if( hasListeners() ) 705 { 706 if( LOG.isDebugEnabled() ) 707 logDebug( "firing onStopping event: " + getListeners() ); 708 709 for( Object flowStepListener : getListeners() ) 710 ( (FlowStepListener) flowStepListener ).onStepStopping( this ); 711 } 712 } 713 714 protected void fireOnStarting() 715 { 716 if( hasListeners() ) 717 { 718 if( LOG.isDebugEnabled() ) 719 logDebug( "firing onStarting event: " + getListeners().size() ); 720 721 for( Object flowStepListener : getListeners() ) 722 ( (FlowStepListener) flowStepListener ).onStepStarting( this ); 723 } 724 } 725 726 protected void fireOnRunning() 727 { 728 if( hasListeners() ) 729 { 730 if( LOG.isDebugEnabled() ) 731 logDebug( "firing onRunning event: " + getListeners().size() ); 732 733 for( Object flowStepListener : getListeners() ) 734 ( (FlowStepListener) flowStepListener ).onStepRunning( this ); 735 } 736 } 737 738 @Override 739 public boolean equals( Object object ) 740 { 741 if( this == object ) 742 return true; 743 if( object == null || getClass() != object.getClass() ) 744 return false; 745 746 BaseFlowStep flowStep = (BaseFlowStep) object; 747 748 if( name != null ? !name.equals( flowStep.name ) : flowStep.name != null ) 749 return false; 750 751 return true; 752 } 753 754 protected ClientState createClientState( FlowProcess flowProcess ) 755 { 756 CascadingServices services = flowProcess.getCurrentSession().getCascadingServices(); 757 return services.createClientState( getID() ); 758 } 759 760 public FlowStepJob<Config> getFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig ) 761 { 762 if( flowStepJob != null ) 763 return flowStepJob; 764 765 if( flowProcess == null ) 766 return null; 767 768 flowStepJob = createFlowStepJob( flowProcess, parentConfig ); 769 770 return flowStepJob; 771 } 772 773 protected abstract FlowStepJob createFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig ); 774 775 protected void initConfFromProcessConfigDef( ConfigDef.Setter setter ) 776 { 777 // applies each mode in order, topologically 778 for( ConfigDef.Mode mode : ConfigDef.Mode.values() ) 779 { 780 TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalOrderIterator(); 781 782 while( iterator.hasNext() ) 783 { 784 FlowElement element = iterator.next(); 785 786 while( element != null ) 787 { 788 if( element.hasStepConfigDef() ) 789 element.getStepConfigDef().apply( mode, setter ); 790 791 if( element instanceof Pipe ) 792 element = ( (Pipe) element ).getParent(); 793 else 794 element = null; 795 } 796 } 797 } 798 } 799 800 @Override 801 public int hashCode() 802 { 803 return name != null ? name.hashCode() : 0; 804 } 805 806 @Override 807 public String toString() 808 { 809 StringBuffer buffer = new StringBuffer(); 810 811 buffer.append( getClass().getSimpleName() ); 812 buffer.append( "[name: " ).append( getName() ).append( "]" ); 813 814 return buffer.toString(); 815 } 816 817 public final boolean isInfoEnabled() 818 { 819 return LOG.isInfoEnabled(); 820 } 821 822 public final boolean isDebugEnabled() 823 { 824 return LOG.isDebugEnabled(); 825 } 826 827 public void logDebug( String message ) 828 { 829 LOG.debug( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message ); 830 } 831 832 public void logInfo( String message ) 833 { 834 LOG.info( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message ); 835 } 836 837 public void logWarn( String message ) 838 { 839 LOG.warn( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message ); 840 } 841 842 public void logWarn( String message, Throwable throwable ) 843 { 844 LOG.warn( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message, throwable ); 845 } 846 847 public void logError( String message, Throwable throwable ) 848 { 849 LOG.error( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message, throwable ); 850 } 851 852 /** 853 * Class SafeFlowStepListener safely calls a wrapped FlowStepListener. 854 * <p/> 855 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 856 * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method 857 * which in turn is run in a new Thread. 858 */ 859 private class SafeFlowStepListener implements FlowStepListener 860 { 861 /** Field flowListener */ 862 final FlowStepListener flowStepListener; 863 /** Field throwable */ 864 Throwable throwable; 865 866 private SafeFlowStepListener( FlowStepListener flowStepListener ) 867 { 868 this.flowStepListener = flowStepListener; 869 } 870 871 public void onStepStarting( FlowStep flowStep ) 872 { 873 try 874 { 875 flowStepListener.onStepStarting( flowStep ); 876 } 877 catch( Throwable throwable ) 878 { 879 handleThrowable( throwable ); 880 } 881 } 882 883 public void onStepStopping( FlowStep flowStep ) 884 { 885 try 886 { 887 flowStepListener.onStepStopping( flowStep ); 888 } 889 catch( Throwable throwable ) 890 { 891 handleThrowable( throwable ); 892 } 893 } 894 895 public void onStepCompleted( FlowStep flowStep ) 896 { 897 try 898 { 899 flowStepListener.onStepCompleted( flowStep ); 900 } 901 catch( Throwable throwable ) 902 { 903 handleThrowable( throwable ); 904 } 905 } 906 907 public void onStepRunning( FlowStep flowStep ) 908 { 909 try 910 { 911 flowStepListener.onStepRunning( flowStep ); 912 } 913 catch( Throwable throwable ) 914 { 915 handleThrowable( throwable ); 916 } 917 } 918 919 public boolean onStepThrowable( FlowStep flowStep, Throwable flowStepThrowable ) 920 { 921 try 922 { 923 return flowStepListener.onStepThrowable( flowStep, flowStepThrowable ); 924 } 925 catch( Throwable throwable ) 926 { 927 handleThrowable( throwable ); 928 } 929 930 return false; 931 } 932 933 private void handleThrowable( Throwable throwable ) 934 { 935 this.throwable = throwable; 936 937 logWarn( String.format( "flow step listener %s threw throwable", flowStepListener ), throwable ); 938 } 939 940 public boolean equals( Object object ) 941 { 942 if( object instanceof BaseFlowStep.SafeFlowStepListener ) 943 return flowStepListener.equals( ( (BaseFlowStep.SafeFlowStepListener) object ).flowStepListener ); 944 945 return flowStepListener.equals( object ); 946 } 947 948 public int hashCode() 949 { 950 return flowStepListener.hashCode(); 951 } 952 } 953 }