001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.Iterator; 031import java.util.LinkedHashMap; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Map; 035import java.util.Properties; 036import java.util.Set; 037import java.util.concurrent.Callable; 038import java.util.concurrent.Future; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.locks.ReentrantLock; 041 042import cascading.CascadingException; 043import cascading.cascade.Cascade; 044import cascading.flow.planner.BaseFlowNode; 045import cascading.flow.planner.BaseFlowStep; 046import cascading.flow.planner.FlowStepJob; 047import cascading.flow.planner.PlannerInfo; 048import cascading.flow.planner.PlatformInfo; 049import cascading.flow.planner.graph.FlowElementGraph; 050import cascading.flow.planner.process.FlowStepGraph; 051import cascading.flow.planner.process.ProcessGraphs; 052import cascading.management.CascadingServices; 053import cascading.management.UnitOfWorkExecutorStrategy; 054import cascading.management.UnitOfWorkSpawnStrategy; 055import cascading.management.state.ClientState; 056import cascading.property.AppProps; 057import cascading.property.PropertyUtil; 058import cascading.stats.FlowStats; 059import cascading.tap.Tap; 060import cascading.tuple.Fields; 061import cascading.tuple.TupleEntryCollector; 062import cascading.tuple.TupleEntryIterator; 063import cascading.util.ProcessLogger; 064import cascading.util.ShutdownUtil; 065import cascading.util.Update; 066import cascading.util.Util; 067import cascading.util.Version; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070import riffle.process.DependencyIncoming; 071import riffle.process.DependencyOutgoing; 072import riffle.process.ProcessCleanup; 073import riffle.process.ProcessComplete; 074import riffle.process.ProcessPrepare; 075import riffle.process.ProcessStart; 076import riffle.process.ProcessStop; 077 078import static cascading.util.Util.formatDurationFromMillis; 079 080@riffle.process.Process 081public abstract class BaseFlow<Config> implements Flow<Config>, ProcessLogger 082 { 083 /** Field LOG */ 084 private static final Logger LOG = LoggerFactory.getLogger( Flow.class ); // wrapped by ProcessLogger interface methods 085 private static final int LOG_FLOW_NAME_MAX = 25; 086 087 private PlannerInfo plannerInfo; 088 private PlatformInfo platformInfo; 089 090 /** Field id */ 091 private String id; 092 /** Field name */ 093 private String name; 094 /** Fields runID */ 095 private String runID; 096 /** Fields classpath */ 097 private List<String> classPath; // may remain null 098 /** Field tags */ 099 private String tags; 100 /** Field listeners */ 101 private List<SafeFlowListener> listeners; 102 /** Field skipStrategy */ 103 private FlowSkipStrategy flowSkipStrategy = new FlowSkipIfSinkNotStale(); 104 /** Field flowStats */ 105 protected FlowStats flowStats; // don't use a listener to set values 106 /** Field sources */ 107 protected Map<String, Tap> sources = Collections.emptyMap(); 108 /** Field sinks */ 109 protected Map<String, Tap> sinks = Collections.emptyMap(); 110 /** Field traps */ 111 private Map<String, Tap> traps = Collections.emptyMap(); 112 /** Field checkpoints */ 113 private Map<String, Tap> checkpoints = Collections.emptyMap(); 114 /** Field stopJobsOnExit */ 115 protected boolean stopJobsOnExit = true; 116 /** Field submitPriority */ 117 private int submitPriority = 5; 118 119 /** Field stepGraph */ 120 private FlowStepGraph flowStepGraph; 121 /** Field thread */ 122 protected transient Thread thread; 123 /** Field throwable */ 124 private Throwable throwable; 125 /** Field stop */ 126 protected boolean stop; 127 128 /** Field pipeGraph */ 129 private FlowElementGraph flowElementGraph; // only used for documentation purposes 130 131 private transient CascadingServices cascadingServices; 132 133 private FlowStepStrategy<Config> flowStepStrategy = null; 134 /** Field steps */ 135 private transient List<FlowStep<Config>> steps; 136 /** Field jobsMap */ 137 private transient Map<String, FlowStepJob<Config>> jobsMap; 138 private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy(); 139 140 private transient ReentrantLock stopLock = new ReentrantLock( true ); 141 protected ShutdownUtil.Hook shutdownHook; 142 143 private HashMap<String, String> flowDescriptor; 144 145 /** 146 * Returns property stopJobsOnExit. 147 * 148 * @param properties of type Map 149 * @return a boolean 150 */ 151 static boolean getStopJobsOnExit( Map<Object, Object> properties ) 152 { 153 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, FlowProps.STOP_JOBS_ON_EXIT, "true" ) ); 154 } 155 156 /** Used for testing. */ 157 protected BaseFlow() 158 { 159 this.name = "NA"; 160 this.flowStats = createPrepareFlowStats(); 161 } 162 163 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name ) 164 { 165 this( platformInfo, properties, defaultConfig, name, new LinkedHashMap<String, String>() ); 166 } 167 168 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name, Map<String, String> flowDescriptor ) 169 { 170 this.platformInfo = platformInfo; 171 this.name = name; 172 173 if( flowDescriptor != null ) 174 this.flowDescriptor = new LinkedHashMap<>( flowDescriptor ); 175 176 addSessionProperties( properties ); 177 initConfig( properties, defaultConfig ); 178 179 this.flowStats = createPrepareFlowStats(); // must be last 180 } 181 182 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, FlowDef flowDef ) 183 { 184 properties = PropertyUtil.asFlatMap( properties ); 185 186 this.platformInfo = platformInfo; 187 this.name = flowDef.getName(); 188 this.tags = flowDef.getTags(); 189 this.runID = flowDef.getRunID(); 190 this.classPath = flowDef.getClassPath(); 191 192 if( !flowDef.getFlowDescriptor().isEmpty() ) 193 this.flowDescriptor = new LinkedHashMap<String, String>( flowDef.getFlowDescriptor() ); 194 195 addSessionProperties( properties ); 196 initConfig( properties, defaultConfig ); 197 setSources( flowDef.getSourcesCopy() ); 198 setSinks( flowDef.getSinksCopy() ); 199 setTraps( flowDef.getTrapsCopy() ); 200 setCheckpoints( flowDef.getCheckpointsCopy() ); 201 initFromTaps(); 202 203 retrieveSourceFields(); 204 retrieveSinkFields(); 205 } 206 207 public void setPlannerInfo( PlannerInfo plannerInfo ) 208 { 209 this.plannerInfo = plannerInfo; 210 } 211 212 @Override 213 public PlannerInfo getPlannerInfo() 214 { 215 return plannerInfo; 216 } 217 218 @Override 219 public PlatformInfo getPlatformInfo() 220 { 221 return platformInfo; 222 } 223 224 public void initialize( FlowElementGraph flowElementGraph, FlowStepGraph flowStepGraph ) 225 { 226 addPlannerProperties(); 227 this.flowElementGraph = flowElementGraph; 228 this.flowStepGraph = flowStepGraph; 229 230 initSteps(); 231 232 this.flowStats = createPrepareFlowStats(); // must be last 233 234 initializeNewJobsMap(); 235 } 236 237 public FlowElementGraph updateSchemes( FlowElementGraph pipeGraph ) 238 { 239 presentSourceFields( pipeGraph ); 240 241 presentSinkFields( pipeGraph ); 242 243 return new FlowElementGraph( pipeGraph ); 244 } 245 246 /** Force a Scheme to fetch any fields from a meta-data store */ 247 protected void retrieveSourceFields() 248 { 249 for( Tap tap : sources.values() ) 250 tap.retrieveSourceFields( getFlowProcess() ); 251 } 252 253 /** 254 * Present the current resolved fields for the Tap 255 * 256 * @param pipeGraph 257 */ 258 protected void presentSourceFields( FlowElementGraph pipeGraph ) 259 { 260 for( Tap tap : sources.values() ) 261 { 262 if( pipeGraph.containsVertex( tap ) ) 263 tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 264 } 265 266 for( Tap tap : checkpoints.values() ) 267 { 268 if( pipeGraph.containsVertex( tap ) ) 269 tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 270 } 271 } 272 273 /** Force a Scheme to fetch any fields from a meta-data store */ 274 protected void retrieveSinkFields() 275 { 276 for( Tap tap : sinks.values() ) 277 tap.retrieveSinkFields( getFlowProcess() ); 278 } 279 280 /** 281 * Present the current resolved fields for the Tap 282 * 283 * @param pipeGraph 284 */ 285 protected void presentSinkFields( FlowElementGraph pipeGraph ) 286 { 287 for( Tap tap : sinks.values() ) 288 { 289 if( pipeGraph.containsVertex( tap ) ) 290 tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 291 } 292 293 for( Tap tap : checkpoints.values() ) 294 { 295 if( pipeGraph.containsVertex( tap ) ) 296 tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 297 } 298 } 299 300 protected Fields getFieldsFor( FlowElementGraph pipeGraph, Tap tap ) 301 { 302 return pipeGraph.outgoingEdgesOf( tap ).iterator().next().getOutValuesFields(); 303 } 304 305 private void addSessionProperties( Map<Object, Object> properties ) 306 { 307 if( properties == null ) 308 return; 309 310 PropertyUtil.setProperty( properties, CASCADING_FLOW_ID, getID() ); 311 PropertyUtil.setProperty( properties, "cascading.flow.tags", getTags() ); 312 AppProps.setApplicationID( properties ); 313 PropertyUtil.setProperty( properties, "cascading.app.name", makeAppName( properties ) ); 314 PropertyUtil.setProperty( properties, "cascading.app.version", makeAppVersion( properties ) ); 315 } 316 317 protected void addPlannerProperties() 318 { 319 setConfigProperty( getConfig(), "cascading.flow.planner", getPlannerInfo().name ); 320 setConfigProperty( getConfig(), "cascading.flow.platform", getPlannerInfo().platform ); 321 setConfigProperty( getConfig(), "cascading.flow.registry", getPlannerInfo().registry ); 322 } 323 324 private String makeAppName( Map<Object, Object> properties ) 325 { 326 if( properties == null ) 327 return null; 328 329 String name = AppProps.getApplicationName( properties ); 330 331 if( name != null ) 332 return name; 333 334 return Util.findName( AppProps.getApplicationJarPath( properties ) ); 335 } 336 337 private String makeAppVersion( Map<Object, Object> properties ) 338 { 339 if( properties == null ) 340 return null; 341 342 String name = AppProps.getApplicationVersion( properties ); 343 344 if( name != null ) 345 return name; 346 347 return Util.findVersion( AppProps.getApplicationJarPath( properties ) ); 348 } 349 350 private FlowStats createPrepareFlowStats() 351 { 352 FlowStats flowStats = new FlowStats( this, getClientState() ); 353 354 flowStats.prepare(); 355 flowStats.markPending(); 356 357 return flowStats; 358 } 359 360 public CascadingServices getCascadingServices() 361 { 362 if( cascadingServices == null ) 363 cascadingServices = new CascadingServices( getConfigAsProperties() ); 364 365 return cascadingServices; 366 } 367 368 private ClientState getClientState() 369 { 370 return getFlowSession().getCascadingServices().createClientState( getID() ); 371 } 372 373 protected void initSteps() 374 { 375 if( flowStepGraph == null ) 376 return; 377 378 Set<FlowStep> flowSteps = flowStepGraph.vertexSet(); 379 380 for( FlowStep flowStep : flowSteps ) 381 { 382 ( (BaseFlowStep) flowStep ).setFlow( this ); 383 384 Set<FlowNode> flowNodes = flowStep.getFlowNodeGraph().vertexSet(); 385 386 for( FlowNode flowNode : flowNodes ) 387 ( (BaseFlowNode) flowNode ).setFlowStep( flowStep ); 388 } 389 } 390 391 private void initFromTaps() 392 { 393 initFromTaps( sources ); 394 initFromTaps( sinks ); 395 initFromTaps( traps ); 396 } 397 398 private void initFromTaps( Map<String, Tap> taps ) 399 { 400 for( Tap tap : taps.values() ) 401 tap.flowConfInit( this ); 402 } 403 404 @Override 405 public String getName() 406 { 407 return name; 408 } 409 410 protected void setName( String name ) 411 { 412 this.name = name; 413 } 414 415 @Override 416 public String getID() 417 { 418 if( id == null ) 419 id = Util.createUniqueID(); 420 421 return id; 422 } 423 424 @Override 425 public String getTags() 426 { 427 return tags; 428 } 429 430 @Override 431 public int getSubmitPriority() 432 { 433 return submitPriority; 434 } 435 436 @Override 437 public void setSubmitPriority( int submitPriority ) 438 { 439 if( submitPriority < 1 || submitPriority > 10 ) 440 throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority ); 441 442 this.submitPriority = submitPriority; 443 } 444 445 FlowElementGraph getFlowElementGraph() 446 { 447 return flowElementGraph; 448 } 449 450 FlowStepGraph getFlowStepGraph() 451 { 452 return flowStepGraph; 453 } 454 455 protected void setSources( Map<String, Tap> sources ) 456 { 457 addListeners( sources.values() ); 458 this.sources = sources; 459 } 460 461 protected void setSinks( Map<String, Tap> sinks ) 462 { 463 addListeners( sinks.values() ); 464 this.sinks = sinks; 465 } 466 467 protected void setTraps( Map<String, Tap> traps ) 468 { 469 addListeners( traps.values() ); 470 this.traps = traps; 471 } 472 473 protected void setCheckpoints( Map<String, Tap> checkpoints ) 474 { 475 addListeners( checkpoints.values() ); 476 this.checkpoints = checkpoints; 477 } 478 479 protected void setFlowStepGraph( FlowStepGraph flowStepGraph ) 480 { 481 this.flowStepGraph = flowStepGraph; 482 } 483 484 /** 485 * This method creates a new internal Config with the parentConfig as defaults using the properties to override 486 * the defaults. 487 * 488 * @param properties of type Map 489 * @param parentConfig of type Config 490 */ 491 protected abstract void initConfig( Map<Object, Object> properties, Config parentConfig ); 492 493 public Config createConfig( Map<Object, Object> properties, Config defaultConfig ) 494 { 495 Config config = newConfig( defaultConfig ); 496 497 if( properties == null ) 498 return config; 499 500 Set<Object> keys = new HashSet<>( properties.keySet() ); 501 502 // keys will only be grabbed if both key/value are String, so keep orig keys 503 if( properties instanceof Properties ) 504 keys.addAll( ( (Properties) properties ).stringPropertyNames() ); 505 506 for( Object key : keys ) 507 { 508 Object value = properties.get( key ); 509 510 if( value == null && properties instanceof Properties && key instanceof String ) 511 value = ( (Properties) properties ).getProperty( (String) key ); 512 513 if( value == null ) // don't stuff null values 514 continue; 515 516 setConfigProperty( config, key, value ); 517 } 518 519 return config; 520 } 521 522 protected abstract void setConfigProperty( Config config, Object key, Object value ); 523 524 protected abstract Config newConfig( Config defaultConfig ); 525 526 protected void initFromProperties( Map<Object, Object> properties ) 527 { 528 stopJobsOnExit = getStopJobsOnExit( properties ); 529 } 530 531 public FlowSession getFlowSession() 532 { 533 return new FlowSession( getCascadingServices() ); 534 } 535 536 @Override 537 public FlowStats getFlowStats() 538 { 539 return flowStats; 540 } 541 542 @Override 543 public Map<String, String> getFlowDescriptor() 544 { 545 if( flowDescriptor == null ) 546 return Collections.emptyMap(); 547 548 return Collections.unmodifiableMap( flowDescriptor ); 549 } 550 551 @Override 552 public FlowStats getStats() 553 { 554 return getFlowStats(); 555 } 556 557 void addListeners( Collection listeners ) 558 { 559 for( Object listener : listeners ) 560 { 561 if( listener instanceof FlowListener ) 562 addListener( (FlowListener) listener ); 563 } 564 } 565 566 List<SafeFlowListener> getListeners() 567 { 568 if( listeners == null ) 569 listeners = new LinkedList<SafeFlowListener>(); 570 571 return listeners; 572 } 573 574 @Override 575 public boolean hasListeners() 576 { 577 return listeners != null && !listeners.isEmpty(); 578 } 579 580 @Override 581 public void addListener( FlowListener flowListener ) 582 { 583 getListeners().add( new SafeFlowListener( flowListener ) ); 584 } 585 586 @Override 587 public boolean removeListener( FlowListener flowListener ) 588 { 589 return getListeners().remove( new SafeFlowListener( flowListener ) ); 590 } 591 592 @Override 593 public boolean hasStepListeners() 594 { 595 boolean hasStepListeners = false; 596 597 for( FlowStep step : getFlowSteps() ) 598 hasStepListeners |= step.hasListeners(); 599 600 return hasStepListeners; 601 } 602 603 @Override 604 public void addStepListener( FlowStepListener flowStepListener ) 605 { 606 for( FlowStep step : getFlowSteps() ) 607 step.addListener( flowStepListener ); 608 } 609 610 @Override 611 public boolean removeStepListener( FlowStepListener flowStepListener ) 612 { 613 boolean listenerRemoved = true; 614 615 for( FlowStep step : getFlowSteps() ) 616 listenerRemoved &= step.removeListener( flowStepListener ); 617 618 return listenerRemoved; 619 } 620 621 @Override 622 public Map<String, Tap> getSources() 623 { 624 return Collections.unmodifiableMap( sources ); 625 } 626 627 @Override 628 public List<String> getSourceNames() 629 { 630 return new ArrayList<String>( sources.keySet() ); 631 } 632 633 @Override 634 public Tap getSource( String name ) 635 { 636 return sources.get( name ); 637 } 638 639 @Override 640 @DependencyIncoming 641 public Collection<Tap> getSourcesCollection() 642 { 643 return getSources().values(); 644 } 645 646 @Override 647 public Map<String, Tap> getSinks() 648 { 649 return Collections.unmodifiableMap( sinks ); 650 } 651 652 @Override 653 public List<String> getSinkNames() 654 { 655 return new ArrayList<String>( sinks.keySet() ); 656 } 657 658 @Override 659 public Tap getSink( String name ) 660 { 661 return sinks.get( name ); 662 } 663 664 @Override 665 @DependencyOutgoing 666 public Collection<Tap> getSinksCollection() 667 { 668 return getSinks().values(); 669 } 670 671 @Override 672 public Tap getSink() 673 { 674 return sinks.values().iterator().next(); 675 } 676 677 @Override 678 public Map<String, Tap> getTraps() 679 { 680 return Collections.unmodifiableMap( traps ); 681 } 682 683 @Override 684 public List<String> getTrapNames() 685 { 686 return new ArrayList<String>( traps.keySet() ); 687 } 688 689 @Override 690 public Collection<Tap> getTrapsCollection() 691 { 692 return getTraps().values(); 693 } 694 695 @Override 696 public Map<String, Tap> getCheckpoints() 697 { 698 return Collections.unmodifiableMap( checkpoints ); 699 } 700 701 @Override 702 public List<String> getCheckpointNames() 703 { 704 return new ArrayList<String>( checkpoints.keySet() ); 705 } 706 707 @Override 708 public Collection<Tap> getCheckpointsCollection() 709 { 710 return getCheckpoints().values(); 711 } 712 713 @Override 714 public boolean isStopJobsOnExit() 715 { 716 return stopJobsOnExit; 717 } 718 719 @Override 720 public FlowSkipStrategy getFlowSkipStrategy() 721 { 722 return flowSkipStrategy; 723 } 724 725 @Override 726 public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ) 727 { 728 if( flowSkipStrategy == null ) 729 throw new IllegalArgumentException( "flowSkipStrategy may not be null" ); 730 731 try 732 { 733 return this.flowSkipStrategy; 734 } 735 finally 736 { 737 this.flowSkipStrategy = flowSkipStrategy; 738 } 739 } 740 741 @Override 742 public boolean isSkipFlow() throws IOException 743 { 744 return flowSkipStrategy.skipFlow( this ); 745 } 746 747 @Override 748 public boolean areSinksStale() throws IOException 749 { 750 return areSourcesNewer( getSinkModified() ); 751 } 752 753 @Override 754 public boolean areSourcesNewer( long sinkModified ) throws IOException 755 { 756 Config config = getConfig(); 757 Iterator<Tap> values = sources.values().iterator(); 758 759 long sourceModified = 0; 760 761 try 762 { 763 sourceModified = Util.getSourceModified( config, values, sinkModified ); 764 765 if( sinkModified < sourceModified ) 766 return true; 767 768 return false; 769 } 770 finally 771 { 772 if( LOG.isInfoEnabled() ) 773 logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all 774 } 775 } 776 777 @Override 778 public long getSinkModified() throws IOException 779 { 780 long sinkModified = Util.getSinkModified( getConfig(), sinks.values() ); 781 782 if( LOG.isInfoEnabled() ) 783 { 784 if( sinkModified == -1L ) 785 logInfo( "at least one sink is marked for delete" ); 786 if( sinkModified == 0L ) 787 logInfo( "at least one sink does not exist" ); 788 else 789 logInfo( "sink oldest modified date: " + new Date( sinkModified ) ); 790 } 791 792 return sinkModified; 793 } 794 795 @Override 796 public FlowStepStrategy getFlowStepStrategy() 797 { 798 return flowStepStrategy; 799 } 800 801 @Override 802 public void setFlowStepStrategy( FlowStepStrategy flowStepStrategy ) 803 { 804 this.flowStepStrategy = flowStepStrategy; 805 } 806 807 @Override 808 public List<FlowStep<Config>> getFlowSteps() 809 { 810 if( steps != null ) 811 return steps; 812 813 if( flowStepGraph == null ) 814 return Collections.emptyList(); 815 816 Iterator<FlowStep> topoIterator = flowStepGraph.getTopologicalIterator(); 817 818 steps = new ArrayList<>(); 819 820 while( topoIterator.hasNext() ) 821 steps.add( topoIterator.next() ); 822 823 return steps; 824 } 825 826 @Override 827 @ProcessPrepare 828 public void prepare() 829 { 830 try 831 { 832 deleteSinksIfNotUpdate(); 833 deleteTrapsIfNotUpdate(); 834 deleteCheckpointsIfNotUpdate(); 835 } 836 catch( IOException exception ) 837 { 838 throw new FlowException( "unable to prepare flow", exception ); 839 } 840 } 841 842 @Override 843 @ProcessStart 844 public synchronized void start() 845 { 846 if( thread != null ) 847 return; 848 849 if( stop ) 850 return; 851 852 registerShutdownHook(); 853 854 internalStart(); 855 856 String threadName = ( "flow " + Util.toNull( getName() ) ).trim(); 857 858 thread = createFlowThread( threadName ); 859 860 thread.start(); 861 } 862 863 protected Thread createFlowThread( String threadName ) 864 { 865 return new Thread( new Runnable() 866 { 867 @Override 868 public void run() 869 { 870 BaseFlow.this.run(); 871 } 872 }, threadName ); 873 } 874 875 protected abstract void internalStart(); 876 877 @Override 878 @ProcessStop 879 public synchronized void stop() 880 { 881 stopLock.lock(); 882 883 try 884 { 885 if( stop ) 886 return; 887 888 stop = true; 889 890 fireOnStopping(); 891 892 if( !flowStats.isFinished() ) 893 flowStats.markStopped(); 894 895 internalStopAllJobs(); 896 897 handleExecutorShutdown(); 898 899 internalClean( true ); 900 } 901 finally 902 { 903 flowStats.cleanup(); 904 stopLock.unlock(); 905 } 906 } 907 908 protected abstract void internalClean( boolean stop ); 909 910 @Override 911 @ProcessComplete 912 public void complete() 913 { 914 start(); 915 916 try 917 { 918 try 919 { 920 synchronized( this ) // prevent NPE on quick stop() & complete() after start() 921 { 922 while( thread == null && !stop ) 923 Util.safeSleep( 10 ); 924 } 925 926 if( thread != null ) 927 thread.join(); 928 } 929 catch( InterruptedException exception ) 930 { 931 throw new FlowException( getName(), "thread interrupted", exception ); 932 } 933 934 // if in #stop and stopping, lets wait till its done in this thread 935 try 936 { 937 stopLock.lock(); 938 } 939 finally 940 { 941 stopLock.unlock(); 942 } 943 944 if( throwable instanceof FlowException ) 945 ( (FlowException) throwable ).setFlowName( getName() ); 946 947 if( throwable instanceof CascadingException ) 948 throw (CascadingException) throwable; 949 950 if( throwable instanceof OutOfMemoryError ) 951 throw (OutOfMemoryError) throwable; 952 953 if( throwable != null ) 954 throw new FlowException( getName(), "unhandled exception", throwable ); 955 956 if( hasListeners() ) 957 { 958 for( SafeFlowListener safeFlowListener : getListeners() ) 959 { 960 if( safeFlowListener.throwable != null ) 961 throw new FlowException( getName(), "unhandled listener exception", throwable ); 962 } 963 } 964 } 965 finally 966 { 967 thread = null; 968 throwable = null; 969 } 970 } 971 972 private void commitTraps() 973 { 974 // commit all the traps, don't fail on an error 975 for( Tap tap : traps.values() ) 976 { 977 try 978 { 979 if( !tap.commitResource( getConfig() ) ) 980 logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ) ); 981 } 982 catch( IOException exception ) 983 { 984 logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), exception ); 985 } 986 } 987 } 988 989 @Override 990 @ProcessCleanup 991 public void cleanup() 992 { 993 // do nothing 994 } 995 996 @Override 997 public TupleEntryIterator openSource() throws IOException 998 { 999 return sources.values().iterator().next().openForRead( getFlowProcess() ); 1000 } 1001 1002 @Override 1003 public TupleEntryIterator openSource( String name ) throws IOException 1004 { 1005 if( !sources.containsKey( name ) ) 1006 throw new IllegalArgumentException( "source does not exist: " + name ); 1007 1008 return sources.get( name ).openForRead( getFlowProcess() ); 1009 } 1010 1011 @Override 1012 public TupleEntryIterator openSink() throws IOException 1013 { 1014 return sinks.values().iterator().next().openForRead( getFlowProcess() ); 1015 } 1016 1017 @Override 1018 public TupleEntryIterator openSink( String name ) throws IOException 1019 { 1020 if( !sinks.containsKey( name ) ) 1021 throw new IllegalArgumentException( "sink does not exist: " + name ); 1022 1023 return sinks.get( name ).openForRead( getFlowProcess() ); 1024 } 1025 1026 @Override 1027 public TupleEntryIterator openTrap() throws IOException 1028 { 1029 return traps.values().iterator().next().openForRead( getFlowProcess() ); 1030 } 1031 1032 @Override 1033 public TupleEntryIterator openTrap( String name ) throws IOException 1034 { 1035 if( !traps.containsKey( name ) ) 1036 throw new IllegalArgumentException( "trap does not exist: " + name ); 1037 1038 return traps.get( name ).openForRead( getFlowProcess() ); 1039 } 1040 1041 /** 1042 * Method deleteSinks deletes all sinks, whether or not they are configured for {@link cascading.tap.SinkMode#UPDATE}. 1043 * <p/> 1044 * Use with caution. 1045 * 1046 * @throws IOException when 1047 * @see BaseFlow#deleteSinksIfNotUpdate() 1048 */ 1049 public void deleteSinks() throws IOException 1050 { 1051 for( Tap tap : sinks.values() ) 1052 deleteOrFail( tap ); 1053 } 1054 1055 private void deleteOrFail( Tap tap ) throws IOException 1056 { 1057 if( !tap.resourceExists( getConfig() ) ) 1058 return; 1059 1060 if( !tap.deleteResource( getConfig() ) ) 1061 throw new FlowException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) ); 1062 } 1063 1064 /** 1065 * Method deleteSinksIfNotUpdate deletes all sinks if they are not configured with the {@link cascading.tap.SinkMode#UPDATE} flag. 1066 * <p/> 1067 * Typically used by a {@link Cascade} before executing the flow if the sinks are stale. 1068 * <p/> 1069 * Use with caution. 1070 * 1071 * @throws IOException when 1072 */ 1073 public void deleteSinksIfNotUpdate() throws IOException 1074 { 1075 for( Tap tap : sinks.values() ) 1076 { 1077 if( !tap.isUpdate() ) 1078 deleteOrFail( tap ); 1079 } 1080 } 1081 1082 public void deleteSinksIfReplace() throws IOException 1083 { 1084 for( Tap tap : sinks.values() ) 1085 { 1086 if( tap.isReplace() ) 1087 deleteOrFail( tap ); 1088 } 1089 } 1090 1091 public void deleteTrapsIfNotUpdate() throws IOException 1092 { 1093 for( Tap tap : traps.values() ) 1094 { 1095 if( !tap.isUpdate() ) 1096 deleteOrFail( tap ); 1097 } 1098 } 1099 1100 public void deleteCheckpointsIfNotUpdate() throws IOException 1101 { 1102 for( Tap tap : checkpoints.values() ) 1103 { 1104 if( !tap.isUpdate() ) 1105 deleteOrFail( tap ); 1106 } 1107 } 1108 1109 public void deleteTrapsIfReplace() throws IOException 1110 { 1111 for( Tap tap : traps.values() ) 1112 { 1113 if( tap.isReplace() ) 1114 deleteOrFail( tap ); 1115 } 1116 } 1117 1118 public void deleteCheckpointsIfReplace() throws IOException 1119 { 1120 for( Tap tap : checkpoints.values() ) 1121 { 1122 if( tap.isReplace() ) 1123 deleteOrFail( tap ); 1124 } 1125 } 1126 1127 @Override 1128 public boolean resourceExists( Tap tap ) throws IOException 1129 { 1130 return tap.resourceExists( getConfig() ); 1131 } 1132 1133 @Override 1134 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 1135 { 1136 return tap.openForRead( getFlowProcess() ); 1137 } 1138 1139 @Override 1140 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 1141 { 1142 return tap.openForWrite( getFlowProcess() ); 1143 } 1144 1145 /** Method run implements the Runnable run method and should not be called by users. */ 1146 private void run() 1147 { 1148 if( thread == null ) 1149 throw new IllegalStateException( "to start a Flow call start() or complete(), not Runnable#run()" ); 1150 1151 Version.printBanner(); 1152 Update.checkForUpdate( getPlatformInfo() ); 1153 1154 try 1155 { 1156 if( stop ) 1157 return; 1158 1159 flowStats.markStarted(); 1160 1161 fireOnStarting(); 1162 1163 if( LOG.isInfoEnabled() ) 1164 { 1165 logInfo( "starting" ); 1166 1167 for( Tap source : getSourcesCollection() ) 1168 logInfo( " source: " + source ); 1169 for( Tap sink : getSinksCollection() ) 1170 logInfo( " sink: " + sink ); 1171 } 1172 1173 // if jobs are run local, then only use one thread to force execution serially 1174 //int numThreads = jobsAreLocal() ? 1 : getMaxConcurrentSteps( getJobConf() ); 1175 int numThreads = getMaxNumParallelSteps(); 1176 1177 if( numThreads == 0 ) 1178 numThreads = jobsMap.size(); 1179 1180 if( numThreads == 0 ) 1181 throw new IllegalStateException( "no jobs rendered for flow: " + getName() ); 1182 1183 if( LOG.isInfoEnabled() ) 1184 { 1185 logInfo( " parallel execution of steps is enabled: " + ( getMaxNumParallelSteps() != 1 ) ); 1186 logInfo( " executing total steps: " + jobsMap.size() ); 1187 logInfo( " allocating management threads: " + numThreads ); 1188 } 1189 1190 List<Future<Throwable>> futures = spawnJobs( numThreads ); 1191 1192 for( Future<Throwable> future : futures ) 1193 { 1194 throwable = future.get(); 1195 1196 if( throwable != null ) 1197 { 1198 if( !stop ) 1199 internalStopAllJobs(); 1200 1201 handleExecutorShutdown(); 1202 break; 1203 } 1204 } 1205 } 1206 catch( Throwable throwable ) 1207 { 1208 this.throwable = throwable; 1209 } 1210 finally 1211 { 1212 handleThrowableAndMarkFailed(); 1213 1214 if( !stop && !flowStats.isFinished() ) 1215 flowStats.markSuccessful(); 1216 1217 internalClean( stop ); // cleaning temp taps may be determined by success/failure 1218 1219 commitTraps(); 1220 1221 try 1222 { 1223 fireOnCompleted(); 1224 } 1225 finally 1226 { 1227 if( LOG.isInfoEnabled() ) 1228 { 1229 long totalSliceCPUSeconds = getTotalSliceCPUMilliSeconds(); 1230 1231 if( totalSliceCPUSeconds == -1 ) 1232 logInfo( " completed in: " + formatDurationFromMillis( flowStats.getDuration() ) ); 1233 else 1234 logInfo( " completed in: " + formatDurationFromMillis( flowStats.getDuration() ) + ", using cpu time: " + formatDurationFromMillis( totalSliceCPUSeconds ) ); 1235 } 1236 1237 flowStats.cleanup(); 1238 internalShutdown(); 1239 deregisterShutdownHook(); 1240 } 1241 } 1242 } 1243 1244 protected long getTotalSliceCPUMilliSeconds() 1245 { 1246 return -1; 1247 } 1248 1249 protected abstract int getMaxNumParallelSteps(); 1250 1251 protected abstract void internalShutdown(); 1252 1253 private List<Future<Throwable>> spawnJobs( int numThreads ) throws InterruptedException 1254 { 1255 if( stop ) 1256 return new ArrayList<Future<Throwable>>(); 1257 1258 List<Callable<Throwable>> list = new ArrayList<Callable<Throwable>>(); 1259 1260 for( FlowStepJob<Config> job : jobsMap.values() ) 1261 list.add( job ); 1262 1263 return spawnStrategy.start( this, numThreads, list ); 1264 } 1265 1266 private void handleThrowableAndMarkFailed() 1267 { 1268 if( throwable != null && !stop ) 1269 { 1270 flowStats.markFailed( throwable ); 1271 1272 fireOnThrowable(); 1273 } 1274 } 1275 1276 Map<String, FlowStepJob<Config>> getJobsMap() 1277 { 1278 return jobsMap; 1279 } 1280 1281 protected void initializeNewJobsMap() 1282 { 1283 jobsMap = new LinkedHashMap<>(); // keep topo order 1284 Iterator<FlowStep> topoIterator = flowStepGraph.getTopologicalIterator(); 1285 1286 while( topoIterator.hasNext() ) 1287 { 1288 BaseFlowStep<Config> step = (BaseFlowStep) topoIterator.next(); 1289 FlowStepJob<Config> flowStepJob = step.getCreateFlowStepJob( getFlowProcess(), getConfig() ); 1290 1291 jobsMap.put( step.getName(), flowStepJob ); 1292 1293 List<FlowStepJob<Config>> predecessors = new ArrayList<FlowStepJob<Config>>(); 1294 1295 for( Object flowStep : ProcessGraphs.predecessorListOf( flowStepGraph, step ) ) 1296 predecessors.add( jobsMap.get( ( (FlowStep) flowStep ).getName() ) ); 1297 1298 flowStepJob.setPredecessors( predecessors ); 1299 1300 flowStats.addStepStats( flowStepJob.getStepStats() ); 1301 } 1302 } 1303 1304 protected void internalStopAllJobs() 1305 { 1306 logInfo( "stopping all jobs" ); 1307 1308 try 1309 { 1310 if( jobsMap == null ) 1311 return; 1312 1313 List<FlowStepJob<Config>> jobs = new ArrayList<FlowStepJob<Config>>( jobsMap.values() ); 1314 1315 Collections.reverse( jobs ); 1316 1317 for( FlowStepJob<Config> job : jobs ) 1318 job.stop(); 1319 } 1320 finally 1321 { 1322 logInfo( "stopped all jobs" ); 1323 } 1324 } 1325 1326 protected void handleExecutorShutdown() 1327 { 1328 if( spawnStrategy.isCompleted( this ) ) 1329 return; 1330 1331 logInfo( "shutting down job executor" ); 1332 1333 try 1334 { 1335 spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS ); 1336 } 1337 catch( InterruptedException exception ) 1338 { 1339 // ignore 1340 } 1341 1342 logInfo( "shutdown complete" ); 1343 } 1344 1345 protected void fireOnCompleted() 1346 { 1347 if( hasListeners() ) 1348 { 1349 if( LOG.isDebugEnabled() ) 1350 logDebug( "firing onCompleted event: " + getListeners().size() ); 1351 1352 for( FlowListener flowListener : getListeners() ) 1353 flowListener.onCompleted( this ); 1354 } 1355 } 1356 1357 protected void fireOnThrowable( Throwable throwable ) 1358 { 1359 this.throwable = throwable; 1360 fireOnThrowable(); 1361 } 1362 1363 protected void fireOnThrowable() 1364 { 1365 if( hasListeners() ) 1366 { 1367 if( LOG.isDebugEnabled() ) 1368 logDebug( "firing onThrowable event: " + getListeners().size() ); 1369 1370 boolean isHandled = false; 1371 1372 for( FlowListener flowListener : getListeners() ) 1373 isHandled = flowListener.onThrowable( this, throwable ) || isHandled; 1374 1375 if( isHandled ) 1376 throwable = null; 1377 } 1378 } 1379 1380 protected void fireOnStopping() 1381 { 1382 if( hasListeners() ) 1383 { 1384 if( LOG.isDebugEnabled() ) 1385 logDebug( "firing onStopping event: " + getListeners().size() ); 1386 1387 for( FlowListener flowListener : getListeners() ) 1388 flowListener.onStopping( this ); 1389 } 1390 } 1391 1392 protected void fireOnStarting() 1393 { 1394 if( hasListeners() ) 1395 { 1396 if( LOG.isDebugEnabled() ) 1397 logDebug( "firing onStarting event: " + getListeners().size() ); 1398 1399 for( FlowListener flowListener : getListeners() ) 1400 flowListener.onStarting( this ); 1401 } 1402 } 1403 1404 @Override 1405 public String toString() 1406 { 1407 StringBuffer buffer = new StringBuffer(); 1408 1409 if( getName() != null ) 1410 buffer.append( getName() ).append( ": " ); 1411 1412 for( FlowStep step : getFlowSteps() ) 1413 buffer.append( step ); 1414 1415 return buffer.toString(); 1416 } 1417 1418 @Override 1419 public final boolean isInfoEnabled() 1420 { 1421 return LOG.isInfoEnabled(); 1422 } 1423 1424 @Override 1425 public final boolean isDebugEnabled() 1426 { 1427 return LOG.isDebugEnabled(); 1428 } 1429 1430 @Override 1431 public void logInfo( String message, Object... arguments ) 1432 { 1433 LOG.info( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments ); 1434 } 1435 1436 @Override 1437 public void logDebug( String message, Object... arguments ) 1438 { 1439 LOG.debug( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments ); 1440 } 1441 1442 @Override 1443 public void logWarn( String message ) 1444 { 1445 LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message ); 1446 } 1447 1448 @Override 1449 public void logWarn( String message, Throwable throwable ) 1450 { 1451 LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, throwable ); 1452 } 1453 1454 @Override 1455 public void logWarn( String message, Object... arguments ) 1456 { 1457 LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments ); 1458 } 1459 1460 @Override 1461 public void logError( String message, Object... arguments ) 1462 { 1463 LOG.error( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments ); 1464 } 1465 1466 @Override 1467 public void logError( String message, Throwable throwable ) 1468 { 1469 LOG.error( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, throwable ); 1470 } 1471 1472 @Override 1473 public void writeDOT( String filename ) 1474 { 1475 if( flowElementGraph == null ) 1476 throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" ); 1477 1478 flowElementGraph.writeDOT( filename ); 1479 } 1480 1481 @Override 1482 public void writeStepsDOT( String filename ) 1483 { 1484 if( flowStepGraph == null ) 1485 throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" ); 1486 1487 flowStepGraph.writeDOT( filename ); 1488 } 1489 1490 /** 1491 * Used to return a simple wrapper for use as an edge in a graph where there can only be 1492 * one instance of every edge. 1493 * 1494 * @return FlowHolder 1495 */ 1496 public FlowHolder getHolder() 1497 { 1498 return new FlowHolder( this ); 1499 } 1500 1501 public void setCascade( Cascade cascade ) 1502 { 1503 setConfigProperty( getConfig(), "cascading.cascade.id", cascade.getID() ); 1504 flowStats.recordInfo(); 1505 } 1506 1507 @Override 1508 public String getCascadeID() 1509 { 1510 return getProperty( "cascading.cascade.id" ); 1511 } 1512 1513 @Override 1514 public String getRunID() 1515 { 1516 return runID; 1517 } 1518 1519 public List<String> getClassPath() 1520 { 1521 return classPath; 1522 } 1523 1524 @Override 1525 public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy ) 1526 { 1527 this.spawnStrategy = spawnStrategy; 1528 } 1529 1530 @Override 1531 public UnitOfWorkSpawnStrategy getSpawnStrategy() 1532 { 1533 return spawnStrategy; 1534 } 1535 1536 protected void registerShutdownHook() 1537 { 1538 if( !isStopJobsOnExit() ) 1539 return; 1540 1541 shutdownHook = new ShutdownUtil.Hook() 1542 { 1543 @Override 1544 public Priority priority() 1545 { 1546 return Priority.WORK_CHILD; 1547 } 1548 1549 @Override 1550 public void execute() 1551 { 1552 logInfo( "shutdown hook calling stop on flow" ); 1553 1554 BaseFlow.this.stop(); 1555 } 1556 }; 1557 1558 ShutdownUtil.addHook( shutdownHook ); 1559 } 1560 1561 private void deregisterShutdownHook() 1562 { 1563 if( !isStopJobsOnExit() || stop ) 1564 return; 1565 1566 ShutdownUtil.removeHook( shutdownHook ); 1567 } 1568 1569 /** Class FlowHolder is a helper class for wrapping Flow instances. */ 1570 public static class FlowHolder 1571 { 1572 /** Field flow */ 1573 public Flow flow; 1574 1575 public FlowHolder() 1576 { 1577 } 1578 1579 public FlowHolder( Flow flow ) 1580 { 1581 this.flow = flow; 1582 } 1583 } 1584 1585 /** 1586 * Class SafeFlowListener safely calls a wrapped FlowListener. 1587 * <p/> 1588 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 1589 * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method 1590 * which in turn is run in a new Thread. 1591 */ 1592 private class SafeFlowListener implements FlowListener 1593 { 1594 /** Field flowListener */ 1595 final FlowListener flowListener; 1596 /** Field throwable */ 1597 Throwable throwable; 1598 1599 private SafeFlowListener( FlowListener flowListener ) 1600 { 1601 this.flowListener = flowListener; 1602 } 1603 1604 public void onStarting( Flow flow ) 1605 { 1606 try 1607 { 1608 flowListener.onStarting( flow ); 1609 } 1610 catch( Throwable throwable ) 1611 { 1612 handleThrowable( throwable ); 1613 } 1614 } 1615 1616 public void onStopping( Flow flow ) 1617 { 1618 try 1619 { 1620 flowListener.onStopping( flow ); 1621 } 1622 catch( Throwable throwable ) 1623 { 1624 handleThrowable( throwable ); 1625 } 1626 } 1627 1628 public void onCompleted( Flow flow ) 1629 { 1630 try 1631 { 1632 flowListener.onCompleted( flow ); 1633 } 1634 catch( Throwable throwable ) 1635 { 1636 handleThrowable( throwable ); 1637 } 1638 } 1639 1640 public boolean onThrowable( Flow flow, Throwable flowThrowable ) 1641 { 1642 try 1643 { 1644 return flowListener.onThrowable( flow, flowThrowable ); 1645 } 1646 catch( Throwable throwable ) 1647 { 1648 handleThrowable( throwable ); 1649 } 1650 1651 return false; 1652 } 1653 1654 private void handleThrowable( Throwable throwable ) 1655 { 1656 this.throwable = throwable; 1657 1658 logWarn( String.format( "flow listener %s threw throwable", flowListener ), throwable ); 1659 1660 // stop this flow 1661 stop(); 1662 } 1663 1664 public boolean equals( Object object ) 1665 { 1666 if( object instanceof BaseFlow.SafeFlowListener ) 1667 return flowListener.equals( ( (BaseFlow.SafeFlowListener) object ).flowListener ); 1668 1669 return flowListener.equals( object ); 1670 } 1671 1672 public int hashCode() 1673 { 1674 return flowListener.hashCode(); 1675 } 1676 } 1677 }