001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.rule;
022
023import java.util.ArrayList;
024import java.util.LinkedHashMap;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029
030import cascading.flow.FlowElement;
031import cascading.flow.planner.PlannerContext;
032import cascading.flow.planner.PlannerException;
033import cascading.flow.planner.graph.AnnotatedGraph;
034import cascading.flow.planner.graph.BoundedElementMultiGraph;
035import cascading.flow.planner.graph.ElementDirectedGraph;
036import cascading.flow.planner.graph.ElementGraph;
037import cascading.flow.planner.graph.FlowElementGraph;
038import cascading.flow.planner.graph.IgnoreAnnotationsHashSet;
039import cascading.flow.planner.iso.GraphResult;
040import cascading.flow.planner.iso.assertion.Asserted;
041import cascading.flow.planner.iso.assertion.GraphAssert;
042import cascading.flow.planner.iso.subgraph.Partitions;
043import cascading.flow.planner.iso.transformer.GraphTransformer;
044import cascading.flow.planner.iso.transformer.Transformed;
045import cascading.flow.planner.rule.util.TraceWriter;
046import cascading.util.EnumMultiMap;
047import cascading.util.ProcessLogger;
048
049import static cascading.util.Util.createIdentitySet;
050import static cascading.util.Util.formatDurationFromMillis;
051import static java.lang.String.format;
052
053/**
054 *
055 */
056public class RuleExec
057  {
058  private static final int ELEMENT_THRESHOLD = 600;
059
060  final TraceWriter traceWriter;
061  final RuleRegistry registry;
062
063  public RuleExec( TraceWriter traceWriter, RuleRegistry registry )
064    {
065    this.traceWriter = traceWriter;
066    this.registry = registry;
067    }
068
069  public RuleResult exec( PlannerContext plannerContext, FlowElementGraph flowElementGraph )
070    {
071    RuleResult ruleResult = new RuleResult( registry, flowElementGraph );
072
073    ProcessLogger logger = plannerContext.getLogger();
074    int size = flowElementGraph.vertexSet().size();
075    boolean logAsInfo = size >= ELEMENT_THRESHOLD;
076
077    if( logAsInfo )
078      logger.logInfo( "elements in graph: {}, info logging threshold: {}, logging planner execution status", size, ELEMENT_THRESHOLD );
079
080    long beginExec = System.currentTimeMillis();
081
082    try
083      {
084      planPhases( plannerContext, logAsInfo, ruleResult );
085      }
086    catch( Exception exception )
087      {
088      ruleResult.setPlannerException( exception );
089      }
090    finally
091      {
092      long endExec = System.currentTimeMillis();
093
094      ruleResult.setDuration( beginExec, endExec );
095
096      RuleResult.ResultStatus status = ruleResult.getResultStatus();
097      String duration = formatDurationFromMillis( endExec - beginExec );
098      logPhase( logger, logAsInfo, "rule registry completed: {}, with status: {}, and duration: {}", registry.getName(), status, duration );
099      }
100
101    return ruleResult;
102    }
103
104  protected void planPhases( PlannerContext plannerContext, boolean logAsInfo, RuleResult ruleResult )
105    {
106    ProcessLogger logger = plannerContext.getLogger();
107
108    for( PlanPhase phase : PlanPhase.values() ) // iterate in order, all planner phases
109      {
110      long beginPhase = System.currentTimeMillis();
111
112      logPhase( logger, logAsInfo, "starting rule phase: {}", phase );
113
114      try
115        {
116        switch( phase.getAction() )
117          {
118          case Resolve:
119            resolveElements( ruleResult );
120            break;
121
122          case Rule:
123            executeRulePhase( phase, plannerContext, ruleResult );
124            break;
125          }
126        }
127      finally
128        {
129        long endPhase = System.currentTimeMillis();
130
131        ruleResult.setPhaseDuration( phase, beginPhase, endPhase );
132
133        logPhase( logger, logAsInfo, "ending rule phase: {}, duration: {}", phase, formatDurationFromMillis( endPhase - beginPhase ) );
134        }
135      }
136    }
137
138  private void resolveElements( RuleResult ruleResult )
139    {
140    if( !registry.enabledResolveElements() )
141      return;
142
143    FlowElementGraph elementGraph = ruleResult.getAssemblyGraph();
144
145    elementGraph = (FlowElementGraph) elementGraph.copyElementGraph();
146
147    ScopeResolver.resolveFields( elementGraph );
148
149    ruleResult.setLevelResults( ProcessLevel.Assembly, ruleResult.initialAssembly, elementGraph );
150    }
151
152  public RuleResult executeRulePhase( PlanPhase phase, PlannerContext plannerContext, RuleResult ruleResult )
153    {
154    ProcessLogger logger = plannerContext.getLogger();
155
156    logger.logDebug( "executing plan phase: {}", phase );
157
158    LinkedList<Rule> rules = registry.getRulesFor( phase );
159
160    writePhaseInitPlan( phase, ruleResult );
161
162    try
163      {
164      // within this phase, execute all rules in declared order
165      for( Rule rule : rules )
166        {
167        logger.logDebug( "executing rule: {}", rule );
168
169        long begin = System.currentTimeMillis();
170
171        try
172          {
173          switch( phase.getMode() )
174            {
175            case Mutate:
176              performMutation( plannerContext, ruleResult, phase, rule );
177              break;
178
179            case Partition:
180              performPartition( plannerContext, ruleResult, phase, rule );
181              break;
182            }
183          }
184        catch( UnsupportedPlanException exception )
185          {
186          logger.logDebug( "executing rule failed: {}, message: {}", rule, exception.getMessage() );
187
188          throw new UnsupportedPlanException( rule, exception );
189          }
190        catch( PlannerException exception )
191          {
192          logger.logDebug( "executing rule failed: {}, message: {}", rule, exception.getMessage() );
193
194          throw exception; // rethrow
195          }
196        catch( Exception exception )
197          {
198          logger.logDebug( "executing rule failed: {}, message: {}", rule, exception.getMessage() );
199
200          throw new PlannerException( registry, phase, rule, exception );
201          }
202        finally
203          {
204          long end = System.currentTimeMillis();
205
206          ruleResult.setRuleDuration( rule, begin, end );
207
208          logger.logDebug( "completed rule: {}", rule );
209          }
210        }
211
212      return ruleResult;
213      }
214    finally
215      {
216      logger.logDebug( "completed plan phase: {}", phase );
217      writePhaseResultPlan( phase, ruleResult );
218      }
219    }
220
221  protected void performMutation( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, Rule rule )
222    {
223    if( rule instanceof GraphTransformer )
224      performTransform( plannerContext, ruleResult, phase, (GraphTransformer) rule );
225    else if( rule instanceof GraphAssert )
226      performAssertion( plannerContext, ruleResult, phase, (GraphAssert) rule );
227    else
228      throw new PlannerException( "unexpected rule: " + rule.getRuleName() );
229    }
230
231  private void performPartition( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, Rule rule )
232    {
233    if( !( rule instanceof RulePartitioner ) )
234      throw new PlannerException( "unexpected rule: " + rule.getRuleName() );
235
236    RulePartitioner partitioner = (RulePartitioner) rule;
237
238    if( partitioner.getPartitionSource() == RulePartitioner.PartitionSource.PartitionParent )
239      handleParentPartitioning( plannerContext, ruleResult, phase, partitioner );
240    else if( partitioner.getPartitionSource() == RulePartitioner.PartitionSource.PartitionCurrent )
241      handleCurrentPartitioning( plannerContext, ruleResult, phase, partitioner );
242    else
243      throw new IllegalStateException( "unknown partitioning type: " + partitioner.getPartitionSource() );
244    }
245
246  private void handleCurrentPartitioning( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, RulePartitioner partitioner )
247    {
248    Map<ElementGraph, List<? extends ElementGraph>> priorResults = ruleResult.getLevelResults( phase.getLevel() );
249
250    Map<ElementGraph, List<? extends ElementGraph>> subGraphs = new LinkedHashMap<>();
251
252    for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : priorResults.entrySet() )
253      {
254      ElementGraph parent = entry.getKey();
255      List<? extends ElementGraph> priors = entry.getValue();
256
257      List<ElementGraph> resultChildren = new ArrayList<>( priors );
258
259      Set<FlowElement> exclusions = getExclusions( priors, partitioner.getAnnotationExcludes() );
260
261      for( ElementGraph child : priors )
262        {
263        ElementGraph priorAnnotated = annotateWithPriors( child, priors );
264
265        Partitions partitions;
266
267        try
268          {
269          partitions = partitioner.partition( plannerContext, priorAnnotated, exclusions );
270          }
271        catch( Throwable throwable )
272          {
273          throw new PlannerException( registry, phase, partitioner, priorAnnotated, throwable );
274          }
275
276        writeTransformTrace( ruleResult, phase, partitioner, parent, child, partitions );
277
278        List<ElementGraph> results = makeBoundedOn( ruleResult.getAssemblyGraph(), partitions.getAnnotatedSubGraphs() );
279
280        if( results.isEmpty() )
281          continue;
282
283        // ignore annotations on equality, but replace an newer graph with prior
284        IgnoreAnnotationsHashSet uniques = new IgnoreAnnotationsHashSet( results );
285
286        if( uniques.size() != results.size() )
287          throw new PlannerException( "rule created duplicate element graphs" );
288
289        // replace child with partitioned results
290        resultChildren.remove( child );
291
292        for( ElementGraph prior : resultChildren )
293          {
294          if( !uniques.add( prior ) ) // todo: setting to force failure on duplicates
295            plannerContext.getLogger().logDebug( "re-partition rule created duplicate element graph to prior partitioner: {}, replacing duplicate result", partitioner.getRuleName() );
296          }
297
298        // order no longer preserved
299        resultChildren = uniques.asList();
300        }
301
302      subGraphs.put( parent, resultChildren );
303      }
304
305    ruleResult.setLevelResults( phase.getLevel(), subGraphs );
306    }
307
308  private void handleParentPartitioning( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, RulePartitioner partitioner )
309    {
310    Map<ElementGraph, List<? extends ElementGraph>> priorResults = ruleResult.getLevelResults( phase.getLevel() );
311
312    Map<ElementGraph, List<? extends ElementGraph>> subGraphs = new LinkedHashMap<>();
313
314    for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : priorResults.entrySet() )
315      {
316      ElementGraph parent = entry.getKey();
317      List<? extends ElementGraph> priors = entry.getValue();
318
319      Set<FlowElement> exclusions = getExclusions( priors, partitioner.getAnnotationExcludes() );
320      ElementGraph priorAnnotated = annotateWithPriors( parent, priors );
321
322      Partitions partitions;
323
324      try
325        {
326        partitions = partitioner.partition( plannerContext, priorAnnotated, exclusions );
327        }
328      catch( Throwable throwable )
329        {
330        throw new PlannerException( registry, phase, partitioner, priorAnnotated, throwable );
331        }
332
333      writeTransformTrace( ruleResult, phase, partitioner, parent, null, partitions );
334
335      List<ElementGraph> results = makeBoundedOn( ruleResult.getAssemblyGraph(), partitions.getAnnotatedSubGraphs() );
336
337      // ignore annotations on equality, but replace an newer graph with prior
338      IgnoreAnnotationsHashSet uniques = new IgnoreAnnotationsHashSet( results );
339
340      if( uniques.size() != results.size() )
341        throw new PlannerException( "rule created duplicate element graphs" );
342
343      for( ElementGraph prior : priors )
344        {
345        if( !uniques.add( prior ) ) // todo: setting to force failure on duplicates
346          plannerContext.getLogger().logDebug( "partition rule created duplicate element graph to prior partitioner: {}, replacing duplicate result", partitioner.getRuleName() );
347        }
348
349      // order no longer preserved
350      subGraphs.put( parent, uniques.asList() );
351      }
352
353    ruleResult.setLevelResults( phase.getLevel(), subGraphs );
354    }
355
356  private void performAssertion( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, GraphAssert asserter )
357    {
358    plannerContext.getLogger().logDebug( "applying assertion: {}", ( (Rule) asserter ).getRuleName() );
359
360    Map<ElementGraph, List<? extends ElementGraph>> levelResults = ruleResult.getLevelResults( phase.getLevel() );
361
362    for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : levelResults.entrySet() )
363      {
364      ElementGraph parent = entry.getKey(); // null for root case
365      List<? extends ElementGraph> children = entry.getValue();
366
367      for( ElementGraph child : children )
368        {
369        Asserted asserted;
370
371        try
372          {
373          asserted = asserter.assertion( plannerContext, child );
374          }
375        catch( Throwable throwable )
376          {
377          throw new PlannerException( registry, phase, (Rule) asserter, child, throwable );
378          }
379
380        writeTransformTrace( ruleResult, phase, (Rule) asserter, parent, child, asserted );
381
382        FlowElement primary = asserted.getFirstAnchor();
383
384        if( primary == null )
385          continue;
386
387        if( asserted.getAssertionType() == GraphAssert.AssertionType.Unsupported )
388          throw new UnsupportedPlanException( asserted.getFirstAnchor(), asserted.getMessage() );
389        else // only two options
390          throw new PlannerException( asserted.getFirstAnchor(), asserted.getMessage() );
391        }
392      }
393    }
394
395  private void performTransform( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, GraphTransformer transformer )
396    {
397    plannerContext.getLogger().logDebug( "applying transform: {}", ( (Rule) transformer ).getRuleName() );
398
399    Map<ElementGraph, List<? extends ElementGraph>> levelResults = ruleResult.getLevelResults( phase.getLevel() );
400
401    for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : levelResults.entrySet() )
402      {
403      ElementGraph parent = entry.getKey(); // null for root case
404      List<? extends ElementGraph> children = entry.getValue();
405
406      List<ElementGraph> results = new ArrayList<>();
407
408      for( ElementGraph child : children )
409        {
410        Transformed transformed;
411
412        try
413          {
414          transformed = transformer.transform( plannerContext, child );
415          }
416        catch( Throwable throwable )
417          {
418          throw new PlannerException( registry, phase, (Rule) transformer, child, throwable );
419          }
420
421        writeTransformTrace( ruleResult, phase, (Rule) transformer, parent, child, transformed );
422
423        ElementGraph endGraph = transformed.getEndGraph();
424
425        if( endGraph != null )
426          results.add( endGraph );
427        else
428          results.add( child );
429        }
430
431      ruleResult.setLevelResults( phase.getLevel(), parent, results );
432      }
433    }
434
435  private ElementGraph annotateWithPriors( ElementGraph elementGraph, List<? extends ElementGraph> priorResults )
436    {
437    if( priorResults == null )
438      return elementGraph;
439
440    // the results are sub-graphs of the elementGraph, so guaranteed to exist in graph
441    AnnotatedGraph resultGraph = new ElementDirectedGraph( elementGraph );
442
443    for( ElementGraph result : priorResults )
444      {
445      if( !( result instanceof AnnotatedGraph ) || !( (AnnotatedGraph) result ).hasAnnotations() )
446        continue;
447
448      EnumMultiMap<FlowElement> annotations = ( (AnnotatedGraph) result ).getAnnotations();
449
450      resultGraph.getAnnotations().addAll( annotations );
451      }
452
453    return (ElementGraph) resultGraph;
454    }
455
456  private Set<FlowElement> getExclusions( List<? extends ElementGraph> elementGraphs, Enum[] annotationExcludes )
457    {
458    if( elementGraphs == null )
459      return null;
460
461    Set<FlowElement> exclusions = createIdentitySet();
462
463    for( ElementGraph elementGraph : elementGraphs )
464      {
465      if( !( elementGraph instanceof AnnotatedGraph ) || !( (AnnotatedGraph) elementGraph ).hasAnnotations() )
466        continue;
467
468      for( Enum annotationExclude : annotationExcludes )
469        {
470        Set<FlowElement> flowElements = ( (AnnotatedGraph) elementGraph ).getAnnotations().getValues( annotationExclude );
471
472        if( flowElements != null )
473          exclusions.addAll( flowElements );
474        }
475      }
476
477    return exclusions;
478    }
479
480  // use the final assembly graph so we can get Scopes for heads and tails
481  private List<ElementGraph> makeBoundedOn( ElementGraph currentElementGraph, Map<ElementGraph, EnumMultiMap> subGraphs )
482    {
483    List<ElementGraph> results = new ArrayList<>( subGraphs.size() );
484
485    for( ElementGraph subGraph : subGraphs.keySet() )
486      results.add( new BoundedElementMultiGraph( currentElementGraph, subGraph, subGraphs.get( subGraph ) ) );
487
488    return results;
489    }
490
491  private void writePhaseInitPlan( PlanPhase phase, RuleResult ruleResult )
492    {
493    switch( phase.getLevel() )
494      {
495      case Assembly:
496        traceWriter.writeTransformPlan( registry.getName(), ruleResult.getAssemblyGraph(), format( "%02d-%s-init.dot", phase.ordinal(), phase ) );
497        break;
498      case Step:
499        break;
500      case Node:
501        break;
502      case Pipeline:
503        break;
504      }
505    }
506
507  private void writePhaseResultPlan( PlanPhase phase, RuleResult ruleResult )
508    {
509    switch( phase.getLevel() )
510      {
511      case Assembly:
512        traceWriter.writeTransformPlan( registry.getName(), ruleResult.getAssemblyGraph(), format( "%02d-%s-result.dot", phase.ordinal(), phase ) );
513        break;
514      case Step:
515        traceWriter.writeTransformPlan( registry.getName(), ruleResult.getAssemblyToStepGraphMap().get( ruleResult.getAssemblyGraph() ), phase, "result" );
516        break;
517      case Node:
518        traceWriter.writeTransformPlan( registry.getName(), ruleResult.getStepToNodeGraphMap(), phase, "result" );
519        break;
520      case Pipeline:
521        traceWriter.writeTransformPlan( registry.getName(), ruleResult.getStepToNodeGraphMap(), ruleResult.getNodeToPipelineGraphMap(), phase, "result" );
522        break;
523      }
524    }
525
526  private void logPhase( ProcessLogger logger, boolean logAsInfo, String message, Object... items )
527    {
528    if( logAsInfo )
529      logger.logInfo( message, items );
530    else
531      logger.logDebug( message, items );
532    }
533
534  private void writeTransformTrace( RuleResult ruleResult, PlanPhase phase, Rule rule, ElementGraph parent, ElementGraph child, GraphResult result )
535    {
536    if( traceWriter.isTransformTraceDisabled() )
537      return;
538
539    int[] path = child != null ? ruleResult.getPathFor( parent, child ) : ruleResult.getPathFor( parent );
540
541    traceWriter.writeTransformPlan( registry.getName(), phase, rule, path, result );
542    }
543  }