001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.cascade; 022 023 import java.io.FileWriter; 024 import java.io.IOException; 025 import java.io.Writer; 026 import java.util.ArrayList; 027 import java.util.Arrays; 028 import java.util.Collection; 029 import java.util.Collections; 030 import java.util.HashSet; 031 import java.util.LinkedHashMap; 032 import java.util.LinkedList; 033 import java.util.List; 034 import java.util.Map; 035 import java.util.Set; 036 import java.util.concurrent.Callable; 037 import java.util.concurrent.CountDownLatch; 038 import java.util.concurrent.ExecutorService; 039 import java.util.concurrent.Executors; 040 import java.util.concurrent.Future; 041 import java.util.concurrent.FutureTask; 042 import java.util.concurrent.TimeUnit; 043 044 import cascading.CascadingException; 045 import cascading.cascade.planner.FlowGraph; 046 import cascading.cascade.planner.IdentifierGraph; 047 import cascading.cascade.planner.TapGraph; 048 import cascading.flow.BaseFlow; 049 import cascading.flow.Flow; 050 import cascading.flow.FlowException; 051 import cascading.flow.FlowSkipStrategy; 052 import cascading.management.CascadingServices; 053 import cascading.management.UnitOfWork; 054 import cascading.management.UnitOfWorkExecutorStrategy; 055 import cascading.management.UnitOfWorkSpawnStrategy; 056 import cascading.management.state.ClientState; 057 import cascading.stats.CascadeStats; 058 import cascading.tap.Tap; 059 import cascading.util.ShutdownUtil; 060 import cascading.util.Util; 061 import cascading.util.Version; 062 import org.jgrapht.Graphs; 063 import org.jgrapht.ext.EdgeNameProvider; 064 import org.jgrapht.ext.IntegerNameProvider; 065 import org.jgrapht.ext.VertexNameProvider; 066 import org.jgrapht.graph.SimpleDirectedGraph; 067 import org.jgrapht.traverse.TopologicalOrderIterator; 068 import org.slf4j.Logger; 069 import org.slf4j.LoggerFactory; 070 071 import static cascading.property.PropertyUtil.getProperty; 072 073 /** 074 * A Cascade is an assembly of {@link cascading.flow.Flow} instances that share or depend on equivalent {@link Tap} instances and are executed as 075 * a single group. The most common case is where one Flow instance depends on a Tap created by a second Flow instance. This 076 * dependency chain can continue as practical. 077 * <p/> 078 * Note Flow instances that have no shared dependencies will be executed in parallel. 079 * <p/> 080 * Additionally, a Cascade allows for incremental builds of complex data processing processes. If a given source {@link Tap} is newer than 081 * a subsequent sink {@link Tap} in the assembly, the connecting {@link cascading.flow.Flow}(s) will be executed 082 * when the Cascade executed. If all the targets (sinks) are up to date, the Cascade exits immediately and does nothing. 083 * <p/> 084 * The concept of 'stale' is pluggable, see the {@link cascading.flow.FlowSkipStrategy} class. 085 * <p/> 086 * When a Cascade starts up, if first verifies which Flow instances have stale sinks, if the sinks are not stale, the 087 * method {@link cascading.flow.BaseFlow#deleteSinksIfNotUpdate()} is called. Before appends/updates were supported (logically) 088 * the Cascade deleted all the sinks in a Flow. 089 * <p/> 090 * The new consequence of this is if the Cascade fails, but does complete a Flow that appended or updated data, re-running 091 * the Cascade (and the successful append/update Flow) will re-update data to the source. Some systems may be idempotent and 092 * may not have any side-effects. So plan accordingly. 093 * <p/> 094 * Use the {@link CascadeListener} to receive any events on the life-cycle of the Cascade as it executes. Any 095 * {@link Tap} instances owned by managed Flows also implementing CascadeListener will automatically be added to the 096 * set of listeners. 097 * 098 * @see CascadeListener 099 * @see cascading.flow.Flow 100 * @see cascading.flow.FlowSkipStrategy 101 */ 102 public class Cascade implements UnitOfWork<CascadeStats> 103 { 104 /** Field LOG */ 105 private static final Logger LOG = LoggerFactory.getLogger( Cascade.class ); 106 107 /** Field id */ 108 private String id; 109 /** Field name */ 110 private final String name; 111 /** Field tags */ 112 private String tags; 113 /** Field properties */ 114 private final Map<Object, Object> properties; 115 /** Fields listeners */ 116 private List<SafeCascadeListener> listeners; 117 /** Field jobGraph */ 118 private final FlowGraph flowGraph; 119 /** Field tapGraph */ 120 private final IdentifierGraph identifierGraph; 121 /** Field cascadeStats */ 122 private final CascadeStats cascadeStats; 123 /** Field cascadingServices */ 124 private CascadingServices cascadingServices; 125 /** Field thread */ 126 private Thread thread; 127 /** Field throwable */ 128 private Throwable throwable; 129 private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy(); 130 /** Field shutdownHook */ 131 private ShutdownUtil.Hook shutdownHook; 132 /** Field jobsMap */ 133 private final Map<String, Callable<Throwable>> jobsMap = new LinkedHashMap<String, Callable<Throwable>>(); 134 /** Field stop */ 135 private boolean stop; 136 /** Field flowSkipStrategy */ 137 private FlowSkipStrategy flowSkipStrategy = null; 138 /** Field maxConcurrentFlows */ 139 private int maxConcurrentFlows = 0; 140 141 /** Field tapGraph * */ 142 private transient TapGraph tapGraph; 143 144 static int getMaxConcurrentFlows( Map<Object, Object> properties, int maxConcurrentFlows ) 145 { 146 if( maxConcurrentFlows != -1 ) // CascadeDef is -1 by default 147 return maxConcurrentFlows; 148 149 return Integer.parseInt( getProperty( properties, CascadeProps.MAX_CONCURRENT_FLOWS, "0" ) ); 150 } 151 152 /** for testing */ 153 protected Cascade() 154 { 155 this.name = null; 156 this.tags = null; 157 this.properties = null; 158 this.flowGraph = null; 159 this.identifierGraph = null; 160 this.cascadeStats = null; 161 } 162 163 Cascade( CascadeDef cascadeDef, Map<Object, Object> properties, FlowGraph flowGraph, IdentifierGraph identifierGraph ) 164 { 165 this.name = cascadeDef.getName(); 166 this.tags = cascadeDef.getTags(); 167 this.properties = properties; 168 this.flowGraph = flowGraph; 169 this.identifierGraph = identifierGraph; 170 this.cascadeStats = createPrepareCascadeStats(); 171 setIDOnFlow(); 172 this.maxConcurrentFlows = cascadeDef.getMaxConcurrentFlows(); 173 174 addListeners( getAllTaps() ); 175 } 176 177 private CascadeStats createPrepareCascadeStats() 178 { 179 CascadeStats cascadeStats = new CascadeStats( this, getClientState() ); 180 181 cascadeStats.prepare(); 182 cascadeStats.markPending(); 183 184 return cascadeStats; 185 } 186 187 /** 188 * Method getName returns the name of this Cascade object. 189 * 190 * @return the name (type String) of this Cascade object. 191 */ 192 @Override 193 public String getName() 194 { 195 return name; 196 } 197 198 /** 199 * Method getID returns the ID of this Cascade object. 200 * <p/> 201 * The ID value is a long HEX String used to identify this instance globally. Subsequent Cascade 202 * instances created with identical parameters will not return the same ID. 203 * 204 * @return the ID (type String) of this Cascade object. 205 */ 206 @Override 207 public String getID() 208 { 209 if( id == null ) 210 id = Util.createUniqueID(); 211 212 return id; 213 } 214 215 /** 216 * Method getTags returns the tags associated with this Cascade object. 217 * 218 * @return the tags (type String) of this Cascade object. 219 */ 220 @Override 221 public String getTags() 222 { 223 return tags; 224 } 225 226 void addListeners( Collection listeners ) 227 { 228 for( Object listener : listeners ) 229 { 230 if( listener instanceof CascadeListener ) 231 addListener( (CascadeListener) listener ); 232 } 233 } 234 235 List<SafeCascadeListener> getListeners() 236 { 237 if( listeners == null ) 238 listeners = new LinkedList<SafeCascadeListener>(); 239 240 return listeners; 241 } 242 243 public boolean hasListeners() 244 { 245 return listeners != null && !listeners.isEmpty(); 246 } 247 248 public void addListener( CascadeListener flowListener ) 249 { 250 getListeners().add( new SafeCascadeListener( flowListener ) ); 251 } 252 253 public boolean removeListener( CascadeListener flowListener ) 254 { 255 return getListeners().remove( new SafeCascadeListener( flowListener ) ); 256 } 257 258 private void fireOnCompleted() 259 { 260 if( hasListeners() ) 261 { 262 if( LOG.isDebugEnabled() ) 263 logDebug( "firing onCompleted event: " + getListeners().size() ); 264 265 for( CascadeListener cascadeListener : getListeners() ) 266 cascadeListener.onCompleted( this ); 267 } 268 } 269 270 private void fireOnThrowable() 271 { 272 if( hasListeners() ) 273 { 274 if( LOG.isDebugEnabled() ) 275 logDebug( "firing onThrowable event: " + getListeners().size() ); 276 277 boolean isHandled = false; 278 279 for( CascadeListener cascadeListener : getListeners() ) 280 isHandled = cascadeListener.onThrowable( this, throwable ) || isHandled; 281 282 if( isHandled ) 283 throwable = null; 284 } 285 } 286 287 protected void fireOnStopping() 288 { 289 if( hasListeners() ) 290 { 291 if( LOG.isDebugEnabled() ) 292 logDebug( "firing onStopping event: " + getListeners().size() ); 293 294 for( CascadeListener cascadeListener : getListeners() ) 295 cascadeListener.onStopping( this ); 296 } 297 } 298 299 protected void fireOnStarting() 300 { 301 if( hasListeners() ) 302 { 303 if( LOG.isDebugEnabled() ) 304 logDebug( "firing onStarting event: " + getListeners().size() ); 305 306 for( CascadeListener cascadeListener : getListeners() ) 307 cascadeListener.onStarting( this ); 308 } 309 } 310 311 private CascadingServices getCascadingServices() 312 { 313 if( cascadingServices == null ) 314 cascadingServices = new CascadingServices( properties ); 315 316 return cascadingServices; 317 } 318 319 private ClientState getClientState() 320 { 321 return getCascadingServices().createClientState( getID() ); 322 } 323 324 /** 325 * Method getCascadeStats returns the cascadeStats of this Cascade object. 326 * 327 * @return the cascadeStats (type CascadeStats) of this Cascade object. 328 */ 329 public CascadeStats getCascadeStats() 330 { 331 return cascadeStats; 332 } 333 334 @Override 335 public CascadeStats getStats() 336 { 337 return getCascadeStats(); 338 } 339 340 private void setIDOnFlow() 341 { 342 for( Flow<?> flow : getFlows() ) 343 ( (BaseFlow<?>) flow ).setCascade( this ); 344 } 345 346 protected FlowGraph getFlowGraph() 347 { 348 return flowGraph; 349 } 350 351 protected IdentifierGraph getIdentifierGraph() 352 { 353 return identifierGraph; 354 } 355 356 /** 357 * Method getFlows returns the flows managed by this Cascade object. The returned {@link cascading.flow.Flow} instances 358 * will be in topological order. 359 * 360 * @return the flows (type Collection<Flow>) of this Cascade object. 361 */ 362 public List<Flow> getFlows() 363 { 364 List<Flow> flows = new LinkedList<Flow>(); 365 TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator(); 366 367 while( topoIterator.hasNext() ) 368 flows.add( topoIterator.next() ); 369 370 return flows; 371 } 372 373 /** 374 * Method findFlows returns a List of flows whose names match the given regex pattern. 375 * 376 * @param regex of type String 377 * @return List<Flow> 378 */ 379 public List<Flow> findFlows( String regex ) 380 { 381 List<Flow> flows = new ArrayList<Flow>(); 382 383 for( Flow flow : getFlows() ) 384 { 385 if( flow.getName().matches( regex ) ) 386 flows.add( flow ); 387 } 388 389 return flows; 390 } 391 392 /** 393 * Method getHeadFlows returns all Flow instances that are at the "head" of the flow graph. 394 * <p/> 395 * That is, they are the first to execute and have no Tap source dependencies with Flow instances in the this Cascade 396 * instance. 397 * 398 * @return Collection<Flow> 399 */ 400 public Collection<Flow> getHeadFlows() 401 { 402 Set<Flow> flows = new HashSet<Flow>(); 403 404 for( Flow flow : flowGraph.vertexSet() ) 405 { 406 if( flowGraph.inDegreeOf( flow ) == 0 ) 407 flows.add( flow ); 408 } 409 410 return flows; 411 } 412 413 /** 414 * Method getTailFlows returns all Flow instances that are at the "tail" of the flow graph. 415 * <p/> 416 * That is, they are the last to execute and have no Tap sink dependencies with Flow instances in the this Cascade 417 * instance. 418 * 419 * @return Collection<Flow> 420 */ 421 public Collection<Flow> getTailFlows() 422 { 423 Set<Flow> flows = new HashSet<Flow>(); 424 425 for( Flow flow : flowGraph.vertexSet() ) 426 { 427 if( flowGraph.outDegreeOf( flow ) == 0 ) 428 flows.add( flow ); 429 } 430 431 return flows; 432 } 433 434 /** 435 * Method getIntermediateFlows returns all Flow instances that are neither at the "tail" or "tail" of the flow graph. 436 * 437 * @return Collection<Flow> 438 */ 439 public Collection<Flow> getIntermediateFlows() 440 { 441 Set<Flow> flows = new HashSet<Flow>( flowGraph.vertexSet() ); 442 443 flows.removeAll( getHeadFlows() ); 444 flows.removeAll( getTailFlows() ); 445 446 return flows; 447 } 448 449 protected TapGraph getTapGraph() 450 { 451 if( tapGraph == null ) 452 tapGraph = new TapGraph( flowGraph.vertexSet() ); 453 454 return tapGraph; 455 } 456 457 /** 458 * Method getSourceTaps returns all source Tap instances in this Cascade instance. 459 * <p/> 460 * That is, none of returned Tap instances are the sinks of other Flow instances in this Cascade. 461 * <p/> 462 * All {@link cascading.tap.CompositeTap} instances are unwound if addressed directly by a managed Flow instance. 463 * 464 * @return Collection<Tap> 465 */ 466 public Collection<Tap> getSourceTaps() 467 { 468 TapGraph tapGraph = getTapGraph(); 469 Set<Tap> taps = new HashSet<Tap>(); 470 471 for( Tap tap : tapGraph.vertexSet() ) 472 { 473 if( tapGraph.inDegreeOf( tap ) == 0 ) 474 taps.add( tap ); 475 } 476 477 return taps; 478 } 479 480 /** 481 * Method getSinkTaps returns all sink Tap instances in this Cascade instance. 482 * <p/> 483 * That is, none of returned Tap instances are the sources of other Flow instances in this Cascade. 484 * <p/> 485 * All {@link cascading.tap.CompositeTap} instances are unwound if addressed directly by a managed Flow instance. 486 * <p/> 487 * This method will return checkpoint Taps managed by Flow instances if not used as a source by other Flow instances. 488 * 489 * @return Collection<Tap> 490 */ 491 public Collection<Tap> getSinkTaps() 492 { 493 TapGraph tapGraph = getTapGraph(); 494 Set<Tap> taps = new HashSet<Tap>(); 495 496 for( Tap tap : tapGraph.vertexSet() ) 497 { 498 if( tapGraph.outDegreeOf( tap ) == 0 ) 499 taps.add( tap ); 500 } 501 502 return taps; 503 } 504 505 /** 506 * Method getCheckpointTaps returns all checkpoint Tap instances from all the Flow instances in this Cascade instance. 507 * 508 * @return Collection<Tap> 509 */ 510 public Collection<Tap> getCheckpointsTaps() 511 { 512 Set<Tap> taps = new HashSet<Tap>(); 513 514 for( Flow flow : getFlows() ) 515 taps.addAll( flow.getCheckpointsCollection() ); 516 517 return taps; 518 } 519 520 /** 521 * Method getIntermediateTaps returns all Tap instances that are neither at the source or sink of the flow graph. 522 * <p/> 523 * This method does consider checkpoint Taps managed by Flow instances in this Cascade instance. 524 * 525 * @return Collection<Flow> 526 */ 527 public Collection<Tap> getIntermediateTaps() 528 { 529 TapGraph tapGraph = getTapGraph(); 530 Set<Tap> taps = new HashSet<Tap>( tapGraph.vertexSet() ); 531 532 taps.removeAll( getSourceTaps() ); 533 taps.removeAll( getSinkTaps() ); 534 535 return taps; 536 } 537 538 /** 539 * Method getAllTaps returns all source, sink, and checkpoint Tap instances associated with the managed 540 * Flow instances in this Cascade instance. 541 * 542 * @return Collection<Tap> 543 */ 544 public Collection<Tap> getAllTaps() 545 { 546 return new HashSet<Tap>( getTapGraph().vertexSet() ); 547 } 548 549 /** 550 * Method getSuccessorFlows returns a Collection of all the Flow instances that will be 551 * executed after the given Flow instance. 552 * 553 * @param flow of type Flow 554 * @return Collection<Flow> 555 */ 556 public Collection<Flow> getSuccessorFlows( Flow flow ) 557 { 558 return Graphs.successorListOf( flowGraph, flow ); 559 } 560 561 /** 562 * Method getPredecessorFlows returns a Collection of all the Flow instances that will be 563 * executed before the given Flow instance. 564 * 565 * @param flow of type Flow 566 * @return Collection<Flow> 567 */ 568 public Collection<Flow> getPredecessorFlows( Flow flow ) 569 { 570 return Graphs.predecessorListOf( flowGraph, flow ); 571 } 572 573 /** 574 * Method findFlowsSourcingFrom returns all Flow instances that reads from a source with the given identifier. 575 * 576 * @param identifier of type String 577 * @return Collection<Flow> 578 */ 579 public Collection<Flow> findFlowsSourcingFrom( String identifier ) 580 { 581 try 582 { 583 return unwrapFlows( identifierGraph.outgoingEdgesOf( identifier ) ); 584 } 585 catch( Exception exception ) 586 { 587 return Collections.emptySet(); 588 } 589 } 590 591 /** 592 * Method findFlowsSinkingTo returns all Flow instances that writes to a sink with the given identifier. 593 * 594 * @param identifier of type String 595 * @return Collection<Flow> 596 */ 597 public Collection<Flow> findFlowsSinkingTo( String identifier ) 598 { 599 try 600 { 601 return unwrapFlows( identifierGraph.incomingEdgesOf( identifier ) ); 602 } 603 catch( Exception exception ) 604 { 605 return Collections.emptySet(); 606 } 607 } 608 609 private Collection<Flow> unwrapFlows( Set<BaseFlow.FlowHolder> flowHolders ) 610 { 611 Set<Flow> flows = new HashSet<Flow>(); 612 613 for( BaseFlow.FlowHolder flowHolder : flowHolders ) 614 flows.add( flowHolder.flow ); 615 616 return flows; 617 } 618 619 /** 620 * Method getFlowSkipStrategy returns the current {@link cascading.flow.FlowSkipStrategy} used by this Flow. 621 * 622 * @return FlowSkipStrategy 623 */ 624 public FlowSkipStrategy getFlowSkipStrategy() 625 { 626 return flowSkipStrategy; 627 } 628 629 /** 630 * Method setFlowSkipStrategy sets a new {@link cascading.flow.FlowSkipStrategy}, the current strategy, if any, is returned. 631 * If a strategy is given, it will be used as the strategy for all {@link cascading.flow.BaseFlow} instances managed by this Cascade instance. 632 * To revert back to consulting the strategies associated with each Flow instance, re-set this value to {@code null}, its 633 * default value. 634 * <p/> 635 * FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is {@link cascading.flow.FlowSkipIfSinkNotStale} 636 * and is inherited from the Flow instance in question. An alternative strategy would be {@link cascading.flow.FlowSkipIfSinkExists}. 637 * <p/> 638 * A FlowSkipStrategy will not be consulted when executing a Flow directly through {@link #start()} 639 * 640 * @param flowSkipStrategy of type FlowSkipStrategy 641 * @return FlowSkipStrategy 642 */ 643 public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ) 644 { 645 try 646 { 647 return this.flowSkipStrategy; 648 } 649 finally 650 { 651 this.flowSkipStrategy = flowSkipStrategy; 652 } 653 } 654 655 @Override 656 public void prepare() 657 { 658 } 659 660 /** 661 * Method start begins the current Cascade process. It returns immediately. See method {@link #complete()} to block 662 * until the Cascade completes. 663 */ 664 public void start() 665 { 666 if( thread != null ) 667 return; 668 669 thread = new Thread( new Runnable() 670 { 671 @Override 672 public void run() 673 { 674 Cascade.this.run(); 675 } 676 }, ( "cascade " + Util.toNull( getName() ) ).trim() ); 677 678 thread.start(); 679 } 680 681 /** 682 * Method complete begins the current Cascade process if method {@link #start()} was not previously called. This method 683 * blocks until the process completes. 684 * 685 * @throws RuntimeException wrapping any exception thrown internally. 686 */ 687 public void complete() 688 { 689 start(); 690 691 try 692 { 693 try 694 { 695 thread.join(); 696 } 697 catch( InterruptedException exception ) 698 { 699 throw new FlowException( "thread interrupted", exception ); 700 } 701 702 if( throwable instanceof CascadingException ) 703 throw (CascadingException) throwable; 704 705 if( throwable != null ) 706 throw new CascadeException( "unhandled exception", throwable ); 707 } 708 finally 709 { 710 thread = null; 711 throwable = null; 712 shutdownHook = null; 713 cascadeStats.cleanup(); 714 } 715 } 716 717 public synchronized void stop() 718 { 719 if( stop ) 720 return; 721 722 stop = true; 723 724 fireOnStopping(); 725 726 if( !cascadeStats.isFinished() ) 727 cascadeStats.markStopped(); 728 729 internalStopAllFlows(); 730 handleExecutorShutdown(); 731 732 cascadeStats.cleanup(); 733 } 734 735 @Override 736 public void cleanup() 737 { 738 } 739 740 /** Method run implements the Runnable run method. */ 741 private void run() 742 { 743 Version.printBanner(); 744 745 if( LOG.isInfoEnabled() ) 746 logInfo( "starting" ); 747 748 registerShutdownHook(); 749 750 try 751 { 752 if( stop ) 753 return; 754 755 // mark started, not submitted 756 cascadeStats.markStartedThenRunning(); 757 758 fireOnStarting(); 759 760 initializeNewJobsMap(); 761 762 int numThreads = getMaxConcurrentFlows( properties, maxConcurrentFlows ); 763 764 if( numThreads == 0 ) 765 numThreads = jobsMap.size(); 766 767 int numLocalFlows = numLocalFlows(); 768 769 boolean runFlowsLocal = numLocalFlows > 1; 770 771 if( runFlowsLocal ) 772 numThreads = 1; 773 774 if( LOG.isInfoEnabled() ) 775 { 776 logInfo( " parallel execution is enabled: " + !runFlowsLocal ); 777 logInfo( " starting flows: " + jobsMap.size() ); 778 logInfo( " allocating threads: " + numThreads ); 779 } 780 781 List<Future<Throwable>> futures = spawnStrategy.start( this, numThreads, jobsMap.values() ); 782 783 for( Future<Throwable> future : futures ) 784 { 785 throwable = future.get(); 786 787 if( throwable != null ) 788 { 789 if( !stop ) 790 { 791 if( !cascadeStats.isFinished() ) 792 cascadeStats.markFailed( throwable ); 793 internalStopAllFlows(); 794 fireOnThrowable(); 795 } 796 797 handleExecutorShutdown(); 798 break; 799 } 800 } 801 } 802 catch( Throwable throwable ) 803 { 804 this.throwable = throwable; 805 } 806 finally 807 { 808 if( !cascadeStats.isFinished() ) 809 cascadeStats.markSuccessful(); 810 811 try 812 { 813 fireOnCompleted(); 814 } 815 finally 816 { 817 deregisterShutdownHook(); 818 } 819 } 820 } 821 822 private void registerShutdownHook() 823 { 824 if( !isStopJobsOnExit() ) 825 return; 826 827 shutdownHook = new ShutdownUtil.Hook() 828 { 829 @Override 830 public Priority priority() 831 { 832 return Priority.WORK_PARENT; 833 } 834 835 @Override 836 public void execute() 837 { 838 logInfo( "shutdown hook calling stop on cascade" ); 839 840 Cascade.this.stop(); 841 } 842 }; 843 844 ShutdownUtil.addHook( shutdownHook ); 845 } 846 847 private void deregisterShutdownHook() 848 { 849 if( !isStopJobsOnExit() || stop ) 850 return; 851 852 ShutdownUtil.removeHook( shutdownHook ); 853 } 854 855 private boolean isStopJobsOnExit() 856 { 857 if( getFlows().isEmpty() ) 858 return false; // don't bother registering hook 859 860 return getFlows().get( 0 ).isStopJobsOnExit(); 861 } 862 863 /** 864 * If the number of flows that are local is greater than one, force the Cascade to run without parallelization. 865 * 866 * @return of type int 867 */ 868 private int numLocalFlows() 869 { 870 int countLocalJobs = 0; 871 872 for( Flow flow : getFlows() ) 873 { 874 if( flow.stepsAreLocal() ) 875 countLocalJobs++; 876 } 877 878 return countLocalJobs; 879 } 880 881 private void initializeNewJobsMap() 882 { 883 synchronized( jobsMap ) 884 { 885 // keep topo order 886 TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator(); 887 888 while( topoIterator.hasNext() ) 889 { 890 Flow flow = topoIterator.next(); 891 892 cascadeStats.addFlowStats( flow.getFlowStats() ); 893 894 CascadeJob job = new CascadeJob( flow ); 895 896 jobsMap.put( flow.getName(), job ); 897 898 List<CascadeJob> predecessors = new ArrayList<CascadeJob>(); 899 900 for( Flow predecessor : Graphs.predecessorListOf( flowGraph, flow ) ) 901 predecessors.add( (CascadeJob) jobsMap.get( predecessor.getName() ) ); 902 903 job.init( predecessors ); 904 } 905 } 906 } 907 908 private void handleExecutorShutdown() 909 { 910 if( spawnStrategy.isCompleted( this ) ) 911 return; 912 913 logInfo( "shutting down flow executor" ); 914 915 try 916 { 917 spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS ); 918 } 919 catch( InterruptedException exception ) 920 { 921 // ignore 922 } 923 924 logInfo( "shutdown complete" ); 925 } 926 927 private void internalStopAllFlows() 928 { 929 logInfo( "stopping all flows" ); 930 931 synchronized( jobsMap ) 932 { 933 List<Callable<Throwable>> jobs = new ArrayList<Callable<Throwable>>( jobsMap.values() ); 934 935 Collections.reverse( jobs ); 936 937 for( Callable<Throwable> callable : jobs ) 938 ( (CascadeJob) callable ).stop(); 939 } 940 941 logInfo( "stopped all flows" ); 942 } 943 944 /** 945 * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging. 946 * 947 * @param filename of type String 948 */ 949 public void writeDOT( String filename ) 950 { 951 printElementGraph( filename, identifierGraph ); 952 } 953 954 protected void printElementGraph( String filename, SimpleDirectedGraph<String, BaseFlow.FlowHolder> graph ) 955 { 956 try 957 { 958 Writer writer = new FileWriter( filename ); 959 960 Util.writeDOT( writer, graph, new IntegerNameProvider<String>(), new VertexNameProvider<String>() 961 { 962 public String getVertexName( String object ) 963 { 964 return object.toString().replaceAll( "\"", "\'" ); 965 } 966 }, new EdgeNameProvider<BaseFlow.FlowHolder>() 967 { 968 public String getEdgeName( BaseFlow.FlowHolder object ) 969 { 970 return object.flow.getName().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz 971 } 972 } 973 ); 974 975 writer.close(); 976 } 977 catch( IOException exception ) 978 { 979 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 980 } 981 } 982 983 @Override 984 public String toString() 985 { 986 return getName(); 987 } 988 989 private void logDebug( String message ) 990 { 991 LOG.debug( "[" + Util.truncate( getName(), 25 ) + "] " + message ); 992 } 993 994 private void logInfo( String message ) 995 { 996 LOG.info( "[" + Util.truncate( getName(), 25 ) + "] " + message ); 997 } 998 999 private void logWarn( String message ) 1000 { 1001 logWarn( message, null ); 1002 } 1003 1004 private void logWarn( String message, Throwable throwable ) 1005 { 1006 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable ); 1007 } 1008 1009 /** Class CascadeJob manages Flow execution in the current Cascade instance. */ 1010 protected class CascadeJob implements Callable<Throwable> 1011 { 1012 /** Field flow */ 1013 final Flow flow; 1014 /** Field predecessors */ 1015 private List<CascadeJob> predecessors; 1016 /** Field latch */ 1017 private final CountDownLatch latch = new CountDownLatch( 1 ); 1018 /** Field stop */ 1019 private boolean stop = false; 1020 /** Field failed */ 1021 private boolean failed = false; 1022 1023 public CascadeJob( Flow flow ) 1024 { 1025 this.flow = flow; 1026 } 1027 1028 public String getName() 1029 { 1030 return flow.getName(); 1031 } 1032 1033 public Throwable call() 1034 { 1035 try 1036 { 1037 for( CascadeJob predecessor : predecessors ) 1038 { 1039 if( !predecessor.isSuccessful() ) 1040 return null; 1041 } 1042 1043 if( stop || cascadeStats.isFinished() ) 1044 return null; 1045 1046 try 1047 { 1048 if( LOG.isInfoEnabled() ) 1049 logInfo( "starting flow: " + flow.getName() ); 1050 1051 if( flowSkipStrategy == null ? flow.isSkipFlow() : flowSkipStrategy.skipFlow( flow ) ) 1052 { 1053 if( LOG.isInfoEnabled() ) 1054 logInfo( "skipping flow: " + flow.getName() ); 1055 1056 flow.getFlowStats().markSkipped(); 1057 1058 return null; 1059 } 1060 1061 flow.prepare(); // do not delete append/update mode taps 1062 flow.complete(); 1063 1064 if( LOG.isInfoEnabled() ) 1065 logInfo( "completed flow: " + flow.getName() ); 1066 } 1067 catch( Throwable exception ) 1068 { 1069 failed = true; 1070 logWarn( "flow failed: " + flow.getName(), exception ); 1071 1072 CascadeException cascadeException = new CascadeException( "flow failed: " + flow.getName(), exception ); 1073 1074 if( !cascadeStats.isFinished() ) 1075 cascadeStats.markFailed( cascadeException ); 1076 1077 return cascadeException; 1078 } 1079 finally 1080 { 1081 flow.cleanup(); 1082 } 1083 } 1084 catch( Throwable throwable ) 1085 { 1086 failed = true; 1087 return throwable; 1088 } 1089 finally 1090 { 1091 latch.countDown(); 1092 } 1093 1094 return null; 1095 } 1096 1097 public void init( List<CascadeJob> predecessors ) 1098 { 1099 this.predecessors = predecessors; 1100 } 1101 1102 public void stop() 1103 { 1104 if( LOG.isInfoEnabled() ) 1105 logInfo( "stopping flow: " + flow.getName() ); 1106 1107 stop = true; 1108 1109 if( flow != null ) 1110 flow.stop(); 1111 } 1112 1113 public boolean isSuccessful() 1114 { 1115 try 1116 { 1117 latch.await(); 1118 1119 return flow != null && !failed && !stop; 1120 } 1121 catch( InterruptedException exception ) 1122 { 1123 logWarn( "latch interrupted", exception ); 1124 } 1125 1126 return false; 1127 } 1128 } 1129 1130 @Override 1131 public UnitOfWorkSpawnStrategy getSpawnStrategy() 1132 { 1133 return spawnStrategy; 1134 } 1135 1136 @Override 1137 public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy ) 1138 { 1139 this.spawnStrategy = spawnStrategy; 1140 } 1141 1142 /** 1143 * Class SafeCascadeListener safely calls a wrapped CascadeListener. 1144 * <p/> 1145 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 1146 * can be caught by the calling Thread. Since Cascade is asynchronous, much of the work is done in the run() method 1147 * which in turn is run in a new Thread. 1148 */ 1149 private class SafeCascadeListener implements CascadeListener 1150 { 1151 /** Field flowListener */ 1152 final CascadeListener cascadeListener; 1153 /** Field throwable */ 1154 Throwable throwable; 1155 1156 private SafeCascadeListener( CascadeListener cascadeListener ) 1157 { 1158 this.cascadeListener = cascadeListener; 1159 } 1160 1161 public void onStarting( Cascade cascade ) 1162 { 1163 try 1164 { 1165 cascadeListener.onStarting( cascade ); 1166 } 1167 catch( Throwable throwable ) 1168 { 1169 handleThrowable( throwable ); 1170 } 1171 } 1172 1173 public void onStopping( Cascade cascade ) 1174 { 1175 try 1176 { 1177 cascadeListener.onStopping( cascade ); 1178 } 1179 catch( Throwable throwable ) 1180 { 1181 handleThrowable( throwable ); 1182 } 1183 } 1184 1185 public void onCompleted( Cascade cascade ) 1186 { 1187 try 1188 { 1189 cascadeListener.onCompleted( cascade ); 1190 } 1191 catch( Throwable throwable ) 1192 { 1193 handleThrowable( throwable ); 1194 } 1195 } 1196 1197 public boolean onThrowable( Cascade cascade, Throwable flowThrowable ) 1198 { 1199 try 1200 { 1201 return cascadeListener.onThrowable( cascade, flowThrowable ); 1202 } 1203 catch( Throwable throwable ) 1204 { 1205 handleThrowable( throwable ); 1206 } 1207 1208 return false; 1209 } 1210 1211 private void handleThrowable( Throwable throwable ) 1212 { 1213 this.throwable = throwable; 1214 1215 logWarn( String.format( "cascade listener %s threw throwable", cascadeListener ), throwable ); 1216 1217 // stop this flow 1218 stop(); 1219 } 1220 1221 public boolean equals( Object object ) 1222 { 1223 if( object instanceof SafeCascadeListener ) 1224 return cascadeListener.equals( ( (SafeCascadeListener) object ).cascadeListener ); 1225 1226 return cascadeListener.equals( object ); 1227 } 1228 1229 public int hashCode() 1230 { 1231 return cascadeListener.hashCode(); 1232 } 1233 } 1234 }