001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner; 022 023import java.io.Serializable; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031 032import cascading.flow.FlowElement; 033import cascading.flow.FlowNode; 034import cascading.flow.FlowStep; 035import cascading.flow.planner.graph.AnnotatedGraph; 036import cascading.flow.planner.graph.ElementGraph; 037import cascading.flow.planner.graph.ElementGraphs; 038import cascading.flow.planner.graph.Extent; 039import cascading.flow.planner.graph.FlowElementGraph; 040import cascading.flow.stream.annotations.StreamMode; 041import cascading.pipe.Group; 042import cascading.pipe.Pipe; 043import cascading.tap.Tap; 044import cascading.util.ProcessLogger; 045import cascading.util.Util; 046 047import static cascading.util.Util.createIdentitySet; 048 049/** 050 * 051 */ 052public class BaseFlowNode implements Serializable, FlowNode, ProcessLogger 053 { 054 private final String id; 055 private int ordinal; 056 private String name; 057 private Map<String, String> processAnnotations; 058 059 private transient FlowStep flowStep; 060 061 protected ElementGraph nodeSubGraph; 062 protected List<? extends ElementGraph> pipelineGraphs = Collections.emptyList(); 063 064 private transient Set<FlowElement> sourceElements; 065 private transient Set<FlowElement> sinkElements; 066 private Map<String, Tap> trapMap; 067 private Set<Tap> sourceTaps; 068 private Set<Tap> sinkTaps; 069 070 private Map<Tap, Set<String>> reverseSourceTaps; 071 private Map<Tap, Set<String>> reverseSinkTaps; 072 private Map<FlowElement, ElementGraph> streamPipelineMap = Collections.emptyMap(); 073 074 // for testing 075 public BaseFlowNode( String name, int ordinal ) 076 { 077 this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number 078 setName( name ); 079 this.ordinal = ordinal; 080 this.trapMap = Collections.emptyMap(); 081 } 082 083 public BaseFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs ) 084 { 085 this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number 086 this.nodeSubGraph = nodeSubGraph; 087 088 if( pipelineGraphs != null ) 089 this.pipelineGraphs = pipelineGraphs; 090 091 verifyPipelines(); 092 createPipelineMap(); 093 094 assignTrappableNames( flowElementGraph ); 095 assignTraps( flowElementGraph.getTrapMap() ); 096 } 097 098 public void setOrdinal( int ordinal ) 099 { 100 this.ordinal = ordinal; 101 } 102 103 @Override 104 public int getOrdinal() 105 { 106 return ordinal; 107 } 108 109 @Override 110 public String getID() 111 { 112 return id; 113 } 114 115 public void setName( String name ) 116 { 117 this.name = name; 118 } 119 120 @Override 121 public String getName() 122 { 123 return name; 124 } 125 126 @Override 127 public Map<String, String> getProcessAnnotations() 128 { 129 if( processAnnotations == null ) 130 return Collections.emptyMap(); 131 132 return Collections.unmodifiableMap( processAnnotations ); 133 } 134 135 @Override 136 public void addProcessAnnotation( Enum annotation ) 137 { 138 if( annotation == null ) 139 return; 140 141 addProcessAnnotation( annotation.getDeclaringClass().getName(), annotation.name() ); 142 } 143 144 @Override 145 public void addProcessAnnotation( String key, String value ) 146 { 147 if( processAnnotations == null ) 148 processAnnotations = new HashMap<>(); 149 150 processAnnotations.put( key, value ); 151 } 152 153 public void setFlowStep( FlowStep flowStep ) 154 { 155 this.flowStep = flowStep; 156 } 157 158 @Override 159 public FlowStep getFlowStep() 160 { 161 return flowStep; 162 } 163 164 @Override 165 public ElementGraph getElementGraph() 166 { 167 return nodeSubGraph; 168 } 169 170 @Override 171 public Set<String> getSourceElementNames() 172 { 173 Set<String> results = new HashSet<>(); 174 175 for( FlowElement flowElement : getSourceElements() ) 176 { 177 if( flowElement instanceof Tap ) 178 results.addAll( getSourceTapNames( (Tap) flowElement ) ); 179 else 180 results.add( ( (Pipe) flowElement ).getName() ); 181 } 182 183 return results; 184 } 185 186 public Set<FlowElement> getSourceElements() 187 { 188 if( sourceElements == null ) 189 sourceElements = Collections.unmodifiableSet( ElementGraphs.findSources( nodeSubGraph, FlowElement.class ) ); 190 191 return sourceElements; 192 } 193 194 @Override 195 public Set<? extends FlowElement> getSourceElements( Enum annotation ) 196 { 197 Set<? extends FlowElement> annotated = getFlowElementsFor( annotation ); 198 Set<FlowElement> sourceElements = getSourceElements(); 199 200 Set<FlowElement> results = new HashSet<>(); 201 202 for( FlowElement sourceElement : sourceElements ) 203 { 204 if( annotated.contains( sourceElement ) ) 205 results.add( sourceElement ); 206 } 207 208 return results; 209 } 210 211 @Override 212 public Set<String> getSinkElementNames() 213 { 214 Set<String> results = new HashSet<>(); 215 216 for( FlowElement flowElement : getSinkElements() ) 217 { 218 if( flowElement instanceof Tap ) 219 results.addAll( getSinkTapNames( (Tap) flowElement ) ); 220 else 221 results.add( ( (Pipe) flowElement ).getName() ); 222 } 223 224 return results; 225 } 226 227 @Override 228 public Set<FlowElement> getSinkElements() 229 { 230 if( sinkElements == null ) 231 sinkElements = Collections.unmodifiableSet( ElementGraphs.findSinks( nodeSubGraph, FlowElement.class ) ); 232 233 return sinkElements; 234 } 235 236 public Set<? extends FlowElement> getSinkElements( Enum annotation ) 237 { 238 Set<? extends FlowElement> annotated = getFlowElementsFor( annotation ); 239 Set<FlowElement> sinkElements = getSinkElements(); 240 241 Set<FlowElement> results = new HashSet<>(); 242 243 for( FlowElement sinkElement : sinkElements ) 244 { 245 if( annotated.contains( sinkElement ) ) 246 results.add( sinkElement ); 247 } 248 249 return results; 250 } 251 252 @Override 253 public List<? extends ElementGraph> getPipelineGraphs() 254 { 255 return pipelineGraphs; 256 } 257 258 @Override 259 public ElementGraph getPipelineGraphFor( FlowElement streamedSource ) 260 { 261 return streamPipelineMap.get( streamedSource ); 262 } 263 264 @Override 265 public Collection<Group> getGroups() 266 { 267 return ElementGraphs.findAllGroups( nodeSubGraph ); 268 } 269 270 @Override 271 public Set<Tap> getSourceTaps() 272 { 273 if( sourceTaps != null ) 274 return sourceTaps; 275 276 sourceTaps = Collections.unmodifiableSet( Util.narrowSet( Tap.class, getSourceElements() ) ); 277 278 return sourceTaps; 279 } 280 281 @Override 282 public Set<Tap> getSinkTaps() 283 { 284 if( sinkTaps != null ) 285 return sinkTaps; 286 287 sinkTaps = Collections.unmodifiableSet( Util.narrowSet( Tap.class, getSinkElements() ) ); 288 289 return sinkTaps; 290 } 291 292 @Override 293 public int getSubmitPriority() 294 { 295 return 0; 296 } 297 298 @Override 299 public Set<String> getSourceTapNames( Tap source ) 300 { 301 return reverseSourceTaps.get( source ); 302 } 303 304 @Override 305 public Set<String> getSinkTapNames( Tap sink ) 306 { 307 return reverseSinkTaps.get( sink ); 308 } 309 310 private void assignTrappableNames( FlowElementGraph flowElementGraph ) 311 { 312 reverseSourceTaps = new HashMap<>(); 313 reverseSinkTaps = new HashMap<>(); 314 315 Set<Tap> sources = getSourceTaps(); 316 317 for( Tap source : sources ) 318 { 319 Set<Scope> scopes = flowElementGraph.outgoingEdgesOf( source ); 320 321 for( Scope scope : scopes ) 322 addSourceName( scope.getName(), source ); 323 } 324 325 for( Map.Entry<String, Tap> entry : flowElementGraph.getSourceMap().entrySet() ) 326 { 327 if( sources.contains( entry.getValue() ) ) 328 addSourceName( entry.getKey(), entry.getValue() ); 329 } 330 331 Set<Tap> sinks = getSinkTaps(); 332 333 for( Tap sink : sinks ) 334 { 335 Set<Scope> scopes = flowElementGraph.incomingEdgesOf( sink ); 336 337 for( Scope scope : scopes ) 338 addSinkName( scope.getName(), sink ); 339 } 340 341 for( Map.Entry<String, Tap> entry : flowElementGraph.getSinkMap().entrySet() ) 342 { 343 if( sinks.contains( entry.getValue() ) ) 344 addSinkName( entry.getKey(), entry.getValue() ); 345 } 346 } 347 348 private void addSourceName( String name, Tap source ) 349 { 350 if( !reverseSourceTaps.containsKey( source ) ) 351 reverseSourceTaps.put( source, new HashSet<String>() ); 352 353 reverseSourceTaps.get( source ).add( name ); 354 } 355 356 private void addSinkName( String name, Tap sink ) 357 { 358 if( !reverseSinkTaps.containsKey( sink ) ) 359 reverseSinkTaps.put( sink, new HashSet<String>() ); 360 361 reverseSinkTaps.get( sink ).add( name ); 362 } 363 364 @Override 365 public Map<String, Tap> getTrapMap() 366 { 367 return trapMap; 368 } 369 370 @Override 371 public Collection<? extends Tap> getTraps() 372 { 373 return getTrapMap().values(); 374 } 375 376 private void assignTraps( Map<String, Tap> traps ) 377 { 378 trapMap = new HashMap<>(); 379 380 for( FlowElement flowElement : nodeSubGraph.vertexSet() ) 381 { 382 Set<String> names = new HashSet<>(); 383 384 if( flowElement instanceof Extent ) 385 continue; 386 387 if( flowElement instanceof Pipe ) 388 { 389 names.add( ( (Pipe) flowElement ).getName() ); 390 } 391 else 392 { 393 Set<String> sourceTapNames = getSourceTapNames( (Tap) flowElement ); 394 395 if( sourceTapNames != null ) 396 names.addAll( sourceTapNames ); 397 398 Set<String> sinkTapNames = getSinkTapNames( (Tap) flowElement ); 399 400 if( sinkTapNames != null ) 401 names.addAll( sinkTapNames ); 402 } 403 404 for( String name : names ) 405 { 406 if( traps.containsKey( name ) ) 407 trapMap.put( name, traps.get( name ) ); 408 } 409 } 410 } 411 412 private void verifyPipelines() 413 { 414 if( pipelineGraphs == null || pipelineGraphs.isEmpty() ) 415 return; 416 417 Set<FlowElement> allElements = createIdentitySet( nodeSubGraph.vertexSet() ); 418 419 for( ElementGraph pipelineGraph : pipelineGraphs ) 420 allElements.removeAll( pipelineGraph.vertexSet() ); 421 422 if( !allElements.isEmpty() ) 423 throw new IllegalStateException( "union of pipeline graphs for flow node are missing elements: " + Util.join( allElements, ", " ) ); 424 } 425 426 private void createPipelineMap() 427 { 428 if( pipelineGraphs == null || pipelineGraphs.isEmpty() ) 429 return; 430 431 Map<FlowElement, ElementGraph> map = new HashMap<>( pipelineGraphs.size() ); 432 433 for( ElementGraph pipelineGraph : pipelineGraphs ) 434 { 435 if( !( pipelineGraph instanceof AnnotatedGraph ) ) 436 throw new IllegalStateException( "pipeline graphs must be of type AnnotatedGraph, got: " + pipelineGraph.getClass().getName() ); 437 438 Set<FlowElement> flowElements; 439 440 if( ( (AnnotatedGraph) pipelineGraph ).hasAnnotations() ) 441 flowElements = ( (AnnotatedGraph) pipelineGraph ).getAnnotations().getValues( StreamMode.Streamed ); 442 else 443 flowElements = ElementGraphs.findSources( pipelineGraph, FlowElement.class ); 444 445 for( FlowElement flowElement : flowElements ) 446 { 447 if( map.containsKey( flowElement ) ) 448 throw new IllegalStateException( "duplicate streamable elements, found: " + flowElement ); 449 450 map.put( flowElement, pipelineGraph ); 451 } 452 } 453 454 this.streamPipelineMap = map; 455 } 456 457 @Override 458 public Tap getTrap( String branchName ) 459 { 460 return trapMap.get( branchName ); 461 } 462 463 @Override 464 public Collection<? extends Scope> getPreviousScopes( FlowElement flowElement ) 465 { 466 return nodeSubGraph.incomingEdgesOf( flowElement ); 467 } 468 469 @Override 470 public Collection<? extends Scope> getNextScopes( FlowElement flowElement ) 471 { 472 return nodeSubGraph.outgoingEdgesOf( flowElement ); 473 } 474 475 @Override 476 public boolean equals( Object object ) 477 { 478 if( this == object ) 479 return true; 480 481 if( object == null || getClass() != object.getClass() ) 482 return false; 483 484 BaseFlowNode flowNode = (BaseFlowNode) object; 485 486 if( id != null ? !id.equals( flowNode.id ) : flowNode.id != null ) 487 return false; 488 489 return true; 490 } 491 492 @Override 493 public int hashCode() 494 { 495 return id != null ? id.hashCode() : 0; 496 } 497 498 @Override 499 public Set<? extends FlowElement> getFlowElementsFor( Enum annotation ) 500 { 501 if( pipelineGraphs.isEmpty() ) 502 return ( (AnnotatedGraph) getElementGraph() ).getAnnotations().getValues( annotation ); 503 504 Set<FlowElement> results = createIdentitySet(); 505 506 for( ElementGraph pipelineGraph : pipelineGraphs ) 507 results.addAll( ( (AnnotatedGraph) pipelineGraph ).getAnnotations().getValues( annotation ) ); 508 509 return results; 510 } 511 512 private ProcessLogger getLogger() 513 { 514 if( flowStep != null && flowStep instanceof ProcessLogger ) 515 return (ProcessLogger) flowStep; 516 517 return ProcessLogger.NULL; 518 } 519 520 @Override 521 public boolean isInfoEnabled() 522 { 523 return getLogger().isInfoEnabled(); 524 } 525 526 @Override 527 public boolean isDebugEnabled() 528 { 529 return getLogger().isDebugEnabled(); 530 } 531 532 @Override 533 public void logInfo( String message, Object... arguments ) 534 { 535 getLogger().logInfo( message, arguments ); 536 } 537 538 @Override 539 public void logDebug( String message, Object... arguments ) 540 { 541 getLogger().logDebug( message, arguments ); 542 } 543 544 @Override 545 public void logWarn( String message ) 546 { 547 getLogger().logWarn( message ); 548 } 549 550 @Override 551 public void logWarn( String message, Object... arguments ) 552 { 553 getLogger().logWarn( message, arguments ); 554 } 555 556 @Override 557 public void logWarn( String message, Throwable throwable ) 558 { 559 getLogger().logWarn( message, throwable ); 560 } 561 562 @Override 563 public void logError( String message, Object... arguments ) 564 { 565 getLogger().logError( message, arguments ); 566 } 567 568 @Override 569 public void logError( String message, Throwable throwable ) 570 { 571 getLogger().logError( message, throwable ); 572 } 573 }