001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner; 023 024import java.io.IOException; 025import java.io.Serializable; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Date; 030import java.util.HashMap; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.LinkedList; 034import java.util.List; 035import java.util.Map; 036import java.util.Set; 037 038import cascading.flow.Flow; 039import cascading.flow.FlowElement; 040import cascading.flow.FlowException; 041import cascading.flow.FlowNode; 042import cascading.flow.FlowProcess; 043import cascading.flow.FlowStep; 044import cascading.flow.FlowStepListener; 045import cascading.flow.planner.graph.AnnotatedGraph; 046import cascading.flow.planner.graph.ElementGraph; 047import cascading.flow.planner.graph.ElementGraphs; 048import cascading.flow.planner.process.FlowNodeGraph; 049import cascading.flow.stream.annotations.StreamMode; 050import cascading.management.CascadingServices; 051import cascading.management.state.ClientState; 052import cascading.operation.Operation; 053import cascading.pipe.Group; 054import cascading.pipe.Operator; 055import cascading.pipe.Pipe; 056import cascading.pipe.SubAssembly; 057import cascading.property.ConfigDef; 058import cascading.stats.FlowStepStats; 059import cascading.tap.Tap; 060import cascading.util.EnumMultiMap; 061import cascading.util.ProcessLogger; 062import cascading.util.Util; 063 064import static cascading.flow.planner.graph.ElementGraphs.findAllGroups; 065 066/** 067 * Class FlowStep is an internal representation of a given Job to be executed on a remote cluster. During 068 * planning, pipe assemblies are broken down into "steps" and encapsulated in this class. 069 * <p/> 070 * FlowSteps are submitted in order of dependency. If two or more steps do not share the same dependencies and all 071 * can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines the order in which 072 * all steps will be submitted for execution. The default submit priority is 5. 073 * <p/> 074 * This class is for internal use, there are no stable public methods. 075 */ 076public abstract class BaseFlowStep<Config> implements FlowStep<Config>, ProcessLogger, Serializable 077 { 078 /** Field flow */ 079 private transient Flow<Config> flow; 080 /** Field flowName */ 081 private String flowName; 082 /** Field flowID */ 083 private String flowID; 084 085 private transient Config flowStepConf; 086 087 /** Field submitPriority */ 088 private int submitPriority = 5; 089 090 /** Field name */ 091 String name; 092 private String id; 093 private int ordinal; 094 private Map<String, String> processAnnotations; 095 096 /** Field step listeners */ 097 private List<SafeFlowStepListener> listeners; 098 099 /** Field elementGraph */ 100 protected ElementGraph elementGraph; 101 /** Field flowNodeGraph */ 102 protected FlowNodeGraph flowNodeGraph; 103 104 /** Field sources */ 105 protected final Map<Tap, Set<String>> sources = new HashMap<>(); // all sources 106 /** Field sink */ 107 protected final Map<Tap, Set<String>> sinks = new HashMap<>(); // all sinks 108 /** Field traps */ 109 private final Map<String, Tap> traps = new HashMap<>(); 110 111 /** Field tempSink */ 112 protected Tap tempSink; // used if we need to bypass the filesystem 113 114 /** Field groups */ 115 private final List<Group> groups = new ArrayList<Group>(); 116 117 protected transient FlowStepStats flowStepStats; 118 119 private transient FlowStepJob<Config> flowStepJob; 120 121 /** optional metadata about the FlowStep */ 122 private Map<String, String> flowStepDescriptor = Collections.emptyMap(); 123 124 protected BaseFlowStep( String name, int ordinal ) 125 { 126 this( name, ordinal, null ); 127 } 128 129 protected BaseFlowStep( String name, int ordinal, Map<String, String> flowStepDescriptor ) 130 { 131 this( name, ordinal, null, flowStepDescriptor ); 132 } 133 134 protected BaseFlowStep( String name, int ordinal, FlowNodeGraph flowNodeGraph, Map<String, String> flowStepDescriptor ) 135 { 136 this(); 137 setName( name ); 138 this.ordinal = ordinal; 139 140 this.elementGraph = null; 141 this.flowNodeGraph = flowNodeGraph; 142 143 setFlowStepDescriptor( flowStepDescriptor ); 144 } 145 146 protected BaseFlowStep( ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph ) 147 { 148 this( elementStepGraph, flowNodeGraph, null ); 149 } 150 151 protected BaseFlowStep( ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph, Map<String, String> flowStepDescriptor ) 152 { 153 this(); 154 this.elementGraph = elementStepGraph; 155 this.flowNodeGraph = flowNodeGraph; // TODO: verify no missing elements in the union of the node graphs 156 157 setFlowStepDescriptor( flowStepDescriptor ); 158 159 configure(); 160 } 161 162 protected BaseFlowStep() 163 { 164 this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number 165 } 166 167 protected void configure() 168 { 169 addSources( this, getElementGraph(), getFlowNodeGraph().getSourceTaps() ); 170 addSinks( this, getElementGraph(), getFlowNodeGraph().getSinkTaps() ); 171 172 addAllGroups(); 173 174 traps.putAll( getFlowNodeGraph().getTrapsMap() ); 175 } 176 177 protected void addAllGroups() 178 { 179 addGroups( findAllGroups( getElementGraph() ) ); 180 } 181 182 @Override 183 public String getID() 184 { 185 return id; 186 } 187 188 public void setOrdinal( int ordinal ) 189 { 190 this.ordinal = ordinal; 191 } 192 193 @Override 194 public int getOrdinal() 195 { 196 return ordinal; 197 } 198 199 @Override 200 public String getName() 201 { 202 return name; 203 } 204 205 public void setName( String name ) 206 { 207 if( name == null || name.isEmpty() ) 208 throw new IllegalArgumentException( "step name may not be null or empty" ); 209 210 this.name = name; 211 } 212 213 @Override 214 public Map<String, String> getFlowStepDescriptor() 215 { 216 return Collections.unmodifiableMap( flowStepDescriptor ); 217 } 218 219 protected void setFlowStepDescriptor( Map<String, String> flowStepDescriptor ) 220 { 221 if( flowStepDescriptor != null ) 222 this.flowStepDescriptor = flowStepDescriptor; 223 } 224 225 @Override 226 public Map<String, String> getProcessAnnotations() 227 { 228 if( processAnnotations == null ) 229 return Collections.emptyMap(); 230 231 return Collections.unmodifiableMap( processAnnotations ); 232 } 233 234 @Override 235 public void addProcessAnnotation( Enum annotation ) 236 { 237 if( annotation == null ) 238 return; 239 240 addProcessAnnotation( annotation.getDeclaringClass().getName(), annotation.name() ); 241 } 242 243 @Override 244 public void addProcessAnnotation( String key, String value ) 245 { 246 if( processAnnotations == null ) 247 processAnnotations = new HashMap<>(); 248 249 processAnnotations.put( key, value ); 250 } 251 252 public void setFlow( Flow<Config> flow ) 253 { 254 this.flow = flow; 255 this.flowID = flow.getID(); 256 this.flowName = flow.getName(); 257 } 258 259 @Override 260 public Flow<Config> getFlow() 261 { 262 return flow; 263 } 264 265 @Override 266 public String getFlowID() 267 { 268 return flowID; 269 } 270 271 @Override 272 public String getFlowName() 273 { 274 return flowName; 275 } 276 277 protected void setFlowName( String flowName ) 278 { 279 this.flowName = flowName; 280 } 281 282 @Override 283 public Config getConfig() 284 { 285 return flowStepConf; 286 } 287 288 @Override 289 public Map<Object, Object> getConfigAsProperties() 290 { 291 return Collections.emptyMap(); 292 } 293 294 /** 295 * Set the initialized flowStepConf Config instance 296 * 297 * @param flowStepConf of type Config 298 */ 299 protected void setConfig( Config flowStepConf ) 300 { 301 this.flowStepConf = flowStepConf; 302 } 303 304 @Override 305 public String getStepDisplayName() 306 { 307 return getStepDisplayName( Util.ID_LENGTH ); 308 } 309 310 protected String getStepDisplayName( int idLength ) 311 { 312 if( idLength < 0 || idLength > Util.ID_LENGTH ) 313 idLength = Util.ID_LENGTH; 314 315 if( idLength == 0 ) 316 return String.format( "%s/%s", getFlowName(), getName() ); 317 318 String flowID = getFlowID().substring( 0, idLength ); 319 String stepID = getID().substring( 0, idLength ); 320 321 return String.format( "[%s/%s] %s/%s", flowID, stepID, getFlowName(), getName() ); 322 } 323 324 protected String getNodeDisplayName( FlowNode flowNode, int idLength ) 325 { 326 if( idLength > Util.ID_LENGTH ) 327 idLength = Util.ID_LENGTH; 328 329 String flowID = getFlowID().substring( 0, idLength ); 330 String stepID = getID().substring( 0, idLength ); 331 String nodeID = flowNode.getID().substring( 0, idLength ); 332 333 return String.format( "[%s/%s/%s] %s/%s", flowID, stepID, nodeID, getFlowName(), getName() ); 334 } 335 336 @Override 337 public int getSubmitPriority() 338 { 339 return submitPriority; 340 } 341 342 @Override 343 public void setSubmitPriority( int submitPriority ) 344 { 345 if( submitPriority < 1 || submitPriority > 10 ) 346 throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority ); 347 348 this.submitPriority = submitPriority; 349 } 350 351 @Override 352 public void setFlowStepStats( FlowStepStats flowStepStats ) 353 { 354 this.flowStepStats = flowStepStats; 355 } 356 357 @Override 358 public FlowStepStats getFlowStepStats() 359 { 360 return flowStepStats; 361 } 362 363 @Override 364 public ElementGraph getElementGraph() 365 { 366 return elementGraph; 367 } 368 369 protected EnumMultiMap getAnnotations() 370 { 371 return ( (AnnotatedGraph) elementGraph ).getAnnotations(); 372 } 373 374 @Override 375 public FlowNodeGraph getFlowNodeGraph() 376 { 377 return flowNodeGraph; 378 } 379 380 @Override 381 public int getNumFlowNodes() 382 { 383 return flowNodeGraph.vertexSet().size(); 384 } 385 386 public Set<FlowElement> getSourceElements() 387 { 388 return ElementGraphs.findSources( getElementGraph(), FlowElement.class ); 389 } 390 391 public Set<FlowElement> getSinkElements() 392 { 393 return ElementGraphs.findSinks( getElementGraph(), FlowElement.class ); 394 } 395 396 @Override 397 public Group getGroup() 398 { 399 if( groups.isEmpty() ) 400 return null; 401 402 if( groups.size() > 1 ) 403 throw new IllegalStateException( "more than one group" ); 404 405 return groups.get( 0 ); 406 } 407 408 @Override 409 public Collection<Group> getGroups() 410 { 411 return groups; 412 } 413 414 public void addGroups( Collection<Group> groups ) 415 { 416 for( Group group : groups ) 417 addGroup( group ); 418 } 419 420 public void addGroup( Group group ) 421 { 422 if( !groups.contains( group ) ) 423 groups.add( group ); 424 } 425 426 /** 427 * Returns all source Tap instances annotated by the planner as being {@link StreamMode#Accumulated}. 428 * 429 * @return Set of accumulated Tap instances 430 */ 431 public Set<Tap> getAllAccumulatedSources() 432 { 433 return Util.narrowIdentitySet( Tap.class, getFlowNodeGraph().getFlowElementsFor( StreamMode.Accumulated ) ); 434 } 435 436 /** 437 * Returns all source Tap instances annotated by the planner as being {@link StreamMode#Streamed}. 438 * 439 * @return Set of streamed Tap instances 440 */ 441 public Set<Tap> getAllStreamedSources() 442 { 443 return Util.narrowIdentitySet( Tap.class, getFlowNodeGraph().getFlowElementsFor( StreamMode.Streamed ) ); 444 } 445 446 public void addSource( String name, Tap source ) 447 { 448 if( !sources.containsKey( source ) ) 449 sources.put( source, new HashSet<String>() ); 450 451 sources.get( source ).add( name ); 452 } 453 454 public void addSink( String name, Tap sink ) 455 { 456 if( !sinks.containsKey( sink ) ) 457 sinks.put( sink, new HashSet<String>() ); 458 459 sinks.get( sink ).add( name ); 460 } 461 462 @Override 463 public Set<Tap> getSourceTaps() 464 { 465 return Collections.unmodifiableSet( new HashSet<Tap>( sources.keySet() ) ); 466 } 467 468 @Override 469 public Set<Tap> getSinkTaps() 470 { 471 return Collections.unmodifiableSet( new HashSet<Tap>( sinks.keySet() ) ); 472 } 473 474 @Override 475 public Tap getSink() 476 { 477 if( sinks.size() == 0 ) 478 return null; 479 480 if( sinks.size() > 1 ) 481 throw new IllegalStateException( "more than one sink" ); 482 483 return sinks.keySet().iterator().next(); 484 } 485 486 @Override 487 public Set<String> getSourceName( Tap source ) 488 { 489 return Collections.unmodifiableSet( sources.get( source ) ); 490 } 491 492 @Override 493 public Set<String> getSinkName( Tap sink ) 494 { 495 return Collections.unmodifiableSet( sinks.get( sink ) ); 496 } 497 498 @Override 499 public Tap getSourceWith( String identifier ) 500 { 501 if( Util.isEmpty( identifier ) ) 502 return null; 503 504 for( Tap tap : sources.keySet() ) 505 { 506 if( identifier.equalsIgnoreCase( tap.getIdentifier() ) ) 507 return tap; 508 } 509 510 return null; 511 } 512 513 @Override 514 public Tap getSinkWith( String identifier ) 515 { 516 if( Util.isEmpty( identifier ) ) 517 return null; 518 519 for( Tap tap : sinks.keySet() ) 520 { 521 if( identifier.equalsIgnoreCase( tap.getIdentifier() ) ) 522 return tap; 523 } 524 525 return null; 526 } 527 528 @Override 529 public Map<String, Tap> getTrapMap() 530 { 531 return traps; 532 } 533 534 @Override 535 public Set<Tap> getTraps() 536 { 537 return Collections.unmodifiableSet( new HashSet<Tap>( traps.values() ) ); 538 } 539 540 public Tap getTrap( String name ) 541 { 542 return getTrapMap().get( name ); 543 } 544 545 boolean allSourcesExist() throws IOException 546 { 547 for( Tap tap : sources.keySet() ) 548 { 549 if( !tap.resourceExists( getConfig() ) ) 550 return false; 551 } 552 553 return true; 554 } 555 556 boolean areSourcesNewer( long sinkModified ) throws IOException 557 { 558 Config config = getConfig(); 559 Iterator<Tap> values = sources.keySet().iterator(); 560 561 long sourceModified = 0; 562 563 try 564 { 565 sourceModified = Util.getSourceModified( config, values, sinkModified ); 566 567 if( sinkModified < sourceModified ) 568 return true; 569 570 return false; 571 } 572 finally 573 { 574 if( isInfoEnabled() ) 575 logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all 576 } 577 } 578 579 long getSinkModified() throws IOException 580 { 581 long sinkModified = Util.getSinkModified( getConfig(), sinks.keySet() ); 582 583 if( isInfoEnabled() ) 584 { 585 if( sinkModified == -1L ) 586 logInfo( "at least one sink is marked for delete" ); 587 if( sinkModified == 0L ) 588 logInfo( "at least one sink does not exist" ); 589 else 590 logInfo( "sink oldest modified date: " + new Date( sinkModified ) ); 591 } 592 593 return sinkModified; 594 } 595 596 protected Throwable prepareResources() 597 { 598 Throwable throwable = prepareResources( getSourceTaps(), false ); 599 600 if( throwable == null ) 601 throwable = prepareResources( getSinkTaps(), true ); 602 603 if( throwable == null ) 604 throwable = prepareResources( getTraps(), true ); 605 606 return throwable; 607 } 608 609 private Throwable prepareResources( Collection<Tap> taps, boolean forWrite ) 610 { 611 Throwable throwable = null; 612 613 for( Tap tap : taps ) 614 { 615 throwable = prepareResource( tap, forWrite ); 616 617 if( throwable != null ) 618 break; 619 } 620 621 return throwable; 622 } 623 624 private Throwable prepareResource( Tap tap, boolean forWrite ) 625 { 626 Throwable throwable = null; 627 628 try 629 { 630 boolean result; 631 632 if( forWrite ) 633 result = tap.prepareResourceForWrite( getConfig() ); 634 else 635 result = tap.prepareResourceForRead( getConfig() ); 636 637 if( !result ) 638 { 639 String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) ); 640 641 logError( message ); 642 643 throwable = new FlowException( message ); 644 } 645 } 646 catch( Throwable exception ) 647 { 648 String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) ); 649 650 logError( message, exception ); 651 652 throwable = new FlowException( message, exception ); 653 } 654 655 return throwable; 656 } 657 658 protected Throwable commitSinks() 659 { 660 Throwable throwable = null; 661 662 for( Tap tap : sinks.keySet() ) 663 { 664 if( throwable != null ) 665 rollbackResource( tap ); 666 else 667 throwable = commitResource( tap ); 668 } 669 670 return throwable; 671 } 672 673 private Throwable commitResource( Tap tap ) 674 { 675 Throwable throwable = null; 676 677 try 678 { 679 if( !tap.commitResource( getConfig() ) ) 680 { 681 String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() ); 682 683 logError( message ); 684 685 throwable = new FlowException( message ); 686 } 687 } 688 catch( Throwable exception ) 689 { 690 String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() ); 691 692 logError( message, exception ); 693 694 throwable = new FlowException( message, exception ); 695 } 696 697 return throwable; 698 } 699 700 private Throwable rollbackResource( Tap tap ) 701 { 702 Throwable throwable = null; 703 704 try 705 { 706 if( !tap.rollbackResource( getConfig() ) ) 707 { 708 String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() ); 709 710 logError( message ); 711 712 throwable = new FlowException( message ); 713 } 714 } 715 catch( Throwable exception ) 716 { 717 String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() ); 718 719 logError( message, exception ); 720 721 throwable = new FlowException( message, exception ); 722 } 723 724 return throwable; 725 } 726 727 protected Throwable rollbackSinks() 728 { 729 Throwable throwable = null; 730 731 for( Tap tap : sinks.keySet() ) 732 { 733 if( throwable != null ) 734 rollbackResource( tap ); 735 else 736 throwable = rollbackResource( tap ); 737 } 738 739 return throwable; 740 } 741 742 /** 743 * Public for testing. 744 * 745 * @param flowProcess 746 * @param parentConfig 747 * @return 748 */ 749 public abstract Config createInitializedConfig( FlowProcess<Config> flowProcess, Config parentConfig ); 750 751 /** 752 * Method getPreviousScopes returns the previous Scope instances. If the flowElement is a Group (specifically a CoGroup), 753 * there will be more than one instance. 754 * 755 * @param flowElement of type FlowElement 756 * @return Set<Scope> 757 */ 758 public Set<Scope> getPreviousScopes( FlowElement flowElement ) 759 { 760 return getElementGraph().incomingEdgesOf( flowElement ); 761 } 762 763 /** 764 * Method getNextScope returns the next Scope instance in the graph. There will always only be one next. 765 * 766 * @param flowElement of type FlowElement 767 * @return Scope 768 */ 769 public Scope getNextScope( FlowElement flowElement ) 770 { 771 Set<Scope> set = getElementGraph().outgoingEdgesOf( flowElement ); 772 773 if( set.size() != 1 ) 774 throw new IllegalStateException( "should only be one scope after current flow element: " + flowElement + " found: " + set.size() ); 775 776 return set.iterator().next(); 777 } 778 779 public FlowElement getNextFlowElement( Scope scope ) 780 { 781 return getElementGraph().getEdgeTarget( scope ); 782 } 783 784 public Collection<Operation> getAllOperations() 785 { 786 Set<FlowElement> vertices = getElementGraph().vertexSet(); 787 List<Operation> operations = new ArrayList<Operation>(); // operations impl equals, so two instance may be the same 788 789 for( FlowElement vertex : vertices ) 790 { 791 if( vertex instanceof Operator ) 792 operations.add( ( (Operator) vertex ).getOperation() ); 793 } 794 795 return operations; 796 } 797 798 @Override 799 public boolean containsPipeNamed( String pipeName ) 800 { 801 Set<FlowElement> vertices = getElementGraph().vertexSet(); 802 803 for( FlowElement vertex : vertices ) 804 { 805 if( vertex instanceof Pipe && ( (Pipe) vertex ).getName().equals( pipeName ) ) 806 return true; 807 } 808 809 return false; 810 } 811 812 public void clean() 813 { 814 // use step config by default 815 clean( getConfig() ); 816 } 817 818 public abstract void clean( Config config ); 819 820 List<SafeFlowStepListener> getListeners() 821 { 822 if( listeners == null ) 823 listeners = new LinkedList<SafeFlowStepListener>(); 824 825 return listeners; 826 } 827 828 @Override 829 public boolean hasListeners() 830 { 831 return listeners != null && !listeners.isEmpty(); 832 } 833 834 @Override 835 public void addListener( FlowStepListener flowStepListener ) 836 { 837 getListeners().add( new SafeFlowStepListener( flowStepListener ) ); 838 } 839 840 @Override 841 public boolean removeListener( FlowStepListener flowStepListener ) 842 { 843 return getListeners().remove( new SafeFlowStepListener( flowStepListener ) ); 844 } 845 846 protected void fireOnCompleted() 847 { 848 if( hasListeners() ) 849 { 850 if( isDebugEnabled() ) 851 logDebug( "firing onCompleted event: " + getListeners().size() ); 852 853 for( Object flowStepListener : getListeners() ) 854 ( (FlowStepListener) flowStepListener ).onStepCompleted( this ); 855 } 856 } 857 858 protected void fireOnThrowable( Throwable throwable ) 859 { 860 if( hasListeners() ) 861 { 862 if( isDebugEnabled() ) 863 logDebug( "firing onThrowable event: " + getListeners().size() ); 864 865 for( Object flowStepListener : getListeners() ) 866 ( (FlowStepListener) flowStepListener ).onStepThrowable( this, throwable ); 867 } 868 } 869 870 protected void fireOnStopping() 871 { 872 if( hasListeners() ) 873 { 874 if( isDebugEnabled() ) 875 logDebug( "firing onStopping event: " + getListeners() ); 876 877 for( Object flowStepListener : getListeners() ) 878 ( (FlowStepListener) flowStepListener ).onStepStopping( this ); 879 } 880 } 881 882 protected void fireOnStarting() 883 { 884 if( hasListeners() ) 885 { 886 if( isDebugEnabled() ) 887 logDebug( "firing onStarting event: " + getListeners().size() ); 888 889 for( Object flowStepListener : getListeners() ) 890 ( (FlowStepListener) flowStepListener ).onStepStarting( this ); 891 } 892 } 893 894 protected void fireOnRunning() 895 { 896 if( hasListeners() ) 897 { 898 if( isDebugEnabled() ) 899 logDebug( "firing onRunning event: " + getListeners().size() ); 900 901 for( Object flowStepListener : getListeners() ) 902 ( (FlowStepListener) flowStepListener ).onStepRunning( this ); 903 } 904 } 905 906 protected ClientState createClientState( FlowProcess flowProcess ) 907 { 908 CascadingServices services = flowProcess.getCurrentSession().getCascadingServices(); 909 910 if( services == null ) 911 return ClientState.NULL; 912 913 return services.createClientState( getID() ); 914 } 915 916 public FlowStepJob<Config> getFlowStepJob() 917 { 918 return flowStepJob; 919 } 920 921 public FlowStepJob<Config> getCreateFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig ) 922 { 923 if( flowStepJob != null ) 924 return flowStepJob; 925 926 if( flowProcess == null ) 927 return null; 928 929 Config initializedConfig = createInitializedConfig( flowProcess, parentConfig ); 930 931 setConfig( initializedConfig ); 932 933 ClientState clientState = createClientState( flowProcess ); 934 935 flowStepJob = createFlowStepJob( clientState, flowProcess, initializedConfig ); 936 937 return flowStepJob; 938 } 939 940 protected abstract FlowStepJob createFlowStepJob( ClientState clientState, FlowProcess<Config> flowProcess, Config initializedStepConfig ); 941 942 protected void initConfFromNodeConfigDef( ElementGraph nodeElementGraph, ConfigDef.Setter setter ) 943 { 944 nodeElementGraph = ElementGraphs.asExtentMaskedSubGraph( nodeElementGraph ); 945 946 ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() ); 947 948 // applies each mode in order, topologically 949 for( ConfigDef.Mode mode : ConfigDef.Mode.values() ) 950 { 951 Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( nodeElementGraph ); 952 953 while( iterator.hasNext() ) 954 { 955 FlowElement element = iterator.next(); 956 957 while( element != null ) 958 { 959 // intentionally skip any element that spans downstream nodes, like a GroupBy 960 // this way GroupBy is applied on the inbound side (where partitioning happens) 961 // not the outbound side. 962 // parent sub-assemblies (like Unique) will be applied if they have leading Pipes to the current spanning Pipe 963 if( elementSpansDownStream( stepElementGraph, nodeElementGraph, element ) ) 964 { 965 element = null; 966 continue; 967 } 968 969 if( element instanceof ScopedElement && ( (ScopedElement) element ).hasNodeConfigDef() ) 970 ( (ScopedElement) element ).getNodeConfigDef().apply( mode, setter ); 971 972 // walk up the sub-assembly parent hierarchy 973 if( element instanceof Pipe ) 974 element = ( (Pipe) element ).getParent(); 975 else 976 element = null; 977 } 978 } 979 } 980 } 981 982 private boolean elementSpansDownStream( ElementGraph stepElementGraph, ElementGraph nodeElementGraph, FlowElement element ) 983 { 984 boolean spansNodes = !( element instanceof SubAssembly ); 985 986 if( spansNodes ) 987 spansNodes = nodeElementGraph.outDegreeOf( element ) == 0 && stepElementGraph.outDegreeOf( element ) > 0; 988 989 return spansNodes; 990 } 991 992 protected void initConfFromStepConfigDef( ConfigDef.Setter setter ) 993 { 994 ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() ); 995 996 // applies each mode in order, topologically 997 for( ConfigDef.Mode mode : ConfigDef.Mode.values() ) 998 { 999 Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( stepElementGraph ); 1000 1001 while( iterator.hasNext() ) 1002 { 1003 FlowElement element = iterator.next(); 1004 1005 while( element != null ) 1006 { 1007 if( element instanceof ScopedElement && ( (ScopedElement) element ).hasStepConfigDef() ) 1008 ( (ScopedElement) element ).getStepConfigDef().apply( mode, setter ); 1009 1010 // walk up the sub-assembly parent hierarchy 1011 if( element instanceof Pipe ) 1012 element = ( (Pipe) element ).getParent(); 1013 else 1014 element = null; 1015 } 1016 } 1017 } 1018 } 1019 1020 protected static void addSources( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sources ) 1021 { 1022 for( Tap tap : sources ) 1023 { 1024 for( Scope scope : elementGraph.outgoingEdgesOf( tap ) ) 1025 flowStep.addSource( scope.getName(), tap ); 1026 } 1027 } 1028 1029 protected static void addSinks( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sinks ) 1030 { 1031 for( Tap tap : sinks ) 1032 { 1033 for( Scope scope : elementGraph.incomingEdgesOf( tap ) ) 1034 flowStep.addSink( scope.getName(), tap ); 1035 } 1036 } 1037 1038 @Override 1039 public boolean equals( Object object ) 1040 { 1041 if( this == object ) 1042 return true; 1043 if( object == null || getClass() != object.getClass() ) 1044 return false; 1045 1046 BaseFlowStep flowStep = (BaseFlowStep) object; 1047 1048 if( id != null ? !id.equals( flowStep.id ) : flowStep.id != null ) 1049 return false; 1050 1051 return true; 1052 } 1053 1054 @Override 1055 public int hashCode() 1056 { 1057 return id != null ? id.hashCode() : 0; 1058 } 1059 1060 @Override 1061 public String toString() 1062 { 1063 StringBuffer buffer = new StringBuffer(); 1064 1065 buffer.append( getClass().getSimpleName() ); 1066 buffer.append( "[name: " ).append( getName() ).append( "]" ); 1067 1068 return buffer.toString(); 1069 } 1070 1071 @Override 1072 public final boolean isInfoEnabled() 1073 { 1074 return getLogger().isInfoEnabled(); 1075 } 1076 1077 private ProcessLogger getLogger() 1078 { 1079 if( flow != null && flow instanceof ProcessLogger ) 1080 return (ProcessLogger) flow; 1081 1082 return ProcessLogger.NULL; 1083 } 1084 1085 @Override 1086 public final boolean isDebugEnabled() 1087 { 1088 return ( getLogger() ).isDebugEnabled(); 1089 } 1090 1091 @Override 1092 public void logDebug( String message, Object... arguments ) 1093 { 1094 getLogger().logDebug( message, arguments ); 1095 } 1096 1097 @Override 1098 public void logInfo( String message, Object... arguments ) 1099 { 1100 getLogger().logInfo( message, arguments ); 1101 } 1102 1103 @Override 1104 public void logWarn( String message ) 1105 { 1106 getLogger().logWarn( message ); 1107 } 1108 1109 @Override 1110 public void logWarn( String message, Throwable throwable ) 1111 { 1112 getLogger().logWarn( message, throwable ); 1113 } 1114 1115 @Override 1116 public void logWarn( String message, Object... arguments ) 1117 { 1118 getLogger().logWarn( message, arguments ); 1119 } 1120 1121 @Override 1122 public void logError( String message, Object... arguments ) 1123 { 1124 getLogger().logError( message, arguments ); 1125 } 1126 1127 @Override 1128 public void logError( String message, Throwable throwable ) 1129 { 1130 getLogger().logError( message, throwable ); 1131 } 1132 1133 /** 1134 * Class SafeFlowStepListener safely calls a wrapped FlowStepListener. 1135 * <p/> 1136 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 1137 * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method 1138 * which in turn is run in a new Thread. 1139 */ 1140 private class SafeFlowStepListener implements FlowStepListener 1141 { 1142 /** Field flowListener */ 1143 final FlowStepListener flowStepListener; 1144 /** Field throwable */ 1145 Throwable throwable; 1146 1147 private SafeFlowStepListener( FlowStepListener flowStepListener ) 1148 { 1149 this.flowStepListener = flowStepListener; 1150 } 1151 1152 public void onStepStarting( FlowStep flowStep ) 1153 { 1154 try 1155 { 1156 flowStepListener.onStepStarting( flowStep ); 1157 } 1158 catch( Throwable throwable ) 1159 { 1160 handleThrowable( throwable ); 1161 } 1162 } 1163 1164 public void onStepStopping( FlowStep flowStep ) 1165 { 1166 try 1167 { 1168 flowStepListener.onStepStopping( flowStep ); 1169 } 1170 catch( Throwable throwable ) 1171 { 1172 handleThrowable( throwable ); 1173 } 1174 } 1175 1176 public void onStepCompleted( FlowStep flowStep ) 1177 { 1178 try 1179 { 1180 flowStepListener.onStepCompleted( flowStep ); 1181 } 1182 catch( Throwable throwable ) 1183 { 1184 handleThrowable( throwable ); 1185 } 1186 } 1187 1188 public void onStepRunning( FlowStep flowStep ) 1189 { 1190 try 1191 { 1192 flowStepListener.onStepRunning( flowStep ); 1193 } 1194 catch( Throwable throwable ) 1195 { 1196 handleThrowable( throwable ); 1197 } 1198 } 1199 1200 public boolean onStepThrowable( FlowStep flowStep, Throwable flowStepThrowable ) 1201 { 1202 try 1203 { 1204 return flowStepListener.onStepThrowable( flowStep, flowStepThrowable ); 1205 } 1206 catch( Throwable throwable ) 1207 { 1208 handleThrowable( throwable ); 1209 } 1210 1211 return false; 1212 } 1213 1214 private void handleThrowable( Throwable throwable ) 1215 { 1216 this.throwable = throwable; 1217 1218 logWarn( String.format( "flow step listener %s threw throwable", flowStepListener ), throwable ); 1219 } 1220 1221 public boolean equals( Object object ) 1222 { 1223 if( object instanceof BaseFlowStep.SafeFlowStepListener ) 1224 return flowStepListener.equals( ( (BaseFlowStep.SafeFlowStepListener) object ).flowStepListener ); 1225 1226 return flowStepListener.equals( object ); 1227 } 1228 1229 public int hashCode() 1230 { 1231 return flowStepListener.hashCode(); 1232 } 1233 } 1234 }