001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner; 022 023import java.io.IOException; 024import java.io.Serializable; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.Date; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036 037import cascading.flow.Flow; 038import cascading.flow.FlowElement; 039import cascading.flow.FlowException; 040import cascading.flow.FlowNode; 041import cascading.flow.FlowProcess; 042import cascading.flow.FlowStep; 043import cascading.flow.FlowStepListener; 044import cascading.flow.planner.graph.AnnotatedGraph; 045import cascading.flow.planner.graph.ElementGraph; 046import cascading.flow.planner.graph.ElementGraphs; 047import cascading.flow.planner.process.FlowNodeGraph; 048import cascading.flow.stream.annotations.StreamMode; 049import cascading.management.CascadingServices; 050import cascading.management.state.ClientState; 051import cascading.operation.Operation; 052import cascading.pipe.Group; 053import cascading.pipe.Operator; 054import cascading.pipe.Pipe; 055import cascading.pipe.SubAssembly; 056import cascading.property.ConfigDef; 057import cascading.stats.FlowStepStats; 058import cascading.tap.Tap; 059import cascading.util.EnumMultiMap; 060import cascading.util.ProcessLogger; 061import cascading.util.Util; 062 063import static cascading.flow.planner.graph.ElementGraphs.findAllGroups; 064 065/** 066 * Class FlowStep is an internal representation of a given Job to be executed on a remote cluster. During 067 * planning, pipe assemblies are broken down into "steps" and encapsulated in this class. 068 * <p/> 069 * FlowSteps are submitted in order of dependency. If two or more steps do not share the same dependencies and all 070 * can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines the order in which 071 * all steps will be submitted for execution. The default submit priority is 5. 072 * <p/> 073 * This class is for internal use, there are no stable public methods. 074 */ 075public abstract class BaseFlowStep<Config> implements FlowStep<Config>, ProcessLogger, Serializable 076 { 077 /** Field flow */ 078 private transient Flow<Config> flow; 079 /** Field flowName */ 080 private String flowName; 081 /** Field flowID */ 082 private String flowID; 083 084 private transient Config flowStepConf; 085 086 /** Field submitPriority */ 087 private int submitPriority = 5; 088 089 /** Field name */ 090 String name; 091 private String id; 092 private int ordinal; 093 private Map<String, String> processAnnotations; 094 095 /** Field step listeners */ 096 private List<SafeFlowStepListener> listeners; 097 098 /** Field graph */ 099 private final ElementGraph elementGraph; 100 101 private FlowNodeGraph flowNodeGraph; 102 103 /** Field sources */ 104 protected final Map<Tap, Set<String>> sources = new HashMap<>(); // all sources 105 /** Field sink */ 106 protected final Map<Tap, Set<String>> sinks = new HashMap<>(); // all sinks 107 108 /** Field mapperTraps */ 109 private final Map<String, Tap> traps = new HashMap<>(); 110 111 /** Field tempSink */ 112 protected Tap tempSink; // used if we need to bypass the filesystem 113 114 /** Field groups */ 115 private final List<Group> groups = new ArrayList<Group>(); 116 117 protected transient FlowStepStats flowStepStats; 118 119 private transient FlowStepJob<Config> flowStepJob; 120 121 /** optional metadata about the FlowStep */ 122 private Map<String, String> flowStepDescriptor = Collections.emptyMap(); 123 124 protected BaseFlowStep( String name, int ordinal ) 125 { 126 this( name, ordinal, null ); 127 } 128 129 protected BaseFlowStep( String name, int ordinal, Map<String, String> flowStepDescriptor ) 130 { 131 this( name, ordinal, null, flowStepDescriptor ); 132 } 133 134 protected BaseFlowStep( String name, int ordinal, FlowNodeGraph flowNodeGraph, Map<String, String> flowStepDescriptor ) 135 { 136 this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number 137 setName( name ); 138 this.ordinal = ordinal; 139 140 this.elementGraph = null; 141 this.flowNodeGraph = flowNodeGraph; 142 143 setFlowStepDescriptor( flowStepDescriptor ); 144 } 145 146 protected BaseFlowStep( ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph ) 147 { 148 this( elementStepGraph, flowNodeGraph, null ); 149 } 150 151 protected BaseFlowStep( ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph, Map<String, String> flowStepDescriptor ) 152 { 153 this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number 154 this.elementGraph = elementStepGraph; 155 this.flowNodeGraph = flowNodeGraph; // TODO: verify no missing elements in the union of the node graphs 156 157 setFlowStepDescriptor( flowStepDescriptor ); 158 159 configure(); 160 } 161 162 protected void configure() 163 { 164 // todo: remove once FlowMapper/FlowReducer aren't reliant 165 addSources( this, elementGraph, flowNodeGraph.getSourceTaps() ); 166 addSinks( this, elementGraph, flowNodeGraph.getSinkTaps() ); 167 168 addAllGroups(); 169 170 traps.putAll( flowNodeGraph.getTrapsMap() ); 171 } 172 173 protected void addAllGroups() 174 { 175 addGroups( findAllGroups( elementGraph ) ); 176 } 177 178 @Override 179 public String getID() 180 { 181 return id; 182 } 183 184 public void setOrdinal( int ordinal ) 185 { 186 this.ordinal = ordinal; 187 } 188 189 @Override 190 public int getOrdinal() 191 { 192 return ordinal; 193 } 194 195 @Override 196 public String getName() 197 { 198 return name; 199 } 200 201 public void setName( String name ) 202 { 203 if( name == null || name.isEmpty() ) 204 throw new IllegalArgumentException( "step name may not be null or empty" ); 205 206 this.name = name; 207 } 208 209 @Override 210 public Map<String, String> getFlowStepDescriptor() 211 { 212 return Collections.unmodifiableMap( flowStepDescriptor ); 213 } 214 215 protected void setFlowStepDescriptor( Map<String, String> flowStepDescriptor ) 216 { 217 if( flowStepDescriptor != null ) 218 this.flowStepDescriptor = flowStepDescriptor; 219 } 220 221 @Override 222 public Map<String, String> getProcessAnnotations() 223 { 224 if( processAnnotations == null ) 225 return Collections.emptyMap(); 226 227 return Collections.unmodifiableMap( processAnnotations ); 228 } 229 230 @Override 231 public void addProcessAnnotation( Enum annotation ) 232 { 233 if( annotation == null ) 234 return; 235 236 addProcessAnnotation( annotation.getDeclaringClass().getName(), annotation.name() ); 237 } 238 239 @Override 240 public void addProcessAnnotation( String key, String value ) 241 { 242 if( processAnnotations == null ) 243 processAnnotations = new HashMap<>(); 244 245 processAnnotations.put( key, value ); 246 } 247 248 public void setFlow( Flow<Config> flow ) 249 { 250 this.flow = flow; 251 this.flowID = flow.getID(); 252 this.flowName = flow.getName(); 253 } 254 255 @Override 256 public Flow<Config> getFlow() 257 { 258 return flow; 259 } 260 261 @Override 262 public String getFlowID() 263 { 264 return flowID; 265 } 266 267 @Override 268 public String getFlowName() 269 { 270 return flowName; 271 } 272 273 protected void setFlowName( String flowName ) 274 { 275 this.flowName = flowName; 276 } 277 278 @Override 279 public Config getConfig() 280 { 281 return flowStepConf; 282 } 283 284 @Override 285 public Map<Object, Object> getConfigAsProperties() 286 { 287 return Collections.emptyMap(); 288 } 289 290 /** 291 * Set the initialized flowStepConf Config instance 292 * 293 * @param flowStepConf of type Config 294 */ 295 protected void setConfig( Config flowStepConf ) 296 { 297 this.flowStepConf = flowStepConf; 298 } 299 300 @Override 301 public String getStepDisplayName() 302 { 303 return getStepDisplayName( Util.ID_LENGTH ); 304 } 305 306 protected String getStepDisplayName( int idLength ) 307 { 308 if( idLength < 0 || idLength > Util.ID_LENGTH ) 309 idLength = Util.ID_LENGTH; 310 311 if( idLength == 0 ) 312 return String.format( "%s/%s", getFlowName(), getName() ); 313 314 String flowID = getFlowID().substring( 0, idLength ); 315 String stepID = getID().substring( 0, idLength ); 316 317 return String.format( "[%s/%s] %s/%s", flowID, stepID, getFlowName(), getName() ); 318 } 319 320 protected String getNodeDisplayName( FlowNode flowNode, int idLength ) 321 { 322 if( idLength > Util.ID_LENGTH ) 323 idLength = Util.ID_LENGTH; 324 325 String flowID = getFlowID().substring( 0, idLength ); 326 String stepID = getID().substring( 0, idLength ); 327 String nodeID = flowNode.getID().substring( 0, idLength ); 328 329 return String.format( "[%s/%s/%s] %s/%s", flowID, stepID, nodeID, getFlowName(), getName() ); 330 } 331 332 @Override 333 public int getSubmitPriority() 334 { 335 return submitPriority; 336 } 337 338 @Override 339 public void setSubmitPriority( int submitPriority ) 340 { 341 if( submitPriority < 1 || submitPriority > 10 ) 342 throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority ); 343 344 this.submitPriority = submitPriority; 345 } 346 347 @Override 348 public void setFlowStepStats( FlowStepStats flowStepStats ) 349 { 350 this.flowStepStats = flowStepStats; 351 } 352 353 @Override 354 public FlowStepStats getFlowStepStats() 355 { 356 return flowStepStats; 357 } 358 359 @Override 360 public ElementGraph getElementGraph() 361 { 362 return elementGraph; 363 } 364 365 protected EnumMultiMap getAnnotations() 366 { 367 return ( (AnnotatedGraph) elementGraph ).getAnnotations(); 368 } 369 370 @Override 371 public FlowNodeGraph getFlowNodeGraph() 372 { 373 return flowNodeGraph; 374 } 375 376 @Override 377 public int getNumFlowNodes() 378 { 379 return flowNodeGraph.vertexSet().size(); 380 } 381 382 public Set<FlowElement> getSourceElements() 383 { 384 return ElementGraphs.findSources( getElementGraph(), FlowElement.class ); 385 } 386 387 public Set<FlowElement> getSinkElements() 388 { 389 return ElementGraphs.findSinks( getElementGraph(), FlowElement.class ); 390 } 391 392 @Override 393 public Group getGroup() 394 { 395 if( groups.isEmpty() ) 396 return null; 397 398 if( groups.size() > 1 ) 399 throw new IllegalStateException( "more than one group" ); 400 401 return groups.get( 0 ); 402 } 403 404 @Override 405 public Collection<Group> getGroups() 406 { 407 return groups; 408 } 409 410 public void addGroups( Collection<Group> groups ) 411 { 412 for( Group group : groups ) 413 addGroup( group ); 414 } 415 416 public void addGroup( Group group ) 417 { 418 if( !groups.contains( group ) ) 419 groups.add( group ); 420 } 421 422 public Set<Tap> getAllAccumulatedSources() 423 { 424 return Util.narrowIdentitySet( Tap.class, getFlowNodeGraph().getFlowElementsFor( StreamMode.Accumulated ) ); 425 } 426 427 public void addSource( String name, Tap source ) 428 { 429 if( !sources.containsKey( source ) ) 430 sources.put( source, new HashSet<String>() ); 431 432 sources.get( source ).add( name ); 433 } 434 435 public void addSink( String name, Tap sink ) 436 { 437 if( !sinks.containsKey( sink ) ) 438 sinks.put( sink, new HashSet<String>() ); 439 440 sinks.get( sink ).add( name ); 441 } 442 443 @Override 444 public Set<Tap> getSourceTaps() 445 { 446 return Collections.unmodifiableSet( new HashSet<Tap>( sources.keySet() ) ); 447 } 448 449 @Override 450 public Set<Tap> getSinkTaps() 451 { 452 return Collections.unmodifiableSet( new HashSet<Tap>( sinks.keySet() ) ); 453 } 454 455 @Override 456 public Tap getSink() 457 { 458 if( sinks.size() == 0 ) 459 return null; 460 461 if( sinks.size() > 1 ) 462 throw new IllegalStateException( "more than one sink" ); 463 464 return sinks.keySet().iterator().next(); 465 } 466 467 @Override 468 public Set<String> getSourceName( Tap source ) 469 { 470 return Collections.unmodifiableSet( sources.get( source ) ); 471 } 472 473 @Override 474 public Set<String> getSinkName( Tap sink ) 475 { 476 return Collections.unmodifiableSet( sinks.get( sink ) ); 477 } 478 479 @Override 480 public Tap getSourceWith( String identifier ) 481 { 482 if( Util.isEmpty( identifier ) ) 483 return null; 484 485 for( Tap tap : sources.keySet() ) 486 { 487 if( identifier.equalsIgnoreCase( tap.getIdentifier() ) ) 488 return tap; 489 } 490 491 return null; 492 } 493 494 @Override 495 public Tap getSinkWith( String identifier ) 496 { 497 if( Util.isEmpty( identifier ) ) 498 return null; 499 500 for( Tap tap : sinks.keySet() ) 501 { 502 if( identifier.equalsIgnoreCase( tap.getIdentifier() ) ) 503 return tap; 504 } 505 506 return null; 507 } 508 509 @Override 510 public Map<String, Tap> getTrapMap() 511 { 512 return traps; 513 } 514 515 @Override 516 public Set<Tap> getTraps() 517 { 518 return Collections.unmodifiableSet( new HashSet<Tap>( traps.values() ) ); 519 } 520 521 public Tap getTrap( String name ) 522 { 523 return getTrapMap().get( name ); 524 } 525 526 boolean allSourcesExist() throws IOException 527 { 528 for( Tap tap : sources.keySet() ) 529 { 530 if( !tap.resourceExists( getConfig() ) ) 531 return false; 532 } 533 534 return true; 535 } 536 537 boolean areSourcesNewer( long sinkModified ) throws IOException 538 { 539 Config config = getConfig(); 540 Iterator<Tap> values = sources.keySet().iterator(); 541 542 long sourceModified = 0; 543 544 try 545 { 546 sourceModified = Util.getSourceModified( config, values, sinkModified ); 547 548 if( sinkModified < sourceModified ) 549 return true; 550 551 return false; 552 } 553 finally 554 { 555 if( isInfoEnabled() ) 556 logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all 557 } 558 } 559 560 long getSinkModified() throws IOException 561 { 562 long sinkModified = Util.getSinkModified( getConfig(), sinks.keySet() ); 563 564 if( isInfoEnabled() ) 565 { 566 if( sinkModified == -1L ) 567 logInfo( "at least one sink is marked for delete" ); 568 if( sinkModified == 0L ) 569 logInfo( "at least one sink does not exist" ); 570 else 571 logInfo( "sink oldest modified date: " + new Date( sinkModified ) ); 572 } 573 574 return sinkModified; 575 } 576 577 protected Throwable prepareResources() 578 { 579 Throwable throwable = prepareResources( getSourceTaps(), false ); 580 581 if( throwable == null ) 582 throwable = prepareResources( getSinkTaps(), true ); 583 584 if( throwable == null ) 585 throwable = prepareResources( getTraps(), true ); 586 587 return throwable; 588 } 589 590 private Throwable prepareResources( Collection<Tap> taps, boolean forWrite ) 591 { 592 Throwable throwable = null; 593 594 for( Tap tap : taps ) 595 { 596 throwable = prepareResource( tap, forWrite ); 597 598 if( throwable != null ) 599 break; 600 } 601 602 return throwable; 603 } 604 605 private Throwable prepareResource( Tap tap, boolean forWrite ) 606 { 607 Throwable throwable = null; 608 609 try 610 { 611 boolean result; 612 613 if( forWrite ) 614 result = tap.prepareResourceForWrite( getConfig() ); 615 else 616 result = tap.prepareResourceForRead( getConfig() ); 617 618 if( !result ) 619 { 620 String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) ); 621 622 logError( message ); 623 624 throwable = new FlowException( message ); 625 } 626 } 627 catch( Throwable exception ) 628 { 629 String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) ); 630 631 logError( message, exception ); 632 633 throwable = new FlowException( message, exception ); 634 } 635 636 return throwable; 637 } 638 639 protected Throwable commitSinks() 640 { 641 Throwable throwable = null; 642 643 for( Tap tap : sinks.keySet() ) 644 { 645 if( throwable != null ) 646 rollbackResource( tap ); 647 else 648 throwable = commitResource( tap ); 649 } 650 651 return throwable; 652 } 653 654 private Throwable commitResource( Tap tap ) 655 { 656 Throwable throwable = null; 657 658 try 659 { 660 if( !tap.commitResource( getConfig() ) ) 661 { 662 String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() ); 663 664 logError( message ); 665 666 throwable = new FlowException( message ); 667 } 668 } 669 catch( Throwable exception ) 670 { 671 String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() ); 672 673 logError( message, exception ); 674 675 throwable = new FlowException( message, exception ); 676 } 677 678 return throwable; 679 } 680 681 private Throwable rollbackResource( Tap tap ) 682 { 683 Throwable throwable = null; 684 685 try 686 { 687 if( !tap.rollbackResource( getConfig() ) ) 688 { 689 String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() ); 690 691 logError( message ); 692 693 throwable = new FlowException( message ); 694 } 695 } 696 catch( Throwable exception ) 697 { 698 String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() ); 699 700 logError( message, exception ); 701 702 throwable = new FlowException( message, exception ); 703 } 704 705 return throwable; 706 } 707 708 protected Throwable rollbackSinks() 709 { 710 Throwable throwable = null; 711 712 for( Tap tap : sinks.keySet() ) 713 { 714 if( throwable != null ) 715 rollbackResource( tap ); 716 else 717 throwable = rollbackResource( tap ); 718 } 719 720 return throwable; 721 } 722 723 /** 724 * Public for testing. 725 * 726 * @param flowProcess 727 * @param parentConfig 728 * @return 729 */ 730 public abstract Config createInitializedConfig( FlowProcess<Config> flowProcess, Config parentConfig ); 731 732 /** 733 * Method getPreviousScopes returns the previous Scope instances. If the flowElement is a Group (specifically a CoGroup), 734 * there will be more than one instance. 735 * 736 * @param flowElement of type FlowElement 737 * @return Set<Scope> 738 */ 739 public Set<Scope> getPreviousScopes( FlowElement flowElement ) 740 { 741 return getElementGraph().incomingEdgesOf( flowElement ); 742 } 743 744 /** 745 * Method getNextScope returns the next Scope instance in the graph. There will always only be one next. 746 * 747 * @param flowElement of type FlowElement 748 * @return Scope 749 */ 750 public Scope getNextScope( FlowElement flowElement ) 751 { 752 Set<Scope> set = getElementGraph().outgoingEdgesOf( flowElement ); 753 754 if( set.size() != 1 ) 755 throw new IllegalStateException( "should only be one scope after current flow element: " + flowElement + " found: " + set.size() ); 756 757 return set.iterator().next(); 758 } 759 760 public FlowElement getNextFlowElement( Scope scope ) 761 { 762 return getElementGraph().getEdgeTarget( scope ); 763 } 764 765 public Collection<Operation> getAllOperations() 766 { 767 Set<FlowElement> vertices = getElementGraph().vertexSet(); 768 List<Operation> operations = new ArrayList<Operation>(); // operations impl equals, so two instance may be the same 769 770 for( FlowElement vertex : vertices ) 771 { 772 if( vertex instanceof Operator ) 773 operations.add( ( (Operator) vertex ).getOperation() ); 774 } 775 776 return operations; 777 } 778 779 @Override 780 public boolean containsPipeNamed( String pipeName ) 781 { 782 Set<FlowElement> vertices = getElementGraph().vertexSet(); 783 784 for( FlowElement vertex : vertices ) 785 { 786 if( vertex instanceof Pipe && ( (Pipe) vertex ).getName().equals( pipeName ) ) 787 return true; 788 } 789 790 return false; 791 } 792 793 public void clean() 794 { 795 // use step config by default 796 clean( getConfig() ); 797 } 798 799 public abstract void clean( Config config ); 800 801 List<SafeFlowStepListener> getListeners() 802 { 803 if( listeners == null ) 804 listeners = new LinkedList<SafeFlowStepListener>(); 805 806 return listeners; 807 } 808 809 @Override 810 public boolean hasListeners() 811 { 812 return listeners != null && !listeners.isEmpty(); 813 } 814 815 @Override 816 public void addListener( FlowStepListener flowStepListener ) 817 { 818 getListeners().add( new SafeFlowStepListener( flowStepListener ) ); 819 } 820 821 @Override 822 public boolean removeListener( FlowStepListener flowStepListener ) 823 { 824 return getListeners().remove( new SafeFlowStepListener( flowStepListener ) ); 825 } 826 827 protected void fireOnCompleted() 828 { 829 if( hasListeners() ) 830 { 831 if( isDebugEnabled() ) 832 logDebug( "firing onCompleted event: " + getListeners().size() ); 833 834 for( Object flowStepListener : getListeners() ) 835 ( (FlowStepListener) flowStepListener ).onStepCompleted( this ); 836 } 837 } 838 839 protected void fireOnThrowable( Throwable throwable ) 840 { 841 if( hasListeners() ) 842 { 843 if( isDebugEnabled() ) 844 logDebug( "firing onThrowable event: " + getListeners().size() ); 845 846 for( Object flowStepListener : getListeners() ) 847 ( (FlowStepListener) flowStepListener ).onStepThrowable( this, throwable ); 848 } 849 } 850 851 protected void fireOnStopping() 852 { 853 if( hasListeners() ) 854 { 855 if( isDebugEnabled() ) 856 logDebug( "firing onStopping event: " + getListeners() ); 857 858 for( Object flowStepListener : getListeners() ) 859 ( (FlowStepListener) flowStepListener ).onStepStopping( this ); 860 } 861 } 862 863 protected void fireOnStarting() 864 { 865 if( hasListeners() ) 866 { 867 if( isDebugEnabled() ) 868 logDebug( "firing onStarting event: " + getListeners().size() ); 869 870 for( Object flowStepListener : getListeners() ) 871 ( (FlowStepListener) flowStepListener ).onStepStarting( this ); 872 } 873 } 874 875 protected void fireOnRunning() 876 { 877 if( hasListeners() ) 878 { 879 if( isDebugEnabled() ) 880 logDebug( "firing onRunning event: " + getListeners().size() ); 881 882 for( Object flowStepListener : getListeners() ) 883 ( (FlowStepListener) flowStepListener ).onStepRunning( this ); 884 } 885 } 886 887 protected ClientState createClientState( FlowProcess flowProcess ) 888 { 889 CascadingServices services = flowProcess.getCurrentSession().getCascadingServices(); 890 891 if( services == null ) 892 return ClientState.NULL; 893 894 return services.createClientState( getID() ); 895 } 896 897 public FlowStepJob<Config> getFlowStepJob() 898 { 899 return flowStepJob; 900 } 901 902 public FlowStepJob<Config> getCreateFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig ) 903 { 904 if( flowStepJob != null ) 905 return flowStepJob; 906 907 if( flowProcess == null ) 908 return null; 909 910 Config initializedConfig = createInitializedConfig( flowProcess, parentConfig ); 911 912 setConfig( initializedConfig ); 913 914 ClientState clientState = createClientState( flowProcess ); 915 916 flowStepJob = createFlowStepJob( clientState, flowProcess, initializedConfig ); 917 918 return flowStepJob; 919 } 920 921 protected abstract FlowStepJob createFlowStepJob( ClientState clientState, FlowProcess<Config> flowProcess, Config initializedStepConfig ); 922 923 protected void initConfFromNodeConfigDef( ElementGraph nodeElementGraph, ConfigDef.Setter setter ) 924 { 925 nodeElementGraph = ElementGraphs.asExtentMaskedSubGraph( nodeElementGraph ); 926 927 ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() ); 928 929 // applies each mode in order, topologically 930 for( ConfigDef.Mode mode : ConfigDef.Mode.values() ) 931 { 932 Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( nodeElementGraph ); 933 934 while( iterator.hasNext() ) 935 { 936 FlowElement element = iterator.next(); 937 938 while( element != null ) 939 { 940 // intentionally skip any element that spans downstream nodes, like a GroupBy 941 // this way GroupBy is applied on the inbound side (where partitioning happens) 942 // not the outbound side. 943 // parent sub-assemblies (like Unique) will be applied if they have leading Pipes to the current spanning Pipe 944 if( elementSpansDownStream( stepElementGraph, nodeElementGraph, element ) ) 945 { 946 element = null; 947 continue; 948 } 949 950 if( element instanceof ScopedElement && ( (ScopedElement) element ).hasNodeConfigDef() ) 951 ( (ScopedElement) element ).getNodeConfigDef().apply( mode, setter ); 952 953 // walk up the sub-assembly parent hierarchy 954 if( element instanceof Pipe ) 955 element = ( (Pipe) element ).getParent(); 956 else 957 element = null; 958 } 959 } 960 } 961 } 962 963 private boolean elementSpansDownStream( ElementGraph stepElementGraph, ElementGraph nodeElementGraph, FlowElement element ) 964 { 965 boolean spansNodes = !( element instanceof SubAssembly ); 966 967 if( spansNodes ) 968 spansNodes = nodeElementGraph.outDegreeOf( element ) == 0 && stepElementGraph.outDegreeOf( element ) > 0; 969 970 return spansNodes; 971 } 972 973 protected void initConfFromStepConfigDef( ConfigDef.Setter setter ) 974 { 975 ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() ); 976 977 // applies each mode in order, topologically 978 for( ConfigDef.Mode mode : ConfigDef.Mode.values() ) 979 { 980 Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( stepElementGraph ); 981 982 while( iterator.hasNext() ) 983 { 984 FlowElement element = iterator.next(); 985 986 while( element != null ) 987 { 988 if( element instanceof ScopedElement && ( (ScopedElement) element ).hasStepConfigDef() ) 989 ( (ScopedElement) element ).getStepConfigDef().apply( mode, setter ); 990 991 // walk up the sub-assembly parent hierarchy 992 if( element instanceof Pipe ) 993 element = ( (Pipe) element ).getParent(); 994 else 995 element = null; 996 } 997 } 998 } 999 } 1000 1001 protected static void addSources( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sources ) 1002 { 1003 for( Tap tap : sources ) 1004 { 1005 for( Scope scope : elementGraph.outgoingEdgesOf( tap ) ) 1006 flowStep.addSource( scope.getName(), tap ); 1007 } 1008 } 1009 1010 protected static void addSinks( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sinks ) 1011 { 1012 for( Tap tap : sinks ) 1013 { 1014 for( Scope scope : elementGraph.incomingEdgesOf( tap ) ) 1015 flowStep.addSink( scope.getName(), tap ); 1016 } 1017 } 1018 1019 @Override 1020 public boolean equals( Object object ) 1021 { 1022 if( this == object ) 1023 return true; 1024 if( object == null || getClass() != object.getClass() ) 1025 return false; 1026 1027 BaseFlowStep flowStep = (BaseFlowStep) object; 1028 1029 if( id != null ? !id.equals( flowStep.id ) : flowStep.id != null ) 1030 return false; 1031 1032 return true; 1033 } 1034 1035 @Override 1036 public int hashCode() 1037 { 1038 return id != null ? id.hashCode() : 0; 1039 } 1040 1041 @Override 1042 public String toString() 1043 { 1044 StringBuffer buffer = new StringBuffer(); 1045 1046 buffer.append( getClass().getSimpleName() ); 1047 buffer.append( "[name: " ).append( getName() ).append( "]" ); 1048 1049 return buffer.toString(); 1050 } 1051 1052 @Override 1053 public final boolean isInfoEnabled() 1054 { 1055 return getLogger().isInfoEnabled(); 1056 } 1057 1058 private ProcessLogger getLogger() 1059 { 1060 if( flow != null && flow instanceof ProcessLogger ) 1061 return (ProcessLogger) flow; 1062 1063 return ProcessLogger.NULL; 1064 } 1065 1066 @Override 1067 public final boolean isDebugEnabled() 1068 { 1069 return ( getLogger() ).isDebugEnabled(); 1070 } 1071 1072 @Override 1073 public void logDebug( String message, Object... arguments ) 1074 { 1075 getLogger().logDebug( message, arguments ); 1076 } 1077 1078 @Override 1079 public void logInfo( String message, Object... arguments ) 1080 { 1081 getLogger().logInfo( message, arguments ); 1082 } 1083 1084 @Override 1085 public void logWarn( String message ) 1086 { 1087 getLogger().logWarn( message ); 1088 } 1089 1090 @Override 1091 public void logWarn( String message, Throwable throwable ) 1092 { 1093 getLogger().logWarn( message, throwable ); 1094 } 1095 1096 @Override 1097 public void logWarn( String message, Object... arguments ) 1098 { 1099 getLogger().logWarn( message, arguments ); 1100 } 1101 1102 @Override 1103 public void logError( String message, Object... arguments ) 1104 { 1105 getLogger().logError( message, arguments ); 1106 } 1107 1108 @Override 1109 public void logError( String message, Throwable throwable ) 1110 { 1111 getLogger().logError( message, throwable ); 1112 } 1113 1114 /** 1115 * Class SafeFlowStepListener safely calls a wrapped FlowStepListener. 1116 * <p/> 1117 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 1118 * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method 1119 * which in turn is run in a new Thread. 1120 */ 1121 private class SafeFlowStepListener implements FlowStepListener 1122 { 1123 /** Field flowListener */ 1124 final FlowStepListener flowStepListener; 1125 /** Field throwable */ 1126 Throwable throwable; 1127 1128 private SafeFlowStepListener( FlowStepListener flowStepListener ) 1129 { 1130 this.flowStepListener = flowStepListener; 1131 } 1132 1133 public void onStepStarting( FlowStep flowStep ) 1134 { 1135 try 1136 { 1137 flowStepListener.onStepStarting( flowStep ); 1138 } 1139 catch( Throwable throwable ) 1140 { 1141 handleThrowable( throwable ); 1142 } 1143 } 1144 1145 public void onStepStopping( FlowStep flowStep ) 1146 { 1147 try 1148 { 1149 flowStepListener.onStepStopping( flowStep ); 1150 } 1151 catch( Throwable throwable ) 1152 { 1153 handleThrowable( throwable ); 1154 } 1155 } 1156 1157 public void onStepCompleted( FlowStep flowStep ) 1158 { 1159 try 1160 { 1161 flowStepListener.onStepCompleted( flowStep ); 1162 } 1163 catch( Throwable throwable ) 1164 { 1165 handleThrowable( throwable ); 1166 } 1167 } 1168 1169 public void onStepRunning( FlowStep flowStep ) 1170 { 1171 try 1172 { 1173 flowStepListener.onStepRunning( flowStep ); 1174 } 1175 catch( Throwable throwable ) 1176 { 1177 handleThrowable( throwable ); 1178 } 1179 } 1180 1181 public boolean onStepThrowable( FlowStep flowStep, Throwable flowStepThrowable ) 1182 { 1183 try 1184 { 1185 return flowStepListener.onStepThrowable( flowStep, flowStepThrowable ); 1186 } 1187 catch( Throwable throwable ) 1188 { 1189 handleThrowable( throwable ); 1190 } 1191 1192 return false; 1193 } 1194 1195 private void handleThrowable( Throwable throwable ) 1196 { 1197 this.throwable = throwable; 1198 1199 logWarn( String.format( "flow step listener %s threw throwable", flowStepListener ), throwable ); 1200 } 1201 1202 public boolean equals( Object object ) 1203 { 1204 if( object instanceof BaseFlowStep.SafeFlowStepListener ) 1205 return flowStepListener.equals( ( (BaseFlowStep.SafeFlowStepListener) object ).flowStepListener ); 1206 1207 return flowStepListener.equals( object ); 1208 } 1209 1210 public int hashCode() 1211 { 1212 return flowStepListener.hashCode(); 1213 } 1214 } 1215 }