001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow; 022 023 import java.io.IOException; 024 import java.util.ArrayList; 025 import java.util.Collection; 026 import java.util.Collections; 027 import java.util.Date; 028 import java.util.HashSet; 029 import java.util.Iterator; 030 import java.util.LinkedHashMap; 031 import java.util.LinkedList; 032 import java.util.List; 033 import java.util.Map; 034 import java.util.Properties; 035 import java.util.Set; 036 import java.util.concurrent.Callable; 037 import java.util.concurrent.Future; 038 import java.util.concurrent.TimeUnit; 039 import java.util.concurrent.locks.ReentrantLock; 040 041 import cascading.CascadingException; 042 import cascading.cascade.Cascade; 043 import cascading.flow.planner.BaseFlowStep; 044 import cascading.flow.planner.ElementGraph; 045 import cascading.flow.planner.FlowStepGraph; 046 import cascading.flow.planner.FlowStepJob; 047 import cascading.flow.planner.PlatformInfo; 048 import cascading.management.CascadingServices; 049 import cascading.management.UnitOfWorkExecutorStrategy; 050 import cascading.management.UnitOfWorkSpawnStrategy; 051 import cascading.management.state.ClientState; 052 import cascading.property.AppProps; 053 import cascading.property.PropertyUtil; 054 import cascading.stats.FlowStats; 055 import cascading.tap.Tap; 056 import cascading.tuple.Fields; 057 import cascading.tuple.TupleEntryCollector; 058 import cascading.tuple.TupleEntryIterator; 059 import cascading.util.ShutdownUtil; 060 import cascading.util.Update; 061 import cascading.util.Util; 062 import cascading.util.Version; 063 import org.jgrapht.traverse.TopologicalOrderIterator; 064 import org.slf4j.Logger; 065 import org.slf4j.LoggerFactory; 066 import riffle.process.DependencyIncoming; 067 import riffle.process.DependencyOutgoing; 068 import riffle.process.ProcessCleanup; 069 import riffle.process.ProcessComplete; 070 import riffle.process.ProcessPrepare; 071 import riffle.process.ProcessStart; 072 import riffle.process.ProcessStop; 073 074 import static org.jgrapht.Graphs.predecessorListOf; 075 076 @riffle.process.Process 077 public abstract class BaseFlow<Config> implements Flow<Config> 078 { 079 /** Field LOG */ 080 private static final Logger LOG = LoggerFactory.getLogger( Flow.class ); 081 082 private PlatformInfo platformInfo; 083 084 /** Field id */ 085 private String id; 086 /** Field name */ 087 private String name; 088 /** Fields runID */ 089 private String runID; 090 /** Fields classpath */ 091 private List<String> classPath; // may remain null 092 /** Field tags */ 093 private String tags; 094 /** Field listeners */ 095 private List<SafeFlowListener> listeners; 096 /** Field skipStrategy */ 097 private FlowSkipStrategy flowSkipStrategy = new FlowSkipIfSinkNotStale(); 098 /** Field flowStats */ 099 protected FlowStats flowStats; // don't use a listener to set values 100 /** Field sources */ 101 protected Map<String, Tap> sources = Collections.EMPTY_MAP; 102 /** Field sinks */ 103 protected Map<String, Tap> sinks = Collections.EMPTY_MAP; 104 /** Field traps */ 105 private Map<String, Tap> traps = Collections.EMPTY_MAP; 106 /** Field checkpoints */ 107 private Map<String, Tap> checkpoints = Collections.EMPTY_MAP; 108 /** Field stopJobsOnExit */ 109 protected boolean stopJobsOnExit = true; 110 /** Field submitPriority */ 111 private int submitPriority = 5; 112 113 /** Field stepGraph */ 114 private FlowStepGraph<Config> flowStepGraph; 115 /** Field thread */ 116 protected transient Thread thread; 117 /** Field throwable */ 118 private Throwable throwable; 119 /** Field stop */ 120 protected boolean stop; 121 122 /** Field pipeGraph */ 123 private ElementGraph pipeGraph; // only used for documentation purposes 124 125 private transient CascadingServices cascadingServices; 126 127 private FlowStepStrategy<Config> flowStepStrategy = null; 128 /** Field steps */ 129 private transient List<FlowStep<Config>> steps; 130 /** Field jobsMap */ 131 private transient Map<String, FlowStepJob<Config>> jobsMap; 132 private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy(); 133 134 private transient ReentrantLock stopLock = new ReentrantLock( true ); 135 protected ShutdownUtil.Hook shutdownHook; 136 137 /** 138 * Returns property stopJobsOnExit. 139 * 140 * @param properties of type Map 141 * @return a boolean 142 */ 143 static boolean getStopJobsOnExit( Map<Object, Object> properties ) 144 { 145 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, FlowProps.STOP_JOBS_ON_EXIT, "true" ) ); 146 } 147 148 /** Used for testing. */ 149 protected BaseFlow() 150 { 151 this.name = "NA"; 152 this.flowStats = createPrepareFlowStats(); 153 } 154 155 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name ) 156 { 157 this.platformInfo = platformInfo; 158 this.name = name; 159 addSessionProperties( properties ); 160 initConfig( properties, defaultConfig ); 161 162 this.flowStats = createPrepareFlowStats(); // must be last 163 } 164 165 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, FlowDef flowDef ) 166 { 167 this.platformInfo = platformInfo; 168 this.name = flowDef.getName(); 169 this.tags = flowDef.getTags(); 170 this.runID = flowDef.getRunID(); 171 this.classPath = flowDef.getClassPath(); 172 173 addSessionProperties( properties ); 174 initConfig( properties, defaultConfig ); 175 setSources( flowDef.getSourcesCopy() ); 176 setSinks( flowDef.getSinksCopy() ); 177 setTraps( flowDef.getTrapsCopy() ); 178 setCheckpoints( flowDef.getCheckpointsCopy() ); 179 initFromTaps(); 180 181 retrieveSourceFields(); 182 retrieveSinkFields(); 183 } 184 185 public PlatformInfo getPlatformInfo() 186 { 187 return platformInfo; 188 } 189 190 public void initialize( ElementGraph pipeGraph, FlowStepGraph<Config> flowStepGraph ) 191 { 192 this.pipeGraph = pipeGraph; 193 this.flowStepGraph = flowStepGraph; 194 195 initSteps(); 196 197 this.flowStats = createPrepareFlowStats(); // must be last 198 199 initializeNewJobsMap(); 200 } 201 202 public ElementGraph updateSchemes( ElementGraph pipeGraph ) 203 { 204 presentSourceFields( pipeGraph ); 205 206 presentSinkFields( pipeGraph ); 207 208 return new ElementGraph( pipeGraph ); 209 } 210 211 /** Force a Scheme to fetch any fields from a meta-data store */ 212 protected void retrieveSourceFields() 213 { 214 for( Tap tap : sources.values() ) 215 tap.retrieveSourceFields( getFlowProcess() ); 216 } 217 218 /** 219 * Present the current resolved fields for the Tap 220 * 221 * @param pipeGraph 222 */ 223 protected void presentSourceFields( ElementGraph pipeGraph ) 224 { 225 for( Tap tap : sources.values() ) 226 { 227 if( pipeGraph.containsVertex( tap ) ) 228 tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 229 } 230 231 for( Tap tap : checkpoints.values() ) 232 { 233 if( pipeGraph.containsVertex( tap ) ) 234 tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 235 } 236 } 237 238 /** Force a Scheme to fetch any fields from a meta-data store */ 239 protected void retrieveSinkFields() 240 { 241 for( Tap tap : sinks.values() ) 242 tap.retrieveSinkFields( getFlowProcess() ); 243 } 244 245 /** 246 * Present the current resolved fields for the Tap 247 * 248 * @param pipeGraph 249 */ 250 protected void presentSinkFields( ElementGraph pipeGraph ) 251 { 252 for( Tap tap : sinks.values() ) 253 { 254 if( pipeGraph.containsVertex( tap ) ) 255 tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 256 } 257 258 for( Tap tap : checkpoints.values() ) 259 { 260 if( pipeGraph.containsVertex( tap ) ) 261 tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 262 } 263 } 264 265 protected Fields getFieldsFor( ElementGraph pipeGraph, Tap tap ) 266 { 267 return pipeGraph.outgoingEdgesOf( tap ).iterator().next().getOutValuesFields(); 268 } 269 270 private void addSessionProperties( Map<Object, Object> properties ) 271 { 272 if( properties == null ) 273 return; 274 275 PropertyUtil.setProperty( properties, CASCADING_FLOW_ID, getID() ); 276 PropertyUtil.setProperty( properties, "cascading.flow.tags", getTags() ); 277 AppProps.setApplicationID( properties ); 278 PropertyUtil.setProperty( properties, "cascading.app.name", makeAppName( properties ) ); 279 PropertyUtil.setProperty( properties, "cascading.app.version", makeAppVersion( properties ) ); 280 } 281 282 private String makeAppName( Map<Object, Object> properties ) 283 { 284 if( properties == null ) 285 return null; 286 287 String name = AppProps.getApplicationName( properties ); 288 289 if( name != null ) 290 return name; 291 292 return Util.findName( AppProps.getApplicationJarPath( properties ) ); 293 } 294 295 private String makeAppVersion( Map<Object, Object> properties ) 296 { 297 if( properties == null ) 298 return null; 299 300 String name = AppProps.getApplicationVersion( properties ); 301 302 if( name != null ) 303 return name; 304 305 return Util.findVersion( AppProps.getApplicationJarPath( properties ) ); 306 } 307 308 private FlowStats createPrepareFlowStats() 309 { 310 FlowStats flowStats = new FlowStats( this, getClientState() ); 311 312 flowStats.prepare(); 313 flowStats.markPending(); 314 315 return flowStats; 316 } 317 318 public CascadingServices getCascadingServices() 319 { 320 if( cascadingServices == null ) 321 cascadingServices = new CascadingServices( getConfigAsProperties() ); 322 323 return cascadingServices; 324 } 325 326 private ClientState getClientState() 327 { 328 return getFlowSession().getCascadingServices().createClientState( getID() ); 329 } 330 331 protected void initSteps() 332 { 333 if( flowStepGraph == null ) 334 return; 335 336 for( Object flowStep : flowStepGraph.vertexSet() ) 337 ( (BaseFlowStep<Config>) flowStep ).setFlow( this ); 338 } 339 340 private void initFromTaps() 341 { 342 initFromTaps( sources ); 343 initFromTaps( sinks ); 344 initFromTaps( traps ); 345 } 346 347 private void initFromTaps( Map<String, Tap> taps ) 348 { 349 for( Tap tap : taps.values() ) 350 tap.flowConfInit( this ); 351 } 352 353 @Override 354 public String getName() 355 { 356 return name; 357 } 358 359 protected void setName( String name ) 360 { 361 this.name = name; 362 } 363 364 @Override 365 public String getID() 366 { 367 if( id == null ) 368 id = Util.createUniqueID(); 369 370 return id; 371 } 372 373 @Override 374 public String getTags() 375 { 376 return tags; 377 } 378 379 @Override 380 public int getSubmitPriority() 381 { 382 return submitPriority; 383 } 384 385 @Override 386 public void setSubmitPriority( int submitPriority ) 387 { 388 if( submitPriority < 1 || submitPriority > 10 ) 389 throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority ); 390 391 this.submitPriority = submitPriority; 392 } 393 394 ElementGraph getPipeGraph() 395 { 396 return pipeGraph; 397 } 398 399 FlowStepGraph getFlowStepGraph() 400 { 401 return flowStepGraph; 402 } 403 404 protected void setSources( Map<String, Tap> sources ) 405 { 406 addListeners( sources.values() ); 407 this.sources = sources; 408 } 409 410 protected void setSinks( Map<String, Tap> sinks ) 411 { 412 addListeners( sinks.values() ); 413 this.sinks = sinks; 414 } 415 416 protected void setTraps( Map<String, Tap> traps ) 417 { 418 addListeners( traps.values() ); 419 this.traps = traps; 420 } 421 422 protected void setCheckpoints( Map<String, Tap> checkpoints ) 423 { 424 addListeners( checkpoints.values() ); 425 this.checkpoints = checkpoints; 426 } 427 428 protected void setFlowStepGraph( FlowStepGraph flowStepGraph ) 429 { 430 this.flowStepGraph = flowStepGraph; 431 } 432 433 /** 434 * This method creates a new internal Config with the parentConfig as defaults using the properties to override 435 * the defaults. 436 * 437 * @param properties of type Map 438 * @param parentConfig of type Config 439 */ 440 protected abstract void initConfig( Map<Object, Object> properties, Config parentConfig ); 441 442 public Config createConfig( Map<Object, Object> properties, Config defaultConfig ) 443 { 444 Config config = newConfig( defaultConfig ); 445 446 if( properties == null ) 447 return config; 448 449 Set<Object> keys = new HashSet<Object>( properties.keySet() ); 450 451 // keys will only be grabbed if both key/value are String, so keep orig keys 452 if( properties instanceof Properties ) 453 keys.addAll( ( (Properties) properties ).stringPropertyNames() ); 454 455 for( Object key : keys ) 456 { 457 Object value = properties.get( key ); 458 459 if( value == null && properties instanceof Properties && key instanceof String ) 460 value = ( (Properties) properties ).getProperty( (String) key ); 461 462 if( value == null ) // don't stuff null values 463 continue; 464 465 setConfigProperty( config, key, value ); 466 } 467 468 return config; 469 } 470 471 protected abstract void setConfigProperty( Config config, Object key, Object value ); 472 473 protected abstract Config newConfig( Config defaultConfig ); 474 475 protected void initFromProperties( Map<Object, Object> properties ) 476 { 477 stopJobsOnExit = getStopJobsOnExit( properties ); 478 } 479 480 public FlowSession getFlowSession() 481 { 482 return new FlowSession( getCascadingServices() ); 483 } 484 485 @Override 486 public FlowStats getFlowStats() 487 { 488 return flowStats; 489 } 490 491 @Override 492 public FlowStats getStats() 493 { 494 return getFlowStats(); 495 } 496 497 void addListeners( Collection listeners ) 498 { 499 for( Object listener : listeners ) 500 { 501 if( listener instanceof FlowListener ) 502 addListener( (FlowListener) listener ); 503 } 504 } 505 506 List<SafeFlowListener> getListeners() 507 { 508 if( listeners == null ) 509 listeners = new LinkedList<SafeFlowListener>(); 510 511 return listeners; 512 } 513 514 @Override 515 public boolean hasListeners() 516 { 517 return listeners != null && !listeners.isEmpty(); 518 } 519 520 @Override 521 public void addListener( FlowListener flowListener ) 522 { 523 getListeners().add( new SafeFlowListener( flowListener ) ); 524 } 525 526 @Override 527 public boolean removeListener( FlowListener flowListener ) 528 { 529 return getListeners().remove( new SafeFlowListener( flowListener ) ); 530 } 531 532 @Override 533 public boolean hasStepListeners() 534 { 535 boolean hasStepListeners = false; 536 537 for( FlowStep step : getFlowSteps() ) 538 hasStepListeners |= step.hasListeners(); 539 540 return hasStepListeners; 541 } 542 543 @Override 544 public void addStepListener( FlowStepListener flowStepListener ) 545 { 546 for( FlowStep step : getFlowSteps() ) 547 step.addListener( flowStepListener ); 548 } 549 550 @Override 551 public boolean removeStepListener( FlowStepListener flowStepListener ) 552 { 553 boolean listenerRemoved = true; 554 555 for( FlowStep step : getFlowSteps() ) 556 listenerRemoved &= step.removeListener( flowStepListener ); 557 558 return listenerRemoved; 559 } 560 561 @Override 562 public Map<String, Tap> getSources() 563 { 564 return Collections.unmodifiableMap( sources ); 565 } 566 567 @Override 568 public List<String> getSourceNames() 569 { 570 return new ArrayList<String>( sources.keySet() ); 571 } 572 573 @Override 574 public Tap getSource( String name ) 575 { 576 return sources.get( name ); 577 } 578 579 @Override 580 @DependencyIncoming 581 public Collection<Tap> getSourcesCollection() 582 { 583 return getSources().values(); 584 } 585 586 @Override 587 public Map<String, Tap> getSinks() 588 { 589 return Collections.unmodifiableMap( sinks ); 590 } 591 592 @Override 593 public List<String> getSinkNames() 594 { 595 return new ArrayList<String>( sinks.keySet() ); 596 } 597 598 @Override 599 public Tap getSink( String name ) 600 { 601 return sinks.get( name ); 602 } 603 604 @Override 605 @DependencyOutgoing 606 public Collection<Tap> getSinksCollection() 607 { 608 return getSinks().values(); 609 } 610 611 @Override 612 public Tap getSink() 613 { 614 return sinks.values().iterator().next(); 615 } 616 617 @Override 618 public Map<String, Tap> getTraps() 619 { 620 return Collections.unmodifiableMap( traps ); 621 } 622 623 @Override 624 public List<String> getTrapNames() 625 { 626 return new ArrayList<String>( traps.keySet() ); 627 } 628 629 @Override 630 public Collection<Tap> getTrapsCollection() 631 { 632 return getTraps().values(); 633 } 634 635 @Override 636 public Map<String, Tap> getCheckpoints() 637 { 638 return Collections.unmodifiableMap( checkpoints ); 639 } 640 641 @Override 642 public List<String> getCheckpointNames() 643 { 644 return new ArrayList<String>( checkpoints.keySet() ); 645 } 646 647 @Override 648 public Collection<Tap> getCheckpointsCollection() 649 { 650 return getCheckpoints().values(); 651 } 652 653 @Override 654 public boolean isStopJobsOnExit() 655 { 656 return stopJobsOnExit; 657 } 658 659 @Override 660 public FlowSkipStrategy getFlowSkipStrategy() 661 { 662 return flowSkipStrategy; 663 } 664 665 @Override 666 public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ) 667 { 668 if( flowSkipStrategy == null ) 669 throw new IllegalArgumentException( "flowSkipStrategy may not be null" ); 670 671 try 672 { 673 return this.flowSkipStrategy; 674 } 675 finally 676 { 677 this.flowSkipStrategy = flowSkipStrategy; 678 } 679 } 680 681 @Override 682 public boolean isSkipFlow() throws IOException 683 { 684 return flowSkipStrategy.skipFlow( this ); 685 } 686 687 @Override 688 public boolean areSinksStale() throws IOException 689 { 690 return areSourcesNewer( getSinkModified() ); 691 } 692 693 @Override 694 public boolean areSourcesNewer( long sinkModified ) throws IOException 695 { 696 Config confCopy = getConfigCopy(); 697 Iterator<Tap> values = sources.values().iterator(); 698 699 long sourceModified = 0; 700 701 try 702 { 703 sourceModified = Util.getSourceModified( confCopy, values, sinkModified ); 704 705 if( sinkModified < sourceModified ) 706 return true; 707 708 return false; 709 } 710 finally 711 { 712 if( LOG.isInfoEnabled() ) 713 logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all 714 } 715 } 716 717 @Override 718 public long getSinkModified() throws IOException 719 { 720 long sinkModified = Util.getSinkModified( getConfigCopy(), sinks.values() ); 721 722 if( LOG.isInfoEnabled() ) 723 { 724 if( sinkModified == -1L ) 725 logInfo( "at least one sink is marked for delete" ); 726 if( sinkModified == 0L ) 727 logInfo( "at least one sink does not exist" ); 728 else 729 logInfo( "sink oldest modified date: " + new Date( sinkModified ) ); 730 } 731 732 return sinkModified; 733 } 734 735 @Override 736 public FlowStepStrategy getFlowStepStrategy() 737 { 738 return flowStepStrategy; 739 } 740 741 @Override 742 public void setFlowStepStrategy( FlowStepStrategy flowStepStrategy ) 743 { 744 this.flowStepStrategy = flowStepStrategy; 745 } 746 747 @Override 748 public List<FlowStep<Config>> getFlowSteps() 749 { 750 if( steps != null ) 751 return steps; 752 753 if( flowStepGraph == null ) 754 return Collections.EMPTY_LIST; 755 756 TopologicalOrderIterator<FlowStep<Config>, Integer> topoIterator = flowStepGraph.getTopologicalIterator(); 757 758 steps = new ArrayList<FlowStep<Config>>(); 759 760 while( topoIterator.hasNext() ) 761 steps.add( topoIterator.next() ); 762 763 return steps; 764 } 765 766 @Override 767 @ProcessPrepare 768 public void prepare() 769 { 770 try 771 { 772 deleteSinksIfNotUpdate(); 773 deleteTrapsIfNotUpdate(); 774 deleteCheckpointsIfNotUpdate(); 775 } 776 catch( IOException exception ) 777 { 778 throw new FlowException( "unable to prepare flow", exception ); 779 } 780 } 781 782 @Override 783 @ProcessStart 784 public synchronized void start() 785 { 786 if( thread != null ) 787 return; 788 789 if( stop ) 790 return; 791 792 registerShutdownHook(); 793 794 internalStart(); 795 796 String threadName = ( "flow " + Util.toNull( getName() ) ).trim(); 797 798 thread = createFlowThread( threadName ); 799 800 thread.start(); 801 } 802 803 protected Thread createFlowThread( String threadName ) 804 { 805 return new Thread( new Runnable() 806 { 807 @Override 808 public void run() 809 { 810 BaseFlow.this.run(); 811 } 812 }, threadName ); 813 } 814 815 protected abstract void internalStart(); 816 817 @Override 818 @ProcessStop 819 public synchronized void stop() 820 { 821 stopLock.lock(); 822 823 try 824 { 825 if( stop ) 826 return; 827 828 stop = true; 829 830 fireOnStopping(); 831 832 if( !flowStats.isFinished() ) 833 flowStats.markStopped(); 834 835 internalStopAllJobs(); 836 837 handleExecutorShutdown(); 838 839 internalClean( true ); 840 } 841 finally 842 { 843 flowStats.cleanup(); 844 stopLock.unlock(); 845 } 846 } 847 848 protected abstract void internalClean( boolean stop ); 849 850 @Override 851 @ProcessComplete 852 public void complete() 853 { 854 start(); 855 856 try 857 { 858 try 859 { 860 synchronized( this ) // prevent NPE on quick stop() & complete() after start() 861 { 862 while( thread == null && !stop ) 863 Util.safeSleep( 10 ); 864 } 865 866 if( thread != null ) 867 thread.join(); 868 } 869 catch( InterruptedException exception ) 870 { 871 throw new FlowException( getName(), "thread interrupted", exception ); 872 } 873 874 // if in #stop and stopping, lets wait till its done in this thread 875 try 876 { 877 stopLock.lock(); 878 } 879 finally 880 { 881 stopLock.unlock(); 882 } 883 884 if( throwable instanceof FlowException ) 885 ( (FlowException) throwable ).setFlowName( getName() ); 886 887 if( throwable instanceof CascadingException ) 888 throw (CascadingException) throwable; 889 890 if( throwable instanceof OutOfMemoryError ) 891 throw (OutOfMemoryError) throwable; 892 893 if( throwable != null ) 894 throw new FlowException( getName(), "unhandled exception", throwable ); 895 896 if( hasListeners() ) 897 { 898 for( SafeFlowListener safeFlowListener : getListeners() ) 899 { 900 if( safeFlowListener.throwable != null ) 901 throw new FlowException( getName(), "unhandled listener exception", throwable ); 902 } 903 } 904 } 905 finally 906 { 907 thread = null; 908 throwable = null; 909 910 try 911 { 912 commitTraps(); 913 914 if( hasListeners() ) 915 { 916 for( SafeFlowListener safeFlowListener : getListeners() ) 917 safeFlowListener.throwable = null; 918 } 919 } 920 finally 921 { 922 flowStats.cleanup(); 923 } 924 } 925 } 926 927 private void commitTraps() 928 { 929 // commit all the traps, don't fail on an error 930 931 for( Tap tap : traps.values() ) 932 { 933 try 934 { 935 if( !tap.commitResource( getConfig() ) ) 936 logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), null ); 937 } 938 catch( IOException exception ) 939 { 940 logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), exception ); 941 } 942 } 943 } 944 945 @Override 946 @ProcessCleanup 947 public void cleanup() 948 { 949 // do nothing 950 } 951 952 @Override 953 public TupleEntryIterator openSource() throws IOException 954 { 955 return sources.values().iterator().next().openForRead( getFlowProcess() ); 956 } 957 958 @Override 959 public TupleEntryIterator openSource( String name ) throws IOException 960 { 961 if( !sources.containsKey( name ) ) 962 throw new IllegalArgumentException( "source does not exist: " + name ); 963 964 return sources.get( name ).openForRead( getFlowProcess() ); 965 } 966 967 @Override 968 public TupleEntryIterator openSink() throws IOException 969 { 970 return sinks.values().iterator().next().openForRead( getFlowProcess() ); 971 } 972 973 @Override 974 public TupleEntryIterator openSink( String name ) throws IOException 975 { 976 if( !sinks.containsKey( name ) ) 977 throw new IllegalArgumentException( "sink does not exist: " + name ); 978 979 return sinks.get( name ).openForRead( getFlowProcess() ); 980 } 981 982 @Override 983 public TupleEntryIterator openTrap() throws IOException 984 { 985 return traps.values().iterator().next().openForRead( getFlowProcess() ); 986 } 987 988 @Override 989 public TupleEntryIterator openTrap( String name ) throws IOException 990 { 991 if( !traps.containsKey( name ) ) 992 throw new IllegalArgumentException( "trap does not exist: " + name ); 993 994 return traps.get( name ).openForRead( getFlowProcess() ); 995 } 996 997 /** 998 * Method deleteSinks deletes all sinks, whether or not they are configured for {@link cascading.tap.SinkMode#UPDATE}. 999 * <p/> 1000 * Use with caution. 1001 * 1002 * @throws IOException when 1003 * @see BaseFlow#deleteSinksIfNotUpdate() 1004 */ 1005 public void deleteSinks() throws IOException 1006 { 1007 for( Tap tap : sinks.values() ) 1008 deleteOrFail( tap ); 1009 } 1010 1011 private void deleteOrFail( Tap tap ) throws IOException 1012 { 1013 if( !tap.resourceExists( getConfig() ) ) 1014 return; 1015 1016 if( !tap.deleteResource( getConfig() ) ) 1017 throw new FlowException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) ); 1018 } 1019 1020 /** 1021 * Method deleteSinksIfNotUpdate deletes all sinks if they are not configured with the {@link cascading.tap.SinkMode#UPDATE} flag. 1022 * <p/> 1023 * Typically used by a {@link Cascade} before executing the flow if the sinks are stale. 1024 * <p/> 1025 * Use with caution. 1026 * 1027 * @throws IOException when 1028 */ 1029 public void deleteSinksIfNotUpdate() throws IOException 1030 { 1031 for( Tap tap : sinks.values() ) 1032 { 1033 if( !tap.isUpdate() ) 1034 deleteOrFail( tap ); 1035 } 1036 } 1037 1038 public void deleteSinksIfReplace() throws IOException 1039 { 1040 for( Tap tap : sinks.values() ) 1041 { 1042 if( tap.isReplace() ) 1043 deleteOrFail( tap ); 1044 } 1045 } 1046 1047 public void deleteTrapsIfNotUpdate() throws IOException 1048 { 1049 for( Tap tap : traps.values() ) 1050 { 1051 if( !tap.isUpdate() ) 1052 deleteOrFail( tap ); 1053 } 1054 } 1055 1056 public void deleteCheckpointsIfNotUpdate() throws IOException 1057 { 1058 for( Tap tap : checkpoints.values() ) 1059 { 1060 if( !tap.isUpdate() ) 1061 deleteOrFail( tap ); 1062 } 1063 } 1064 1065 public void deleteTrapsIfReplace() throws IOException 1066 { 1067 for( Tap tap : traps.values() ) 1068 { 1069 if( tap.isReplace() ) 1070 deleteOrFail( tap ); 1071 } 1072 } 1073 1074 public void deleteCheckpointsIfReplace() throws IOException 1075 { 1076 for( Tap tap : checkpoints.values() ) 1077 { 1078 if( tap.isReplace() ) 1079 deleteOrFail( tap ); 1080 } 1081 } 1082 1083 @Override 1084 public boolean resourceExists( Tap tap ) throws IOException 1085 { 1086 return tap.resourceExists( getConfig() ); 1087 } 1088 1089 @Override 1090 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 1091 { 1092 return tap.openForRead( getFlowProcess() ); 1093 } 1094 1095 @Override 1096 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 1097 { 1098 return tap.openForWrite( getFlowProcess() ); 1099 } 1100 1101 /** Method run implements the Runnable run method and should not be called by users. */ 1102 private void run() 1103 { 1104 if( thread == null ) 1105 throw new IllegalStateException( "to start a Flow call start() or complete(), not Runnable#run()" ); 1106 1107 Version.printBanner(); 1108 Update.checkForUpdate( getPlatformInfo() ); 1109 1110 try 1111 { 1112 if( stop ) 1113 return; 1114 1115 flowStats.markStarted(); 1116 1117 fireOnStarting(); 1118 1119 if( LOG.isInfoEnabled() ) 1120 { 1121 logInfo( "starting" ); 1122 1123 for( Tap source : getSourcesCollection() ) 1124 logInfo( " source: " + source ); 1125 for( Tap sink : getSinksCollection() ) 1126 logInfo( " sink: " + sink ); 1127 } 1128 1129 // if jobs are run local, then only use one thread to force execution serially 1130 //int numThreads = jobsAreLocal() ? 1 : getMaxConcurrentSteps( getJobConf() ); 1131 int numThreads = getMaxNumParallelSteps(); 1132 1133 if( numThreads == 0 ) 1134 numThreads = jobsMap.size(); 1135 1136 if( numThreads == 0 ) 1137 throw new IllegalStateException( "no jobs rendered for flow: " + getName() ); 1138 1139 if( LOG.isInfoEnabled() ) 1140 { 1141 logInfo( " parallel execution is enabled: " + ( getMaxNumParallelSteps() != 1 ) ); 1142 logInfo( " starting jobs: " + jobsMap.size() ); 1143 logInfo( " allocating threads: " + numThreads ); 1144 } 1145 1146 List<Future<Throwable>> futures = spawnJobs( numThreads ); 1147 1148 for( Future<Throwable> future : futures ) 1149 { 1150 throwable = future.get(); 1151 1152 if( throwable != null ) 1153 { 1154 if( !stop ) 1155 internalStopAllJobs(); 1156 1157 handleExecutorShutdown(); 1158 break; 1159 } 1160 } 1161 } 1162 catch( Throwable throwable ) 1163 { 1164 this.throwable = throwable; 1165 } 1166 finally 1167 { 1168 handleThrowableAndMarkFailed(); 1169 1170 if( !stop && !flowStats.isFinished() ) 1171 flowStats.markSuccessful(); 1172 1173 internalClean( stop ); // cleaning temp taps may be determined by success/failure 1174 1175 try 1176 { 1177 fireOnCompleted(); 1178 } 1179 finally 1180 { 1181 flowStats.cleanup(); 1182 internalShutdown(); 1183 deregisterShutdownHook(); 1184 } 1185 } 1186 } 1187 1188 protected abstract int getMaxNumParallelSteps(); 1189 1190 protected abstract void internalShutdown(); 1191 1192 private List<Future<Throwable>> spawnJobs( int numThreads ) throws InterruptedException 1193 { 1194 if( stop ) 1195 return new ArrayList<Future<Throwable>>(); 1196 1197 List<Callable<Throwable>> list = new ArrayList<Callable<Throwable>>(); 1198 1199 for( FlowStepJob<Config> job : jobsMap.values() ) 1200 list.add( job ); 1201 1202 return spawnStrategy.start( this, numThreads, list ); 1203 } 1204 1205 private void handleThrowableAndMarkFailed() 1206 { 1207 if( throwable != null && !stop ) 1208 { 1209 flowStats.markFailed( throwable ); 1210 1211 fireOnThrowable(); 1212 } 1213 } 1214 1215 Map<String, FlowStepJob<Config>> getJobsMap() 1216 { 1217 return jobsMap; 1218 } 1219 1220 protected void initializeNewJobsMap() 1221 { 1222 // keep topo order 1223 jobsMap = new LinkedHashMap<String, FlowStepJob<Config>>(); 1224 TopologicalOrderIterator<FlowStep<Config>, Integer> topoIterator = flowStepGraph.getTopologicalIterator(); 1225 1226 while( topoIterator.hasNext() ) 1227 { 1228 BaseFlowStep<Config> step = (BaseFlowStep<Config>) topoIterator.next(); 1229 FlowStepJob<Config> flowStepJob = step.getFlowStepJob( getFlowProcess(), getConfig() ); 1230 1231 jobsMap.put( step.getName(), flowStepJob ); 1232 1233 List<FlowStepJob<Config>> predecessors = new ArrayList<FlowStepJob<Config>>(); 1234 1235 for( Object flowStep : predecessorListOf( flowStepGraph, step ) ) 1236 predecessors.add( jobsMap.get( ( (FlowStep<Config>) flowStep ).getName() ) ); 1237 1238 flowStepJob.setPredecessors( predecessors ); 1239 1240 flowStats.addStepStats( flowStepJob.getStepStats() ); 1241 } 1242 } 1243 1244 protected void internalStopAllJobs() 1245 { 1246 logInfo( "stopping all jobs" ); 1247 1248 try 1249 { 1250 if( jobsMap == null ) 1251 return; 1252 1253 List<FlowStepJob<Config>> jobs = new ArrayList<FlowStepJob<Config>>( jobsMap.values() ); 1254 1255 Collections.reverse( jobs ); 1256 1257 for( FlowStepJob<Config> job : jobs ) 1258 job.stop(); 1259 } 1260 finally 1261 { 1262 logInfo( "stopped all jobs" ); 1263 } 1264 } 1265 1266 protected void handleExecutorShutdown() 1267 { 1268 if( spawnStrategy.isCompleted( this ) ) 1269 return; 1270 1271 logInfo( "shutting down job executor" ); 1272 1273 try 1274 { 1275 spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS ); 1276 } 1277 catch( InterruptedException exception ) 1278 { 1279 // ignore 1280 } 1281 1282 logInfo( "shutdown complete" ); 1283 } 1284 1285 protected void fireOnCompleted() 1286 { 1287 if( hasListeners() ) 1288 { 1289 if( LOG.isDebugEnabled() ) 1290 logDebug( "firing onCompleted event: " + getListeners().size() ); 1291 1292 for( FlowListener flowListener : getListeners() ) 1293 flowListener.onCompleted( this ); 1294 } 1295 } 1296 1297 protected void fireOnThrowable() 1298 { 1299 if( hasListeners() ) 1300 { 1301 if( LOG.isDebugEnabled() ) 1302 logDebug( "firing onThrowable event: " + getListeners().size() ); 1303 1304 boolean isHandled = false; 1305 1306 for( FlowListener flowListener : getListeners() ) 1307 isHandled = flowListener.onThrowable( this, throwable ) || isHandled; 1308 1309 if( isHandled ) 1310 throwable = null; 1311 } 1312 } 1313 1314 protected void fireOnStopping() 1315 { 1316 if( hasListeners() ) 1317 { 1318 if( LOG.isDebugEnabled() ) 1319 logDebug( "firing onStopping event: " + getListeners().size() ); 1320 1321 for( FlowListener flowListener : getListeners() ) 1322 flowListener.onStopping( this ); 1323 } 1324 } 1325 1326 protected void fireOnStarting() 1327 { 1328 if( hasListeners() ) 1329 { 1330 if( LOG.isDebugEnabled() ) 1331 logDebug( "firing onStarting event: " + getListeners().size() ); 1332 1333 for( FlowListener flowListener : getListeners() ) 1334 flowListener.onStarting( this ); 1335 } 1336 } 1337 1338 @Override 1339 public String toString() 1340 { 1341 StringBuffer buffer = new StringBuffer(); 1342 1343 if( getName() != null ) 1344 buffer.append( getName() ).append( ": " ); 1345 1346 for( FlowStep step : getFlowSteps() ) 1347 buffer.append( step ); 1348 1349 return buffer.toString(); 1350 } 1351 1352 protected void logInfo( String message ) 1353 { 1354 LOG.info( "[" + Util.truncate( getName(), 25 ) + "] " + message ); 1355 } 1356 1357 private void logDebug( String message ) 1358 { 1359 LOG.debug( "[" + Util.truncate( getName(), 25 ) + "] " + message ); 1360 } 1361 1362 private void logWarn( String message, Throwable throwable ) 1363 { 1364 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable ); 1365 } 1366 1367 private void logError( String message, Throwable throwable ) 1368 { 1369 LOG.error( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable ); 1370 } 1371 1372 @Override 1373 public void writeDOT( String filename ) 1374 { 1375 if( pipeGraph == null ) 1376 throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" ); 1377 1378 pipeGraph.writeDOT( filename ); 1379 } 1380 1381 @Override 1382 public void writeStepsDOT( String filename ) 1383 { 1384 if( flowStepGraph == null ) 1385 throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" ); 1386 1387 flowStepGraph.writeDOT( filename ); 1388 } 1389 1390 /** 1391 * Used to return a simple wrapper for use as an edge in a graph where there can only be 1392 * one instance of every edge. 1393 * 1394 * @return FlowHolder 1395 */ 1396 public FlowHolder getHolder() 1397 { 1398 return new FlowHolder( this ); 1399 } 1400 1401 public void setCascade( Cascade cascade ) 1402 { 1403 setConfigProperty( getConfig(), "cascading.cascade.id", cascade.getID() ); 1404 flowStats.recordInfo(); 1405 } 1406 1407 @Override 1408 public String getCascadeID() 1409 { 1410 return getProperty( "cascading.cascade.id" ); 1411 } 1412 1413 @Override 1414 public String getRunID() 1415 { 1416 return runID; 1417 } 1418 1419 protected List<String> getClassPath() 1420 { 1421 return classPath; 1422 } 1423 1424 @Override 1425 public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy ) 1426 { 1427 this.spawnStrategy = spawnStrategy; 1428 } 1429 1430 @Override 1431 public UnitOfWorkSpawnStrategy getSpawnStrategy() 1432 { 1433 return spawnStrategy; 1434 } 1435 1436 protected void registerShutdownHook() 1437 { 1438 if( !isStopJobsOnExit() ) 1439 return; 1440 1441 shutdownHook = new ShutdownUtil.Hook() 1442 { 1443 @Override 1444 public Priority priority() 1445 { 1446 return Priority.WORK_CHILD; 1447 } 1448 1449 @Override 1450 public void execute() 1451 { 1452 logInfo( "shutdown hook calling stop on flow" ); 1453 1454 BaseFlow.this.stop(); 1455 } 1456 }; 1457 1458 ShutdownUtil.addHook( shutdownHook ); 1459 } 1460 1461 private void deregisterShutdownHook() 1462 { 1463 if( !isStopJobsOnExit() || stop ) 1464 return; 1465 1466 ShutdownUtil.removeHook( shutdownHook ); 1467 } 1468 1469 /** Class FlowHolder is a helper class for wrapping Flow instances. */ 1470 public static class FlowHolder 1471 { 1472 /** Field flow */ 1473 public Flow flow; 1474 1475 public FlowHolder() 1476 { 1477 } 1478 1479 public FlowHolder( Flow flow ) 1480 { 1481 this.flow = flow; 1482 } 1483 } 1484 1485 /** 1486 * Class SafeFlowListener safely calls a wrapped FlowListener. 1487 * <p/> 1488 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 1489 * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method 1490 * which in turn is run in a new Thread. 1491 */ 1492 private class SafeFlowListener implements FlowListener 1493 { 1494 /** Field flowListener */ 1495 final FlowListener flowListener; 1496 /** Field throwable */ 1497 Throwable throwable; 1498 1499 private SafeFlowListener( FlowListener flowListener ) 1500 { 1501 this.flowListener = flowListener; 1502 } 1503 1504 public void onStarting( Flow flow ) 1505 { 1506 try 1507 { 1508 flowListener.onStarting( flow ); 1509 } 1510 catch( Throwable throwable ) 1511 { 1512 handleThrowable( throwable ); 1513 } 1514 } 1515 1516 public void onStopping( Flow flow ) 1517 { 1518 try 1519 { 1520 flowListener.onStopping( flow ); 1521 } 1522 catch( Throwable throwable ) 1523 { 1524 handleThrowable( throwable ); 1525 } 1526 } 1527 1528 public void onCompleted( Flow flow ) 1529 { 1530 try 1531 { 1532 flowListener.onCompleted( flow ); 1533 } 1534 catch( Throwable throwable ) 1535 { 1536 handleThrowable( throwable ); 1537 } 1538 } 1539 1540 public boolean onThrowable( Flow flow, Throwable flowThrowable ) 1541 { 1542 try 1543 { 1544 return flowListener.onThrowable( flow, flowThrowable ); 1545 } 1546 catch( Throwable throwable ) 1547 { 1548 handleThrowable( throwable ); 1549 } 1550 1551 return false; 1552 } 1553 1554 private void handleThrowable( Throwable throwable ) 1555 { 1556 this.throwable = throwable; 1557 1558 logWarn( String.format( "flow listener %s threw throwable", flowListener ), throwable ); 1559 1560 // stop this flow 1561 stop(); 1562 } 1563 1564 public boolean equals( Object object ) 1565 { 1566 if( object instanceof BaseFlow.SafeFlowListener ) 1567 return flowListener.equals( ( (BaseFlow.SafeFlowListener) object ).flowListener ); 1568 1569 return flowListener.equals( object ); 1570 } 1571 1572 public int hashCode() 1573 { 1574 return flowListener.hashCode(); 1575 } 1576 } 1577 }