001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner;
022
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeSet;
031
032import cascading.flow.AssemblyPlanner;
033import cascading.flow.BaseFlow;
034import cascading.flow.Flow;
035import cascading.flow.FlowConnector;
036import cascading.flow.FlowConnectorProps;
037import cascading.flow.FlowDef;
038import cascading.flow.FlowElement;
039import cascading.flow.FlowNode;
040import cascading.flow.FlowStep;
041import cascading.flow.Flows;
042import cascading.flow.planner.graph.ElementGraph;
043import cascading.flow.planner.graph.FlowElementGraph;
044import cascading.flow.planner.process.FlowNodeGraph;
045import cascading.flow.planner.process.FlowStepGraph;
046import cascading.flow.planner.rule.ProcessLevel;
047import cascading.flow.planner.rule.RuleRegistry;
048import cascading.flow.planner.rule.RuleRegistrySet;
049import cascading.flow.planner.rule.RuleResult;
050import cascading.flow.planner.rule.RuleSetExec;
051import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory;
052import cascading.flow.planner.rule.util.TraceWriter;
053import cascading.operation.AssertionLevel;
054import cascading.operation.DebugLevel;
055import cascading.pipe.Checkpoint;
056import cascading.pipe.OperatorException;
057import cascading.pipe.Pipe;
058import cascading.pipe.SubAssembly;
059import cascading.property.ConfigDef;
060import cascading.property.PropertyUtil;
061import cascading.scheme.Scheme;
062import cascading.tap.Tap;
063import cascading.tap.TapException;
064import cascading.tuple.Fields;
065import cascading.util.Update;
066import cascading.util.Util;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import static cascading.util.Util.*;
071import static java.util.Arrays.asList;
072
073/**
074 * Class FlowPlanner is the base class for all planner implementations.
075 * <p/>
076 * This planner support tracing execution of each rule. See the appropriate properties on this
077 * class to enable.
078 */
079public abstract class FlowPlanner<F extends BaseFlow, Config>
080  {
081  /**
082   * Enables the planner to write out basic planner information including the initial element-graph,
083   * completed element-graph, and the completed step-graph dot files.
084   */
085  public static final String TRACE_PLAN_PATH = "cascading.planner.plan.path";
086
087  /**
088   * Enables the planner to write out detail level planner information for each rule, including recursive
089   * transforms.
090   * <p/>
091   * Use this to debug rules. This does increase overhead during planning.
092   */
093  public static final String TRACE_PLAN_TRANSFORM_PATH = "cascading.planner.plan.transforms.path";
094
095  /**
096   * Enables the planner to write out planner statistics for each planner phase and rule.
097   */
098  public static final String TRACE_STATS_PATH = "cascading.planner.stats.path";
099
100  /** Field LOG */
101  private static final Logger LOG = LoggerFactory.getLogger( FlowPlanner.class );
102
103  /** Field properties */
104  protected Map<Object, Object> defaultProperties;
105
106  protected String checkpointTapRootPath = null;
107
108  /** Field assertionLevel */
109  protected AssertionLevel defaultAssertionLevel;
110  /** Field debugLevel */
111  protected DebugLevel defaultDebugLevel;
112
113  /**
114   * Method getAssertionLevel returns the configured target planner {@link cascading.operation.AssertionLevel}.
115   *
116   * @param properties of type Map<Object, Object>
117   * @return AssertionLevel the configured AssertionLevel
118   */
119  static AssertionLevel getAssertionLevel( Map<Object, Object> properties )
120    {
121    String assertionLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.assertionlevel", AssertionLevel.STRICT.name() );
122
123    return AssertionLevel.valueOf( assertionLevel );
124    }
125
126  /**
127   * Method getDebugLevel returns the configured target planner {@link cascading.operation.DebugLevel}.
128   *
129   * @param properties of type Map<Object, Object>
130   * @return DebugLevel the configured DebugLevel
131   */
132  static DebugLevel getDebugLevel( Map<Object, Object> properties )
133    {
134    String debugLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.debuglevel", DebugLevel.DEFAULT.name() );
135
136    return DebugLevel.valueOf( debugLevel );
137    }
138
139  {
140  Update.registerPlanner( getClass() );
141  }
142
143  public Map<Object, Object> getDefaultProperties()
144    {
145    return defaultProperties;
146    }
147
148  public abstract Config getDefaultConfig();
149
150  public abstract PlannerInfo getPlannerInfo( String name );
151
152  public abstract PlatformInfo getPlatformInfo();
153
154  public void initialize( FlowConnector flowConnector, Map<Object, Object> properties )
155    {
156    this.defaultProperties = properties;
157    this.defaultAssertionLevel = getAssertionLevel( properties );
158    this.defaultDebugLevel = getDebugLevel( properties );
159    }
160
161  public F buildFlow( FlowDef flowDef, RuleRegistrySet ruleRegistrySet )
162    {
163    FlowElementGraph flowElementGraph = null;
164
165    try
166      {
167      flowDef = normalizeTaps( flowDef );
168
169      verifyAllTaps( flowDef );
170
171      F flow = createFlow( flowDef );
172
173      Pipe[] tails = resolveTails( flowDef, flow );
174
175      verifyAssembly( flowDef, tails );
176
177      flowElementGraph = createFlowElementGraph( flowDef, tails );
178
179      TraceWriter traceWriter = new TraceWriter( flow );
180      RuleSetExec ruleSetExec = new RuleSetExec( traceWriter, this, flow, ruleRegistrySet, flowDef, flowElementGraph );
181
182      RuleResult ruleResult = ruleSetExec.exec();
183
184      traceWriter.writeTracePlan( null, "0-initial-flow-element-graph", flowElementGraph );
185
186      FlowElementGraph finalFlowElementGraph = ruleResult.getAssemblyGraph();
187
188      finalFlowElementGraph = flow.updateSchemes( finalFlowElementGraph );
189
190      Map<ElementGraph, List<? extends ElementGraph>> stepToNodes = ruleResult.getStepToNodeGraphMap();
191      Map<ElementGraph, List<? extends ElementGraph>> nodeToPipeline = ruleResult.getNodeToPipelineGraphMap();
192
193      FlowStepGraph flowStepGraph = new FlowStepGraph( this, finalFlowElementGraph, stepToNodes, nodeToPipeline );
194
195      traceWriter.writeFinal( "1-final-flow-registry", ruleResult );
196      traceWriter.writeTracePlan( null, "2-final-flow-element-graph", finalFlowElementGraph );
197      traceWriter.writeTracePlan( null, "3-final-flow-step-graph", flowStepGraph );
198      traceWriter.writeTracePlanSteps( "4-final-flow-steps", flowStepGraph );
199
200      flow.setPlannerInfo( getPlannerInfo( ruleResult.getRegistry().getName() ) );
201
202      flow.initialize( finalFlowElementGraph, flowStepGraph );
203
204      return flow;
205      }
206    catch( Exception exception )
207      {
208      throw handleExceptionDuringPlanning( flowDef, exception, flowElementGraph );
209      }
210    }
211
212  protected abstract F createFlow( FlowDef flowDef );
213
214  public abstract FlowStep<Config> createFlowStep( ElementGraph stepElementGraph, FlowNodeGraph flowNodeGraph );
215
216  public FlowNode createFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs )
217    {
218    return new BaseFlowNode( flowElementGraph, nodeSubGraph, pipelineGraphs );
219    }
220
221  public void configRuleRegistryDefaults( RuleRegistry ruleRegistry )
222    {
223
224    }
225
226  protected Pipe[] resolveTails( FlowDef flowDef, F flow )
227    {
228    Pipe[] tails = flowDef.getTailsArray();
229
230    tails = resolveAssemblyPlanners( flowDef, flow, tails );
231
232    return tails;
233    }
234
235  protected Pipe[] resolveAssemblyPlanners( FlowDef flowDef, Flow flow, Pipe[] pipes )
236    {
237    List<Pipe> tails = Arrays.asList( pipes );
238
239    List<AssemblyPlanner> assemblyPlanners = flowDef.getAssemblyPlanners();
240
241    for( AssemblyPlanner assemblyPlanner : assemblyPlanners )
242      {
243      tails = assemblyPlanner.resolveTails( new AssemblyPlannerContext( flowDef, flow, tails ) );
244
245      if( tails.isEmpty() )
246        throw new PlannerException( "assembly planner: " + assemblyPlanner + ", returned zero tails" );
247
248      tails = Collections.unmodifiableList( tails );
249      }
250
251    return tails.toArray( new Pipe[ tails.size() ] );
252    }
253
254  protected void verifyAssembly( FlowDef flowDef, Pipe[] tails )
255    {
256    verifyPipeAssemblyEndPoints( flowDef, tails );
257    verifyTraps( flowDef, tails );
258    verifyCheckpoints( flowDef, tails );
259    }
260
261  protected void verifyAllTaps( FlowDef flowDef )
262    {
263    verifySourceNotSinks( flowDef.getSources(), flowDef.getSinks() );
264
265    verifyTaps( flowDef.getSources(), true, true );
266    verifyTaps( flowDef.getSinks(), false, true );
267    verifyTaps( flowDef.getTraps(), false, false );
268
269    // are both sources and sinks
270    verifyTaps( flowDef.getCheckpoints(), true, false );
271    verifyTaps( flowDef.getCheckpoints(), false, false );
272    }
273
274  protected FlowElementGraph createFlowElementGraph( FlowDef flowDef, Pipe[] flowTails )
275    {
276    Map<String, Tap> sources = flowDef.getSourcesCopy();
277    Map<String, Tap> sinks = flowDef.getSinksCopy();
278    Map<String, Tap> traps = flowDef.getTrapsCopy();
279    Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy();
280
281    checkpointTapRootPath = makeCheckpointRootPath( flowDef );
282
283    return new FlowElementGraph( getPlatformInfo(), flowTails, sources, sinks, traps, checkpoints, checkpointTapRootPath != null );
284    }
285
286  private FlowDef normalizeTaps( FlowDef flowDef )
287    {
288    Set<Tap> taps = new HashSet<>();
289
290    Map<String, Tap> sources = flowDef.getSourcesCopy();
291    Map<String, Tap> sinks = flowDef.getSinksCopy();
292    Map<String, Tap> traps = flowDef.getTrapsCopy();
293    Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy();
294
295    boolean sourcesHasDupes = addTaps( sources, taps );
296    boolean sinksHasDupes = addTaps( sinks, taps );
297    boolean trapsHasDupes = addTaps( traps, taps );
298    boolean checkpointsHasDupes = addTaps( checkpoints, taps );
299
300    if( sourcesHasDupes )
301      normalize( taps, sources );
302
303    if( sinksHasDupes )
304      normalize( taps, sinks );
305
306    if( trapsHasDupes )
307      normalize( taps, traps );
308
309    if( checkpointsHasDupes )
310      normalize( taps, checkpoints );
311
312    return Flows.copy( flowDef, sources, sinks, traps, checkpoints );
313    }
314
315  private boolean addTaps( Map<String, Tap> current, Set<Tap> taps )
316    {
317    int size = taps.size();
318
319    taps.addAll( current.values() );
320
321    // if all the added values are not unique, taps.size will be less than original size + num tap instances
322    return size + current.size() != taps.size();
323    }
324
325  private void normalize( Set<Tap> taps, Map<String, Tap> current )
326    {
327    for( Tap tap : taps )
328      {
329      for( Map.Entry<String, Tap> entry : current.entrySet() )
330        {
331        if( entry.getValue().equals( tap ) ) // force equivalent instance to being the same instance
332          entry.setValue( tap );
333        }
334      }
335    }
336
337  private String makeCheckpointRootPath( FlowDef flowDef )
338    {
339    String flowName = flowDef.getName();
340    String runID = flowDef.getRunID();
341
342    if( runID == null )
343      return null;
344
345    if( flowName == null )
346      throw new PlannerException( "flow name is required when providing a run id" );
347
348    return flowName + "/" + runID;
349    }
350
351  protected void verifySourceNotSinks( Map<String, Tap> sources, Map<String, Tap> sinks )
352    {
353    Collection<Tap> sourcesSet = sources.values();
354
355    for( Tap tap : sinks.values() )
356      {
357      if( sourcesSet.contains( tap ) )
358        throw new PlannerException( "tap may not be used as both source and sink in the same Flow: " + tap );
359      }
360    }
361
362  /**
363   * Method verifyTaps ...
364   *
365   * @param taps          of type Map<String, Tap>
366   * @param areSources    of type boolean
367   * @param mayNotBeEmpty of type boolean
368   */
369  protected void verifyTaps( Map<String, Tap> taps, boolean areSources, boolean mayNotBeEmpty )
370    {
371    if( mayNotBeEmpty && taps.isEmpty() )
372      throw new PlannerException( ( areSources ? "source" : "sink" ) + " taps are required" );
373
374    for( String tapName : taps.keySet() )
375      {
376      if( areSources && !taps.get( tapName ).isSource() )
377        throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a source: " + taps.get( tapName ) );
378      else if( !areSources && !taps.get( tapName ).isSink() )
379        throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a sink: " + taps.get( tapName ) );
380      }
381    }
382
383  /**
384   * Method verifyEndPoints verifies
385   * <p/>
386   * there aren't dupe names in heads or tails.
387   * all the sink and source tap names match up with tail and head pipes
388   */
389  // todo: force dupe names to throw exceptions
390  protected void verifyPipeAssemblyEndPoints( FlowDef flowDef, Pipe[] flowTails )
391    {
392    Set<String> tapNames = new HashSet<String>();
393
394    tapNames.addAll( flowDef.getSources().keySet() );
395    tapNames.addAll( flowDef.getSinks().keySet() );
396
397    // handle tails
398    Set<Pipe> tails = new HashSet<Pipe>();
399    Set<String> tailNames = new HashSet<String>();
400
401    for( Pipe pipe : flowTails )
402      {
403      if( pipe instanceof SubAssembly )
404        {
405        for( Pipe tail : ( (SubAssembly) pipe ).getTails() )
406          {
407          String tailName = tail.getName();
408
409          if( !tapNames.contains( tailName ) )
410            throw new PlannerException( tail, "pipe name not found in either sink or source map: '" + tailName + "'" );
411
412          if( tailNames.contains( tailName ) && !tails.contains( tail ) )
413            throw new PlannerException( pipe, "duplicate tail name found: " + tailName );
414
415          tailNames.add( tailName );
416          tails.add( tail );
417          }
418        }
419      else
420        {
421        String tailName = pipe.getName();
422
423        if( !tapNames.contains( tailName ) )
424          throw new PlannerException( pipe, "pipe name not found in either sink or source map: '" + tailName + "'" );
425
426        if( tailNames.contains( tailName ) && !tails.contains( pipe ) )
427          throw new PlannerException( pipe, "duplicate tail name found: " + tailName );
428
429        tailNames.add( tailName );
430        tails.add( pipe );
431        }
432      }
433
434    tailNames.removeAll( flowDef.getSinks().keySet() );
435    Set<String> remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() );
436    remainingSinks.removeAll( tailNames );
437
438    if( tailNames.size() != 0 )
439      throw new PlannerException( "not all tail pipes bound to sink taps, remaining tail pipe names: [" + join( quote( tailNames, "'" ), ", " ) + "], remaining sink tap names: [" + join( quote( remainingSinks, "'" ), ", " ) + "]" );
440
441    // unlike heads, pipes can input to another pipe and simultaneously be a sink
442    // so there is no way to know all the intentional tails, so they aren't listed below in the exception
443    remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() );
444    remainingSinks.removeAll( asList( Pipe.names( flowTails ) ) );
445
446    if( remainingSinks.size() != 0 )
447      throw new PlannerException( "not all sink taps bound to tail pipes, remaining sink tap names: [" + join( quote( remainingSinks, "'" ), ", " ) + "]" );
448
449    // handle heads
450    Set<Pipe> heads = new HashSet<Pipe>();
451    Set<String> headNames = new HashSet<String>();
452
453    for( Pipe pipe : flowTails )
454      {
455      for( Pipe head : pipe.getHeads() )
456        {
457        String headName = head.getName();
458
459        if( !tapNames.contains( headName ) )
460          throw new PlannerException( head, "pipe name not found in either sink or source map: '" + headName + "'" );
461
462        if( headNames.contains( headName ) && !heads.contains( head ) )
463          LOG.warn( "duplicate head name found, not an error but heads should have unique names: '{}'", headName );
464
465        headNames.add( headName );
466        heads.add( head );
467        }
468      }
469
470    Set<String> allHeadNames = new HashSet<String>( headNames );
471    headNames.removeAll( flowDef.getSources().keySet() );
472    Set<String> remainingSources = new HashSet<String>( flowDef.getSources().keySet() );
473    remainingSources.removeAll( headNames );
474
475    if( headNames.size() != 0 )
476      throw new PlannerException( "not all head pipes bound to source taps, remaining head pipe names: [" + join( quote( headNames, "'" ), ", " ) + "], remaining source tap names: [" + join( quote( remainingSources, "'" ), ", " ) + "]" );
477
478    remainingSources = new HashSet<String>( flowDef.getSources().keySet() );
479    remainingSources.removeAll( allHeadNames );
480
481    if( remainingSources.size() != 0 )
482      throw new PlannerException( "not all source taps bound to head pipes, remaining source tap names: [" + join( quote( remainingSources, "'" ), ", " ) + "], remaining head pipe names: [" + join( quote( headNames, "'" ), ", " ) + "]" );
483
484    }
485
486  protected void verifyTraps( FlowDef flowDef, Pipe[] flowTails )
487    {
488    verifyNotSourcesSinks( flowDef.getTraps(), flowDef.getSources(), flowDef.getSinks(), "trap" );
489
490    Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) );
491
492    for( String name : flowDef.getTraps().keySet() )
493      {
494      if( !names.contains( name ) )
495        throw new PlannerException( "trap name not found in assembly: '" + name + "'" );
496      }
497    }
498
499  protected void verifyCheckpoints( FlowDef flowDef, Pipe[] flowTails )
500    {
501    verifyNotSourcesSinks( flowDef.getCheckpoints(), flowDef.getSources(), flowDef.getSinks(), "checkpoint" );
502
503    for( Tap checkpointTap : flowDef.getCheckpoints().values() )
504      {
505      Scheme scheme = checkpointTap.getScheme();
506
507      if( scheme.getSourceFields().equals( Fields.UNKNOWN ) && scheme.getSinkFields().equals( Fields.ALL ) )
508        continue;
509
510      throw new PlannerException( "checkpoint tap scheme must be undeclared, source fields must be UNKNOWN, and sink fields ALL, got: " + scheme.toString() );
511      }
512
513    Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) );
514
515    for( String name : flowDef.getCheckpoints().keySet() )
516      {
517      if( !names.contains( name ) )
518        throw new PlannerException( "named checkpoint declared in FlowDef, but no named branch found in pipe assembly: '" + name + "'" );
519
520      Set<Pipe> pipes = new HashSet<Pipe>( asList( Pipe.named( name, flowTails ) ) );
521
522      int count = 0;
523
524      for( Pipe pipe : pipes )
525        {
526        if( pipe instanceof Checkpoint )
527          count++;
528        }
529
530      if( count == 0 )
531        throw new PlannerException( "no checkpoint pipe with branch name found in pipe assembly: '" + name + "'" );
532
533      if( count > 1 )
534        throw new PlannerException( "more than one checkpoint pipe with branch name found in pipe assembly: '" + name + "'" );
535      }
536    }
537
538  private void verifyNotSourcesSinks( Map<String, Tap> taps, Map<String, Tap> sources, Map<String, Tap> sinks, String role )
539    {
540    Collection<Tap> sourceTaps = sources.values();
541    Collection<Tap> sinkTaps = sinks.values();
542
543    for( Tap tap : taps.values() )
544      {
545      if( sourceTaps.contains( tap ) )
546        throw new PlannerException( "tap may not be used as both a " + role + " and a source in the same Flow: " + tap );
547
548      if( sinkTaps.contains( tap ) )
549        throw new PlannerException( "tap may not be used as both a " + role + " and a sink in the same Flow: " + tap );
550      }
551    }
552
553  /**
554   * If there are rules for a given {@link cascading.flow.planner.rule.ProcessLevel} on the current platform
555   * there must be sub-graphs partitioned at that level.
556   */
557  public Exception verifyResult( RuleResult ruleResult )
558    {
559    try
560      {
561      verifyResultInternal( ruleResult );
562      }
563    catch( Exception exception )
564      {
565      return exception;
566      }
567
568    return null;
569    }
570
571  protected void verifyResultInternal( RuleResult ruleResult )
572    {
573    Set<ProcessLevel> processLevels = getReverseOrderedProcessLevels( ruleResult );
574
575    for( ProcessLevel processLevel : processLevels )
576      {
577      String registryName = ruleResult.getRegistry().getName();
578
579      switch( processLevel )
580        {
581        case Assembly:
582
583          FlowElementGraph finalFlowElementGraph = ruleResult.getAssemblyGraph();
584
585          if( finalFlowElementGraph.vertexSet().isEmpty() )
586            throw new PlannerException( "final assembly graph is empty: " + registryName );
587
588          break;
589
590        case Step:
591
592          Map<ElementGraph, List<? extends ElementGraph>> assemblyToSteps = ruleResult.getAssemblyToStepGraphMap();
593
594          if( assemblyToSteps.isEmpty() )
595            throw new PlannerException( "no steps partitioned: " + registryName );
596
597          for( ElementGraph assembly : assemblyToSteps.keySet() )
598            {
599            List<? extends ElementGraph> steps = assemblyToSteps.get( assembly );
600
601            if( steps.isEmpty() )
602              throw new PlannerException( "no steps partitioned from assembly: " + registryName, assembly );
603
604            Set<ElementGraph> stepSet = new HashSet<>( steps.size() );
605
606            for( ElementGraph step : steps )
607              {
608              if( !stepSet.add( step ) )
609                throw new PlannerException( "found duplicate step in flow: " + registryName, step );
610              }
611
612            Set<FlowElement> elements = createIdentitySet();
613
614            for( ElementGraph step : steps )
615              elements.addAll( step.vertexSet() );
616
617            Set<FlowElement> missing = differenceIdentity( assembly.vertexSet(), elements );
618
619            if( !missing.isEmpty() )
620              {
621              String message = "union of steps have " + missing.size() + " fewer elements than parent assembly: " + registryName + ", missing: [" + join( missing, ", " ) + "]";
622              throw new PlannerException( message, assembly );
623              }
624            }
625
626          break;
627
628        case Node:
629
630          Map<ElementGraph, List<? extends ElementGraph>> stepToNodes = ruleResult.getStepToNodeGraphMap();
631
632          if( stepToNodes.isEmpty() )
633            throw new PlannerException( "no nodes partitioned: " + registryName );
634
635          for( ElementGraph step : stepToNodes.keySet() )
636            {
637            List<? extends ElementGraph> nodes = stepToNodes.get( step );
638
639            if( nodes.isEmpty() )
640              throw new PlannerException( "no nodes partitioned from step: " + registryName, step );
641
642            Set<ElementGraph> nodesSet = new HashSet<>( nodes.size() );
643
644            for( ElementGraph node : nodes )
645              {
646              if( !nodesSet.add( node ) )
647                throw new PlannerException( "found duplicate node in step: " + registryName, node );
648              }
649
650            Set<FlowElement> elements = createIdentitySet();
651
652            for( ElementGraph node : nodes )
653              elements.addAll( node.vertexSet() );
654
655            Set<FlowElement> missing = differenceIdentity( step.vertexSet(), elements );
656
657            if( !missing.isEmpty() )
658              {
659              String message = "union of nodes have " + missing.size() + " fewer elements than parent step: " + registryName + ", missing: [" + join( missing, ", " ) + "]";
660              throw new PlannerException( message, step );
661              }
662            }
663
664          break;
665
666        case Pipeline:
667
668          // all nodes are partitioned into pipelines, but if partitioned all elements should be represented
669          Map<ElementGraph, List<? extends ElementGraph>> nodeToPipeline = ruleResult.getNodeToPipelineGraphMap();
670
671          if( nodeToPipeline.isEmpty() )
672            throw new PlannerException( "no pipelines partitioned: " + registryName );
673
674          for( ElementGraph node : nodeToPipeline.keySet() )
675            {
676            List<? extends ElementGraph> pipelines = nodeToPipeline.get( node );
677
678            if( pipelines.isEmpty() )
679              throw new PlannerException( "no pipelines partitioned from node: " + registryName, node );
680
681            Set<ElementGraph> pipelineSet = new HashSet<>( pipelines.size() );
682
683            for( ElementGraph pipeline : pipelines )
684              {
685              if( !pipelineSet.add( pipeline ) )
686                throw new PlannerException( "found duplicate pipeline in node: " + registryName, pipeline );
687              }
688
689            Set<FlowElement> elements = createIdentitySet();
690
691            for( ElementGraph pipeline : pipelines )
692              elements.addAll( pipeline.vertexSet() );
693
694            Set<FlowElement> missing = differenceIdentity( node.vertexSet(), elements );
695
696            if( !missing.isEmpty() )
697              {
698              String message = "union of pipelines have " + missing.size() + " fewer elements than parent node: " + registryName + ", missing: [" + join( missing, ", " ) + "]";
699              throw new PlannerException( message, node );
700              }
701            }
702
703          break;
704        }
705      }
706    }
707
708  protected PlannerException handleExceptionDuringPlanning( FlowDef flowDef, Exception exception, FlowElementGraph flowElementGraph )
709    {
710    if( exception instanceof PlannerException )
711      {
712      if( ( (PlannerException) exception ).elementGraph == null )
713        ( (PlannerException) exception ).elementGraph = flowElementGraph;
714
715      return (PlannerException) exception;
716      }
717    else if( exception instanceof ElementGraphException )
718      {
719      Throwable cause = exception.getCause();
720
721      if( cause == null )
722        cause = exception;
723
724      // captures pipegraph for debugging
725      // forward message in case cause or trace is lost
726      String message = String.format( "[%s] could not build flow from assembly", Util.truncate( flowDef.getName(), 25 ) );
727
728      if( cause.getMessage() != null )
729        message = String.format( "%s: [%s]", message, cause.getMessage() );
730
731      if( cause instanceof OperatorException )
732        return new PlannerException( message, cause, flowElementGraph );
733
734      if( cause instanceof TapException )
735        return new PlannerException( message, cause, flowElementGraph );
736
737      return new PlannerException( ( (ElementGraphException) exception ).getPipe(), message, cause, flowElementGraph );
738      }
739    else
740      {
741      // captures pipegraph for debugging
742      // forward message in case cause or trace is lost
743      String message = String.format( "[%s] could not build flow from assembly", Util.truncate( flowDef.getName(), 25 ) );
744
745      if( exception.getMessage() != null )
746        message = String.format( "%s: [%s]", message, exception.getMessage() );
747
748      return new PlannerException( message, exception, flowElementGraph );
749      }
750    }
751
752  public class TempTapElementFactory extends IntermediateTapElementFactory
753    {
754    @Override
755    public FlowElement create( ElementGraph graph, FlowElement flowElement )
756      {
757      return makeTempTap( (FlowElementGraph) graph, (Pipe) flowElement );
758      }
759    }
760
761  private Tap makeTempTap( FlowElementGraph graph, Pipe pipe )
762    {
763    Tap checkpointTap = graph.getCheckpointsMap().get( pipe.getName() );
764
765    if( checkpointTap != null )
766      {
767      LOG.info( "found checkpoint: {}, using tap: {}", pipe.getName(), checkpointTap );
768      checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS );
769      }
770
771    if( checkpointTap == null )
772      {
773      // only restart from a checkpoint pipe or checkpoint tap below
774      if( pipe instanceof Checkpoint )
775        {
776        checkpointTap = makeTempTap( checkpointTapRootPath, pipe.getName() );
777        checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS );
778        // mark as an anonymous checkpoint
779        checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" );
780        }
781      else
782        {
783        checkpointTap = makeTempTap( pipe.getName() );
784        }
785      }
786
787    return decorateTap( pipe, checkpointTap, FlowConnectorProps.TEMPORARY_TAP_DECORATOR_CLASS );
788    }
789
790  private Tap decorateTap( Pipe pipe, Tap tempTap, String decoratorClass )
791    {
792    String decoratorClassName = PropertyUtil.getProperty( defaultProperties, pipe, decoratorClass );
793
794    if( Util.isEmpty( decoratorClassName ) )
795      return tempTap;
796
797    LOG.info( "found decorator: {}, wrapping tap: {}", decoratorClass, tempTap );
798
799    tempTap = Util.newInstance( decoratorClassName, tempTap );
800
801    return tempTap;
802    }
803
804  protected Tap makeTempTap( String name )
805    {
806    return makeTempTap( null, name );
807    }
808
809  protected DebugLevel getDebugLevel( FlowDef flowDef )
810    {
811    return flowDef.getDebugLevel() == null ? this.defaultDebugLevel : flowDef.getDebugLevel();
812    }
813
814  protected AssertionLevel getAssertionLevel( FlowDef flowDef )
815    {
816    return flowDef.getAssertionLevel() == null ? this.defaultAssertionLevel : flowDef.getAssertionLevel();
817    }
818
819  public String makeFlowNodeName( FlowNode flowNode, int size, int ordinal )
820    {
821    return String.format( "(%d/%d)", ordinal + 1, size );
822    }
823
824  public String makeFlowStepName( FlowStep flowStep, int numSteps, int stepNum )
825    {
826    Tap sink = Util.getFirst( flowStep.getSinkTaps() );
827
828    stepNum++; // number more sensical (5/5)
829
830    if( sink == null || sink.isTemporary() )
831      return String.format( "(%d/%d)", stepNum, numSteps );
832
833    String identifier = sink.getIdentifier();
834
835    if( identifier.length() > 25 )
836      identifier = String.format( "...%25s", identifier.substring( identifier.length() - 25 ) );
837
838    return String.format( "(%d/%d) %s", stepNum, numSteps, identifier );
839    }
840
841  protected abstract Tap makeTempTap( String prefix, String name );
842
843  private Set<ProcessLevel> getReverseOrderedProcessLevels( RuleResult ruleResult )
844    {
845    Set<ProcessLevel> ordered = new TreeSet<>( Collections.reverseOrder() );
846
847    ordered.addAll( ruleResult.getRegistry().getProcessLevels() );
848
849    return ordered;
850    }
851  }