001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner; 022 023import java.io.Serializable; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031 032import cascading.flow.FlowElement; 033import cascading.flow.FlowNode; 034import cascading.flow.FlowStep; 035import cascading.flow.planner.graph.AnnotatedGraph; 036import cascading.flow.planner.graph.ElementGraph; 037import cascading.flow.planner.graph.ElementGraphs; 038import cascading.flow.planner.graph.Extent; 039import cascading.flow.planner.graph.FlowElementGraph; 040import cascading.flow.stream.annotations.StreamMode; 041import cascading.pipe.Group; 042import cascading.pipe.Pipe; 043import cascading.stats.FlowNodeStats; 044import cascading.tap.Tap; 045import cascading.util.ProcessLogger; 046import cascading.util.Util; 047 048import static cascading.util.Util.createIdentitySet; 049 050/** 051 * 052 */ 053public class BaseFlowNode implements Serializable, FlowNode, ProcessLogger 054 { 055 private final String id; 056 private int ordinal; 057 private String name; 058 private Map<String, String> processAnnotations; 059 060 private transient FlowStep flowStep; 061 062 protected ElementGraph nodeSubGraph; 063 protected List<? extends ElementGraph> pipelineGraphs = Collections.emptyList(); 064 065 private transient Set<FlowElement> sourceElements; 066 private transient Set<FlowElement> sinkElements; 067 private Map<String, Tap> trapMap = Collections.emptyMap(); 068 protected Set<Tap> sourceTaps; 069 protected Set<Tap> sinkTaps; 070 071 private Map<Tap, Set<String>> reverseSourceTaps; 072 private Map<Tap, Set<String>> reverseSinkTaps; 073 private Map<FlowElement, ElementGraph> streamPipelineMap = Collections.emptyMap(); 074 075 /** optional metadata about the FlowStep */ 076 private Map<String, String> flowNodeDescriptor = Collections.emptyMap(); 077 078 protected transient FlowNodeStats flowNodeStats; 079 080 public BaseFlowNode( String name, int ordinal ) 081 { 082 this( name, ordinal, null ); 083 } 084 085 public BaseFlowNode( String name, int ordinal, Map<String, String> flowNodeDescriptor ) 086 { 087 this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number 088 setName( name ); 089 this.ordinal = ordinal; 090 this.trapMap = Collections.emptyMap(); 091 092 setFlowNodeDescriptor( flowNodeDescriptor ); 093 } 094 095 public BaseFlowNode( ElementGraph nodeSubGraph ) 096 { 097 this( null, nodeSubGraph, null, null ); 098 } 099 100 public BaseFlowNode( ElementGraph nodeSubGraph, Map<String, String> flowNodeDescriptor ) 101 { 102 this( null, nodeSubGraph, flowNodeDescriptor ); 103 } 104 105 public BaseFlowNode( ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs ) 106 { 107 this( null, nodeSubGraph, pipelineGraphs, null ); 108 } 109 110 public BaseFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, Map<String, String> flowNodeDescriptor ) 111 { 112 this( flowElementGraph, nodeSubGraph, null, flowNodeDescriptor ); 113 } 114 115 public BaseFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs ) 116 { 117 this( flowElementGraph, nodeSubGraph, pipelineGraphs, null ); 118 } 119 120 public BaseFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs, Map<String, String> flowNodeDescriptor ) 121 { 122 this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number 123 this.nodeSubGraph = nodeSubGraph; 124 125 setPipelineGraphs( pipelineGraphs ); 126 127 setFlowNodeDescriptor( flowNodeDescriptor ); 128 129 verifyPipelines(); 130 createPipelineMap(); 131 132 if( flowElementGraph != null ) 133 { 134 assignTrappableNames( flowElementGraph ); 135 assignTraps( flowElementGraph.getTrapMap() ); 136 } 137 } 138 139 public void setOrdinal( int ordinal ) 140 { 141 this.ordinal = ordinal; 142 } 143 144 @Override 145 public int getOrdinal() 146 { 147 return ordinal; 148 } 149 150 @Override 151 public String getID() 152 { 153 return id; 154 } 155 156 public void setName( String name ) 157 { 158 this.name = name; 159 } 160 161 @Override 162 public String getName() 163 { 164 return name; 165 } 166 167 @Override 168 public Map<String, String> getFlowNodeDescriptor() 169 { 170 return flowNodeDescriptor; 171 } 172 173 protected void setFlowNodeDescriptor( Map<String, String> flowNodeDescriptor ) 174 { 175 if( flowNodeDescriptor != null ) 176 this.flowNodeDescriptor = flowNodeDescriptor; 177 } 178 179 @Override 180 public Map<String, String> getProcessAnnotations() 181 { 182 if( processAnnotations == null ) 183 return Collections.emptyMap(); 184 185 return Collections.unmodifiableMap( processAnnotations ); 186 } 187 188 @Override 189 public void addProcessAnnotation( Enum annotation ) 190 { 191 if( annotation == null ) 192 return; 193 194 addProcessAnnotation( annotation.getDeclaringClass().getName(), annotation.name() ); 195 } 196 197 @Override 198 public void addProcessAnnotation( String key, String value ) 199 { 200 if( processAnnotations == null ) 201 processAnnotations = new HashMap<>(); 202 203 processAnnotations.put( key, value ); 204 } 205 206 public void setFlowNodeStats( FlowNodeStats flowNodeStats ) 207 { 208 this.flowNodeStats = flowNodeStats; 209 } 210 211 @Override 212 public FlowNodeStats getFlowNodeStats() 213 { 214 return flowNodeStats; 215 } 216 217 public void setFlowStep( FlowStep flowStep ) 218 { 219 this.flowStep = flowStep; 220 } 221 222 @Override 223 public FlowStep getFlowStep() 224 { 225 return flowStep; 226 } 227 228 @Override 229 public ElementGraph getElementGraph() 230 { 231 return nodeSubGraph; 232 } 233 234 @Override 235 public Set<String> getSourceElementNames() 236 { 237 Set<String> results = new HashSet<>(); 238 239 for( FlowElement flowElement : getSourceElements() ) 240 { 241 if( flowElement instanceof Tap ) 242 results.addAll( getSourceTapNames( (Tap) flowElement ) ); 243 else 244 results.add( ( (Pipe) flowElement ).getName() ); 245 } 246 247 return results; 248 } 249 250 public Set<FlowElement> getSourceElements() 251 { 252 if( sourceElements == null ) 253 sourceElements = Collections.unmodifiableSet( ElementGraphs.findSources( nodeSubGraph, FlowElement.class ) ); 254 255 return sourceElements; 256 } 257 258 @Override 259 public Set<? extends FlowElement> getSourceElements( Enum annotation ) 260 { 261 Set<? extends FlowElement> annotated = getFlowElementsFor( annotation ); 262 Set<FlowElement> sourceElements = getSourceElements(); 263 264 Set<FlowElement> results = new HashSet<>(); 265 266 for( FlowElement sourceElement : sourceElements ) 267 { 268 if( annotated.contains( sourceElement ) ) 269 results.add( sourceElement ); 270 } 271 272 return results; 273 } 274 275 @Override 276 public Set<String> getSinkElementNames() 277 { 278 Set<String> results = new HashSet<>(); 279 280 for( FlowElement flowElement : getSinkElements() ) 281 { 282 if( flowElement instanceof Tap ) 283 results.addAll( getSinkTapNames( (Tap) flowElement ) ); 284 else 285 results.add( ( (Pipe) flowElement ).getName() ); 286 } 287 288 return results; 289 } 290 291 @Override 292 public Set<FlowElement> getSinkElements() 293 { 294 if( sinkElements == null ) 295 sinkElements = Collections.unmodifiableSet( ElementGraphs.findSinks( nodeSubGraph, FlowElement.class ) ); 296 297 return sinkElements; 298 } 299 300 public Set<? extends FlowElement> getSinkElements( Enum annotation ) 301 { 302 Set<? extends FlowElement> annotated = getFlowElementsFor( annotation ); 303 Set<FlowElement> sinkElements = getSinkElements(); 304 305 Set<FlowElement> results = new HashSet<>(); 306 307 for( FlowElement sinkElement : sinkElements ) 308 { 309 if( annotated.contains( sinkElement ) ) 310 results.add( sinkElement ); 311 } 312 313 return results; 314 } 315 316 @Override 317 public List<? extends ElementGraph> getPipelineGraphs() 318 { 319 return pipelineGraphs; 320 } 321 322 protected void setPipelineGraphs( List<? extends ElementGraph> pipelineGraphs ) 323 { 324 if( pipelineGraphs != null ) 325 this.pipelineGraphs = pipelineGraphs; 326 } 327 328 @Override 329 public ElementGraph getPipelineGraphFor( FlowElement streamedSource ) 330 { 331 return streamPipelineMap.get( streamedSource ); 332 } 333 334 @Override 335 public Collection<Group> getGroups() 336 { 337 return ElementGraphs.findAllGroups( nodeSubGraph ); 338 } 339 340 @Override 341 public Set<Tap> getSourceTaps() 342 { 343 if( sourceTaps != null ) 344 return sourceTaps; 345 346 sourceTaps = Collections.unmodifiableSet( Util.narrowIdentitySet( Tap.class, getSourceElements() ) ); 347 348 return sourceTaps; 349 } 350 351 @Override 352 public Set<Tap> getSinkTaps() 353 { 354 if( sinkTaps != null ) 355 return sinkTaps; 356 357 sinkTaps = Collections.unmodifiableSet( Util.narrowIdentitySet( Tap.class, getSinkElements() ) ); 358 359 return sinkTaps; 360 } 361 362 @Override 363 public int getSubmitPriority() 364 { 365 return 0; 366 } 367 368 @Override 369 public Set<String> getSourceTapNames( Tap source ) 370 { 371 return reverseSourceTaps.get( source ); 372 } 373 374 @Override 375 public Set<String> getSinkTapNames( Tap sink ) 376 { 377 return reverseSinkTaps.get( sink ); 378 } 379 380 private void assignTrappableNames( FlowElementGraph flowElementGraph ) 381 { 382 if( flowElementGraph == null ) 383 return; 384 385 reverseSourceTaps = new HashMap<>(); 386 reverseSinkTaps = new HashMap<>(); 387 388 Set<Tap> sources = getSourceTaps(); 389 390 for( Tap source : sources ) 391 { 392 Set<Scope> scopes = flowElementGraph.outgoingEdgesOf( source ); 393 394 for( Scope scope : scopes ) 395 addSourceName( scope.getName(), source ); 396 } 397 398 for( Map.Entry<String, Tap> entry : flowElementGraph.getSourceMap().entrySet() ) 399 { 400 if( sources.contains( entry.getValue() ) ) 401 addSourceName( entry.getKey(), entry.getValue() ); 402 } 403 404 Set<Tap> sinks = getSinkTaps(); 405 406 for( Tap sink : sinks ) 407 { 408 Set<Scope> scopes = flowElementGraph.incomingEdgesOf( sink ); 409 410 for( Scope scope : scopes ) 411 addSinkName( scope.getName(), sink ); 412 } 413 414 for( Map.Entry<String, Tap> entry : flowElementGraph.getSinkMap().entrySet() ) 415 { 416 if( sinks.contains( entry.getValue() ) ) 417 addSinkName( entry.getKey(), entry.getValue() ); 418 } 419 } 420 421 private void addSourceName( String name, Tap source ) 422 { 423 if( !reverseSourceTaps.containsKey( source ) ) 424 reverseSourceTaps.put( source, new HashSet<String>() ); 425 426 reverseSourceTaps.get( source ).add( name ); 427 } 428 429 private void addSinkName( String name, Tap sink ) 430 { 431 if( !reverseSinkTaps.containsKey( sink ) ) 432 reverseSinkTaps.put( sink, new HashSet<String>() ); 433 434 reverseSinkTaps.get( sink ).add( name ); 435 } 436 437 @Override 438 public Map<String, Tap> getTrapMap() 439 { 440 return trapMap; 441 } 442 443 @Override 444 public Collection<? extends Tap> getTraps() 445 { 446 return getTrapMap().values(); 447 } 448 449 private void assignTraps( Map<String, Tap> traps ) 450 { 451 trapMap = new HashMap<>(); 452 453 for( FlowElement flowElement : nodeSubGraph.vertexSet() ) 454 { 455 Set<String> names = new HashSet<>(); 456 457 if( flowElement instanceof Extent ) 458 continue; 459 460 if( flowElement instanceof Pipe ) 461 { 462 names.add( ( (Pipe) flowElement ).getName() ); 463 } 464 else 465 { 466 Set<String> sourceTapNames = getSourceTapNames( (Tap) flowElement ); 467 468 if( sourceTapNames != null ) 469 names.addAll( sourceTapNames ); 470 471 Set<String> sinkTapNames = getSinkTapNames( (Tap) flowElement ); 472 473 if( sinkTapNames != null ) 474 names.addAll( sinkTapNames ); 475 } 476 477 for( String name : names ) 478 { 479 if( traps.containsKey( name ) ) 480 trapMap.put( name, traps.get( name ) ); 481 } 482 } 483 } 484 485 private void verifyPipelines() 486 { 487 if( pipelineGraphs == null || pipelineGraphs.isEmpty() ) 488 return; 489 490 Set<FlowElement> allElements = createIdentitySet( nodeSubGraph.vertexSet() ); 491 492 for( ElementGraph pipelineGraph : pipelineGraphs ) 493 allElements.removeAll( pipelineGraph.vertexSet() ); 494 495 if( !allElements.isEmpty() ) 496 throw new IllegalStateException( "union of pipeline graphs for flow node are missing elements: " + Util.join( allElements, ", " ) ); 497 } 498 499 private void createPipelineMap() 500 { 501 if( pipelineGraphs == null || pipelineGraphs.isEmpty() ) 502 return; 503 504 Map<FlowElement, ElementGraph> map = new HashMap<>( pipelineGraphs.size() ); 505 506 for( ElementGraph pipelineGraph : pipelineGraphs ) 507 { 508 if( !( pipelineGraph instanceof AnnotatedGraph ) ) 509 throw new IllegalStateException( "pipeline graphs must be of type AnnotatedGraph, got: " + pipelineGraph.getClass().getName() ); 510 511 Set<FlowElement> flowElements; 512 513 if( ( (AnnotatedGraph) pipelineGraph ).hasAnnotations() ) 514 flowElements = ( (AnnotatedGraph) pipelineGraph ).getAnnotations().getValues( StreamMode.Streamed ); 515 else 516 flowElements = ElementGraphs.findSources( pipelineGraph, FlowElement.class ); 517 518 for( FlowElement flowElement : flowElements ) 519 { 520 if( map.containsKey( flowElement ) ) 521 throw new IllegalStateException( "duplicate streamable elements, found: " + flowElement ); 522 523 map.put( flowElement, pipelineGraph ); 524 } 525 } 526 527 this.streamPipelineMap = map; 528 } 529 530 @Override 531 public Tap getTrap( String branchName ) 532 { 533 return trapMap.get( branchName ); 534 } 535 536 @Override 537 public Collection<? extends Scope> getPreviousScopes( FlowElement flowElement ) 538 { 539 return nodeSubGraph.incomingEdgesOf( flowElement ); 540 } 541 542 @Override 543 public Collection<? extends Scope> getNextScopes( FlowElement flowElement ) 544 { 545 return nodeSubGraph.outgoingEdgesOf( flowElement ); 546 } 547 548 @Override 549 public boolean equals( Object object ) 550 { 551 if( this == object ) 552 return true; 553 554 if( object == null || getClass() != object.getClass() ) 555 return false; 556 557 BaseFlowNode flowNode = (BaseFlowNode) object; 558 559 if( id != null ? !id.equals( flowNode.id ) : flowNode.id != null ) 560 return false; 561 562 return true; 563 } 564 565 @Override 566 public int hashCode() 567 { 568 return id != null ? id.hashCode() : 0; 569 } 570 571 @Override 572 public Set<? extends FlowElement> getFlowElementsFor( Enum annotation ) 573 { 574 if( pipelineGraphs.isEmpty() ) 575 return ( (AnnotatedGraph) getElementGraph() ).getAnnotations().getValues( annotation ); 576 577 Set<FlowElement> results = createIdentitySet(); 578 579 for( ElementGraph pipelineGraph : pipelineGraphs ) 580 results.addAll( ( (AnnotatedGraph) pipelineGraph ).getAnnotations().getValues( annotation ) ); 581 582 return results; 583 } 584 585 private ProcessLogger getLogger() 586 { 587 if( flowStep != null && flowStep instanceof ProcessLogger ) 588 return (ProcessLogger) flowStep; 589 590 return ProcessLogger.NULL; 591 } 592 593 @Override 594 public boolean isInfoEnabled() 595 { 596 return getLogger().isInfoEnabled(); 597 } 598 599 @Override 600 public boolean isDebugEnabled() 601 { 602 return getLogger().isDebugEnabled(); 603 } 604 605 @Override 606 public void logInfo( String message, Object... arguments ) 607 { 608 getLogger().logInfo( message, arguments ); 609 } 610 611 @Override 612 public void logDebug( String message, Object... arguments ) 613 { 614 getLogger().logDebug( message, arguments ); 615 } 616 617 @Override 618 public void logWarn( String message ) 619 { 620 getLogger().logWarn( message ); 621 } 622 623 @Override 624 public void logWarn( String message, Object... arguments ) 625 { 626 getLogger().logWarn( message, arguments ); 627 } 628 629 @Override 630 public void logWarn( String message, Throwable throwable ) 631 { 632 getLogger().logWarn( message, throwable ); 633 } 634 635 @Override 636 public void logError( String message, Object... arguments ) 637 { 638 getLogger().logError( message, arguments ); 639 } 640 641 @Override 642 public void logError( String message, Throwable throwable ) 643 { 644 getLogger().logError( message, throwable ); 645 } 646 }