001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner; 022 023import java.io.Serializable; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031 032import cascading.flow.FlowElement; 033import cascading.flow.FlowNode; 034import cascading.flow.FlowStep; 035import cascading.flow.planner.graph.AnnotatedGraph; 036import cascading.flow.planner.graph.ElementGraph; 037import cascading.flow.planner.graph.ElementGraphs; 038import cascading.flow.planner.graph.Extent; 039import cascading.flow.planner.graph.FlowElementGraph; 040import cascading.flow.stream.annotations.StreamMode; 041import cascading.pipe.Group; 042import cascading.pipe.Pipe; 043import cascading.stats.FlowNodeStats; 044import cascading.tap.Tap; 045import cascading.util.ProcessLogger; 046import cascading.util.Util; 047 048import static cascading.util.Util.createIdentitySet; 049 050/** 051 * 052 */ 053public class BaseFlowNode implements Serializable, FlowNode, ProcessLogger 054 { 055 private final String id; 056 private int ordinal; 057 private String name; 058 private Map<String, String> processAnnotations; 059 060 private transient FlowStep flowStep; 061 062 protected ElementGraph nodeSubGraph; 063 protected List<? extends ElementGraph> pipelineGraphs = Collections.emptyList(); 064 065 private transient Set<FlowElement> sourceElements; 066 private transient Set<FlowElement> sinkElements; 067 private Map<String, Tap> trapMap = Collections.emptyMap(); 068 protected Set<Tap> sourceTaps; 069 protected Set<Tap> sinkTaps; 070 071 private Map<Tap, Set<String>> reverseSourceTaps; 072 private Map<Tap, Set<String>> reverseSinkTaps; 073 private Map<FlowElement, ElementGraph> streamPipelineMap = Collections.emptyMap(); 074 075 /** optional metadata about the FlowStep */ 076 private Map<String, String> flowNodeDescriptor = Collections.emptyMap(); 077 078 protected transient FlowNodeStats flowNodeStats; 079 080 public BaseFlowNode( String name, int ordinal ) 081 { 082 this( name, ordinal, null ); 083 } 084 085 public BaseFlowNode( String name, int ordinal, Map<String, String> flowNodeDescriptor ) 086 { 087 this( null, name, ordinal, flowNodeDescriptor ); 088 } 089 090 public BaseFlowNode( ElementGraph nodeSubGraph, String name, int ordinal ) 091 { 092 this( nodeSubGraph, name, ordinal, null ); 093 } 094 095 public BaseFlowNode( ElementGraph nodeSubGraph, String name, int ordinal, Map<String, String> flowNodeDescriptor ) 096 { 097 this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number 098 this.nodeSubGraph = nodeSubGraph; 099 setName( name ); 100 this.ordinal = ordinal; 101 this.trapMap = Collections.emptyMap(); 102 103 setFlowNodeDescriptor( flowNodeDescriptor ); 104 } 105 106 public BaseFlowNode( ElementGraph nodeSubGraph ) 107 { 108 this( null, nodeSubGraph, null, null ); 109 } 110 111 public BaseFlowNode( ElementGraph nodeSubGraph, Map<String, String> flowNodeDescriptor ) 112 { 113 this( null, nodeSubGraph, flowNodeDescriptor ); 114 } 115 116 public BaseFlowNode( ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs ) 117 { 118 this( null, nodeSubGraph, pipelineGraphs, null ); 119 } 120 121 public BaseFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, Map<String, String> flowNodeDescriptor ) 122 { 123 this( flowElementGraph, nodeSubGraph, null, flowNodeDescriptor ); 124 } 125 126 public BaseFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs ) 127 { 128 this( flowElementGraph, nodeSubGraph, pipelineGraphs, null ); 129 } 130 131 public BaseFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs, Map<String, String> flowNodeDescriptor ) 132 { 133 this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number 134 this.nodeSubGraph = nodeSubGraph; 135 136 setPipelineGraphs( pipelineGraphs ); 137 138 setFlowNodeDescriptor( flowNodeDescriptor ); 139 140 verifyPipelines(); 141 createPipelineMap(); 142 143 if( flowElementGraph != null ) 144 { 145 assignTrappableNames( flowElementGraph ); 146 assignTraps( flowElementGraph.getTrapMap() ); 147 } 148 } 149 150 public void setOrdinal( int ordinal ) 151 { 152 this.ordinal = ordinal; 153 } 154 155 @Override 156 public int getOrdinal() 157 { 158 return ordinal; 159 } 160 161 @Override 162 public String getID() 163 { 164 return id; 165 } 166 167 public void setName( String name ) 168 { 169 this.name = name; 170 } 171 172 @Override 173 public String getName() 174 { 175 return name; 176 } 177 178 @Override 179 public Map<String, String> getFlowNodeDescriptor() 180 { 181 return flowNodeDescriptor; 182 } 183 184 protected void setFlowNodeDescriptor( Map<String, String> flowNodeDescriptor ) 185 { 186 if( flowNodeDescriptor != null ) 187 this.flowNodeDescriptor = flowNodeDescriptor; 188 } 189 190 @Override 191 public Map<String, String> getProcessAnnotations() 192 { 193 if( processAnnotations == null ) 194 return Collections.emptyMap(); 195 196 return Collections.unmodifiableMap( processAnnotations ); 197 } 198 199 @Override 200 public void addProcessAnnotation( Enum annotation ) 201 { 202 if( annotation == null ) 203 return; 204 205 addProcessAnnotation( annotation.getDeclaringClass().getName(), annotation.name() ); 206 } 207 208 @Override 209 public void addProcessAnnotation( String key, String value ) 210 { 211 if( processAnnotations == null ) 212 processAnnotations = new HashMap<>(); 213 214 processAnnotations.put( key, value ); 215 } 216 217 public void setFlowNodeStats( FlowNodeStats flowNodeStats ) 218 { 219 this.flowNodeStats = flowNodeStats; 220 } 221 222 @Override 223 public FlowNodeStats getFlowNodeStats() 224 { 225 return flowNodeStats; 226 } 227 228 public void setFlowStep( FlowStep flowStep ) 229 { 230 this.flowStep = flowStep; 231 } 232 233 @Override 234 public FlowStep getFlowStep() 235 { 236 return flowStep; 237 } 238 239 @Override 240 public ElementGraph getElementGraph() 241 { 242 return nodeSubGraph; 243 } 244 245 @Override 246 public Set<String> getSourceElementNames() 247 { 248 Set<String> results = new HashSet<>(); 249 250 for( FlowElement flowElement : getSourceElements() ) 251 { 252 if( flowElement instanceof Tap ) 253 results.addAll( getSourceTapNames( (Tap) flowElement ) ); 254 else 255 results.add( ( (Pipe) flowElement ).getName() ); 256 } 257 258 return results; 259 } 260 261 public Set<FlowElement> getSourceElements() 262 { 263 if( sourceElements == null ) 264 sourceElements = Collections.unmodifiableSet( ElementGraphs.findSources( nodeSubGraph, FlowElement.class ) ); 265 266 return sourceElements; 267 } 268 269 @Override 270 public Set<? extends FlowElement> getSourceElements( Enum annotation ) 271 { 272 Set<? extends FlowElement> annotated = getFlowElementsFor( annotation ); 273 Set<FlowElement> sourceElements = getSourceElements(); 274 275 Set<FlowElement> results = new HashSet<>(); 276 277 for( FlowElement sourceElement : sourceElements ) 278 { 279 if( annotated.contains( sourceElement ) ) 280 results.add( sourceElement ); 281 } 282 283 return results; 284 } 285 286 @Override 287 public Set<String> getSinkElementNames() 288 { 289 Set<String> results = new HashSet<>(); 290 291 for( FlowElement flowElement : getSinkElements() ) 292 { 293 if( flowElement instanceof Tap ) 294 results.addAll( getSinkTapNames( (Tap) flowElement ) ); 295 else 296 results.add( ( (Pipe) flowElement ).getName() ); 297 } 298 299 return results; 300 } 301 302 @Override 303 public Set<FlowElement> getSinkElements() 304 { 305 if( sinkElements == null ) 306 sinkElements = Collections.unmodifiableSet( ElementGraphs.findSinks( nodeSubGraph, FlowElement.class ) ); 307 308 return sinkElements; 309 } 310 311 public Set<? extends FlowElement> getSinkElements( Enum annotation ) 312 { 313 Set<? extends FlowElement> annotated = getFlowElementsFor( annotation ); 314 Set<FlowElement> sinkElements = getSinkElements(); 315 316 Set<FlowElement> results = new HashSet<>(); 317 318 for( FlowElement sinkElement : sinkElements ) 319 { 320 if( annotated.contains( sinkElement ) ) 321 results.add( sinkElement ); 322 } 323 324 return results; 325 } 326 327 @Override 328 public List<? extends ElementGraph> getPipelineGraphs() 329 { 330 return pipelineGraphs; 331 } 332 333 protected void setPipelineGraphs( List<? extends ElementGraph> pipelineGraphs ) 334 { 335 if( pipelineGraphs != null ) 336 this.pipelineGraphs = pipelineGraphs; 337 } 338 339 @Override 340 public ElementGraph getPipelineGraphFor( FlowElement streamedSource ) 341 { 342 return streamPipelineMap.get( streamedSource ); 343 } 344 345 @Override 346 public Collection<Group> getGroups() 347 { 348 return ElementGraphs.findAllGroups( nodeSubGraph ); 349 } 350 351 @Override 352 public Set<Tap> getSourceTaps() 353 { 354 if( sourceTaps != null ) 355 return sourceTaps; 356 357 sourceTaps = Collections.unmodifiableSet( Util.narrowIdentitySet( Tap.class, getSourceElements() ) ); 358 359 return sourceTaps; 360 } 361 362 @Override 363 public Set<Tap> getSinkTaps() 364 { 365 if( sinkTaps != null ) 366 return sinkTaps; 367 368 sinkTaps = Collections.unmodifiableSet( Util.narrowIdentitySet( Tap.class, getSinkElements() ) ); 369 370 return sinkTaps; 371 } 372 373 @Override 374 public int getSubmitPriority() 375 { 376 return 0; 377 } 378 379 @Override 380 public Set<String> getSourceTapNames( Tap source ) 381 { 382 return reverseSourceTaps.get( source ); 383 } 384 385 @Override 386 public Set<String> getSinkTapNames( Tap sink ) 387 { 388 return reverseSinkTaps.get( sink ); 389 } 390 391 private void assignTrappableNames( FlowElementGraph flowElementGraph ) 392 { 393 if( flowElementGraph == null ) 394 return; 395 396 reverseSourceTaps = new HashMap<>(); 397 reverseSinkTaps = new HashMap<>(); 398 399 Set<Tap> sources = getSourceTaps(); 400 401 for( Tap source : sources ) 402 { 403 Set<Scope> scopes = flowElementGraph.outgoingEdgesOf( source ); 404 405 for( Scope scope : scopes ) 406 addSourceName( scope.getName(), source ); 407 } 408 409 for( Map.Entry<String, Tap> entry : flowElementGraph.getSourceMap().entrySet() ) 410 { 411 if( sources.contains( entry.getValue() ) ) 412 addSourceName( entry.getKey(), entry.getValue() ); 413 } 414 415 Set<Tap> sinks = getSinkTaps(); 416 417 for( Tap sink : sinks ) 418 { 419 Set<Scope> scopes = flowElementGraph.incomingEdgesOf( sink ); 420 421 for( Scope scope : scopes ) 422 addSinkName( scope.getName(), sink ); 423 } 424 425 for( Map.Entry<String, Tap> entry : flowElementGraph.getSinkMap().entrySet() ) 426 { 427 if( sinks.contains( entry.getValue() ) ) 428 addSinkName( entry.getKey(), entry.getValue() ); 429 } 430 } 431 432 private void addSourceName( String name, Tap source ) 433 { 434 if( !reverseSourceTaps.containsKey( source ) ) 435 reverseSourceTaps.put( source, new HashSet<String>() ); 436 437 reverseSourceTaps.get( source ).add( name ); 438 } 439 440 private void addSinkName( String name, Tap sink ) 441 { 442 if( !reverseSinkTaps.containsKey( sink ) ) 443 reverseSinkTaps.put( sink, new HashSet<String>() ); 444 445 reverseSinkTaps.get( sink ).add( name ); 446 } 447 448 @Override 449 public Map<String, Tap> getTrapMap() 450 { 451 return trapMap; 452 } 453 454 @Override 455 public Collection<? extends Tap> getTraps() 456 { 457 return getTrapMap().values(); 458 } 459 460 private void assignTraps( Map<String, Tap> traps ) 461 { 462 trapMap = new HashMap<>(); 463 464 for( FlowElement flowElement : nodeSubGraph.vertexSet() ) 465 { 466 Set<String> names = new HashSet<>(); 467 468 if( flowElement instanceof Extent ) 469 continue; 470 471 if( flowElement instanceof Pipe ) 472 { 473 names.add( ( (Pipe) flowElement ).getName() ); 474 } 475 else 476 { 477 Set<String> sourceTapNames = getSourceTapNames( (Tap) flowElement ); 478 479 if( sourceTapNames != null ) 480 names.addAll( sourceTapNames ); 481 482 Set<String> sinkTapNames = getSinkTapNames( (Tap) flowElement ); 483 484 if( sinkTapNames != null ) 485 names.addAll( sinkTapNames ); 486 } 487 488 for( String name : names ) 489 { 490 if( traps.containsKey( name ) ) 491 trapMap.put( name, traps.get( name ) ); 492 } 493 } 494 } 495 496 private void verifyPipelines() 497 { 498 if( pipelineGraphs == null || pipelineGraphs.isEmpty() ) 499 return; 500 501 Set<FlowElement> allElements = createIdentitySet( nodeSubGraph.vertexSet() ); 502 503 for( ElementGraph pipelineGraph : pipelineGraphs ) 504 allElements.removeAll( pipelineGraph.vertexSet() ); 505 506 if( !allElements.isEmpty() ) 507 throw new IllegalStateException( "union of pipeline graphs for flow node are missing elements: " + Util.join( allElements, ", " ) ); 508 } 509 510 private void createPipelineMap() 511 { 512 if( pipelineGraphs == null || pipelineGraphs.isEmpty() ) 513 return; 514 515 Map<FlowElement, ElementGraph> map = new HashMap<>( pipelineGraphs.size() ); 516 517 for( ElementGraph pipelineGraph : pipelineGraphs ) 518 { 519 if( !( pipelineGraph instanceof AnnotatedGraph ) ) 520 throw new IllegalStateException( "pipeline graphs must be of type AnnotatedGraph, got: " + pipelineGraph.getClass().getName() ); 521 522 Set<FlowElement> flowElements; 523 524 if( ( (AnnotatedGraph) pipelineGraph ).hasAnnotations() ) 525 flowElements = ( (AnnotatedGraph) pipelineGraph ).getAnnotations().getValues( StreamMode.Streamed ); 526 else 527 flowElements = ElementGraphs.findSources( pipelineGraph, FlowElement.class ); 528 529 for( FlowElement flowElement : flowElements ) 530 { 531 if( map.containsKey( flowElement ) ) 532 throw new IllegalStateException( "duplicate streamable elements, found: " + flowElement ); 533 534 map.put( flowElement, pipelineGraph ); 535 } 536 } 537 538 this.streamPipelineMap = map; 539 } 540 541 @Override 542 public Tap getTrap( String branchName ) 543 { 544 return trapMap.get( branchName ); 545 } 546 547 @Override 548 public Collection<? extends Scope> getPreviousScopes( FlowElement flowElement ) 549 { 550 return nodeSubGraph.incomingEdgesOf( flowElement ); 551 } 552 553 @Override 554 public Collection<? extends Scope> getNextScopes( FlowElement flowElement ) 555 { 556 return nodeSubGraph.outgoingEdgesOf( flowElement ); 557 } 558 559 @Override 560 public boolean equals( Object object ) 561 { 562 if( this == object ) 563 return true; 564 565 if( object == null || getClass() != object.getClass() ) 566 return false; 567 568 BaseFlowNode flowNode = (BaseFlowNode) object; 569 570 if( id != null ? !id.equals( flowNode.id ) : flowNode.id != null ) 571 return false; 572 573 return true; 574 } 575 576 @Override 577 public int hashCode() 578 { 579 return id != null ? id.hashCode() : 0; 580 } 581 582 @Override 583 public Set<? extends FlowElement> getFlowElementsFor( Enum annotation ) 584 { 585 if( pipelineGraphs.isEmpty() ) 586 return ( (AnnotatedGraph) getElementGraph() ).getAnnotations().getValues( annotation ); 587 588 Set<FlowElement> results = createIdentitySet(); 589 590 for( ElementGraph pipelineGraph : pipelineGraphs ) 591 results.addAll( ( (AnnotatedGraph) pipelineGraph ).getAnnotations().getValues( annotation ) ); 592 593 return results; 594 } 595 596 private ProcessLogger getLogger() 597 { 598 if( flowStep != null && flowStep instanceof ProcessLogger ) 599 return (ProcessLogger) flowStep; 600 601 return ProcessLogger.NULL; 602 } 603 604 @Override 605 public boolean isInfoEnabled() 606 { 607 return getLogger().isInfoEnabled(); 608 } 609 610 @Override 611 public boolean isDebugEnabled() 612 { 613 return getLogger().isDebugEnabled(); 614 } 615 616 @Override 617 public void logInfo( String message, Object... arguments ) 618 { 619 getLogger().logInfo( message, arguments ); 620 } 621 622 @Override 623 public void logDebug( String message, Object... arguments ) 624 { 625 getLogger().logDebug( message, arguments ); 626 } 627 628 @Override 629 public void logWarn( String message ) 630 { 631 getLogger().logWarn( message ); 632 } 633 634 @Override 635 public void logWarn( String message, Object... arguments ) 636 { 637 getLogger().logWarn( message, arguments ); 638 } 639 640 @Override 641 public void logWarn( String message, Throwable throwable ) 642 { 643 getLogger().logWarn( message, throwable ); 644 } 645 646 @Override 647 public void logError( String message, Object... arguments ) 648 { 649 getLogger().logError( message, arguments ); 650 } 651 652 @Override 653 public void logError( String message, Throwable throwable ) 654 { 655 getLogger().logError( message, throwable ); 656 } 657 }