001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.planner; 022 023 import java.util.ArrayList; 024 import java.util.Arrays; 025 import java.util.Collection; 026 import java.util.Collections; 027 import java.util.HashSet; 028 import java.util.List; 029 import java.util.Map; 030 import java.util.Set; 031 032 import cascading.flow.AssemblyPlanner; 033 import cascading.flow.Flow; 034 import cascading.flow.FlowConnector; 035 import cascading.flow.FlowDef; 036 import cascading.flow.FlowElement; 037 import cascading.operation.AssertionLevel; 038 import cascading.operation.DebugLevel; 039 import cascading.pipe.Checkpoint; 040 import cascading.pipe.CoGroup; 041 import cascading.pipe.Each; 042 import cascading.pipe.Every; 043 import cascading.pipe.Group; 044 import cascading.pipe.GroupBy; 045 import cascading.pipe.HashJoin; 046 import cascading.pipe.Merge; 047 import cascading.pipe.OperatorException; 048 import cascading.pipe.Pipe; 049 import cascading.pipe.Splice; 050 import cascading.pipe.SubAssembly; 051 import cascading.property.ConfigDef; 052 import cascading.property.PropertyUtil; 053 import cascading.scheme.Scheme; 054 import cascading.tap.Tap; 055 import cascading.tap.TapException; 056 import cascading.tuple.Fields; 057 import cascading.util.Util; 058 import org.jgrapht.GraphPath; 059 import org.jgrapht.Graphs; 060 import org.slf4j.Logger; 061 import org.slf4j.LoggerFactory; 062 063 import static cascading.flow.planner.ElementGraphs.*; 064 import static java.util.Arrays.asList; 065 066 /** Class FlowPlanner is the base class for all planner implementations. */ 067 public abstract class FlowPlanner<F extends Flow, Config> 068 { 069 /** Field LOG */ 070 private static final Logger LOG = LoggerFactory.getLogger( FlowPlanner.class ); 071 072 /** Field properties */ 073 protected Map<Object, Object> properties; 074 075 protected String checkpointRootPath = null; 076 077 /** Field assertionLevel */ 078 protected AssertionLevel assertionLevel; 079 /** Field debugLevel */ 080 protected DebugLevel debugLevel; 081 082 /** 083 * Method getAssertionLevel returns the configured target planner {@link cascading.operation.AssertionLevel}. 084 * 085 * @param properties of type Map<Object, Object> 086 * @return AssertionLevel the configured AssertionLevel 087 */ 088 static AssertionLevel getAssertionLevel( Map<Object, Object> properties ) 089 { 090 String assertionLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.assertionlevel", AssertionLevel.STRICT.name() ); 091 092 return AssertionLevel.valueOf( assertionLevel ); 093 } 094 095 /** 096 * Method getDebugLevel returns the configured target planner {@link cascading.operation.DebugLevel}. 097 * 098 * @param properties of type Map<Object, Object> 099 * @return DebugLevel the configured DebugLevel 100 */ 101 static DebugLevel getDebugLevel( Map<Object, Object> properties ) 102 { 103 String debugLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.debuglevel", DebugLevel.DEFAULT.name() ); 104 105 return DebugLevel.valueOf( debugLevel ); 106 } 107 108 public Map<Object, Object> getProperties() 109 { 110 return properties; 111 } 112 113 public abstract Config getConfig(); 114 115 public abstract PlatformInfo getPlatformInfo(); 116 117 public void initialize( FlowConnector flowConnector, Map<Object, Object> properties ) 118 { 119 this.properties = properties; 120 this.assertionLevel = getAssertionLevel( properties ); 121 this.debugLevel = getDebugLevel( properties ); 122 } 123 124 protected abstract Flow createFlow( FlowDef flowDef ); 125 126 /** 127 * Method buildFlow renders the actual Flow instance. 128 * 129 * @param flowDef 130 * @return Flow 131 */ 132 public abstract F buildFlow( FlowDef flowDef ); 133 134 protected Pipe[] resolveTails( FlowDef flowDef, Flow<Config> flow ) 135 { 136 Pipe[] tails = flowDef.getTailsArray(); 137 138 tails = resolveAssemblyPlanners( flowDef, flow, tails ); 139 140 return tails; 141 } 142 143 protected Pipe[] resolveAssemblyPlanners( FlowDef flowDef, Flow flow, Pipe[] pipes ) 144 { 145 List<Pipe> tails = Arrays.asList( pipes ); 146 147 List<AssemblyPlanner> assemblyPlanners = flowDef.getAssemblyPlanners(); 148 149 for( AssemblyPlanner assemblyPlanner : assemblyPlanners ) 150 { 151 tails = assemblyPlanner.resolveTails( new AssemblyPlannerContext( flowDef, flow, tails ) ); 152 153 if( tails.isEmpty() ) 154 throw new PlannerException( "assembly planner: " + assemblyPlanner + ", returned zero tails" ); 155 156 tails = Collections.unmodifiableList( tails ); 157 } 158 159 return tails.toArray( new Pipe[ tails.size() ] ); 160 } 161 162 protected void verifyAssembly( FlowDef flowDef, Pipe[] tails ) 163 { 164 verifyPipeAssemblyEndPoints( flowDef, tails ); 165 verifyTraps( flowDef, tails ); 166 verifyCheckpoints( flowDef, tails ); 167 } 168 169 protected void verifyAllTaps( FlowDef flowDef ) 170 { 171 verifySourceNotSinks( flowDef.getSources(), flowDef.getSinks() ); 172 173 verifyTaps( flowDef.getSources(), true, true ); 174 verifyTaps( flowDef.getSinks(), false, true ); 175 verifyTaps( flowDef.getTraps(), false, false ); 176 177 // are both sources and sinks 178 verifyTaps( flowDef.getCheckpoints(), true, false ); 179 verifyTaps( flowDef.getCheckpoints(), false, false ); 180 } 181 182 protected ElementGraph createElementGraph( FlowDef flowDef, Pipe[] flowTails ) 183 { 184 Map<String, Tap> sources = flowDef.getSourcesCopy(); 185 Map<String, Tap> sinks = flowDef.getSinksCopy(); 186 Map<String, Tap> traps = flowDef.getTrapsCopy(); 187 Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy(); 188 189 AssertionLevel assertionLevel = flowDef.getAssertionLevel() == null ? this.assertionLevel : flowDef.getAssertionLevel(); 190 DebugLevel debugLevel = flowDef.getDebugLevel() == null ? this.debugLevel : flowDef.getDebugLevel(); 191 192 checkpointRootPath = makeCheckpointRootPath( flowDef ); 193 194 return new ElementGraph( getPlatformInfo(), flowTails, sources, sinks, traps, checkpoints, checkpointRootPath != null, assertionLevel, debugLevel ); 195 } 196 197 private String makeCheckpointRootPath( FlowDef flowDef ) 198 { 199 String flowName = flowDef.getName(); 200 String runID = flowDef.getRunID(); 201 202 if( runID == null ) 203 return null; 204 205 if( flowName == null ) 206 throw new PlannerException( "flow name is required when providing a run id" ); 207 208 return flowName + "/" + runID; 209 } 210 211 212 protected void verifySourceNotSinks( Map<String, Tap> sources, Map<String, Tap> sinks ) 213 { 214 Collection<Tap> sourcesSet = sources.values(); 215 216 for( Tap tap : sinks.values() ) 217 { 218 if( sourcesSet.contains( tap ) ) 219 throw new PlannerException( "tap may not be used as both source and sink in the same Flow: " + tap ); 220 } 221 } 222 223 /** 224 * Method verifyTaps ... 225 * 226 * @param taps of type Map<String, Tap> 227 * @param areSources of type boolean 228 * @param mayNotBeEmpty of type boolean 229 */ 230 protected void verifyTaps( Map<String, Tap> taps, boolean areSources, boolean mayNotBeEmpty ) 231 { 232 if( mayNotBeEmpty && taps.isEmpty() ) 233 throw new PlannerException( ( areSources ? "source" : "sink" ) + " taps are required" ); 234 235 for( String tapName : taps.keySet() ) 236 { 237 if( areSources && !taps.get( tapName ).isSource() ) 238 throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a source: " + taps.get( tapName ) ); 239 else if( !areSources && !taps.get( tapName ).isSink() ) 240 throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a sink: " + taps.get( tapName ) ); 241 } 242 } 243 244 /** 245 * Method verifyEndPoints verifies 246 * <p/> 247 * there aren't dupe names in heads or tails. 248 * all the sink and source tap names match up with tail and head pipes 249 */ 250 // todo: force dupe names to throw exceptions 251 protected void verifyPipeAssemblyEndPoints( FlowDef flowDef, Pipe[] flowTails ) 252 { 253 Set<String> tapNames = new HashSet<String>(); 254 255 tapNames.addAll( flowDef.getSources().keySet() ); 256 tapNames.addAll( flowDef.getSinks().keySet() ); 257 258 // handle tails 259 Set<Pipe> tails = new HashSet<Pipe>(); 260 Set<String> tailNames = new HashSet<String>(); 261 262 for( Pipe pipe : flowTails ) 263 { 264 if( pipe instanceof SubAssembly ) 265 { 266 for( Pipe tail : ( (SubAssembly) pipe ).getTails() ) 267 { 268 String tailName = tail.getName(); 269 270 if( !tapNames.contains( tailName ) ) 271 throw new PlannerException( tail, "pipe name not found in either sink or source map: '" + tailName + "'" ); 272 273 if( tailNames.contains( tailName ) && !tails.contains( tail ) ) 274 LOG.warn( "duplicate tail name found: '{}'", tailName ); 275 // throw new PlannerException( pipe, "duplicate tail name found: " + tailName ); 276 277 tailNames.add( tailName ); 278 tails.add( tail ); 279 } 280 } 281 else 282 { 283 String tailName = pipe.getName(); 284 285 if( !tapNames.contains( tailName ) ) 286 throw new PlannerException( pipe, "pipe name not found in either sink or source map: '" + tailName + "'" ); 287 288 if( tailNames.contains( tailName ) && !tails.contains( pipe ) ) 289 LOG.warn( "duplicate tail name found: '{}'", tailName ); 290 // throw new PlannerException( pipe, "duplicate tail name found: " + tailName ); 291 292 tailNames.add( tailName ); 293 tails.add( pipe ); 294 } 295 } 296 297 // Set<String> allTailNames = new HashSet<String>( tailNames ); 298 tailNames.removeAll( flowDef.getSinks().keySet() ); 299 Set<String> remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() ); 300 remainingSinks.removeAll( tailNames ); 301 302 if( tailNames.size() != 0 ) 303 throw new PlannerException( "not all tail pipes bound to sink taps, remaining tail pipe names: [" + Util.join( Util.quote( tailNames, "'" ), ", " ) + "], remaining sink tap names: [" + Util.join( Util.quote( remainingSinks, "'" ), ", " ) + "]" ); 304 305 // unlike heads, pipes can input to another pipe and simultaneously be a sink 306 // so there is no way to know all the intentional tails, so they aren't listed below in the exception 307 remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() ); 308 remainingSinks.removeAll( asList( Pipe.names( flowTails ) ) ); 309 310 if( remainingSinks.size() != 0 ) 311 throw new PlannerException( "not all sink taps bound to tail pipes, remaining sink tap names: [" + Util.join( Util.quote( remainingSinks, "'" ), ", " ) + "]" ); 312 313 // handle heads 314 Set<Pipe> heads = new HashSet<Pipe>(); 315 Set<String> headNames = new HashSet<String>(); 316 317 for( Pipe pipe : flowTails ) 318 { 319 for( Pipe head : pipe.getHeads() ) 320 { 321 String headName = head.getName(); 322 323 if( !tapNames.contains( headName ) ) 324 throw new PlannerException( head, "pipe name not found in either sink or source map: '" + headName + "'" ); 325 326 if( headNames.contains( headName ) && !heads.contains( head ) ) 327 LOG.warn( "duplicate head name found, not an error but heads should have unique names: '{}'", headName ); 328 // throw new PlannerException( pipe, "duplicate head name found: " + headName ); 329 330 headNames.add( headName ); 331 heads.add( head ); 332 } 333 } 334 335 Set<String> allHeadNames = new HashSet<String>( headNames ); 336 headNames.removeAll( flowDef.getSources().keySet() ); 337 Set<String> remainingSources = new HashSet<String>( flowDef.getSources().keySet() ); 338 remainingSources.removeAll( headNames ); 339 340 if( headNames.size() != 0 ) 341 throw new PlannerException( "not all head pipes bound to source taps, remaining head pipe names: [" + Util.join( Util.quote( headNames, "'" ), ", " ) + "], remaining source tap names: [" + Util.join( Util.quote( remainingSources, "'" ), ", " ) + "]" ); 342 343 remainingSources = new HashSet<String>( flowDef.getSources().keySet() ); 344 remainingSources.removeAll( allHeadNames ); 345 346 if( remainingSources.size() != 0 ) 347 throw new PlannerException( "not all source taps bound to head pipes, remaining source tap names: [" + Util.join( Util.quote( remainingSources, "'" ), ", " ) + "], remaining head pipe names: [" + Util.join( Util.quote( headNames, "'" ), ", " ) + "]" ); 348 349 } 350 351 protected void verifyTraps( FlowDef flowDef, Pipe[] flowTails ) 352 { 353 verifyNotSourcesSinks( flowDef.getTraps(), flowDef.getSources(), flowDef.getSinks(), "trap" ); 354 355 Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) ); 356 357 for( String name : flowDef.getTraps().keySet() ) 358 { 359 if( !names.contains( name ) ) 360 throw new PlannerException( "trap name not found in assembly: '" + name + "'" ); 361 } 362 } 363 364 protected void verifyCheckpoints( FlowDef flowDef, Pipe[] flowTails ) 365 { 366 verifyNotSourcesSinks( flowDef.getCheckpoints(), flowDef.getSources(), flowDef.getSinks(), "checkpoint" ); 367 368 for( Tap checkpointTap : flowDef.getCheckpoints().values() ) 369 { 370 Scheme scheme = checkpointTap.getScheme(); 371 372 if( scheme.getSourceFields().equals( Fields.UNKNOWN ) && scheme.getSinkFields().equals( Fields.ALL ) ) 373 continue; 374 375 throw new PlannerException( "checkpoint tap scheme must be undeclared, source fields must be UNKNOWN, and sink fields ALL, got: " + scheme.toString() ); 376 } 377 378 Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) ); 379 380 for( String name : flowDef.getCheckpoints().keySet() ) 381 { 382 if( !names.contains( name ) ) 383 throw new PlannerException( "checkpoint name not found in assembly: '" + name + "'" ); 384 385 Set<Pipe> pipes = new HashSet<Pipe>( asList( Pipe.named( name, flowTails ) ) ); 386 387 int count = 0; 388 389 for( Pipe pipe : pipes ) 390 { 391 if( pipe instanceof Checkpoint ) 392 count++; 393 } 394 395 if( count == 0 ) 396 throw new PlannerException( "no checkpoint with name found in assembly: '" + name + "'" ); 397 398 if( count > 1 ) 399 throw new PlannerException( "more than one checkpoint with name found in assembly: '" + name + "'" ); 400 } 401 } 402 403 private void verifyNotSourcesSinks( Map<String, Tap> taps, Map<String, Tap> sources, Map<String, Tap> sinks, String role ) 404 { 405 Collection<Tap> sourceTaps = sources.values(); 406 Collection<Tap> sinkTaps = sinks.values(); 407 408 for( Tap tap : taps.values() ) 409 { 410 if( sourceTaps.contains( tap ) ) 411 throw new PlannerException( "tap may not be used as both a " + role + " and a source in the same Flow: " + tap ); 412 413 if( sinkTaps.contains( tap ) ) 414 throw new PlannerException( "tap may not be used as both a " + role + " and a sink in the same Flow: " + tap ); 415 } 416 } 417 418 /** 419 * Verifies that there are not only GroupAssertions following any given Group instance. This will adversely 420 * affect the stream entering any subsequent Tap of Each instances. 421 */ 422 protected void failOnLoneGroupAssertion( ElementGraph elementGraph ) 423 { 424 List<Group> groups = elementGraph.findAllGroups(); 425 426 // walk Every instances after Group 427 for( Group group : groups ) 428 { 429 for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsFrom( group ) ) 430 { 431 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is tail 432 433 int everies = 0; 434 int assertions = 0; 435 436 for( FlowElement flowElement : flowElements ) 437 { 438 if( flowElement instanceof Group ) 439 continue; 440 441 if( !( flowElement instanceof Every ) ) 442 break; 443 444 everies++; 445 446 Every every = (Every) flowElement; 447 448 if( every.getPlannerLevel() != null ) 449 assertions++; 450 } 451 452 if( everies != 0 && everies == assertions ) 453 throw new PlannerException( "group assertions must be accompanied by aggregator operations" ); 454 } 455 } 456 } 457 458 protected void failOnMissingGroup( ElementGraph elementGraph ) 459 { 460 List<Every> everies = elementGraph.findAllEveries(); 461 462 // walk Every instances after Group 463 for( Every every : everies ) 464 { 465 for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( every ) ) 466 { 467 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is every 468 Collections.reverse( flowElements ); // first element is every 469 470 for( FlowElement flowElement : flowElements ) 471 { 472 if( flowElement instanceof Every || flowElement.getClass() == Pipe.class ) 473 continue; 474 475 if( flowElement instanceof GroupBy || flowElement instanceof CoGroup ) 476 break; 477 478 throw new PlannerException( (Pipe) flowElement, "Every may only be preceded by another Every or a Group pipe, found: " + flowElement ); 479 } 480 } 481 } 482 } 483 484 protected void failOnMisusedBuffer( ElementGraph elementGraph ) 485 { 486 List<Every> everies = elementGraph.findAllEveries(); 487 488 // walk Every instances after Group 489 for( Every every : everies ) 490 { 491 for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( every ) ) 492 { 493 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is every 494 Collections.reverse( flowElements ); // first element is every 495 496 Every last = null; 497 boolean foundBuffer = false; 498 int foundEveries = -1; 499 500 for( FlowElement flowElement : flowElements ) 501 { 502 if( flowElement instanceof Each ) 503 throw new PlannerException( (Pipe) flowElement, "Every may only be preceded by another Every or a GroupBy or CoGroup pipe, found: " + flowElement ); 504 505 if( flowElement instanceof Every ) 506 { 507 foundEveries++; 508 509 boolean isBuffer = ( (Every) flowElement ).isBuffer(); 510 511 if( foundEveries != 0 && ( isBuffer || foundBuffer ) ) 512 throw new PlannerException( (Pipe) flowElement, "Only one Every with a Buffer may follow a GroupBy or CoGroup pipe, no other Every instances are allowed immediately before or after, found: " + flowElement + " before: " + last ); 513 514 if( !foundBuffer ) 515 foundBuffer = isBuffer; 516 517 last = (Every) flowElement; 518 } 519 520 if( flowElement instanceof Group ) 521 break; 522 } 523 } 524 } 525 } 526 527 protected void failOnGroupEverySplit( ElementGraph elementGraph ) 528 { 529 List<Group> groups = new ArrayList<Group>(); 530 531 elementGraph.findAllOfType( 1, 2, Group.class, groups ); 532 533 for( Group group : groups ) 534 { 535 Set<FlowElement> children = elementGraph.getAllChildrenNotExactlyType( group, Pipe.class ); 536 537 for( FlowElement flowElement : children ) 538 { 539 if( flowElement instanceof Every ) 540 throw new PlannerException( (Every) flowElement, "Every instances may not split after a GroupBy or CoGroup pipe, found: " + flowElement + " after: " + group ); 541 } 542 } 543 } 544 545 protected PlannerException handleExceptionDuringPlanning( Exception exception, ElementGraph elementGraph ) 546 { 547 if( exception instanceof PlannerException ) 548 { 549 ( (PlannerException) exception ).elementGraph = elementGraph; 550 551 return (PlannerException) exception; 552 } 553 else if( exception instanceof ElementGraphException ) 554 { 555 Throwable cause = exception.getCause(); 556 557 if( cause == null ) 558 cause = exception; 559 560 // captures pipegraph for debugging 561 // forward message in case cause or trace is lost 562 String message = String.format( "could not build flow from assembly: [%s]", cause.getMessage() ); 563 564 if( cause instanceof OperatorException ) 565 return new PlannerException( message, cause, elementGraph ); 566 567 if( cause instanceof TapException ) 568 return new PlannerException( message, cause, elementGraph ); 569 570 return new PlannerException( ( (ElementGraphException) exception ).getPipe(), message, cause, elementGraph ); 571 } 572 else 573 { 574 // captures pipegraph for debugging 575 // forward message in case cause or trace is lost 576 String message = String.format( "could not build flow from assembly: [%s]", exception.getMessage() ); 577 return new PlannerException( message, exception, elementGraph ); 578 } 579 } 580 581 protected void handleNonSafeOperations( ElementGraph elementGraph ) 582 { 583 // if there was a graph change, iterate paths again. 584 while( !internalNonSafeOperations( elementGraph ) ) 585 ; 586 } 587 588 private boolean internalNonSafeOperations( ElementGraph elementGraph ) 589 { 590 Set<Pipe> tapInsertions = new HashSet<Pipe>(); 591 592 List<Pipe> splits = elementGraph.findAllPipeSplits(); 593 594 // if any predecessor is unsafe, insert temp 595 for( Pipe split : splits ) 596 { 597 List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsTo( split ); 598 599 for( GraphPath<FlowElement, Scope> path : paths ) 600 { 601 List<FlowElement> elements = Graphs.getPathVertexList( path ); 602 Collections.reverse( elements ); 603 604 for( FlowElement element : elements ) 605 { 606 if( !( element instanceof Each ) && element.getClass() != Pipe.class ) 607 break; 608 609 if( element.getClass() == Pipe.class ) 610 continue; 611 612 if( !( (Each) element ).getOperation().isSafe() ) 613 { 614 tapInsertions.add( split ); 615 break; 616 } 617 } 618 } 619 } 620 621 for( Pipe pipe : tapInsertions ) 622 insertTempTapAfter( elementGraph, pipe ); 623 624 return tapInsertions.isEmpty(); 625 } 626 627 /** 628 * Method insertTapAfter ... 629 * 630 * @param graph of type PipeGraph 631 * @param pipe of type Pipe 632 */ 633 protected void insertTempTapAfter( ElementGraph graph, Pipe pipe ) 634 { 635 LOG.debug( "inserting tap after: {}", pipe ); 636 637 Tap checkpointTap = graph.getCheckpointsMap().get( pipe.getName() ); 638 639 if( checkpointTap != null ) 640 LOG.info( "found checkpoint: {}, using tap: {}", pipe.getName(), checkpointTap ); 641 642 if( checkpointTap == null ) 643 { 644 // only restart from a checkpoint pipe or checkpoint tap below 645 if( pipe instanceof Checkpoint ) 646 { 647 checkpointTap = makeTempTap( checkpointRootPath, pipe.getName() ); 648 // mark as an anonymous checkpoint 649 checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" ); 650 } 651 else 652 { 653 checkpointTap = makeTempTap( pipe.getName() ); 654 } 655 } 656 657 graph.insertFlowElementAfter( pipe, checkpointTap ); 658 } 659 660 protected Tap makeTempTap( String name ) 661 { 662 return makeTempTap( null, name ); 663 } 664 665 protected abstract Tap makeTempTap( String prefix, String name ); 666 667 /** 668 * Inserts a temporary Tap between logical MR jobs. 669 * <p/> 670 * Since all joins are at groups or splices, depth first search is safe 671 * <p/> 672 * todo: refactor so that rules are applied to path segments bounded by taps 673 * todo: this would allow balancing of operations within paths instead of pushing 674 * todo: all operations up. may allow for consolidation of rules 675 * 676 * @param elementGraph of type PipeGraph 677 */ 678 protected void handleJobPartitioning( ElementGraph elementGraph ) 679 { 680 // if there was a graph change, iterate paths again. prevents many temp taps from being inserted in front of a group 681 while( !internalJobPartitioning( elementGraph ) ) 682 ; 683 } 684 685 private boolean internalJobPartitioning( ElementGraph elementGraph ) 686 { 687 for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsBetweenExtents() ) 688 { 689 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); 690 List<Pipe> tapInsertions = new ArrayList<Pipe>(); 691 692 boolean foundGroup = false; 693 694 for( int i = 0; i < flowElements.size(); i++ ) 695 { 696 FlowElement flowElement = flowElements.get( i ); 697 698 if( flowElement instanceof ElementGraph.Extent ) // is an extent: head or tail 699 continue; 700 else if( flowElement instanceof Tap && flowElements.get( i - 1 ) instanceof ElementGraph.Extent ) // is a source tap 701 continue; 702 703 if( flowElement instanceof Group && !foundGroup ) 704 { 705 foundGroup = true; 706 } 707 else if( flowElement instanceof Splice && foundGroup ) // add tap between groups, push joins/merge map side 708 { 709 tapInsertions.add( (Pipe) flowElements.get( i - 1 ) ); 710 711 if( !( flowElement instanceof Group ) ) 712 foundGroup = false; 713 } 714 else if( flowElement instanceof Checkpoint ) // add tap after checkpoint 715 { 716 if( flowElements.get( i + 1 ) instanceof Tap ) // don't keep inserting 717 continue; 718 719 tapInsertions.add( (Pipe) flowElement ); 720 foundGroup = false; 721 } 722 else if( flowElement instanceof Tap ) 723 { 724 foundGroup = false; 725 } 726 } 727 728 for( Pipe pipe : tapInsertions ) 729 insertTempTapAfter( elementGraph, pipe ); 730 731 if( !tapInsertions.isEmpty() ) 732 return false; 733 } 734 735 return true; 736 } 737 738 /** 739 * Prevent leftmost sources from sourcing a downstream join on the rightmost side intra-task by inserting a 740 * temp tap between the left-sourced join and right-sourced join. 741 * 742 * @param elementGraph 743 */ 744 protected void handleJoins( ElementGraph elementGraph ) 745 { 746 while( !internalJoins( elementGraph ) ) 747 ; 748 } 749 750 private boolean internalJoins( ElementGraph elementGraph ) 751 { 752 List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsBetweenExtents(); 753 754 // large to small 755 Collections.reverse( paths ); 756 757 for( GraphPath<FlowElement, Scope> path : paths ) 758 { 759 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); 760 List<Pipe> tapInsertions = new ArrayList<Pipe>(); 761 List<HashJoin> joins = new ArrayList<HashJoin>(); 762 List<Merge> merges = new ArrayList<Merge>(); 763 764 FlowElement lastSourceElement = null; 765 766 for( int i = 0; i < flowElements.size(); i++ ) 767 { 768 FlowElement flowElement = flowElements.get( i ); 769 770 if( flowElement instanceof Merge ) 771 { 772 merges.add( (Merge) flowElement ); 773 } 774 else if( flowElement instanceof HashJoin ) 775 { 776 HashJoin join = (HashJoin) flowElement; 777 778 Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, join, true ); 779 780 // is this path streamed 781 int pathPosition = pathPositionInto( path, join ); 782 boolean thisPathIsStreamed = pathPosition == 0; 783 784 boolean isAccumulatedAndStreamed = isBothAccumulatedAndStreamedPath( pathCounts ); // has streamed and accumulated paths 785 int pathCount = countPaths( pathCounts ); 786 787 int priorJoins = countTypesBetween( elementGraph, lastSourceElement, join, HashJoin.class ); 788 789 if( priorJoins == 0 ) 790 { 791 // if same source is leading into the hashjoin, insert tap on the accumulated side 792 if( pathCount == 2 && isAccumulatedAndStreamed && !thisPathIsStreamed ) 793 { 794 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 795 break; 796 } 797 798 // if more than one path into streamed and accumulated branches, insert tap on streamed side 799 if( pathCount > 2 && isAccumulatedAndStreamed && thisPathIsStreamed ) 800 { 801 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 802 break; 803 } 804 } 805 806 if( !merges.isEmpty() ) 807 { 808 // if a Merge is prior to a HashJoin, and its an accumulated path, force Merge results to disk 809 int joinPos = flowElements.indexOf( join ); 810 int mergePos = nearest( flowElements, joinPos, merges ); 811 812 if( mergePos != -1 && joinPos > mergePos ) 813 { 814 // if all paths are accumulated and streamed, insert 815 // else if just if this path is accumulated 816 if( ( isAccumulatedAndStreamed && thisPathIsStreamed ) || !thisPathIsStreamed ) 817 { 818 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 819 break; 820 } 821 } 822 } 823 824 joins.add( (HashJoin) flowElement ); 825 } 826 else if( flowElement instanceof Tap || flowElement instanceof Group ) 827 { 828 // added for JoinFieldedPipesPlatformTest.testJoinMergeGroupBy where Merge hides streamed nature of path 829 if( flowElement instanceof Group && !joins.isEmpty() ) 830 { 831 List<Splice> splices = new ArrayList<Splice>(); 832 833 splices.addAll( merges ); 834 splices.add( (Splice) flowElement ); 835 836 Collections.reverse( splices ); 837 838 for( Splice splice : splices ) 839 { 840 Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, splice, true ); 841 842 if( isBothAccumulatedAndStreamedPath( pathCounts ) ) 843 { 844 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( splice ) - 1 ) ); 845 break; 846 } 847 } 848 849 if( !tapInsertions.isEmpty() ) 850 break; 851 } 852 853 for( int j = 0; j < joins.size(); j++ ) 854 { 855 HashJoin join = joins.get( j ); 856 857 int pathPosition = pathPositionInto( path, join ); 858 boolean thisPathIsStreamed = pathPosition == 0; 859 860 Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, join, true ); 861 862 boolean isAccumulatedAndStreamed = isBothAccumulatedAndStreamedPath( pathCounts ); // has streamed and accumulated paths 863 int pathCount = countPaths( pathCounts ); 864 865 if( pathCount >= 2 && isAccumulatedAndStreamed && thisPathIsStreamed ) 866 { 867 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 868 break; 869 } 870 871 if( thisPathIsStreamed ) 872 continue; 873 874 if( j == 0 ) // is accumulated on first join 875 break; 876 877 // prevent a streamed path from being accumulated by injecting a tap before the 878 // current HashJoin 879 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 880 break; 881 } 882 883 if( !tapInsertions.isEmpty() ) 884 break; 885 886 lastSourceElement = flowElement; 887 merges.clear(); 888 joins.clear(); 889 } 890 } 891 892 for( Pipe pipe : tapInsertions ) 893 insertTempTapAfter( elementGraph, pipe ); 894 895 if( !tapInsertions.isEmpty() ) 896 return false; 897 } 898 899 return true; 900 } 901 902 private int nearest( List<FlowElement> flowElements, int index, List<Merge> merges ) 903 { 904 List<Merge> reversed = new ArrayList<Merge>( merges ); 905 Collections.reverse( reversed ); 906 907 for( Merge merge : reversed ) 908 { 909 int pos = flowElements.indexOf( merge ); 910 if( pos < index ) 911 return pos; 912 } 913 914 return -1; 915 } 916 }