001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner; 022 023import java.io.IOException; 024import java.io.Serializable; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.Date; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036 037import cascading.flow.Flow; 038import cascading.flow.FlowElement; 039import cascading.flow.FlowException; 040import cascading.flow.FlowNode; 041import cascading.flow.FlowProcess; 042import cascading.flow.FlowStep; 043import cascading.flow.FlowStepListener; 044import cascading.flow.planner.graph.AnnotatedGraph; 045import cascading.flow.planner.graph.ElementGraph; 046import cascading.flow.planner.graph.ElementGraphs; 047import cascading.flow.planner.process.FlowNodeGraph; 048import cascading.flow.stream.annotations.StreamMode; 049import cascading.management.CascadingServices; 050import cascading.management.state.ClientState; 051import cascading.operation.Operation; 052import cascading.pipe.Group; 053import cascading.pipe.Operator; 054import cascading.pipe.Pipe; 055import cascading.pipe.SubAssembly; 056import cascading.property.ConfigDef; 057import cascading.stats.FlowStepStats; 058import cascading.tap.Tap; 059import cascading.util.EnumMultiMap; 060import cascading.util.ProcessLogger; 061import cascading.util.Util; 062 063import static cascading.flow.planner.graph.ElementGraphs.findAllGroups; 064 065/** 066 * Class FlowStep is an internal representation of a given Job to be executed on a remote cluster. During 067 * planning, pipe assemblies are broken down into "steps" and encapsulated in this class. 068 * <p/> 069 * FlowSteps are submitted in order of dependency. If two or more steps do not share the same dependencies and all 070 * can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines the order in which 071 * all steps will be submitted for execution. The default submit priority is 5. 072 * <p/> 073 * This class is for internal use, there are no stable public methods. 074 */ 075public abstract class BaseFlowStep<Config> implements FlowStep<Config>, ProcessLogger, Serializable 076 { 077 /** Field flow */ 078 private transient Flow<Config> flow; 079 /** Field flowName */ 080 private String flowName; 081 /** Field flowID */ 082 private String flowID; 083 084 private transient Config flowStepConf; 085 086 /** Field submitPriority */ 087 private int submitPriority = 5; 088 089 /** Field name */ 090 String name; 091 private String id; 092 private int ordinal; 093 private Map<String, String> processAnnotations; 094 095 /** Field step listeners */ 096 private List<SafeFlowStepListener> listeners; 097 098 /** Field graph */ 099 private final ElementGraph elementGraph; 100 101 private FlowNodeGraph flowNodeGraph; 102 103 /** Field sources */ 104 protected final Map<Tap, Set<String>> sources = new HashMap<>(); // all sources 105 /** Field sink */ 106 protected final Map<Tap, Set<String>> sinks = new HashMap<>(); // all sinks 107 108 /** Field mapperTraps */ 109 private final Map<String, Tap> traps = new HashMap<>(); 110 111 /** Field tempSink */ 112 protected Tap tempSink; // used if we need to bypass the filesystem 113 114 /** Field groups */ 115 private final List<Group> groups = new ArrayList<Group>(); 116 117 protected transient FlowStepStats flowStepStats; 118 119 private transient FlowStepJob<Config> flowStepJob; 120 121 // for testing 122 protected BaseFlowStep( String name, int ordinal ) 123 { 124 this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number 125 setName( name ); 126 this.ordinal = ordinal; 127 128 this.elementGraph = null; 129 this.flowNodeGraph = null; 130 } 131 132 protected BaseFlowStep( ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph ) 133 { 134 this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number 135 this.elementGraph = elementStepGraph; 136 this.flowNodeGraph = flowNodeGraph; // TODO: verify no missing elements in the union of the node graphs 137 138 configure(); 139 } 140 141 protected void configure() 142 { 143 // todo: remove once FlowMapper/FlowReducer aren't reliant 144 ElementGraphs.addSources( this, elementGraph, flowNodeGraph.getSourceTaps() ); 145 ElementGraphs.addSinks( this, elementGraph, flowNodeGraph.getSinkTaps() ); 146 147 addGroups( findAllGroups( elementGraph ) ); 148 149 traps.putAll( flowNodeGraph.getTrapsMap() ); 150 } 151 152 @Override 153 public String getID() 154 { 155 return id; 156 } 157 158 public void setOrdinal( int ordinal ) 159 { 160 this.ordinal = ordinal; 161 } 162 163 @Override 164 public int getOrdinal() 165 { 166 return ordinal; 167 } 168 169 @Override 170 public String getName() 171 { 172 return name; 173 } 174 175 public void setName( String name ) 176 { 177 if( name == null || name.isEmpty() ) 178 throw new IllegalArgumentException( "step name may not be null or empty" ); 179 180 this.name = name; 181 } 182 183 @Override 184 public Map<String, String> getProcessAnnotations() 185 { 186 if( processAnnotations == null ) 187 return Collections.emptyMap(); 188 189 return Collections.unmodifiableMap( processAnnotations ); 190 } 191 192 @Override 193 public void addProcessAnnotation( Enum annotation ) 194 { 195 if( annotation == null ) 196 return; 197 198 addProcessAnnotation( annotation.getDeclaringClass().getName(), annotation.name() ); 199 } 200 201 @Override 202 public void addProcessAnnotation( String key, String value ) 203 { 204 if( processAnnotations == null ) 205 processAnnotations = new HashMap<>(); 206 207 processAnnotations.put( key, value ); 208 } 209 210 public void setFlow( Flow<Config> flow ) 211 { 212 this.flow = flow; 213 this.flowID = flow.getID(); 214 this.flowName = flow.getName(); 215 } 216 217 @Override 218 public Flow<Config> getFlow() 219 { 220 return flow; 221 } 222 223 @Override 224 public String getFlowID() 225 { 226 return flowID; 227 } 228 229 @Override 230 public String getFlowName() 231 { 232 return flowName; 233 } 234 235 protected void setFlowName( String flowName ) 236 { 237 this.flowName = flowName; 238 } 239 240 @Override 241 public Config getConfig() 242 { 243 return flowStepConf; 244 } 245 246 @Override 247 public Map<Object, Object> getConfigAsProperties() 248 { 249 return Collections.emptyMap(); 250 } 251 252 /** 253 * Set the initialized flowStepConf Config instance 254 * 255 * @param flowStepConf of type Config 256 */ 257 protected void setConfig( Config flowStepConf ) 258 { 259 this.flowStepConf = flowStepConf; 260 } 261 262 @Override 263 public String getStepDisplayName() 264 { 265 return getStepDisplayName( Util.ID_LENGTH ); 266 } 267 268 protected String getStepDisplayName( int idLength ) 269 { 270 if( idLength < 0 || idLength > Util.ID_LENGTH ) 271 idLength = Util.ID_LENGTH; 272 273 if( idLength == 0 ) 274 return String.format( "%s/%s", getFlowName(), getName() ); 275 276 String flowID = getFlowID().substring( 0, idLength ); 277 String stepID = getID().substring( 0, idLength ); 278 279 return String.format( "[%s/%s] %s/%s", flowID, stepID, getFlowName(), getName() ); 280 } 281 282 protected String getNodeDisplayName( FlowNode flowNode, int idLength ) 283 { 284 if( idLength > Util.ID_LENGTH ) 285 idLength = Util.ID_LENGTH; 286 287 String flowID = getFlowID().substring( 0, idLength ); 288 String stepID = getID().substring( 0, idLength ); 289 String nodeID = flowNode.getID().substring( 0, idLength ); 290 291 return String.format( "[%s/%s/%s] %s/%s", flowID, stepID, nodeID, getFlowName(), getName() ); 292 } 293 294 @Override 295 public int getSubmitPriority() 296 { 297 return submitPriority; 298 } 299 300 @Override 301 public void setSubmitPriority( int submitPriority ) 302 { 303 if( submitPriority < 1 || submitPriority > 10 ) 304 throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority ); 305 306 this.submitPriority = submitPriority; 307 } 308 309 @Override 310 public void setFlowStepStats( FlowStepStats flowStepStats ) 311 { 312 this.flowStepStats = flowStepStats; 313 } 314 315 @Override 316 public FlowStepStats getFlowStepStats() 317 { 318 return flowStepStats; 319 } 320 321 @Override 322 public ElementGraph getElementGraph() 323 { 324 return elementGraph; 325 } 326 327 protected EnumMultiMap getAnnotations() 328 { 329 return ( (AnnotatedGraph) elementGraph ).getAnnotations(); 330 } 331 332 @Override 333 public FlowNodeGraph getFlowNodeGraph() 334 { 335 return flowNodeGraph; 336 } 337 338 @Override 339 public int getNumFlowNodes() 340 { 341 return flowNodeGraph.vertexSet().size(); 342 } 343 344 public Set<FlowElement> getSourceElements() 345 { 346 return ElementGraphs.findSources( getElementGraph(), FlowElement.class ); 347 } 348 349 public Set<FlowElement> getSinkElements() 350 { 351 return ElementGraphs.findSinks( getElementGraph(), FlowElement.class ); 352 } 353 354 @Override 355 public Group getGroup() 356 { 357 if( groups.isEmpty() ) 358 return null; 359 360 if( groups.size() > 1 ) 361 throw new IllegalStateException( "more than one group" ); 362 363 return groups.get( 0 ); 364 } 365 366 @Override 367 public Collection<Group> getGroups() 368 { 369 return groups; 370 } 371 372 public void addGroups( Collection<Group> groups ) 373 { 374 for( Group group : groups ) 375 addGroup( group ); 376 } 377 378 public void addGroup( Group group ) 379 { 380 if( !groups.contains( group ) ) 381 groups.add( group ); 382 } 383 384 public Set<Tap> getAllAccumulatedSources() 385 { 386 return Util.narrowSet( Tap.class, getFlowNodeGraph().getFlowElementsFor( StreamMode.Accumulated ) ); 387 } 388 389 public void addSource( String name, Tap source ) 390 { 391 if( !sources.containsKey( source ) ) 392 sources.put( source, new HashSet<String>() ); 393 394 sources.get( source ).add( name ); 395 } 396 397 public void addSink( String name, Tap sink ) 398 { 399 if( !sinks.containsKey( sink ) ) 400 sinks.put( sink, new HashSet<String>() ); 401 402 sinks.get( sink ).add( name ); 403 } 404 405 @Override 406 public Set<Tap> getSourceTaps() 407 { 408 return Collections.unmodifiableSet( new HashSet<Tap>( sources.keySet() ) ); 409 } 410 411 @Override 412 public Set<Tap> getSinkTaps() 413 { 414 return Collections.unmodifiableSet( new HashSet<Tap>( sinks.keySet() ) ); 415 } 416 417 @Override 418 public Tap getSink() 419 { 420 if( sinks.size() != 1 ) 421 throw new IllegalStateException( "more than one sink" ); 422 423 return sinks.keySet().iterator().next(); 424 } 425 426 @Override 427 public Set<String> getSourceName( Tap source ) 428 { 429 return Collections.unmodifiableSet( sources.get( source ) ); 430 } 431 432 @Override 433 public Set<String> getSinkName( Tap sink ) 434 { 435 return Collections.unmodifiableSet( sinks.get( sink ) ); 436 } 437 438 @Override 439 public Tap getSourceWith( String identifier ) 440 { 441 for( Tap tap : sources.keySet() ) 442 { 443 if( tap.getIdentifier().equalsIgnoreCase( identifier ) ) 444 return tap; 445 } 446 447 return null; 448 } 449 450 @Override 451 public Tap getSinkWith( String identifier ) 452 { 453 for( Tap tap : sinks.keySet() ) 454 { 455 if( tap.getIdentifier().equalsIgnoreCase( identifier ) ) 456 return tap; 457 } 458 459 return null; 460 } 461 462 @Override 463 public Map<String, Tap> getTrapMap() 464 { 465 return traps; 466 } 467 468 @Override 469 public Set<Tap> getTraps() 470 { 471 return Collections.unmodifiableSet( new HashSet<Tap>( traps.values() ) ); 472 } 473 474 public Tap getTrap( String name ) 475 { 476 return getTrapMap().get( name ); 477 } 478 479 boolean allSourcesExist() throws IOException 480 { 481 for( Tap tap : sources.keySet() ) 482 { 483 if( !tap.resourceExists( getConfig() ) ) 484 return false; 485 } 486 487 return true; 488 } 489 490 boolean areSourcesNewer( long sinkModified ) throws IOException 491 { 492 Config config = getConfig(); 493 Iterator<Tap> values = sources.keySet().iterator(); 494 495 long sourceModified = 0; 496 497 try 498 { 499 sourceModified = Util.getSourceModified( config, values, sinkModified ); 500 501 if( sinkModified < sourceModified ) 502 return true; 503 504 return false; 505 } 506 finally 507 { 508 if( isInfoEnabled() ) 509 logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all 510 } 511 } 512 513 long getSinkModified() throws IOException 514 { 515 long sinkModified = Util.getSinkModified( getConfig(), sinks.keySet() ); 516 517 if( isInfoEnabled() ) 518 { 519 if( sinkModified == -1L ) 520 logInfo( "at least one sink is marked for delete" ); 521 if( sinkModified == 0L ) 522 logInfo( "at least one sink does not exist" ); 523 else 524 logInfo( "sink oldest modified date: " + new Date( sinkModified ) ); 525 } 526 527 return sinkModified; 528 } 529 530 protected Throwable prepareResources() 531 { 532 Throwable throwable = prepareResources( getSourceTaps(), false ); 533 534 if( throwable == null ) 535 throwable = prepareResources( getSinkTaps(), true ); 536 537 if( throwable == null ) 538 throwable = prepareResources( getTraps(), true ); 539 540 return throwable; 541 } 542 543 private Throwable prepareResources( Collection<Tap> taps, boolean forWrite ) 544 { 545 Throwable throwable = null; 546 547 for( Tap tap : taps ) 548 { 549 throwable = prepareResource( tap, forWrite ); 550 551 if( throwable != null ) 552 break; 553 } 554 555 return throwable; 556 } 557 558 private Throwable prepareResource( Tap tap, boolean forWrite ) 559 { 560 Throwable throwable = null; 561 562 try 563 { 564 boolean result; 565 566 if( forWrite ) 567 result = tap.prepareResourceForWrite( getConfig() ); 568 else 569 result = tap.prepareResourceForRead( getConfig() ); 570 571 if( !result ) 572 { 573 String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) ); 574 575 logError( message ); 576 577 throwable = new FlowException( message ); 578 } 579 } 580 catch( Throwable exception ) 581 { 582 String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) ); 583 584 logError( message, exception ); 585 586 throwable = new FlowException( message, exception ); 587 } 588 589 return throwable; 590 } 591 592 protected Throwable commitSinks() 593 { 594 Throwable throwable = null; 595 596 for( Tap tap : sinks.keySet() ) 597 { 598 if( throwable != null ) 599 rollbackResource( tap ); 600 else 601 throwable = commitResource( tap ); 602 } 603 604 return throwable; 605 } 606 607 private Throwable commitResource( Tap tap ) 608 { 609 Throwable throwable = null; 610 611 try 612 { 613 if( !tap.commitResource( getConfig() ) ) 614 { 615 String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() ); 616 617 logError( message ); 618 619 throwable = new FlowException( message ); 620 } 621 } 622 catch( Throwable exception ) 623 { 624 String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() ); 625 626 logError( message, exception ); 627 628 throwable = new FlowException( message, exception ); 629 } 630 631 return throwable; 632 } 633 634 private Throwable rollbackResource( Tap tap ) 635 { 636 Throwable throwable = null; 637 638 try 639 { 640 if( !tap.rollbackResource( getConfig() ) ) 641 { 642 String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() ); 643 644 logError( message ); 645 646 throwable = new FlowException( message ); 647 } 648 } 649 catch( Throwable exception ) 650 { 651 String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() ); 652 653 logError( message, exception ); 654 655 throwable = new FlowException( message, exception ); 656 } 657 658 return throwable; 659 } 660 661 protected Throwable rollbackSinks() 662 { 663 Throwable throwable = null; 664 665 for( Tap tap : sinks.keySet() ) 666 { 667 if( throwable != null ) 668 rollbackResource( tap ); 669 else 670 throwable = rollbackResource( tap ); 671 } 672 673 return throwable; 674 } 675 676 /** 677 * Public for testing. 678 * 679 * @param flowProcess 680 * @param parentConfig 681 * @return 682 */ 683 public abstract Config createInitializedConfig( FlowProcess<Config> flowProcess, Config parentConfig ); 684 685 /** 686 * Method getPreviousScopes returns the previous Scope instances. If the flowElement is a Group (specifically a CoGroup), 687 * there will be more than one instance. 688 * 689 * @param flowElement of type FlowElement 690 * @return Set<Scope> 691 */ 692 public Set<Scope> getPreviousScopes( FlowElement flowElement ) 693 { 694 return getElementGraph().incomingEdgesOf( flowElement ); 695 } 696 697 /** 698 * Method getNextScope returns the next Scope instance in the graph. There will always only be one next. 699 * 700 * @param flowElement of type FlowElement 701 * @return Scope 702 */ 703 public Scope getNextScope( FlowElement flowElement ) 704 { 705 Set<Scope> set = getElementGraph().outgoingEdgesOf( flowElement ); 706 707 if( set.size() != 1 ) 708 throw new IllegalStateException( "should only be one scope after current flow element: " + flowElement + " found: " + set.size() ); 709 710 return set.iterator().next(); 711 } 712 713 public FlowElement getNextFlowElement( Scope scope ) 714 { 715 return getElementGraph().getEdgeTarget( scope ); 716 } 717 718 public Collection<Operation> getAllOperations() 719 { 720 Set<FlowElement> vertices = getElementGraph().vertexSet(); 721 List<Operation> operations = new ArrayList<Operation>(); // operations impl equals, so two instance may be the same 722 723 for( FlowElement vertex : vertices ) 724 { 725 if( vertex instanceof Operator ) 726 operations.add( ( (Operator) vertex ).getOperation() ); 727 } 728 729 return operations; 730 } 731 732 @Override 733 public boolean containsPipeNamed( String pipeName ) 734 { 735 Set<FlowElement> vertices = getElementGraph().vertexSet(); 736 737 for( FlowElement vertex : vertices ) 738 { 739 if( vertex instanceof Pipe && ( (Pipe) vertex ).getName().equals( pipeName ) ) 740 return true; 741 } 742 743 return false; 744 } 745 746 public void clean() 747 { 748 // use step config by default 749 clean( getConfig() ); 750 } 751 752 public abstract void clean( Config config ); 753 754 List<SafeFlowStepListener> getListeners() 755 { 756 if( listeners == null ) 757 listeners = new LinkedList<SafeFlowStepListener>(); 758 759 return listeners; 760 } 761 762 @Override 763 public boolean hasListeners() 764 { 765 return listeners != null && !listeners.isEmpty(); 766 } 767 768 @Override 769 public void addListener( FlowStepListener flowStepListener ) 770 { 771 getListeners().add( new SafeFlowStepListener( flowStepListener ) ); 772 } 773 774 @Override 775 public boolean removeListener( FlowStepListener flowStepListener ) 776 { 777 return getListeners().remove( new SafeFlowStepListener( flowStepListener ) ); 778 } 779 780 protected void fireOnCompleted() 781 { 782 if( hasListeners() ) 783 { 784 if( isDebugEnabled() ) 785 logDebug( "firing onCompleted event: " + getListeners().size() ); 786 787 for( Object flowStepListener : getListeners() ) 788 ( (FlowStepListener) flowStepListener ).onStepCompleted( this ); 789 } 790 } 791 792 protected void fireOnThrowable( Throwable throwable ) 793 { 794 if( hasListeners() ) 795 { 796 if( isDebugEnabled() ) 797 logDebug( "firing onThrowable event: " + getListeners().size() ); 798 799 for( Object flowStepListener : getListeners() ) 800 ( (FlowStepListener) flowStepListener ).onStepThrowable( this, throwable ); 801 } 802 } 803 804 protected void fireOnStopping() 805 { 806 if( hasListeners() ) 807 { 808 if( isDebugEnabled() ) 809 logDebug( "firing onStopping event: " + getListeners() ); 810 811 for( Object flowStepListener : getListeners() ) 812 ( (FlowStepListener) flowStepListener ).onStepStopping( this ); 813 } 814 } 815 816 protected void fireOnStarting() 817 { 818 if( hasListeners() ) 819 { 820 if( isDebugEnabled() ) 821 logDebug( "firing onStarting event: " + getListeners().size() ); 822 823 for( Object flowStepListener : getListeners() ) 824 ( (FlowStepListener) flowStepListener ).onStepStarting( this ); 825 } 826 } 827 828 protected void fireOnRunning() 829 { 830 if( hasListeners() ) 831 { 832 if( isDebugEnabled() ) 833 logDebug( "firing onRunning event: " + getListeners().size() ); 834 835 for( Object flowStepListener : getListeners() ) 836 ( (FlowStepListener) flowStepListener ).onStepRunning( this ); 837 } 838 } 839 840 protected ClientState createClientState( FlowProcess flowProcess ) 841 { 842 CascadingServices services = flowProcess.getCurrentSession().getCascadingServices(); 843 844 return services.createClientState( getID() ); 845 } 846 847 public FlowStepJob<Config> getFlowStepJob() 848 { 849 return flowStepJob; 850 } 851 852 public FlowStepJob<Config> getCreateFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig ) 853 { 854 if( flowStepJob != null ) 855 return flowStepJob; 856 857 if( flowProcess == null ) 858 return null; 859 860 Config initializedConfig = createInitializedConfig( flowProcess, parentConfig ); 861 862 setConfig( initializedConfig ); 863 864 ClientState clientState = createClientState( flowProcess ); 865 866 flowStepJob = createFlowStepJob( clientState, flowProcess, initializedConfig ); 867 868 return flowStepJob; 869 } 870 871 protected abstract FlowStepJob createFlowStepJob( ClientState clientState, FlowProcess<Config> flowProcess, Config initializedStepConfig ); 872 873 protected void initConfFromNodeConfigDef( ElementGraph nodeElementGraph, ConfigDef.Setter setter ) 874 { 875 nodeElementGraph = ElementGraphs.asExtentMaskedSubGraph( nodeElementGraph ); 876 877 ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() ); 878 879 // applies each mode in order, topologically 880 for( ConfigDef.Mode mode : ConfigDef.Mode.values() ) 881 { 882 Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( nodeElementGraph ); 883 884 while( iterator.hasNext() ) 885 { 886 FlowElement element = iterator.next(); 887 888 while( element != null ) 889 { 890 // intentionally skip any element that spans downstream nodes, like a GroupBy 891 // this way GroupBy is applied on the inbound side (where partitioning happens) 892 // not the outbound side. 893 // parent sub-assemblies (like Unique) will be applied if they have leading Pipes to the current spanning Pipe 894 if( elementSpansDownStream( stepElementGraph, nodeElementGraph, element ) ) 895 { 896 element = null; 897 continue; 898 } 899 900 if( element.hasNodeConfigDef() ) 901 element.getNodeConfigDef().apply( mode, setter ); 902 903 // walk up the sub-assembly parent hierarchy 904 if( element instanceof Pipe ) 905 element = ( (Pipe) element ).getParent(); 906 else 907 element = null; 908 } 909 } 910 } 911 } 912 913 private boolean elementSpansDownStream( ElementGraph stepElementGraph, ElementGraph nodeElementGraph, FlowElement element ) 914 { 915 boolean spansNodes = !( element instanceof SubAssembly ); 916 917 if( spansNodes ) 918 spansNodes = nodeElementGraph.outDegreeOf( element ) == 0 && stepElementGraph.outDegreeOf( element ) > 0; 919 920 return spansNodes; 921 } 922 923 protected void initConfFromStepConfigDef( ConfigDef.Setter setter ) 924 { 925 ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() ); 926 927 // applies each mode in order, topologically 928 for( ConfigDef.Mode mode : ConfigDef.Mode.values() ) 929 { 930 Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( stepElementGraph ); 931 932 while( iterator.hasNext() ) 933 { 934 FlowElement element = iterator.next(); 935 936 while( element != null ) 937 { 938 if( element.hasStepConfigDef() ) 939 element.getStepConfigDef().apply( mode, setter ); 940 941 // walk up the sub-assembly parent hierarchy 942 if( element instanceof Pipe ) 943 element = ( (Pipe) element ).getParent(); 944 else 945 element = null; 946 } 947 } 948 } 949 } 950 951 @Override 952 public boolean equals( Object object ) 953 { 954 if( this == object ) 955 return true; 956 if( object == null || getClass() != object.getClass() ) 957 return false; 958 959 BaseFlowStep flowStep = (BaseFlowStep) object; 960 961 if( id != null ? !id.equals( flowStep.id ) : flowStep.id != null ) 962 return false; 963 964 return true; 965 } 966 967 @Override 968 public int hashCode() 969 { 970 return id != null ? id.hashCode() : 0; 971 } 972 973 @Override 974 public String toString() 975 { 976 StringBuffer buffer = new StringBuffer(); 977 978 buffer.append( getClass().getSimpleName() ); 979 buffer.append( "[name: " ).append( getName() ).append( "]" ); 980 981 return buffer.toString(); 982 } 983 984 @Override 985 public final boolean isInfoEnabled() 986 { 987 return getLogger().isInfoEnabled(); 988 } 989 990 private ProcessLogger getLogger() 991 { 992 if( flow != null && flow instanceof ProcessLogger ) 993 return (ProcessLogger) flow; 994 995 return ProcessLogger.NULL; 996 } 997 998 @Override 999 public final boolean isDebugEnabled() 1000 { 1001 return ( getLogger() ).isDebugEnabled(); 1002 } 1003 1004 @Override 1005 public void logDebug( String message, Object... arguments ) 1006 { 1007 getLogger().logDebug( message, arguments ); 1008 } 1009 1010 @Override 1011 public void logInfo( String message, Object... arguments ) 1012 { 1013 getLogger().logInfo( message, arguments ); 1014 } 1015 1016 @Override 1017 public void logWarn( String message ) 1018 { 1019 getLogger().logWarn( message ); 1020 } 1021 1022 @Override 1023 public void logWarn( String message, Throwable throwable ) 1024 { 1025 getLogger().logWarn( message, throwable ); 1026 } 1027 1028 @Override 1029 public void logWarn( String message, Object... arguments ) 1030 { 1031 getLogger().logWarn( message, arguments ); 1032 } 1033 1034 @Override 1035 public void logError( String message, Object... arguments ) 1036 { 1037 getLogger().logError( message, arguments ); 1038 } 1039 1040 @Override 1041 public void logError( String message, Throwable throwable ) 1042 { 1043 getLogger().logError( message, throwable ); 1044 } 1045 1046 /** 1047 * Class SafeFlowStepListener safely calls a wrapped FlowStepListener. 1048 * <p/> 1049 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 1050 * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method 1051 * which in turn is run in a new Thread. 1052 */ 1053 private class SafeFlowStepListener implements FlowStepListener 1054 { 1055 /** Field flowListener */ 1056 final FlowStepListener flowStepListener; 1057 /** Field throwable */ 1058 Throwable throwable; 1059 1060 private SafeFlowStepListener( FlowStepListener flowStepListener ) 1061 { 1062 this.flowStepListener = flowStepListener; 1063 } 1064 1065 public void onStepStarting( FlowStep flowStep ) 1066 { 1067 try 1068 { 1069 flowStepListener.onStepStarting( flowStep ); 1070 } 1071 catch( Throwable throwable ) 1072 { 1073 handleThrowable( throwable ); 1074 } 1075 } 1076 1077 public void onStepStopping( FlowStep flowStep ) 1078 { 1079 try 1080 { 1081 flowStepListener.onStepStopping( flowStep ); 1082 } 1083 catch( Throwable throwable ) 1084 { 1085 handleThrowable( throwable ); 1086 } 1087 } 1088 1089 public void onStepCompleted( FlowStep flowStep ) 1090 { 1091 try 1092 { 1093 flowStepListener.onStepCompleted( flowStep ); 1094 } 1095 catch( Throwable throwable ) 1096 { 1097 handleThrowable( throwable ); 1098 } 1099 } 1100 1101 public void onStepRunning( FlowStep flowStep ) 1102 { 1103 try 1104 { 1105 flowStepListener.onStepRunning( flowStep ); 1106 } 1107 catch( Throwable throwable ) 1108 { 1109 handleThrowable( throwable ); 1110 } 1111 } 1112 1113 public boolean onStepThrowable( FlowStep flowStep, Throwable flowStepThrowable ) 1114 { 1115 try 1116 { 1117 return flowStepListener.onStepThrowable( flowStep, flowStepThrowable ); 1118 } 1119 catch( Throwable throwable ) 1120 { 1121 handleThrowable( throwable ); 1122 } 1123 1124 return false; 1125 } 1126 1127 private void handleThrowable( Throwable throwable ) 1128 { 1129 this.throwable = throwable; 1130 1131 logWarn( String.format( "flow step listener %s threw throwable", flowStepListener ), throwable ); 1132 } 1133 1134 public boolean equals( Object object ) 1135 { 1136 if( object instanceof BaseFlowStep.SafeFlowStepListener ) 1137 return flowStepListener.equals( ( (BaseFlowStep.SafeFlowStepListener) object ).flowStepListener ); 1138 1139 return flowStepListener.equals( object ); 1140 } 1141 1142 public int hashCode() 1143 { 1144 return flowStepListener.hashCode(); 1145 } 1146 } 1147 }