001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner; 022 023import java.util.Arrays; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.TreeSet; 031 032import cascading.flow.AssemblyPlanner; 033import cascading.flow.BaseFlow; 034import cascading.flow.Flow; 035import cascading.flow.FlowConnector; 036import cascading.flow.FlowConnectorProps; 037import cascading.flow.FlowDef; 038import cascading.flow.FlowElement; 039import cascading.flow.FlowNode; 040import cascading.flow.FlowStep; 041import cascading.flow.Flows; 042import cascading.flow.planner.graph.ElementGraph; 043import cascading.flow.planner.graph.FlowElementGraph; 044import cascading.flow.planner.process.FlowNodeGraph; 045import cascading.flow.planner.process.FlowStepGraph; 046import cascading.flow.planner.rule.ProcessLevel; 047import cascading.flow.planner.rule.RuleRegistry; 048import cascading.flow.planner.rule.RuleRegistrySet; 049import cascading.flow.planner.rule.RuleResult; 050import cascading.flow.planner.rule.RuleSetExec; 051import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory; 052import cascading.flow.planner.rule.util.TraceWriter; 053import cascading.operation.AssertionLevel; 054import cascading.operation.DebugLevel; 055import cascading.pipe.Checkpoint; 056import cascading.pipe.OperatorException; 057import cascading.pipe.Pipe; 058import cascading.pipe.SubAssembly; 059import cascading.property.ConfigDef; 060import cascading.property.PropertyUtil; 061import cascading.scheme.Scheme; 062import cascading.tap.Tap; 063import cascading.tap.TapException; 064import cascading.tuple.Fields; 065import cascading.util.Update; 066import cascading.util.Util; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import static cascading.util.Util.*; 071import static java.util.Arrays.asList; 072 073/** 074 * Class FlowPlanner is the base class for all planner implementations. 075 * <p/> 076 * This planner support tracing execution of each rule. See the appropriate properties on this 077 * class to enable. 078 */ 079public abstract class FlowPlanner<F extends BaseFlow, Config> 080 { 081 /** 082 * Enables the planner to write out basic planner information including the initial element-graph, 083 * completed element-graph, and the completed step-graph dot files. 084 */ 085 public static final String TRACE_PLAN_PATH = "cascading.planner.plan.path"; 086 087 /** 088 * Enables the planner to write out detail level planner information for each rule, including recursive 089 * transforms. 090 * <p/> 091 * Use this to debug rules. This does increase overhead during planning. 092 */ 093 public static final String TRACE_PLAN_TRANSFORM_PATH = "cascading.planner.plan.transforms.path"; 094 095 /** 096 * Enables the planner to write out planner statistics for each planner phase and rule. 097 */ 098 public static final String TRACE_STATS_PATH = "cascading.planner.stats.path"; 099 100 /** Field LOG */ 101 private static final Logger LOG = LoggerFactory.getLogger( FlowPlanner.class ); 102 103 /** Field properties */ 104 protected Map<Object, Object> defaultProperties; 105 106 protected String checkpointTapRootPath = null; 107 108 /** Field assertionLevel */ 109 protected AssertionLevel defaultAssertionLevel; 110 /** Field debugLevel */ 111 protected DebugLevel defaultDebugLevel; 112 113 /** 114 * Method getAssertionLevel returns the configured target planner {@link cascading.operation.AssertionLevel}. 115 * 116 * @param properties of type Map<Object, Object> 117 * @return AssertionLevel the configured AssertionLevel 118 */ 119 static AssertionLevel getAssertionLevel( Map<Object, Object> properties ) 120 { 121 String assertionLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.assertionlevel", AssertionLevel.STRICT.name() ); 122 123 return AssertionLevel.valueOf( assertionLevel ); 124 } 125 126 /** 127 * Method getDebugLevel returns the configured target planner {@link cascading.operation.DebugLevel}. 128 * 129 * @param properties of type Map<Object, Object> 130 * @return DebugLevel the configured DebugLevel 131 */ 132 static DebugLevel getDebugLevel( Map<Object, Object> properties ) 133 { 134 String debugLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.debuglevel", DebugLevel.DEFAULT.name() ); 135 136 return DebugLevel.valueOf( debugLevel ); 137 } 138 139 { 140 Update.registerPlanner( getClass() ); 141 } 142 143 public Map<Object, Object> getDefaultProperties() 144 { 145 return defaultProperties; 146 } 147 148 public abstract Config getDefaultConfig(); 149 150 public abstract PlannerInfo getPlannerInfo( String name ); 151 152 public abstract PlatformInfo getPlatformInfo(); 153 154 public void initialize( FlowConnector flowConnector, Map<Object, Object> properties ) 155 { 156 this.defaultProperties = properties; 157 this.defaultAssertionLevel = getAssertionLevel( properties ); 158 this.defaultDebugLevel = getDebugLevel( properties ); 159 } 160 161 public F buildFlow( FlowDef flowDef, RuleRegistrySet ruleRegistrySet ) 162 { 163 FlowElementGraph flowElementGraph = null; 164 165 try 166 { 167 flowDef = normalizeTaps( flowDef ); 168 169 verifyAllTaps( flowDef ); 170 171 F flow = createFlow( flowDef ); 172 173 Pipe[] tails = resolveTails( flowDef, flow ); 174 175 verifyAssembly( flowDef, tails ); 176 177 flowElementGraph = createFlowElementGraph( flowDef, tails ); 178 179 TraceWriter traceWriter = new TraceWriter( flow ); 180 RuleSetExec ruleSetExec = new RuleSetExec( traceWriter, this, flow, ruleRegistrySet, flowDef, flowElementGraph ); 181 182 RuleResult ruleResult = ruleSetExec.exec(); 183 184 traceWriter.writeTracePlan( null, "0-initial-flow-element-graph", flowElementGraph ); 185 186 FlowElementGraph finalFlowElementGraph = ruleResult.getAssemblyGraph(); 187 188 finalFlowElementGraph = flow.updateSchemes( finalFlowElementGraph ); 189 190 Map<ElementGraph, List<? extends ElementGraph>> stepToNodes = ruleResult.getStepToNodeGraphMap(); 191 Map<ElementGraph, List<? extends ElementGraph>> nodeToPipeline = ruleResult.getNodeToPipelineGraphMap(); 192 193 FlowStepGraph flowStepGraph = new FlowStepGraph( this, finalFlowElementGraph, stepToNodes, nodeToPipeline ); 194 195 traceWriter.writeFinal( "1-final-flow-registry", ruleResult ); 196 traceWriter.writeTracePlan( null, "2-final-flow-element-graph", finalFlowElementGraph ); 197 traceWriter.writeTracePlan( null, "3-final-flow-step-graph", flowStepGraph ); 198 traceWriter.writeTracePlanSteps( "4-final-flow-steps", flowStepGraph ); 199 200 flow.setPlannerInfo( getPlannerInfo( ruleResult.getRegistry().getName() ) ); 201 202 flow.initialize( finalFlowElementGraph, flowStepGraph ); 203 204 return flow; 205 } 206 catch( Exception exception ) 207 { 208 throw handleExceptionDuringPlanning( flowDef, exception, flowElementGraph ); 209 } 210 } 211 212 protected abstract F createFlow( FlowDef flowDef ); 213 214 public abstract FlowStep<Config> createFlowStep( ElementGraph stepElementGraph, FlowNodeGraph flowNodeGraph ); 215 216 public FlowNode createFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs ) 217 { 218 return new BaseFlowNode( flowElementGraph, nodeSubGraph, pipelineGraphs ); 219 } 220 221 public void configRuleRegistryDefaults( RuleRegistry ruleRegistry ) 222 { 223 224 } 225 226 protected Pipe[] resolveTails( FlowDef flowDef, F flow ) 227 { 228 Pipe[] tails = flowDef.getTailsArray(); 229 230 tails = resolveAssemblyPlanners( flowDef, flow, tails ); 231 232 return tails; 233 } 234 235 protected Pipe[] resolveAssemblyPlanners( FlowDef flowDef, Flow flow, Pipe[] pipes ) 236 { 237 List<Pipe> tails = Arrays.asList( pipes ); 238 239 List<AssemblyPlanner> assemblyPlanners = flowDef.getAssemblyPlanners(); 240 241 for( AssemblyPlanner assemblyPlanner : assemblyPlanners ) 242 { 243 tails = assemblyPlanner.resolveTails( new AssemblyPlannerContext( flowDef, flow, tails ) ); 244 245 if( tails.isEmpty() ) 246 throw new PlannerException( "assembly planner: " + assemblyPlanner + ", returned zero tails" ); 247 248 tails = Collections.unmodifiableList( tails ); 249 } 250 251 return tails.toArray( new Pipe[ tails.size() ] ); 252 } 253 254 protected void verifyAssembly( FlowDef flowDef, Pipe[] tails ) 255 { 256 verifyPipeAssemblyEndPoints( flowDef, tails ); 257 verifyTraps( flowDef, tails ); 258 verifyCheckpoints( flowDef, tails ); 259 } 260 261 protected void verifyAllTaps( FlowDef flowDef ) 262 { 263 verifySourceNotSinks( flowDef.getSources(), flowDef.getSinks() ); 264 265 verifyTaps( flowDef.getSources(), true, true ); 266 verifyTaps( flowDef.getSinks(), false, true ); 267 verifyTaps( flowDef.getTraps(), false, false ); 268 269 // are both sources and sinks 270 verifyTaps( flowDef.getCheckpoints(), true, false ); 271 verifyTaps( flowDef.getCheckpoints(), false, false ); 272 } 273 274 protected FlowElementGraph createFlowElementGraph( FlowDef flowDef, Pipe[] flowTails ) 275 { 276 Map<String, Tap> sources = flowDef.getSourcesCopy(); 277 Map<String, Tap> sinks = flowDef.getSinksCopy(); 278 Map<String, Tap> traps = flowDef.getTrapsCopy(); 279 Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy(); 280 281 checkpointTapRootPath = makeCheckpointRootPath( flowDef ); 282 283 return new FlowElementGraph( getPlatformInfo(), flowTails, sources, sinks, traps, checkpoints, checkpointTapRootPath != null ); 284 } 285 286 private FlowDef normalizeTaps( FlowDef flowDef ) 287 { 288 Set<Tap> taps = new HashSet<>(); 289 290 Map<String, Tap> sources = flowDef.getSourcesCopy(); 291 Map<String, Tap> sinks = flowDef.getSinksCopy(); 292 Map<String, Tap> traps = flowDef.getTrapsCopy(); 293 Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy(); 294 295 boolean sourcesHasDupes = addTaps( sources, taps ); 296 boolean sinksHasDupes = addTaps( sinks, taps ); 297 boolean trapsHasDupes = addTaps( traps, taps ); 298 boolean checkpointsHasDupes = addTaps( checkpoints, taps ); 299 300 if( sourcesHasDupes ) 301 normalize( taps, sources ); 302 303 if( sinksHasDupes ) 304 normalize( taps, sinks ); 305 306 if( trapsHasDupes ) 307 normalize( taps, traps ); 308 309 if( checkpointsHasDupes ) 310 normalize( taps, checkpoints ); 311 312 return Flows.copy( flowDef, sources, sinks, traps, checkpoints ); 313 } 314 315 private boolean addTaps( Map<String, Tap> current, Set<Tap> taps ) 316 { 317 int size = taps.size(); 318 319 taps.addAll( current.values() ); 320 321 // if all the added values are not unique, taps.size will be less than original size + num tap instances 322 return size + current.size() != taps.size(); 323 } 324 325 private void normalize( Set<Tap> taps, Map<String, Tap> current ) 326 { 327 for( Tap tap : taps ) 328 { 329 for( Map.Entry<String, Tap> entry : current.entrySet() ) 330 { 331 if( entry.getValue().equals( tap ) ) // force equivalent instance to being the same instance 332 entry.setValue( tap ); 333 } 334 } 335 } 336 337 private String makeCheckpointRootPath( FlowDef flowDef ) 338 { 339 String flowName = flowDef.getName(); 340 String runID = flowDef.getRunID(); 341 342 if( runID == null ) 343 return null; 344 345 if( flowName == null ) 346 throw new PlannerException( "flow name is required when providing a run id" ); 347 348 return flowName + "/" + runID; 349 } 350 351 protected void verifySourceNotSinks( Map<String, Tap> sources, Map<String, Tap> sinks ) 352 { 353 Collection<Tap> sourcesSet = sources.values(); 354 355 for( Tap tap : sinks.values() ) 356 { 357 if( sourcesSet.contains( tap ) ) 358 throw new PlannerException( "tap may not be used as both source and sink in the same Flow: " + tap ); 359 } 360 } 361 362 /** 363 * Method verifyTaps ... 364 * 365 * @param taps of type Map<String, Tap> 366 * @param areSources of type boolean 367 * @param mayNotBeEmpty of type boolean 368 */ 369 protected void verifyTaps( Map<String, Tap> taps, boolean areSources, boolean mayNotBeEmpty ) 370 { 371 if( mayNotBeEmpty && taps.isEmpty() ) 372 throw new PlannerException( ( areSources ? "source" : "sink" ) + " taps are required" ); 373 374 for( String tapName : taps.keySet() ) 375 { 376 if( areSources && !taps.get( tapName ).isSource() ) 377 throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a source: " + taps.get( tapName ) ); 378 else if( !areSources && !taps.get( tapName ).isSink() ) 379 throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a sink: " + taps.get( tapName ) ); 380 } 381 } 382 383 /** 384 * Method verifyEndPoints verifies 385 * <p/> 386 * there aren't dupe names in heads or tails. 387 * all the sink and source tap names match up with tail and head pipes 388 */ 389 // todo: force dupe names to throw exceptions 390 protected void verifyPipeAssemblyEndPoints( FlowDef flowDef, Pipe[] flowTails ) 391 { 392 Set<String> tapNames = new HashSet<String>(); 393 394 tapNames.addAll( flowDef.getSources().keySet() ); 395 tapNames.addAll( flowDef.getSinks().keySet() ); 396 397 // handle tails 398 Set<Pipe> tails = new HashSet<Pipe>(); 399 Set<String> tailNames = new HashSet<String>(); 400 401 for( Pipe pipe : flowTails ) 402 { 403 if( pipe instanceof SubAssembly ) 404 { 405 for( Pipe tail : ( (SubAssembly) pipe ).getTails() ) 406 { 407 String tailName = tail.getName(); 408 409 if( !tapNames.contains( tailName ) ) 410 throw new PlannerException( tail, "pipe name not found in either sink or source map: '" + tailName + "'" ); 411 412 if( tailNames.contains( tailName ) && !tails.contains( tail ) ) 413 throw new PlannerException( pipe, "duplicate tail name found: " + tailName ); 414 415 tailNames.add( tailName ); 416 tails.add( tail ); 417 } 418 } 419 else 420 { 421 String tailName = pipe.getName(); 422 423 if( !tapNames.contains( tailName ) ) 424 throw new PlannerException( pipe, "pipe name not found in either sink or source map: '" + tailName + "'" ); 425 426 if( tailNames.contains( tailName ) && !tails.contains( pipe ) ) 427 throw new PlannerException( pipe, "duplicate tail name found: " + tailName ); 428 429 tailNames.add( tailName ); 430 tails.add( pipe ); 431 } 432 } 433 434 tailNames.removeAll( flowDef.getSinks().keySet() ); 435 Set<String> remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() ); 436 remainingSinks.removeAll( tailNames ); 437 438 if( tailNames.size() != 0 ) 439 throw new PlannerException( "not all tail pipes bound to sink taps, remaining tail pipe names: [" + join( quote( tailNames, "'" ), ", " ) + "], remaining sink tap names: [" + join( quote( remainingSinks, "'" ), ", " ) + "]" ); 440 441 // unlike heads, pipes can input to another pipe and simultaneously be a sink 442 // so there is no way to know all the intentional tails, so they aren't listed below in the exception 443 remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() ); 444 remainingSinks.removeAll( asList( Pipe.names( flowTails ) ) ); 445 446 if( remainingSinks.size() != 0 ) 447 throw new PlannerException( "not all sink taps bound to tail pipes, remaining sink tap names: [" + join( quote( remainingSinks, "'" ), ", " ) + "]" ); 448 449 // handle heads 450 Set<Pipe> heads = new HashSet<Pipe>(); 451 Set<String> headNames = new HashSet<String>(); 452 453 for( Pipe pipe : flowTails ) 454 { 455 for( Pipe head : pipe.getHeads() ) 456 { 457 String headName = head.getName(); 458 459 if( !tapNames.contains( headName ) ) 460 throw new PlannerException( head, "pipe name not found in either sink or source map: '" + headName + "'" ); 461 462 if( headNames.contains( headName ) && !heads.contains( head ) ) 463 LOG.warn( "duplicate head name found, not an error but heads should have unique names: '{}'", headName ); 464 465 headNames.add( headName ); 466 heads.add( head ); 467 } 468 } 469 470 Set<String> allHeadNames = new HashSet<String>( headNames ); 471 headNames.removeAll( flowDef.getSources().keySet() ); 472 Set<String> remainingSources = new HashSet<String>( flowDef.getSources().keySet() ); 473 remainingSources.removeAll( headNames ); 474 475 if( headNames.size() != 0 ) 476 throw new PlannerException( "not all head pipes bound to source taps, remaining head pipe names: [" + join( quote( headNames, "'" ), ", " ) + "], remaining source tap names: [" + join( quote( remainingSources, "'" ), ", " ) + "]" ); 477 478 remainingSources = new HashSet<String>( flowDef.getSources().keySet() ); 479 remainingSources.removeAll( allHeadNames ); 480 481 if( remainingSources.size() != 0 ) 482 throw new PlannerException( "not all source taps bound to head pipes, remaining source tap names: [" + join( quote( remainingSources, "'" ), ", " ) + "], remaining head pipe names: [" + join( quote( headNames, "'" ), ", " ) + "]" ); 483 484 } 485 486 protected void verifyTraps( FlowDef flowDef, Pipe[] flowTails ) 487 { 488 verifyNotSourcesSinks( flowDef.getTraps(), flowDef.getSources(), flowDef.getSinks(), "trap" ); 489 490 Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) ); 491 492 for( String name : flowDef.getTraps().keySet() ) 493 { 494 if( !names.contains( name ) ) 495 throw new PlannerException( "trap name not found in assembly: '" + name + "'" ); 496 } 497 } 498 499 protected void verifyCheckpoints( FlowDef flowDef, Pipe[] flowTails ) 500 { 501 verifyNotSourcesSinks( flowDef.getCheckpoints(), flowDef.getSources(), flowDef.getSinks(), "checkpoint" ); 502 503 for( Tap checkpointTap : flowDef.getCheckpoints().values() ) 504 { 505 Scheme scheme = checkpointTap.getScheme(); 506 507 if( scheme.getSourceFields().equals( Fields.UNKNOWN ) && scheme.getSinkFields().equals( Fields.ALL ) ) 508 continue; 509 510 throw new PlannerException( "checkpoint tap scheme must be undeclared, source fields must be UNKNOWN, and sink fields ALL, got: " + scheme.toString() ); 511 } 512 513 Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) ); 514 515 for( String name : flowDef.getCheckpoints().keySet() ) 516 { 517 if( !names.contains( name ) ) 518 throw new PlannerException( "named checkpoint declared in FlowDef, but no named branch found in pipe assembly: '" + name + "'" ); 519 520 Set<Pipe> pipes = new HashSet<Pipe>( asList( Pipe.named( name, flowTails ) ) ); 521 522 int count = 0; 523 524 for( Pipe pipe : pipes ) 525 { 526 if( pipe instanceof Checkpoint ) 527 count++; 528 } 529 530 if( count == 0 ) 531 throw new PlannerException( "no checkpoint pipe with branch name found in pipe assembly: '" + name + "'" ); 532 533 if( count > 1 ) 534 throw new PlannerException( "more than one checkpoint pipe with branch name found in pipe assembly: '" + name + "'" ); 535 } 536 } 537 538 private void verifyNotSourcesSinks( Map<String, Tap> taps, Map<String, Tap> sources, Map<String, Tap> sinks, String role ) 539 { 540 Collection<Tap> sourceTaps = sources.values(); 541 Collection<Tap> sinkTaps = sinks.values(); 542 543 for( Tap tap : taps.values() ) 544 { 545 if( sourceTaps.contains( tap ) ) 546 throw new PlannerException( "tap may not be used as both a " + role + " and a source in the same Flow: " + tap ); 547 548 if( sinkTaps.contains( tap ) ) 549 throw new PlannerException( "tap may not be used as both a " + role + " and a sink in the same Flow: " + tap ); 550 } 551 } 552 553 /** 554 * If there are rules for a given {@link cascading.flow.planner.rule.ProcessLevel} on the current platform 555 * there must be sub-graphs partitioned at that level. 556 */ 557 public Exception verifyResult( RuleResult ruleResult ) 558 { 559 try 560 { 561 verifyResultInternal( ruleResult ); 562 } 563 catch( Exception exception ) 564 { 565 return exception; 566 } 567 568 return null; 569 } 570 571 protected void verifyResultInternal( RuleResult ruleResult ) 572 { 573 Set<ProcessLevel> processLevels = getReverseOrderedProcessLevels( ruleResult ); 574 575 for( ProcessLevel processLevel : processLevels ) 576 { 577 String registryName = ruleResult.getRegistry().getName(); 578 579 switch( processLevel ) 580 { 581 case Assembly: 582 583 FlowElementGraph finalFlowElementGraph = ruleResult.getAssemblyGraph(); 584 585 if( finalFlowElementGraph.vertexSet().isEmpty() ) 586 throw new PlannerException( "final assembly graph is empty: " + registryName ); 587 588 break; 589 590 case Step: 591 592 Map<ElementGraph, List<? extends ElementGraph>> assemblyToSteps = ruleResult.getAssemblyToStepGraphMap(); 593 594 if( assemblyToSteps.isEmpty() ) 595 throw new PlannerException( "no steps partitioned: " + registryName ); 596 597 for( ElementGraph assembly : assemblyToSteps.keySet() ) 598 { 599 List<? extends ElementGraph> steps = assemblyToSteps.get( assembly ); 600 601 if( steps.isEmpty() ) 602 throw new PlannerException( "no steps partitioned from assembly: " + registryName, assembly ); 603 604 Set<ElementGraph> stepSet = new HashSet<>( steps.size() ); 605 606 for( ElementGraph step : steps ) 607 { 608 if( !stepSet.add( step ) ) 609 throw new PlannerException( "found duplicate step in flow: " + registryName, step ); 610 } 611 612 Set<FlowElement> elements = createIdentitySet(); 613 614 for( ElementGraph step : steps ) 615 elements.addAll( step.vertexSet() ); 616 617 Set<FlowElement> missing = differenceIdentity( assembly.vertexSet(), elements ); 618 619 if( !missing.isEmpty() ) 620 { 621 String message = "union of steps have " + missing.size() + " fewer elements than parent assembly: " + registryName + ", missing: [" + join( missing, ", " ) + "]"; 622 throw new PlannerException( message, assembly ); 623 } 624 } 625 626 break; 627 628 case Node: 629 630 Map<ElementGraph, List<? extends ElementGraph>> stepToNodes = ruleResult.getStepToNodeGraphMap(); 631 632 if( stepToNodes.isEmpty() ) 633 throw new PlannerException( "no nodes partitioned: " + registryName ); 634 635 for( ElementGraph step : stepToNodes.keySet() ) 636 { 637 List<? extends ElementGraph> nodes = stepToNodes.get( step ); 638 639 if( nodes.isEmpty() ) 640 throw new PlannerException( "no nodes partitioned from step: " + registryName, step ); 641 642 Set<ElementGraph> nodesSet = new HashSet<>( nodes.size() ); 643 644 for( ElementGraph node : nodes ) 645 { 646 if( !nodesSet.add( node ) ) 647 throw new PlannerException( "found duplicate node in step: " + registryName, node ); 648 } 649 650 Set<FlowElement> elements = createIdentitySet(); 651 652 for( ElementGraph node : nodes ) 653 elements.addAll( node.vertexSet() ); 654 655 Set<FlowElement> missing = differenceIdentity( step.vertexSet(), elements ); 656 657 if( !missing.isEmpty() ) 658 { 659 String message = "union of nodes have " + missing.size() + " fewer elements than parent step: " + registryName + ", missing: [" + join( missing, ", " ) + "]"; 660 throw new PlannerException( message, step ); 661 } 662 } 663 664 break; 665 666 case Pipeline: 667 668 // all nodes are partitioned into pipelines, but if partitioned all elements should be represented 669 Map<ElementGraph, List<? extends ElementGraph>> nodeToPipeline = ruleResult.getNodeToPipelineGraphMap(); 670 671 if( nodeToPipeline.isEmpty() ) 672 throw new PlannerException( "no pipelines partitioned: " + registryName ); 673 674 for( ElementGraph node : nodeToPipeline.keySet() ) 675 { 676 List<? extends ElementGraph> pipelines = nodeToPipeline.get( node ); 677 678 if( pipelines.isEmpty() ) 679 throw new PlannerException( "no pipelines partitioned from node: " + registryName, node ); 680 681 Set<ElementGraph> pipelineSet = new HashSet<>( pipelines.size() ); 682 683 for( ElementGraph pipeline : pipelines ) 684 { 685 if( !pipelineSet.add( pipeline ) ) 686 throw new PlannerException( "found duplicate pipeline in node: " + registryName, pipeline ); 687 } 688 689 Set<FlowElement> elements = createIdentitySet(); 690 691 for( ElementGraph pipeline : pipelines ) 692 elements.addAll( pipeline.vertexSet() ); 693 694 Set<FlowElement> missing = differenceIdentity( node.vertexSet(), elements ); 695 696 if( !missing.isEmpty() ) 697 { 698 String message = "union of pipelines have " + missing.size() + " fewer elements than parent node: " + registryName + ", missing: [" + join( missing, ", " ) + "]"; 699 throw new PlannerException( message, node ); 700 } 701 } 702 703 break; 704 } 705 } 706 } 707 708 protected PlannerException handleExceptionDuringPlanning( FlowDef flowDef, Exception exception, FlowElementGraph flowElementGraph ) 709 { 710 if( exception instanceof PlannerException ) 711 { 712 if( ( (PlannerException) exception ).elementGraph == null ) 713 ( (PlannerException) exception ).elementGraph = flowElementGraph; 714 715 return (PlannerException) exception; 716 } 717 else if( exception instanceof ElementGraphException ) 718 { 719 Throwable cause = exception.getCause(); 720 721 if( cause == null ) 722 cause = exception; 723 724 // captures pipegraph for debugging 725 // forward message in case cause or trace is lost 726 String message = String.format( "[%s] could not build flow from assembly", Util.truncate( flowDef.getName(), 25 ) ); 727 728 if( cause.getMessage() != null ) 729 message = String.format( "%s: [%s]", message, cause.getMessage() ); 730 731 if( cause instanceof OperatorException ) 732 return new PlannerException( message, cause, flowElementGraph ); 733 734 if( cause instanceof TapException ) 735 return new PlannerException( message, cause, flowElementGraph ); 736 737 return new PlannerException( ( (ElementGraphException) exception ).getPipe(), message, cause, flowElementGraph ); 738 } 739 else 740 { 741 // captures pipegraph for debugging 742 // forward message in case cause or trace is lost 743 String message = String.format( "[%s] could not build flow from assembly", Util.truncate( flowDef.getName(), 25 ) ); 744 745 if( exception.getMessage() != null ) 746 message = String.format( "%s: [%s]", message, exception.getMessage() ); 747 748 return new PlannerException( message, exception, flowElementGraph ); 749 } 750 } 751 752 public class TempTapElementFactory extends IntermediateTapElementFactory 753 { 754 @Override 755 public FlowElement create( ElementGraph graph, FlowElement flowElement ) 756 { 757 return makeTempTap( (FlowElementGraph) graph, (Pipe) flowElement ); 758 } 759 } 760 761 private Tap makeTempTap( FlowElementGraph graph, Pipe pipe ) 762 { 763 Tap checkpointTap = graph.getCheckpointsMap().get( pipe.getName() ); 764 765 if( checkpointTap != null ) 766 { 767 LOG.info( "found checkpoint: {}, using tap: {}", pipe.getName(), checkpointTap ); 768 checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS ); 769 } 770 771 if( checkpointTap == null ) 772 { 773 // only restart from a checkpoint pipe or checkpoint tap below 774 if( pipe instanceof Checkpoint ) 775 { 776 checkpointTap = makeTempTap( checkpointTapRootPath, pipe.getName() ); 777 checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS ); 778 // mark as an anonymous checkpoint 779 checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" ); 780 } 781 else 782 { 783 checkpointTap = makeTempTap( pipe.getName() ); 784 } 785 } 786 787 return decorateTap( pipe, checkpointTap, FlowConnectorProps.TEMPORARY_TAP_DECORATOR_CLASS ); 788 } 789 790 private Tap decorateTap( Pipe pipe, Tap tempTap, String decoratorClass ) 791 { 792 String decoratorClassName = PropertyUtil.getProperty( defaultProperties, pipe, decoratorClass ); 793 794 if( Util.isEmpty( decoratorClassName ) ) 795 return tempTap; 796 797 LOG.info( "found decorator: {}, wrapping tap: {}", decoratorClass, tempTap ); 798 799 tempTap = Util.newInstance( decoratorClassName, tempTap ); 800 801 return tempTap; 802 } 803 804 protected Tap makeTempTap( String name ) 805 { 806 return makeTempTap( null, name ); 807 } 808 809 protected DebugLevel getDebugLevel( FlowDef flowDef ) 810 { 811 return flowDef.getDebugLevel() == null ? this.defaultDebugLevel : flowDef.getDebugLevel(); 812 } 813 814 protected AssertionLevel getAssertionLevel( FlowDef flowDef ) 815 { 816 return flowDef.getAssertionLevel() == null ? this.defaultAssertionLevel : flowDef.getAssertionLevel(); 817 } 818 819 public String makeFlowNodeName( FlowNode flowNode, int size, int ordinal ) 820 { 821 return String.format( "(%d/%d)", ordinal + 1, size ); 822 } 823 824 public String makeFlowStepName( FlowStep flowStep, int numSteps, int stepNum ) 825 { 826 Tap sink = Util.getFirst( flowStep.getSinkTaps() ); 827 828 stepNum++; // number more sensical (5/5) 829 830 if( sink == null || sink.isTemporary() ) 831 return String.format( "(%d/%d)", stepNum, numSteps ); 832 833 String identifier = sink.getIdentifier(); 834 835 if( identifier.length() > 25 ) 836 identifier = String.format( "...%25s", identifier.substring( identifier.length() - 25 ) ); 837 838 return String.format( "(%d/%d) %s", stepNum, numSteps, identifier ); 839 } 840 841 protected abstract Tap makeTempTap( String prefix, String name ); 842 843 private Set<ProcessLevel> getReverseOrderedProcessLevels( RuleResult ruleResult ) 844 { 845 Set<ProcessLevel> ordered = new TreeSet<>( Collections.reverseOrder() ); 846 847 ordered.addAll( ruleResult.getRegistry().getProcessLevels() ); 848 849 return ordered; 850 } 851 }