001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.rule; 022 023import java.util.ArrayList; 024import java.util.LinkedHashMap; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029 030import cascading.flow.FlowElement; 031import cascading.flow.planner.PlannerContext; 032import cascading.flow.planner.PlannerException; 033import cascading.flow.planner.graph.AnnotatedGraph; 034import cascading.flow.planner.graph.BoundedElementMultiGraph; 035import cascading.flow.planner.graph.ElementDirectedGraph; 036import cascading.flow.planner.graph.ElementGraph; 037import cascading.flow.planner.graph.FlowElementGraph; 038import cascading.flow.planner.graph.IgnoreAnnotationsHashSet; 039import cascading.flow.planner.iso.GraphResult; 040import cascading.flow.planner.iso.assertion.Asserted; 041import cascading.flow.planner.iso.assertion.GraphAssert; 042import cascading.flow.planner.iso.subgraph.Partitions; 043import cascading.flow.planner.iso.transformer.GraphTransformer; 044import cascading.flow.planner.iso.transformer.Transformed; 045import cascading.flow.planner.rule.util.TraceWriter; 046import cascading.util.EnumMultiMap; 047import cascading.util.ProcessLogger; 048 049import static cascading.util.Util.createIdentitySet; 050import static cascading.util.Util.formatDurationFromMillis; 051import static java.lang.String.format; 052 053/** 054 * 055 */ 056public class RuleExec 057 { 058 private static final int ELEMENT_THRESHOLD = 600; 059 060 final TraceWriter traceWriter; 061 final RuleRegistry registry; 062 063 public RuleExec( TraceWriter traceWriter, RuleRegistry registry ) 064 { 065 this.traceWriter = traceWriter; 066 this.registry = registry; 067 } 068 069 public RuleResult exec( PlannerContext plannerContext, FlowElementGraph flowElementGraph ) 070 { 071 RuleResult ruleResult = new RuleResult( registry, flowElementGraph ); 072 073 ProcessLogger logger = plannerContext.getLogger(); 074 int size = flowElementGraph.vertexSet().size(); 075 boolean logAsInfo = size >= ELEMENT_THRESHOLD; 076 077 if( logAsInfo ) 078 logger.logInfo( "elements in graph: {}, info logging threshold: {}, logging planner execution status", size, ELEMENT_THRESHOLD ); 079 080 long beginExec = System.currentTimeMillis(); 081 082 try 083 { 084 planPhases( plannerContext, logAsInfo, ruleResult ); 085 } 086 catch( Exception exception ) 087 { 088 ruleResult.setPlannerException( exception ); 089 } 090 finally 091 { 092 long endExec = System.currentTimeMillis(); 093 094 ruleResult.setDuration( beginExec, endExec ); 095 096 RuleResult.ResultStatus status = ruleResult.getResultStatus(); 097 String duration = formatDurationFromMillis( endExec - beginExec ); 098 logPhase( logger, logAsInfo, "rule registry completed: {}, with status: {}, and duration: {}", registry.getName(), status, duration ); 099 } 100 101 return ruleResult; 102 } 103 104 protected void planPhases( PlannerContext plannerContext, boolean logAsInfo, RuleResult ruleResult ) 105 { 106 ProcessLogger logger = plannerContext.getLogger(); 107 108 for( PlanPhase phase : PlanPhase.values() ) // iterate in order, all planner phases 109 { 110 long beginPhase = System.currentTimeMillis(); 111 112 logPhase( logger, logAsInfo, "starting rule phase: {}", phase ); 113 114 try 115 { 116 switch( phase.getAction() ) 117 { 118 case Resolve: 119 resolveElements( ruleResult ); 120 break; 121 122 case Rule: 123 executeRulePhase( phase, plannerContext, ruleResult ); 124 break; 125 } 126 } 127 finally 128 { 129 long endPhase = System.currentTimeMillis(); 130 131 ruleResult.setPhaseDuration( phase, beginPhase, endPhase ); 132 133 logPhase( logger, logAsInfo, "ending rule phase: {}, duration: {}", phase, formatDurationFromMillis( endPhase - beginPhase ) ); 134 } 135 } 136 } 137 138 private void resolveElements( RuleResult ruleResult ) 139 { 140 if( !registry.enabledResolveElements() ) 141 return; 142 143 FlowElementGraph elementGraph = ruleResult.getAssemblyGraph(); 144 145 elementGraph = (FlowElementGraph) elementGraph.copyElementGraph(); 146 147 elementGraph.resolveFields(); 148 149 ruleResult.setLevelResults( ProcessLevel.Assembly, ruleResult.initialAssembly, elementGraph ); 150 } 151 152 public RuleResult executeRulePhase( PlanPhase phase, PlannerContext plannerContext, RuleResult ruleResult ) 153 { 154 ProcessLogger logger = plannerContext.getLogger(); 155 156 logger.logDebug( "executing plan phase: {}", phase ); 157 158 LinkedList<Rule> rules = registry.getRulesFor( phase ); 159 160 writePhaseInitPlan( phase, ruleResult ); 161 162 try 163 { 164 // within this phase, execute all rules in declared order 165 for( Rule rule : rules ) 166 { 167 logger.logDebug( "executing rule: {}", rule ); 168 169 long begin = System.currentTimeMillis(); 170 171 try 172 { 173 switch( phase.getMode() ) 174 { 175 case Mutate: 176 performMutation( plannerContext, ruleResult, phase, rule ); 177 break; 178 179 case Partition: 180 performPartition( plannerContext, ruleResult, phase, rule ); 181 break; 182 } 183 } 184 catch( UnsupportedPlanException exception ) 185 { 186 logger.logDebug( "executing rule failed: {}, message: {}", rule, exception.getMessage() ); 187 188 throw new UnsupportedPlanException( rule, exception ); 189 } 190 catch( PlannerException exception ) 191 { 192 logger.logDebug( "executing rule failed: {}, message: {}", rule, exception.getMessage() ); 193 194 throw exception; // rethrow 195 } 196 catch( Exception exception ) 197 { 198 logger.logDebug( "executing rule failed: {}, message: {}", rule, exception.getMessage() ); 199 200 throw new PlannerException( registry, phase, rule, exception ); 201 } 202 finally 203 { 204 long end = System.currentTimeMillis(); 205 206 ruleResult.setRuleDuration( rule, begin, end ); 207 208 logger.logDebug( "completed rule: {}", rule ); 209 } 210 } 211 212 return ruleResult; 213 } 214 finally 215 { 216 logger.logDebug( "completed plan phase: {}", phase ); 217 writePhaseResultPlan( phase, ruleResult ); 218 } 219 } 220 221 protected void performMutation( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, Rule rule ) 222 { 223 if( rule instanceof GraphTransformer ) 224 performTransform( plannerContext, ruleResult, phase, (GraphTransformer) rule ); 225 else if( rule instanceof GraphAssert ) 226 performAssertion( plannerContext, ruleResult, phase, (GraphAssert) rule ); 227 else 228 throw new PlannerException( "unexpected rule: " + rule.getRuleName() ); 229 } 230 231 private void performPartition( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, Rule rule ) 232 { 233 if( !( rule instanceof RulePartitioner ) ) 234 throw new PlannerException( "unexpected rule: " + rule.getRuleName() ); 235 236 RulePartitioner partitioner = (RulePartitioner) rule; 237 238 if( partitioner.getPartitionSource() == RulePartitioner.PartitionSource.PartitionParent ) 239 handleParentPartitioning( plannerContext, ruleResult, phase, partitioner ); 240 else if( partitioner.getPartitionSource() == RulePartitioner.PartitionSource.PartitionCurrent ) 241 handleCurrentPartitioning( plannerContext, ruleResult, phase, partitioner ); 242 else 243 throw new IllegalStateException( "unknown partitioning type: " + partitioner.getPartitionSource() ); 244 } 245 246 private void handleCurrentPartitioning( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, RulePartitioner partitioner ) 247 { 248 Map<ElementGraph, List<? extends ElementGraph>> priorResults = ruleResult.getLevelResults( phase.getLevel() ); 249 250 Map<ElementGraph, List<? extends ElementGraph>> subGraphs = new LinkedHashMap<>(); 251 252 for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : priorResults.entrySet() ) 253 { 254 ElementGraph parent = entry.getKey(); 255 List<? extends ElementGraph> priors = entry.getValue(); 256 257 List<ElementGraph> resultChildren = new ArrayList<>( priors ); 258 259 Set<FlowElement> exclusions = getExclusions( priors, partitioner.getAnnotationExcludes() ); 260 261 for( ElementGraph child : priors ) 262 { 263 ElementGraph priorAnnotated = annotateWithPriors( child, priors ); 264 265 Partitions partitions; 266 267 try 268 { 269 partitions = partitioner.partition( plannerContext, priorAnnotated, exclusions ); 270 } 271 catch( Throwable throwable ) 272 { 273 throw new PlannerException( registry, phase, partitioner, priorAnnotated, throwable ); 274 } 275 276 writeTransformTrace( ruleResult, phase, partitioner, parent, child, partitions ); 277 278 List<ElementGraph> results = makeBoundedOn( ruleResult.getAssemblyGraph(), partitions.getAnnotatedSubGraphs() ); 279 280 if( results.isEmpty() ) 281 continue; 282 283 // ignore annotations on equality, but replace an newer graph with prior 284 IgnoreAnnotationsHashSet uniques = new IgnoreAnnotationsHashSet( results ); 285 286 if( uniques.size() != results.size() ) 287 throw new PlannerException( "rule created duplicate element graphs" ); 288 289 // replace child with partitioned results 290 resultChildren.remove( child ); 291 292 for( ElementGraph prior : resultChildren ) 293 { 294 if( !uniques.add( prior ) ) // todo: setting to force failure on duplicates 295 plannerContext.getLogger().logDebug( "re-partition rule created duplicate element graph to prior partitioner: {}, replacing duplicate result", partitioner.getRuleName() ); 296 } 297 298 // order no longer preserved 299 resultChildren = uniques.asList(); 300 } 301 302 subGraphs.put( parent, resultChildren ); 303 } 304 305 ruleResult.setLevelResults( phase.getLevel(), subGraphs ); 306 } 307 308 private void handleParentPartitioning( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, RulePartitioner partitioner ) 309 { 310 Map<ElementGraph, List<? extends ElementGraph>> priorResults = ruleResult.getLevelResults( phase.getLevel() ); 311 312 Map<ElementGraph, List<? extends ElementGraph>> subGraphs = new LinkedHashMap<>(); 313 314 for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : priorResults.entrySet() ) 315 { 316 ElementGraph parent = entry.getKey(); 317 List<? extends ElementGraph> priors = entry.getValue(); 318 319 Set<FlowElement> exclusions = getExclusions( priors, partitioner.getAnnotationExcludes() ); 320 ElementGraph priorAnnotated = annotateWithPriors( parent, priors ); 321 322 Partitions partitions; 323 324 try 325 { 326 partitions = partitioner.partition( plannerContext, priorAnnotated, exclusions ); 327 } 328 catch( Throwable throwable ) 329 { 330 throw new PlannerException( registry, phase, partitioner, priorAnnotated, throwable ); 331 } 332 333 writeTransformTrace( ruleResult, phase, partitioner, parent, null, partitions ); 334 335 List<ElementGraph> results = makeBoundedOn( ruleResult.getAssemblyGraph(), partitions.getAnnotatedSubGraphs() ); 336 337 // ignore annotations on equality, but replace an newer graph with prior 338 IgnoreAnnotationsHashSet uniques = new IgnoreAnnotationsHashSet( results ); 339 340 if( uniques.size() != results.size() ) 341 throw new PlannerException( "rule created duplicate element graphs" ); 342 343 for( ElementGraph prior : priors ) 344 { 345 if( !uniques.add( prior ) ) // todo: setting to force failure on duplicates 346 plannerContext.getLogger().logDebug( "partition rule created duplicate element graph to prior partitioner: {}, replacing duplicate result", partitioner.getRuleName() ); 347 } 348 349 // order no longer preserved 350 subGraphs.put( parent, uniques.asList() ); 351 } 352 353 ruleResult.setLevelResults( phase.getLevel(), subGraphs ); 354 } 355 356 private void performAssertion( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, GraphAssert asserter ) 357 { 358 plannerContext.getLogger().logDebug( "applying assertion: {}", ( (Rule) asserter ).getRuleName() ); 359 360 Map<ElementGraph, List<? extends ElementGraph>> levelResults = ruleResult.getLevelResults( phase.getLevel() ); 361 362 for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : levelResults.entrySet() ) 363 { 364 ElementGraph parent = entry.getKey(); // null for root case 365 List<? extends ElementGraph> children = entry.getValue(); 366 367 for( ElementGraph child : children ) 368 { 369 Asserted asserted; 370 371 try 372 { 373 asserted = asserter.assertion( plannerContext, child ); 374 } 375 catch( Throwable throwable ) 376 { 377 throw new PlannerException( registry, phase, (Rule) asserter, child, throwable ); 378 } 379 380 writeTransformTrace( ruleResult, phase, (Rule) asserter, parent, child, asserted ); 381 382 FlowElement primary = asserted.getFirstAnchor(); 383 384 if( primary == null ) 385 continue; 386 387 if( asserted.getAssertionType() == GraphAssert.AssertionType.Unsupported ) 388 throw new UnsupportedPlanException( asserted.getFirstAnchor(), asserted.getMessage() ); 389 else // only two options 390 throw new PlannerException( asserted.getFirstAnchor(), asserted.getMessage() ); 391 } 392 } 393 } 394 395 private void performTransform( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, GraphTransformer transformer ) 396 { 397 plannerContext.getLogger().logDebug( "applying transform: {}", ( (Rule) transformer ).getRuleName() ); 398 399 Map<ElementGraph, List<? extends ElementGraph>> levelResults = ruleResult.getLevelResults( phase.getLevel() ); 400 401 for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : levelResults.entrySet() ) 402 { 403 ElementGraph parent = entry.getKey(); // null for root case 404 List<? extends ElementGraph> children = entry.getValue(); 405 406 List<ElementGraph> results = new ArrayList<>(); 407 408 for( ElementGraph child : children ) 409 { 410 Transformed transformed; 411 412 try 413 { 414 transformed = transformer.transform( plannerContext, child ); 415 } 416 catch( Throwable throwable ) 417 { 418 throw new PlannerException( registry, phase, (Rule) transformer, child, throwable ); 419 } 420 421 writeTransformTrace( ruleResult, phase, (Rule) transformer, parent, child, transformed ); 422 423 ElementGraph endGraph = transformed.getEndGraph(); 424 425 if( endGraph != null ) 426 results.add( endGraph ); 427 else 428 results.add( child ); 429 } 430 431 ruleResult.setLevelResults( phase.getLevel(), parent, results ); 432 } 433 } 434 435 private ElementGraph annotateWithPriors( ElementGraph elementGraph, List<? extends ElementGraph> priorResults ) 436 { 437 if( priorResults == null ) 438 return elementGraph; 439 440 // the results are sub-graphs of the elementGraph, so guaranteed to exist in graph 441 AnnotatedGraph resultGraph = new ElementDirectedGraph( elementGraph ); 442 443 for( ElementGraph result : priorResults ) 444 { 445 if( !( result instanceof AnnotatedGraph ) || !( (AnnotatedGraph) result ).hasAnnotations() ) 446 continue; 447 448 EnumMultiMap<FlowElement> annotations = ( (AnnotatedGraph) result ).getAnnotations(); 449 450 resultGraph.getAnnotations().addAll( annotations ); 451 } 452 453 return (ElementGraph) resultGraph; 454 } 455 456 private Set<FlowElement> getExclusions( List<? extends ElementGraph> elementGraphs, Enum[] annotationExcludes ) 457 { 458 if( elementGraphs == null ) 459 return null; 460 461 Set<FlowElement> exclusions = createIdentitySet(); 462 463 for( ElementGraph elementGraph : elementGraphs ) 464 { 465 if( !( elementGraph instanceof AnnotatedGraph ) || !( (AnnotatedGraph) elementGraph ).hasAnnotations() ) 466 continue; 467 468 for( Enum annotationExclude : annotationExcludes ) 469 { 470 Set<FlowElement> flowElements = ( (AnnotatedGraph) elementGraph ).getAnnotations().getValues( annotationExclude ); 471 472 if( flowElements != null ) 473 exclusions.addAll( flowElements ); 474 } 475 } 476 477 return exclusions; 478 } 479 480 // use the final assembly graph so we can get Scopes for heads and tails 481 private List<ElementGraph> makeBoundedOn( ElementGraph currentElementGraph, Map<ElementGraph, EnumMultiMap> subGraphs ) 482 { 483 List<ElementGraph> results = new ArrayList<>( subGraphs.size() ); 484 485 for( ElementGraph subGraph : subGraphs.keySet() ) 486 results.add( new BoundedElementMultiGraph( currentElementGraph, subGraph, subGraphs.get( subGraph ) ) ); 487 488 return results; 489 } 490 491 private void writePhaseInitPlan( PlanPhase phase, RuleResult ruleResult ) 492 { 493 switch( phase.getLevel() ) 494 { 495 case Assembly: 496 traceWriter.writeTransformPlan( registry.getName(), ruleResult.getAssemblyGraph(), format( "%02d-%s-init.dot", phase.ordinal(), phase ) ); 497 break; 498 case Step: 499 break; 500 case Node: 501 break; 502 case Pipeline: 503 break; 504 } 505 } 506 507 private void writePhaseResultPlan( PlanPhase phase, RuleResult ruleResult ) 508 { 509 switch( phase.getLevel() ) 510 { 511 case Assembly: 512 traceWriter.writeTransformPlan( registry.getName(), ruleResult.getAssemblyGraph(), format( "%02d-%s-result.dot", phase.ordinal(), phase ) ); 513 break; 514 case Step: 515 traceWriter.writeTransformPlan( registry.getName(), ruleResult.getAssemblyToStepGraphMap().get( ruleResult.getAssemblyGraph() ), phase, "result" ); 516 break; 517 case Node: 518 traceWriter.writeTransformPlan( registry.getName(), ruleResult.getStepToNodeGraphMap(), phase, "result" ); 519 break; 520 case Pipeline: 521 traceWriter.writeTransformPlan( registry.getName(), ruleResult.getStepToNodeGraphMap(), ruleResult.getNodeToPipelineGraphMap(), phase, "result" ); 522 break; 523 } 524 } 525 526 private void logPhase( ProcessLogger logger, boolean logAsInfo, String message, Object... items ) 527 { 528 if( logAsInfo ) 529 logger.logInfo( message, items ); 530 else 531 logger.logDebug( message, items ); 532 } 533 534 private void writeTransformTrace( RuleResult ruleResult, PlanPhase phase, Rule rule, ElementGraph parent, ElementGraph child, GraphResult result ) 535 { 536 if( traceWriter.isTransformTraceDisabled() ) 537 return; 538 539 int[] path = child != null ? ruleResult.getPathFor( parent, child ) : ruleResult.getPathFor( parent ); 540 541 traceWriter.writeTransformPlan( registry.getName(), phase, rule, path, result ); 542 } 543 }