001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.Iterator; 031import java.util.LinkedHashMap; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Map; 035import java.util.Properties; 036import java.util.Set; 037import java.util.concurrent.Callable; 038import java.util.concurrent.Future; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.locks.ReentrantLock; 041 042import cascading.CascadingException; 043import cascading.cascade.Cascade; 044import cascading.flow.planner.BaseFlowNode; 045import cascading.flow.planner.BaseFlowStep; 046import cascading.flow.planner.FlowStepJob; 047import cascading.flow.planner.PlannerInfo; 048import cascading.flow.planner.PlatformInfo; 049import cascading.flow.planner.graph.ElementGraphs; 050import cascading.flow.planner.graph.FlowElementGraph; 051import cascading.flow.planner.process.FlowStepGraph; 052import cascading.flow.planner.process.ProcessGraphs; 053import cascading.management.CascadingServices; 054import cascading.management.UnitOfWorkExecutorStrategy; 055import cascading.management.UnitOfWorkSpawnStrategy; 056import cascading.management.state.ClientState; 057import cascading.property.AppProps; 058import cascading.property.PropertyUtil; 059import cascading.stats.FlowStats; 060import cascading.tap.Tap; 061import cascading.tuple.Fields; 062import cascading.tuple.TupleEntryCollector; 063import cascading.tuple.TupleEntryIterator; 064import cascading.util.ProcessLogger; 065import cascading.util.ShutdownUtil; 066import cascading.util.Update; 067import cascading.util.Util; 068import cascading.util.Version; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071import riffle.process.DependencyIncoming; 072import riffle.process.DependencyOutgoing; 073import riffle.process.ProcessCleanup; 074import riffle.process.ProcessComplete; 075import riffle.process.ProcessPrepare; 076import riffle.process.ProcessStart; 077import riffle.process.ProcessStop; 078 079import static cascading.util.Util.formatDurationFromMillis; 080 081@riffle.process.Process 082public abstract class BaseFlow<Config> implements Flow<Config>, ProcessLogger 083 { 084 /** Field LOG */ 085 private static final Logger LOG = LoggerFactory.getLogger( Flow.class ); // wrapped by ProcessLogger interface methods 086 private static final int LOG_FLOW_NAME_MAX = 25; 087 088 private PlannerInfo plannerInfo = PlannerInfo.NULL; 089 protected PlatformInfo platformInfo = PlatformInfo.NULL; 090 091 /** Field id */ 092 private String id; 093 /** Field name */ 094 private String name; 095 /** Fields runID */ 096 private String runID; 097 /** Fields classpath */ 098 private List<String> classPath; // may remain null 099 /** Field tags */ 100 private String tags; 101 /** Field listeners */ 102 private List<SafeFlowListener> listeners; 103 /** Field skipStrategy */ 104 private FlowSkipStrategy flowSkipStrategy = new FlowSkipIfSinkNotStale(); 105 /** Field flowStats */ 106 protected FlowStats flowStats; // don't use a listener to set values 107 /** Field sources */ 108 protected Map<String, Tap> sources = Collections.emptyMap(); 109 /** Field sinks */ 110 protected Map<String, Tap> sinks = Collections.emptyMap(); 111 /** Field traps */ 112 private Map<String, Tap> traps = Collections.emptyMap(); 113 /** Field checkpoints */ 114 private Map<String, Tap> checkpoints = Collections.emptyMap(); 115 /** Field stopJobsOnExit */ 116 protected boolean stopJobsOnExit = true; 117 /** Field submitPriority */ 118 private int submitPriority = 5; 119 120 /** Field stepGraph */ 121 protected FlowStepGraph flowStepGraph; 122 /** Field thread */ 123 protected transient Thread thread; 124 /** Field throwable */ 125 private Throwable throwable; 126 /** Field stop */ 127 protected boolean stop; 128 129 /** Field flowCanonicalHash */ 130 protected String flowCanonicalHash; 131 /** Field flowElementGraph */ 132 protected FlowElementGraph flowElementGraph; // only used for documentation purposes 133 134 private transient CascadingServices cascadingServices; 135 136 private FlowStepStrategy<Config> flowStepStrategy = null; 137 /** Field steps */ 138 protected transient List<FlowStep<Config>> steps; 139 /** Field jobsMap */ 140 private transient Map<String, FlowStepJob<Config>> jobsMap; 141 private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy(); 142 143 private transient ReentrantLock stopLock = new ReentrantLock( true ); 144 protected ShutdownUtil.Hook shutdownHook; 145 146 protected HashMap<String, String> flowDescriptor; 147 148 /** 149 * Returns property stopJobsOnExit. 150 * 151 * @param properties of type Map 152 * @return a boolean 153 */ 154 static boolean getStopJobsOnExit( Map<Object, Object> properties ) 155 { 156 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, FlowProps.STOP_JOBS_ON_EXIT, "true" ) ); 157 } 158 159 /** Used for testing. */ 160 protected BaseFlow() 161 { 162 this.name = "NA"; 163 this.flowStats = createPrepareFlowStats(); 164 } 165 166 /** 167 * Does not initialize stats 168 * 169 * @param name 170 */ 171 protected BaseFlow( PlatformInfo platformInfo, String name ) 172 { 173 if( platformInfo != null ) 174 this.platformInfo = platformInfo; 175 176 this.name = name; 177 } 178 179 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name, Map<String, String> flowDescriptor ) 180 { 181 if( platformInfo != null ) 182 this.platformInfo = platformInfo; 183 184 this.name = name; 185 186 if( flowDescriptor != null ) 187 this.flowDescriptor = new LinkedHashMap<>( flowDescriptor ); 188 189 addSessionProperties( properties ); 190 initConfig( properties, defaultConfig ); 191 } 192 193 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, FlowDef flowDef ) 194 { 195 properties = PropertyUtil.asFlatMap( properties ); 196 197 if( platformInfo != null ) 198 this.platformInfo = platformInfo; 199 200 this.name = flowDef.getName(); 201 this.tags = flowDef.getTags(); 202 this.runID = flowDef.getRunID(); 203 this.classPath = flowDef.getClassPath(); 204 205 if( !flowDef.getFlowDescriptor().isEmpty() ) 206 this.flowDescriptor = new LinkedHashMap<>( flowDef.getFlowDescriptor() ); 207 208 addSessionProperties( properties ); 209 initConfig( properties, defaultConfig ); 210 setSources( flowDef.getSourcesCopy() ); 211 setSinks( flowDef.getSinksCopy() ); 212 setTraps( flowDef.getTrapsCopy() ); 213 setCheckpoints( flowDef.getCheckpointsCopy() ); 214 initFromTaps(); 215 216 retrieveSourceFields(); 217 retrieveSinkFields(); 218 } 219 220 public void setPlannerInfo( PlannerInfo plannerInfo ) 221 { 222 this.plannerInfo = plannerInfo; 223 } 224 225 @Override 226 public PlannerInfo getPlannerInfo() 227 { 228 return plannerInfo; 229 } 230 231 @Override 232 public PlatformInfo getPlatformInfo() 233 { 234 return platformInfo; 235 } 236 237 public void initialize( FlowElementGraph flowElementGraph, FlowStepGraph flowStepGraph ) 238 { 239 addPlannerProperties(); 240 this.flowElementGraph = flowElementGraph; 241 this.flowStepGraph = flowStepGraph; 242 243 initSteps(); 244 245 this.flowStats = createPrepareFlowStats(); 246 247 initializeNewJobsMap(); 248 249 initializeChildStats(); 250 } 251 252 public FlowElementGraph updateSchemes( FlowElementGraph pipeGraph ) 253 { 254 presentSourceFields( pipeGraph ); 255 256 presentSinkFields( pipeGraph ); 257 258 return new FlowElementGraph( pipeGraph ); 259 } 260 261 /** Force a Scheme to fetch any fields from a meta-data store */ 262 protected void retrieveSourceFields() 263 { 264 for( Tap tap : sources.values() ) 265 tap.retrieveSourceFields( getFlowProcess() ); 266 } 267 268 /** 269 * Present the current resolved fields for the Tap 270 * 271 * @param pipeGraph 272 */ 273 protected void presentSourceFields( FlowElementGraph pipeGraph ) 274 { 275 for( Tap tap : sources.values() ) 276 { 277 if( pipeGraph.containsVertex( tap ) ) 278 tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 279 } 280 281 for( Tap tap : checkpoints.values() ) 282 { 283 if( pipeGraph.containsVertex( tap ) ) 284 tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 285 } 286 } 287 288 /** Force a Scheme to fetch any fields from a meta-data store */ 289 protected void retrieveSinkFields() 290 { 291 for( Tap tap : sinks.values() ) 292 tap.retrieveSinkFields( getFlowProcess() ); 293 } 294 295 /** 296 * Present the current resolved fields for the Tap 297 * 298 * @param pipeGraph 299 */ 300 protected void presentSinkFields( FlowElementGraph pipeGraph ) 301 { 302 for( Tap tap : sinks.values() ) 303 { 304 if( pipeGraph.containsVertex( tap ) ) 305 tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 306 } 307 308 for( Tap tap : checkpoints.values() ) 309 { 310 if( pipeGraph.containsVertex( tap ) ) 311 tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 312 } 313 } 314 315 protected Fields getFieldsFor( FlowElementGraph pipeGraph, Tap tap ) 316 { 317 return pipeGraph.outgoingEdgesOf( tap ).iterator().next().getOutValuesFields(); 318 } 319 320 protected void addSessionProperties( Map<Object, Object> properties ) 321 { 322 if( properties == null ) 323 return; 324 325 PropertyUtil.setProperty( properties, CASCADING_FLOW_ID, getID() ); 326 PropertyUtil.setProperty( properties, "cascading.flow.tags", getTags() ); 327 AppProps.setApplicationID( properties ); 328 PropertyUtil.setProperty( properties, "cascading.app.name", makeAppName( properties ) ); 329 PropertyUtil.setProperty( properties, "cascading.app.version", makeAppVersion( properties ) ); 330 } 331 332 protected void addPlannerProperties() 333 { 334 setConfigProperty( getConfig(), "cascading.flow.planner", getPlannerInfo().name ); 335 setConfigProperty( getConfig(), "cascading.flow.platform", getPlannerInfo().platform ); 336 setConfigProperty( getConfig(), "cascading.flow.registry", getPlannerInfo().registry ); 337 } 338 339 private String makeAppName( Map<Object, Object> properties ) 340 { 341 if( properties == null ) 342 return null; 343 344 String name = AppProps.getApplicationName( properties ); 345 346 if( name != null ) 347 return name; 348 349 return Util.findName( AppProps.getApplicationJarPath( properties ) ); 350 } 351 352 private String makeAppVersion( Map<Object, Object> properties ) 353 { 354 if( properties == null ) 355 return null; 356 357 String name = AppProps.getApplicationVersion( properties ); 358 359 if( name != null ) 360 return name; 361 362 return Util.findVersion( AppProps.getApplicationJarPath( properties ) ); 363 } 364 365 protected FlowStats createPrepareFlowStats() 366 { 367 FlowStats flowStats = createFlowStats(); 368 369 flowStats.prepare(); 370 flowStats.markPending(); 371 372 return flowStats; 373 } 374 375 protected FlowStats createFlowStats() 376 { 377 return new FlowStats( this, getClientState() ); 378 } 379 380 public CascadingServices getCascadingServices() 381 { 382 if( cascadingServices == null ) 383 cascadingServices = new CascadingServices( getConfigAsProperties() ); 384 385 return cascadingServices; 386 } 387 388 protected ClientState getClientState() 389 { 390 return getFlowSession().getCascadingServices().createClientState( getID() ); 391 } 392 393 protected void initSteps() 394 { 395 if( flowStepGraph == null ) 396 return; 397 398 Set<FlowStep> flowSteps = flowStepGraph.vertexSet(); 399 400 for( FlowStep flowStep : flowSteps ) 401 { 402 ( (BaseFlowStep) flowStep ).setFlow( this ); 403 404 Set<FlowNode> flowNodes = flowStep.getFlowNodeGraph().vertexSet(); 405 406 for( FlowNode flowNode : flowNodes ) 407 ( (BaseFlowNode) flowNode ).setFlowStep( flowStep ); 408 } 409 } 410 411 private void initFromTaps() 412 { 413 initFromTaps( sources ); 414 initFromTaps( sinks ); 415 initFromTaps( traps ); 416 } 417 418 private void initFromTaps( Map<String, Tap> taps ) 419 { 420 for( Tap tap : taps.values() ) 421 tap.flowConfInit( this ); 422 } 423 424 @Override 425 public String getName() 426 { 427 return name; 428 } 429 430 protected void setName( String name ) 431 { 432 this.name = name; 433 } 434 435 @Override 436 public String getID() 437 { 438 if( id == null ) 439 id = Util.createUniqueID(); 440 441 return id; 442 } 443 444 @Override 445 public String getTags() 446 { 447 return tags; 448 } 449 450 @Override 451 public int getSubmitPriority() 452 { 453 return submitPriority; 454 } 455 456 @Override 457 public void setSubmitPriority( int submitPriority ) 458 { 459 if( submitPriority < 1 || submitPriority > 10 ) 460 throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority ); 461 462 this.submitPriority = submitPriority; 463 } 464 465 /** 466 * The hash value can be used to determine if two unique Flow instances performed the same work. 467 * <p/> 468 * The source and sink taps are not relevant to the hash. 469 * 470 * @return a String value 471 */ 472 public String getFlowCanonicalHash() 473 { 474 if( flowCanonicalHash != null || flowElementGraph == null ) 475 return flowCanonicalHash; 476 477 // synchronize on flowElementGraph to prevent duplicate hash creation - can be high overhead 478 // and to prevent deadlocks if the DocumentService calls back into the flow when transitioning 479 // into a running state 480 synchronized( flowElementGraph ) 481 { 482 flowCanonicalHash = createFlowCanonicalHash( flowElementGraph ); 483 } 484 485 return flowCanonicalHash; 486 } 487 488 // allow for sub-class overrides 489 protected String createFlowCanonicalHash( FlowElementGraph flowElementGraph ) 490 { 491 if( flowElementGraph == null ) 492 return null; 493 494 return ElementGraphs.canonicalHash( flowElementGraph ); 495 } 496 497 FlowElementGraph getFlowElementGraph() 498 { 499 return flowElementGraph; 500 } 501 502 FlowStepGraph getFlowStepGraph() 503 { 504 return flowStepGraph; 505 } 506 507 protected void setSources( Map<String, Tap> sources ) 508 { 509 if( sources == null ) 510 return; 511 512 addListeners( sources.values() ); 513 this.sources = sources; 514 } 515 516 protected void setSinks( Map<String, Tap> sinks ) 517 { 518 if( sinks == null ) 519 return; 520 521 addListeners( sinks.values() ); 522 this.sinks = sinks; 523 } 524 525 protected void setTraps( Map<String, Tap> traps ) 526 { 527 addListeners( traps.values() ); 528 this.traps = traps; 529 } 530 531 protected void setCheckpoints( Map<String, Tap> checkpoints ) 532 { 533 addListeners( checkpoints.values() ); 534 this.checkpoints = checkpoints; 535 } 536 537 protected void setFlowStepGraph( FlowStepGraph flowStepGraph ) 538 { 539 this.flowStepGraph = flowStepGraph; 540 } 541 542 /** 543 * This method creates a new internal Config with the parentConfig as defaults using the properties to override 544 * the defaults. 545 * 546 * @param properties of type Map 547 * @param parentConfig of type Config 548 */ 549 protected abstract void initConfig( Map<Object, Object> properties, Config parentConfig ); 550 551 public Config createConfig( Map<Object, Object> properties, Config defaultConfig ) 552 { 553 Config config = newConfig( defaultConfig ); 554 555 if( properties == null ) 556 return config; 557 558 Set<Object> keys = new HashSet<>( properties.keySet() ); 559 560 // keys will only be grabbed if both key/value are String, so keep orig keys 561 if( properties instanceof Properties ) 562 keys.addAll( ( (Properties) properties ).stringPropertyNames() ); 563 564 for( Object key : keys ) 565 { 566 Object value = properties.get( key ); 567 568 if( value == null && properties instanceof Properties && key instanceof String ) 569 value = ( (Properties) properties ).getProperty( (String) key ); 570 571 if( value == null ) // don't stuff null values 572 continue; 573 574 setConfigProperty( config, key, value ); 575 } 576 577 return config; 578 } 579 580 protected abstract void setConfigProperty( Config config, Object key, Object value ); 581 582 protected abstract Config newConfig( Config defaultConfig ); 583 584 protected void initFromProperties( Map<Object, Object> properties ) 585 { 586 stopJobsOnExit = getStopJobsOnExit( properties ); 587 } 588 589 public FlowSession getFlowSession() 590 { 591 return new FlowSession( getCascadingServices() ); 592 } 593 594 @Override 595 public FlowStats getFlowStats() 596 { 597 return flowStats; 598 } 599 600 @Override 601 public Map<String, String> getFlowDescriptor() 602 { 603 if( flowDescriptor == null ) 604 return Collections.emptyMap(); 605 606 return Collections.unmodifiableMap( flowDescriptor ); 607 } 608 609 @Override 610 public FlowStats getStats() 611 { 612 return getFlowStats(); 613 } 614 615 void addListeners( Collection listeners ) 616 { 617 for( Object listener : listeners ) 618 { 619 if( listener instanceof FlowListener ) 620 addListener( (FlowListener) listener ); 621 } 622 } 623 624 List<SafeFlowListener> getListeners() 625 { 626 if( listeners == null ) 627 listeners = new LinkedList<SafeFlowListener>(); 628 629 return listeners; 630 } 631 632 @Override 633 public boolean hasListeners() 634 { 635 return listeners != null && !listeners.isEmpty(); 636 } 637 638 @Override 639 public void addListener( FlowListener flowListener ) 640 { 641 getListeners().add( new SafeFlowListener( flowListener ) ); 642 } 643 644 @Override 645 public boolean removeListener( FlowListener flowListener ) 646 { 647 return getListeners().remove( new SafeFlowListener( flowListener ) ); 648 } 649 650 @Override 651 public boolean hasStepListeners() 652 { 653 boolean hasStepListeners = false; 654 655 for( FlowStep step : getFlowSteps() ) 656 hasStepListeners |= step.hasListeners(); 657 658 return hasStepListeners; 659 } 660 661 @Override 662 public void addStepListener( FlowStepListener flowStepListener ) 663 { 664 for( FlowStep step : getFlowSteps() ) 665 step.addListener( flowStepListener ); 666 } 667 668 @Override 669 public boolean removeStepListener( FlowStepListener flowStepListener ) 670 { 671 boolean listenerRemoved = true; 672 673 for( FlowStep step : getFlowSteps() ) 674 listenerRemoved &= step.removeListener( flowStepListener ); 675 676 return listenerRemoved; 677 } 678 679 @Override 680 public Map<String, Tap> getSources() 681 { 682 return Collections.unmodifiableMap( sources ); 683 } 684 685 @Override 686 public List<String> getSourceNames() 687 { 688 return new ArrayList<String>( sources.keySet() ); 689 } 690 691 @Override 692 public Tap getSource( String name ) 693 { 694 return sources.get( name ); 695 } 696 697 @Override 698 @DependencyIncoming 699 public Collection<Tap> getSourcesCollection() 700 { 701 return getSources().values(); 702 } 703 704 @Override 705 public Map<String, Tap> getSinks() 706 { 707 return Collections.unmodifiableMap( sinks ); 708 } 709 710 @Override 711 public List<String> getSinkNames() 712 { 713 return new ArrayList<String>( sinks.keySet() ); 714 } 715 716 @Override 717 public Tap getSink( String name ) 718 { 719 return sinks.get( name ); 720 } 721 722 @Override 723 @DependencyOutgoing 724 public Collection<Tap> getSinksCollection() 725 { 726 return getSinks().values(); 727 } 728 729 @Override 730 public Tap getSink() 731 { 732 return sinks.values().iterator().next(); 733 } 734 735 @Override 736 public Map<String, Tap> getTraps() 737 { 738 return Collections.unmodifiableMap( traps ); 739 } 740 741 @Override 742 public List<String> getTrapNames() 743 { 744 return new ArrayList<String>( traps.keySet() ); 745 } 746 747 @Override 748 public Collection<Tap> getTrapsCollection() 749 { 750 return getTraps().values(); 751 } 752 753 @Override 754 public Map<String, Tap> getCheckpoints() 755 { 756 return Collections.unmodifiableMap( checkpoints ); 757 } 758 759 @Override 760 public List<String> getCheckpointNames() 761 { 762 return new ArrayList<String>( checkpoints.keySet() ); 763 } 764 765 @Override 766 public Collection<Tap> getCheckpointsCollection() 767 { 768 return getCheckpoints().values(); 769 } 770 771 @Override 772 public boolean isStopJobsOnExit() 773 { 774 return stopJobsOnExit; 775 } 776 777 @Override 778 public FlowSkipStrategy getFlowSkipStrategy() 779 { 780 return flowSkipStrategy; 781 } 782 783 @Override 784 public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ) 785 { 786 if( flowSkipStrategy == null ) 787 throw new IllegalArgumentException( "flowSkipStrategy may not be null" ); 788 789 try 790 { 791 return this.flowSkipStrategy; 792 } 793 finally 794 { 795 this.flowSkipStrategy = flowSkipStrategy; 796 } 797 } 798 799 @Override 800 public boolean isSkipFlow() throws IOException 801 { 802 return flowSkipStrategy.skipFlow( this ); 803 } 804 805 @Override 806 public boolean areSinksStale() throws IOException 807 { 808 return areSourcesNewer( getSinkModified() ); 809 } 810 811 @Override 812 public boolean areSourcesNewer( long sinkModified ) throws IOException 813 { 814 Config config = getConfig(); 815 Iterator<Tap> values = sources.values().iterator(); 816 817 long sourceModified = 0; 818 819 try 820 { 821 sourceModified = Util.getSourceModified( config, values, sinkModified ); 822 823 if( sinkModified < sourceModified ) 824 return true; 825 826 return false; 827 } 828 finally 829 { 830 if( LOG.isInfoEnabled() ) 831 logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all 832 } 833 } 834 835 @Override 836 public long getSinkModified() throws IOException 837 { 838 long sinkModified = Util.getSinkModified( getConfig(), sinks.values() ); 839 840 if( LOG.isInfoEnabled() ) 841 { 842 if( sinkModified == -1L ) 843 logInfo( "at least one sink is marked for delete" ); 844 if( sinkModified == 0L ) 845 logInfo( "at least one sink does not exist" ); 846 else 847 logInfo( "sink oldest modified date: " + new Date( sinkModified ) ); 848 } 849 850 return sinkModified; 851 } 852 853 @Override 854 public FlowStepStrategy getFlowStepStrategy() 855 { 856 return flowStepStrategy; 857 } 858 859 @Override 860 public void setFlowStepStrategy( FlowStepStrategy flowStepStrategy ) 861 { 862 this.flowStepStrategy = flowStepStrategy; 863 } 864 865 @Override 866 public List<FlowStep<Config>> getFlowSteps() 867 { 868 if( steps != null ) 869 return steps; 870 871 if( flowStepGraph == null ) 872 return Collections.emptyList(); 873 874 Iterator<FlowStep> topoIterator = flowStepGraph.getTopologicalIterator(); 875 876 steps = new ArrayList<>(); 877 878 while( topoIterator.hasNext() ) 879 steps.add( topoIterator.next() ); 880 881 return steps; 882 } 883 884 @Override 885 @ProcessPrepare 886 public void prepare() 887 { 888 try 889 { 890 deleteSinksIfNotUpdate(); 891 deleteTrapsIfNotUpdate(); 892 deleteCheckpointsIfNotUpdate(); 893 } 894 catch( IOException exception ) 895 { 896 throw new FlowException( "unable to prepare flow", exception ); 897 } 898 } 899 900 @Override 901 @ProcessStart 902 public synchronized void start() 903 { 904 if( thread != null ) 905 return; 906 907 if( stop ) 908 return; 909 910 registerShutdownHook(); 911 912 internalStart(); 913 914 String threadName = ( "flow " + Util.toNull( getName() ) ).trim(); 915 916 thread = createFlowThread( threadName ); 917 918 thread.start(); 919 } 920 921 protected Thread createFlowThread( String threadName ) 922 { 923 return new Thread( new Runnable() 924 { 925 @Override 926 public void run() 927 { 928 BaseFlow.this.run(); 929 } 930 }, threadName ); 931 } 932 933 protected abstract void internalStart(); 934 935 @Override 936 @ProcessStop 937 public synchronized void stop() 938 { 939 stopLock.lock(); 940 941 try 942 { 943 if( stop ) 944 return; 945 946 stop = true; 947 948 fireOnStopping(); 949 950 if( !flowStats.isFinished() ) 951 flowStats.markStopped(); 952 953 internalStopAllJobs(); 954 955 handleExecutorShutdown(); 956 957 internalClean( true ); 958 } 959 finally 960 { 961 flowStats.cleanup(); 962 stopLock.unlock(); 963 } 964 } 965 966 protected abstract void internalClean( boolean stop ); 967 968 @Override 969 @ProcessComplete 970 public void complete() 971 { 972 start(); 973 974 try 975 { 976 try 977 { 978 synchronized( this ) // prevent NPE on quick stop() & complete() after start() 979 { 980 while( thread == null && !stop ) 981 Util.safeSleep( 10 ); 982 } 983 984 if( thread != null ) 985 thread.join(); 986 } 987 catch( InterruptedException exception ) 988 { 989 throw new FlowException( getName(), "thread interrupted", exception ); 990 } 991 992 // if in #stop and stopping, lets wait till its done in this thread 993 try 994 { 995 stopLock.lock(); 996 } 997 finally 998 { 999 stopLock.unlock(); 1000 } 1001 1002 if( throwable instanceof FlowException ) 1003 ( (FlowException) throwable ).setFlowName( getName() ); 1004 1005 if( throwable instanceof CascadingException ) 1006 throw (CascadingException) throwable; 1007 1008 if( throwable instanceof OutOfMemoryError ) 1009 throw (OutOfMemoryError) throwable; 1010 1011 if( throwable != null ) 1012 throw new FlowException( getName(), "unhandled exception", throwable ); 1013 1014 if( hasListeners() ) 1015 { 1016 for( SafeFlowListener safeFlowListener : getListeners() ) 1017 { 1018 if( safeFlowListener.throwable != null ) 1019 throw new FlowException( getName(), "unhandled listener exception", throwable ); 1020 } 1021 } 1022 } 1023 finally 1024 { 1025 thread = null; 1026 throwable = null; 1027 } 1028 } 1029 1030 private void commitTraps() 1031 { 1032 // commit all the traps, don't fail on an error 1033 for( Tap tap : traps.values() ) 1034 { 1035 try 1036 { 1037 if( !tap.commitResource( getConfig() ) ) 1038 logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ) ); 1039 } 1040 catch( IOException exception ) 1041 { 1042 logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), exception ); 1043 } 1044 } 1045 } 1046 1047 @Override 1048 @ProcessCleanup 1049 public void cleanup() 1050 { 1051 // do nothing 1052 } 1053 1054 @Override 1055 public TupleEntryIterator openSource() throws IOException 1056 { 1057 return sources.values().iterator().next().openForRead( getFlowProcess() ); 1058 } 1059 1060 @Override 1061 public TupleEntryIterator openSource( String name ) throws IOException 1062 { 1063 if( !sources.containsKey( name ) ) 1064 throw new IllegalArgumentException( "source does not exist: " + name ); 1065 1066 return sources.get( name ).openForRead( getFlowProcess() ); 1067 } 1068 1069 @Override 1070 public TupleEntryIterator openSink() throws IOException 1071 { 1072 return sinks.values().iterator().next().openForRead( getFlowProcess() ); 1073 } 1074 1075 @Override 1076 public TupleEntryIterator openSink( String name ) throws IOException 1077 { 1078 if( !sinks.containsKey( name ) ) 1079 throw new IllegalArgumentException( "sink does not exist: " + name ); 1080 1081 return sinks.get( name ).openForRead( getFlowProcess() ); 1082 } 1083 1084 @Override 1085 public TupleEntryIterator openTrap() throws IOException 1086 { 1087 return traps.values().iterator().next().openForRead( getFlowProcess() ); 1088 } 1089 1090 @Override 1091 public TupleEntryIterator openTrap( String name ) throws IOException 1092 { 1093 if( !traps.containsKey( name ) ) 1094 throw new IllegalArgumentException( "trap does not exist: " + name ); 1095 1096 return traps.get( name ).openForRead( getFlowProcess() ); 1097 } 1098 1099 /** 1100 * Method deleteSinks deletes all sinks, whether or not they are configured for {@link cascading.tap.SinkMode#UPDATE}. 1101 * <p/> 1102 * Use with caution. 1103 * 1104 * @throws IOException when 1105 * @see BaseFlow#deleteSinksIfNotUpdate() 1106 */ 1107 public void deleteSinks() throws IOException 1108 { 1109 for( Tap tap : sinks.values() ) 1110 deleteOrFail( tap ); 1111 } 1112 1113 private void deleteOrFail( Tap tap ) throws IOException 1114 { 1115 if( !tap.resourceExists( getConfig() ) ) 1116 return; 1117 1118 if( !tap.deleteResource( getConfig() ) ) 1119 throw new FlowException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) ); 1120 } 1121 1122 /** 1123 * Method deleteSinksIfNotUpdate deletes all sinks if they are not configured with the {@link cascading.tap.SinkMode#UPDATE} flag. 1124 * <p/> 1125 * Typically used by a {@link Cascade} before executing the flow if the sinks are stale. 1126 * <p/> 1127 * Use with caution. 1128 * 1129 * @throws IOException when 1130 */ 1131 public void deleteSinksIfNotUpdate() throws IOException 1132 { 1133 for( Tap tap : sinks.values() ) 1134 { 1135 if( !tap.isUpdate() ) 1136 deleteOrFail( tap ); 1137 } 1138 } 1139 1140 public void deleteSinksIfReplace() throws IOException 1141 { 1142 for( Tap tap : sinks.values() ) 1143 { 1144 if( tap.isReplace() ) 1145 deleteOrFail( tap ); 1146 } 1147 } 1148 1149 public void deleteTrapsIfNotUpdate() throws IOException 1150 { 1151 for( Tap tap : traps.values() ) 1152 { 1153 if( !tap.isUpdate() ) 1154 deleteOrFail( tap ); 1155 } 1156 } 1157 1158 public void deleteCheckpointsIfNotUpdate() throws IOException 1159 { 1160 for( Tap tap : checkpoints.values() ) 1161 { 1162 if( !tap.isUpdate() ) 1163 deleteOrFail( tap ); 1164 } 1165 } 1166 1167 public void deleteTrapsIfReplace() throws IOException 1168 { 1169 for( Tap tap : traps.values() ) 1170 { 1171 if( tap.isReplace() ) 1172 deleteOrFail( tap ); 1173 } 1174 } 1175 1176 public void deleteCheckpointsIfReplace() throws IOException 1177 { 1178 for( Tap tap : checkpoints.values() ) 1179 { 1180 if( tap.isReplace() ) 1181 deleteOrFail( tap ); 1182 } 1183 } 1184 1185 @Override 1186 public boolean resourceExists( Tap tap ) throws IOException 1187 { 1188 return tap.resourceExists( getConfig() ); 1189 } 1190 1191 @Override 1192 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 1193 { 1194 return tap.openForRead( getFlowProcess() ); 1195 } 1196 1197 @Override 1198 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 1199 { 1200 return tap.openForWrite( getFlowProcess() ); 1201 } 1202 1203 /** Method run implements the Runnable run method and should not be called by users. */ 1204 private void run() 1205 { 1206 if( thread == null ) 1207 throw new IllegalStateException( "to start a Flow call start() or complete(), not Runnable#run()" ); 1208 1209 Version.printBanner(); 1210 Update.checkForUpdate( getPlatformInfo() ); 1211 1212 try 1213 { 1214 if( stop ) 1215 return; 1216 1217 flowStats.markStarted(); 1218 1219 fireOnStarting(); 1220 1221 if( LOG.isInfoEnabled() ) 1222 { 1223 logInfo( "starting" ); 1224 1225 for( Tap source : getSourcesCollection() ) 1226 logInfo( " source: " + source ); 1227 for( Tap sink : getSinksCollection() ) 1228 logInfo( " sink: " + sink ); 1229 } 1230 1231 // if jobs are run local, then only use one thread to force execution serially 1232 //int numThreads = jobsAreLocal() ? 1 : getMaxConcurrentSteps( getJobConf() ); 1233 int numThreads = getMaxNumParallelSteps(); 1234 1235 if( numThreads == 0 ) 1236 numThreads = jobsMap.size(); 1237 1238 if( numThreads == 0 ) 1239 throw new IllegalStateException( "no jobs rendered for flow: " + getName() ); 1240 1241 if( LOG.isInfoEnabled() ) 1242 { 1243 logInfo( " parallel execution of steps is enabled: " + ( getMaxNumParallelSteps() != 1 ) ); 1244 logInfo( " executing total steps: " + jobsMap.size() ); 1245 logInfo( " allocating management threads: " + numThreads ); 1246 } 1247 1248 List<Future<Throwable>> futures = spawnJobs( numThreads ); 1249 1250 for( Future<Throwable> future : futures ) 1251 { 1252 throwable = future.get(); 1253 1254 if( throwable != null ) 1255 { 1256 if( !stop ) 1257 internalStopAllJobs(); 1258 1259 handleExecutorShutdown(); 1260 break; 1261 } 1262 } 1263 } 1264 catch( Throwable throwable ) 1265 { 1266 this.throwable = throwable; 1267 } 1268 finally 1269 { 1270 handleThrowableAndMarkFailed(); 1271 1272 if( !stop && !flowStats.isFinished() ) 1273 flowStats.markSuccessful(); 1274 1275 internalClean( stop ); // cleaning temp taps may be determined by success/failure 1276 1277 commitTraps(); 1278 1279 try 1280 { 1281 fireOnCompleted(); 1282 } 1283 finally 1284 { 1285 if( LOG.isInfoEnabled() ) 1286 { 1287 long totalSliceCPUSeconds = getTotalSliceCPUMilliSeconds(); 1288 1289 if( totalSliceCPUSeconds == -1 ) 1290 logInfo( " completed in: " + formatDurationFromMillis( flowStats.getDuration() ) ); 1291 else 1292 logInfo( " completed in: " + formatDurationFromMillis( flowStats.getDuration() ) + ", using cpu time: " + formatDurationFromMillis( totalSliceCPUSeconds ) ); 1293 } 1294 1295 flowStats.cleanup(); 1296 internalShutdown(); 1297 deregisterShutdownHook(); 1298 } 1299 } 1300 } 1301 1302 protected long getTotalSliceCPUMilliSeconds() 1303 { 1304 return -1; 1305 } 1306 1307 protected abstract int getMaxNumParallelSteps(); 1308 1309 protected abstract void internalShutdown(); 1310 1311 private List<Future<Throwable>> spawnJobs( int numThreads ) throws InterruptedException 1312 { 1313 if( spawnStrategy == null ) 1314 { 1315 logError( "no spawnStrategy set" ); 1316 return new ArrayList<>(); 1317 } 1318 1319 if( stop ) 1320 return new ArrayList<>(); 1321 1322 List<Callable<Throwable>> list = new ArrayList<>(); 1323 1324 for( FlowStepJob<Config> job : jobsMap.values() ) 1325 list.add( job ); 1326 1327 return spawnStrategy.start( this, numThreads, list ); 1328 } 1329 1330 private void handleThrowableAndMarkFailed() 1331 { 1332 if( throwable != null && !stop ) 1333 { 1334 flowStats.markFailed( throwable ); 1335 1336 fireOnThrowable(); 1337 } 1338 } 1339 1340 Map<String, FlowStepJob<Config>> getJobsMap() 1341 { 1342 return jobsMap; 1343 } 1344 1345 protected void initializeNewJobsMap() 1346 { 1347 jobsMap = new LinkedHashMap<>(); // keep topo order 1348 Iterator<FlowStep> topoIterator = flowStepGraph.getTopologicalIterator(); 1349 1350 while( topoIterator.hasNext() ) 1351 { 1352 BaseFlowStep<Config> step = (BaseFlowStep) topoIterator.next(); 1353 FlowStepJob<Config> flowStepJob = step.getCreateFlowStepJob( getFlowProcess(), getConfig() ); 1354 1355 jobsMap.put( step.getName(), flowStepJob ); 1356 1357 List<FlowStepJob<Config>> predecessors = new ArrayList<FlowStepJob<Config>>(); 1358 1359 for( Object flowStep : ProcessGraphs.predecessorListOf( flowStepGraph, step ) ) 1360 predecessors.add( jobsMap.get( ( (FlowStep) flowStep ).getName() ) ); 1361 1362 flowStepJob.setPredecessors( predecessors ); 1363 } 1364 } 1365 1366 protected void initializeChildStats() 1367 { 1368 for( FlowStepJob<Config> flowStepJob : jobsMap.values() ) 1369 flowStats.addStepStats( flowStepJob.getStepStats() ); 1370 } 1371 1372 protected void internalStopAllJobs() 1373 { 1374 logInfo( "stopping all jobs" ); 1375 1376 try 1377 { 1378 if( jobsMap == null ) 1379 return; 1380 1381 List<FlowStepJob<Config>> jobs = new ArrayList<FlowStepJob<Config>>( jobsMap.values() ); 1382 1383 Collections.reverse( jobs ); 1384 1385 for( FlowStepJob<Config> job : jobs ) 1386 job.stop(); 1387 } 1388 finally 1389 { 1390 logInfo( "stopped all jobs" ); 1391 } 1392 } 1393 1394 protected void handleExecutorShutdown() 1395 { 1396 if( spawnStrategy == null ) 1397 return; 1398 1399 if( spawnStrategy.isCompleted( this ) ) 1400 return; 1401 1402 logDebug( "shutting down job executor" ); 1403 1404 try 1405 { 1406 spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS ); 1407 } 1408 catch( InterruptedException exception ) 1409 { 1410 // ignore 1411 } 1412 1413 logDebug( "shutdown of job executor complete" ); 1414 } 1415 1416 protected void fireOnCompleted() 1417 { 1418 if( hasListeners() ) 1419 { 1420 if( isDebugEnabled() ) 1421 logDebug( "firing onCompleted event: " + getListeners().size() ); 1422 1423 for( FlowListener flowListener : getListeners() ) 1424 flowListener.onCompleted( this ); 1425 } 1426 } 1427 1428 protected void fireOnThrowable( Throwable throwable ) 1429 { 1430 this.throwable = throwable; 1431 fireOnThrowable(); 1432 } 1433 1434 protected void fireOnThrowable() 1435 { 1436 if( hasListeners() ) 1437 { 1438 if( isDebugEnabled() ) 1439 logDebug( "firing onThrowable event: " + getListeners().size() ); 1440 1441 boolean isHandled = false; 1442 1443 for( FlowListener flowListener : getListeners() ) 1444 isHandled = flowListener.onThrowable( this, throwable ) || isHandled; 1445 1446 if( isHandled ) 1447 throwable = null; 1448 } 1449 } 1450 1451 protected void fireOnStopping() 1452 { 1453 if( hasListeners() ) 1454 { 1455 if( isDebugEnabled() ) 1456 logDebug( "firing onStopping event: " + getListeners().size() ); 1457 1458 for( FlowListener flowListener : getListeners() ) 1459 flowListener.onStopping( this ); 1460 } 1461 } 1462 1463 protected void fireOnStarting() 1464 { 1465 if( hasListeners() ) 1466 { 1467 if( isDebugEnabled() ) 1468 logDebug( "firing onStarting event: " + getListeners().size() ); 1469 1470 for( FlowListener flowListener : getListeners() ) 1471 flowListener.onStarting( this ); 1472 } 1473 } 1474 1475 @Override 1476 public String toString() 1477 { 1478 StringBuffer buffer = new StringBuffer(); 1479 1480 if( getName() != null ) 1481 buffer.append( getName() ).append( ": " ); 1482 1483 for( FlowStep step : getFlowSteps() ) 1484 buffer.append( step ); 1485 1486 return buffer.toString(); 1487 } 1488 1489 @Override 1490 public final boolean isInfoEnabled() 1491 { 1492 return LOG.isInfoEnabled(); 1493 } 1494 1495 @Override 1496 public final boolean isDebugEnabled() 1497 { 1498 return LOG.isDebugEnabled(); 1499 } 1500 1501 @Override 1502 public void logInfo( String message, Object... arguments ) 1503 { 1504 LOG.info( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments ); 1505 } 1506 1507 @Override 1508 public void logDebug( String message, Object... arguments ) 1509 { 1510 LOG.debug( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments ); 1511 } 1512 1513 @Override 1514 public void logWarn( String message ) 1515 { 1516 LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message ); 1517 } 1518 1519 @Override 1520 public void logWarn( String message, Throwable throwable ) 1521 { 1522 LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, throwable ); 1523 } 1524 1525 @Override 1526 public void logWarn( String message, Object... arguments ) 1527 { 1528 LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments ); 1529 } 1530 1531 @Override 1532 public void logError( String message, Object... arguments ) 1533 { 1534 LOG.error( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments ); 1535 } 1536 1537 @Override 1538 public void logError( String message, Throwable throwable ) 1539 { 1540 LOG.error( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, throwable ); 1541 } 1542 1543 @Override 1544 public void writeDOT( String filename ) 1545 { 1546 if( flowElementGraph == null ) 1547 throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" ); 1548 1549 flowElementGraph.writeDOT( filename ); 1550 } 1551 1552 @Override 1553 public void writeStepsDOT( String filename ) 1554 { 1555 if( flowStepGraph == null ) 1556 throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" ); 1557 1558 flowStepGraph.writeDOT( filename ); 1559 } 1560 1561 /** 1562 * Used to return a simple wrapper for use as an edge in a graph where there can only be 1563 * one instance of every edge. 1564 * 1565 * @return FlowHolder 1566 */ 1567 public FlowHolder getHolder() 1568 { 1569 return new FlowHolder( this ); 1570 } 1571 1572 public void setCascade( Cascade cascade ) 1573 { 1574 setConfigProperty( getConfig(), "cascading.cascade.id", cascade.getID() ); 1575 flowStats.recordInfo(); 1576 } 1577 1578 @Override 1579 public String getCascadeID() 1580 { 1581 return getProperty( "cascading.cascade.id" ); 1582 } 1583 1584 @Override 1585 public String getRunID() 1586 { 1587 return runID; 1588 } 1589 1590 public List<String> getClassPath() 1591 { 1592 return classPath; 1593 } 1594 1595 @Override 1596 public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy ) 1597 { 1598 this.spawnStrategy = spawnStrategy; 1599 } 1600 1601 @Override 1602 public UnitOfWorkSpawnStrategy getSpawnStrategy() 1603 { 1604 return spawnStrategy; 1605 } 1606 1607 protected void registerShutdownHook() 1608 { 1609 if( !isStopJobsOnExit() ) 1610 return; 1611 1612 shutdownHook = new ShutdownUtil.Hook() 1613 { 1614 @Override 1615 public Priority priority() 1616 { 1617 return Priority.WORK_CHILD; 1618 } 1619 1620 @Override 1621 public void execute() 1622 { 1623 logInfo( "shutdown hook calling stop on flow" ); 1624 1625 BaseFlow.this.stop(); 1626 } 1627 }; 1628 1629 ShutdownUtil.addHook( shutdownHook ); 1630 } 1631 1632 private void deregisterShutdownHook() 1633 { 1634 if( !isStopJobsOnExit() || stop ) 1635 return; 1636 1637 ShutdownUtil.removeHook( shutdownHook ); 1638 } 1639 1640 /** Class FlowHolder is a helper class for wrapping Flow instances. */ 1641 public static class FlowHolder 1642 { 1643 /** Field flow */ 1644 public Flow flow; 1645 1646 public FlowHolder() 1647 { 1648 } 1649 1650 public FlowHolder( Flow flow ) 1651 { 1652 this.flow = flow; 1653 } 1654 } 1655 1656 /** 1657 * Class SafeFlowListener safely calls a wrapped FlowListener. 1658 * <p/> 1659 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 1660 * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method 1661 * which in turn is run in a new Thread. 1662 */ 1663 private class SafeFlowListener implements FlowListener 1664 { 1665 /** Field flowListener */ 1666 final FlowListener flowListener; 1667 /** Field throwable */ 1668 Throwable throwable; 1669 1670 private SafeFlowListener( FlowListener flowListener ) 1671 { 1672 this.flowListener = flowListener; 1673 } 1674 1675 public void onStarting( Flow flow ) 1676 { 1677 try 1678 { 1679 flowListener.onStarting( flow ); 1680 } 1681 catch( Throwable throwable ) 1682 { 1683 handleThrowable( throwable ); 1684 } 1685 } 1686 1687 public void onStopping( Flow flow ) 1688 { 1689 try 1690 { 1691 flowListener.onStopping( flow ); 1692 } 1693 catch( Throwable throwable ) 1694 { 1695 handleThrowable( throwable ); 1696 } 1697 } 1698 1699 public void onCompleted( Flow flow ) 1700 { 1701 try 1702 { 1703 flowListener.onCompleted( flow ); 1704 } 1705 catch( Throwable throwable ) 1706 { 1707 handleThrowable( throwable ); 1708 } 1709 } 1710 1711 public boolean onThrowable( Flow flow, Throwable flowThrowable ) 1712 { 1713 try 1714 { 1715 return flowListener.onThrowable( flow, flowThrowable ); 1716 } 1717 catch( Throwable throwable ) 1718 { 1719 handleThrowable( throwable ); 1720 } 1721 1722 return false; 1723 } 1724 1725 private void handleThrowable( Throwable throwable ) 1726 { 1727 this.throwable = throwable; 1728 1729 logWarn( String.format( "flow listener %s threw throwable", flowListener ), throwable ); 1730 1731 // stop this flow 1732 stop(); 1733 } 1734 1735 public boolean equals( Object object ) 1736 { 1737 if( object instanceof BaseFlow.SafeFlowListener ) 1738 return flowListener.equals( ( (BaseFlow.SafeFlowListener) object ).flowListener ); 1739 1740 return flowListener.equals( object ); 1741 } 1742 1743 public int hashCode() 1744 { 1745 return flowListener.hashCode(); 1746 } 1747 } 1748 }