001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner;
023
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeSet;
032
033import cascading.flow.AssemblyPlanner;
034import cascading.flow.BaseFlow;
035import cascading.flow.Flow;
036import cascading.flow.FlowConnector;
037import cascading.flow.FlowConnectorProps;
038import cascading.flow.FlowDef;
039import cascading.flow.FlowElement;
040import cascading.flow.Flows;
041import cascading.flow.planner.graph.ElementGraph;
042import cascading.flow.planner.graph.FlowElementGraph;
043import cascading.flow.planner.process.FlowNodeFactory;
044import cascading.flow.planner.process.FlowStepFactory;
045import cascading.flow.planner.process.FlowStepGraph;
046import cascading.flow.planner.rule.ProcessLevel;
047import cascading.flow.planner.rule.RuleRegistry;
048import cascading.flow.planner.rule.RuleRegistrySet;
049import cascading.flow.planner.rule.RuleResult;
050import cascading.flow.planner.rule.RuleSetExec;
051import cascading.flow.planner.rule.transformer.IntermediatePipeElementFactory;
052import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory;
053import cascading.flow.planner.rule.util.TraceWriter;
054import cascading.operation.AssertionLevel;
055import cascading.operation.DebugLevel;
056import cascading.operation.Identity;
057import cascading.pipe.Checkpoint;
058import cascading.pipe.Each;
059import cascading.pipe.OperatorException;
060import cascading.pipe.Pipe;
061import cascading.pipe.SubAssembly;
062import cascading.property.ConfigDef;
063import cascading.property.PropertyUtil;
064import cascading.scheme.Scheme;
065import cascading.tap.Tap;
066import cascading.tap.TapException;
067import cascading.tuple.Fields;
068import cascading.util.Update;
069import cascading.util.Util;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import static cascading.util.TraceUtil.formatTraces;
074import static cascading.util.Util.*;
075import static java.util.Arrays.asList;
076
077/**
078 * Class FlowPlanner is the base class for all planner implementations.
079 * <p/>
080 * This planner support tracing execution of each rule. See the appropriate properties on this
081 * class to enable.
082 */
083public abstract class FlowPlanner<F extends BaseFlow, Config>
084  {
085  /**
086   * Enables the planner to write out basic planner information including the initial element-graph,
087   * completed element-graph, and the completed step-graph dot files.
088   */
089  public static final String TRACE_PLAN_PATH = "cascading.planner.plan.path";
090
091  /**
092   * Enables the planner to write out detail level planner information for each rule, including recursive
093   * transforms.
094   * <p/>
095   * Use this to debug rules. This does increase overhead during planning.
096   */
097  public static final String TRACE_PLAN_TRANSFORM_PATH = "cascading.planner.plan.transforms.path";
098
099  /**
100   * Enables the planner to write out planner statistics for each planner phase and rule.
101   */
102  public static final String TRACE_STATS_PATH = "cascading.planner.stats.path";
103
104  /** Field LOG */
105  private static final Logger LOG = LoggerFactory.getLogger( FlowPlanner.class );
106
107  /** Field properties */
108  protected Map<Object, Object> defaultProperties;
109
110  protected String checkpointTapRootPath = null;
111
112  /** Field assertionLevel */
113  protected AssertionLevel defaultAssertionLevel;
114  /** Field debugLevel */
115  protected DebugLevel defaultDebugLevel;
116
117  /**
118   * Method getAssertionLevel returns the configured target planner {@link cascading.operation.AssertionLevel}.
119   *
120   * @param properties of type Map<Object, Object>
121   * @return AssertionLevel the configured AssertionLevel
122   */
123  static AssertionLevel getAssertionLevel( Map<Object, Object> properties )
124    {
125    String assertionLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.assertionlevel", AssertionLevel.STRICT.name() );
126
127    return AssertionLevel.valueOf( assertionLevel );
128    }
129
130  /**
131   * Method getDebugLevel returns the configured target planner {@link cascading.operation.DebugLevel}.
132   *
133   * @param properties of type Map<Object, Object>
134   * @return DebugLevel the configured DebugLevel
135   */
136  static DebugLevel getDebugLevel( Map<Object, Object> properties )
137    {
138    String debugLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.debuglevel", DebugLevel.DEFAULT.name() );
139
140    return DebugLevel.valueOf( debugLevel );
141    }
142
143  {
144  Update.registerPlanner( getClass() );
145  }
146
147  public Map<Object, Object> getDefaultProperties()
148    {
149    return defaultProperties;
150    }
151
152  public abstract Config getDefaultConfig();
153
154  public abstract PlannerInfo getPlannerInfo( String name );
155
156  public abstract PlatformInfo getPlatformInfo();
157
158  public void initialize( FlowConnector flowConnector, Map<Object, Object> properties )
159    {
160    this.defaultProperties = properties;
161    this.defaultAssertionLevel = getAssertionLevel( properties );
162    this.defaultDebugLevel = getDebugLevel( properties );
163    }
164
165  public F buildFlow( FlowDef flowDef, RuleRegistrySet ruleRegistrySet )
166    {
167    FlowElementGraph flowElementGraph = null;
168
169    try
170      {
171      flowDef = normalizeTaps( flowDef );
172
173      verifyAllTaps( flowDef );
174
175      F flow = createFlow( flowDef );
176
177      Pipe[] tails = resolveTails( flowDef, flow );
178
179      verifyAssembly( flowDef, tails );
180
181      flowElementGraph = createFlowElementGraph( flowDef, tails );
182
183      TraceWriter traceWriter = new TraceWriter( flow );
184      RuleSetExec ruleSetExec = new RuleSetExec( traceWriter, this, flow, ruleRegistrySet, flowDef, flowElementGraph );
185
186      RuleResult ruleResult = ruleSetExec.exec();
187
188      traceWriter.writeTracePlan( null, "0-initial-flow-element-graph", flowElementGraph );
189
190      FlowElementGraph finalFlowElementGraph = ruleResult.getAssemblyGraph();
191
192      finalFlowElementGraph = flow.updateSchemes( finalFlowElementGraph );
193
194      Map<ElementGraph, List<? extends ElementGraph>> stepToNodes = ruleResult.getStepToNodeGraphMap();
195      Map<ElementGraph, List<? extends ElementGraph>> nodeToPipeline = ruleResult.getNodeToPipelineGraphMap();
196
197      FlowStepGraph flowStepGraph = new FlowStepGraph( getFlowStepFactory(), finalFlowElementGraph, stepToNodes, nodeToPipeline );
198
199      traceWriter.writeFinal( "1-final-flow-registry", ruleResult );
200      traceWriter.writeTracePlan( null, "2-final-flow-element-graph", finalFlowElementGraph );
201      traceWriter.writeTracePlan( null, "3-final-flow-step-graph", flowStepGraph );
202      traceWriter.writeTracePlanSteps( "4-final-flow-steps", flowStepGraph );
203
204      flow.setPlannerInfo( getPlannerInfo( ruleResult.getRegistry().getName() ) );
205
206      flow.initialize( finalFlowElementGraph, flowStepGraph );
207
208      return flow;
209      }
210    catch( Exception exception )
211      {
212      throw handleExceptionDuringPlanning( flowDef, exception, flowElementGraph );
213      }
214    }
215
216  protected abstract F createFlow( FlowDef flowDef );
217
218  public abstract FlowStepFactory<Config> getFlowStepFactory();
219
220  public FlowNodeFactory getFlowNodeFactory()
221    {
222    return new BaseFlowNodeFactory();
223    }
224
225  public void configRuleRegistryDefaults( RuleRegistry ruleRegistry )
226    {
227    ruleRegistry.addDefaultElementFactory( IntermediatePipeElementFactory.IDENTITY, new IdentityElementFactgory() );
228    }
229
230  protected Pipe[] resolveTails( FlowDef flowDef, F flow )
231    {
232    Pipe[] tails = flowDef.getTailsArray();
233
234    tails = resolveAssemblyPlanners( flowDef, flow, tails );
235
236    return tails;
237    }
238
239  protected Pipe[] resolveAssemblyPlanners( FlowDef flowDef, Flow flow, Pipe[] pipes )
240    {
241    List<Pipe> tails = Arrays.asList( pipes );
242
243    List<AssemblyPlanner> assemblyPlanners = flowDef.getAssemblyPlanners();
244
245    for( AssemblyPlanner assemblyPlanner : assemblyPlanners )
246      {
247      tails = assemblyPlanner.resolveTails( new AssemblyPlannerContext( flowDef, flow, tails ) );
248
249      if( tails.isEmpty() )
250        throw new PlannerException( "assembly planner: " + assemblyPlanner + ", returned zero tails" );
251
252      tails = Collections.unmodifiableList( tails );
253      }
254
255    return tails.toArray( new Pipe[ tails.size() ] );
256    }
257
258  protected void verifyAssembly( FlowDef flowDef, Pipe[] tails )
259    {
260    verifyPipeAssemblyEndPoints( flowDef, tails );
261    verifyTraps( flowDef, tails );
262    verifyCheckpoints( flowDef, tails );
263    }
264
265  protected void verifyAllTaps( FlowDef flowDef )
266    {
267    verifySourceNotSinks( flowDef.getSources(), flowDef.getSinks() );
268
269    verifyTaps( flowDef.getSources(), true, true );
270    verifyTaps( flowDef.getSinks(), false, true );
271    verifyTaps( flowDef.getTraps(), false, false );
272
273    // are both sources and sinks
274    verifyTaps( flowDef.getCheckpoints(), true, false );
275    verifyTaps( flowDef.getCheckpoints(), false, false );
276    }
277
278  protected FlowElementGraph createFlowElementGraph( FlowDef flowDef, Pipe[] flowTails )
279    {
280    Map<String, Tap> sources = flowDef.getSourcesCopy();
281    Map<String, Tap> sinks = flowDef.getSinksCopy();
282    Map<String, Tap> traps = flowDef.getTrapsCopy();
283    Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy();
284
285    checkpointTapRootPath = makeCheckpointRootPath( flowDef );
286
287    return new FlowElementGraph( getPlatformInfo(), flowTails, sources, sinks, traps, checkpoints, checkpointTapRootPath != null );
288    }
289
290  private FlowDef normalizeTaps( FlowDef flowDef )
291    {
292    Set<Tap> taps = new HashSet<>();
293
294    Map<String, Tap> sources = flowDef.getSourcesCopy();
295    Map<String, Tap> sinks = flowDef.getSinksCopy();
296    Map<String, Tap> traps = flowDef.getTrapsCopy();
297    Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy();
298
299    boolean sourcesHasDupes = addTaps( sources, taps );
300    boolean sinksHasDupes = addTaps( sinks, taps );
301    boolean trapsHasDupes = addTaps( traps, taps );
302    boolean checkpointsHasDupes = addTaps( checkpoints, taps );
303
304    if( sourcesHasDupes )
305      normalize( taps, sources );
306
307    if( sinksHasDupes )
308      normalize( taps, sinks );
309
310    if( trapsHasDupes )
311      normalize( taps, traps );
312
313    if( checkpointsHasDupes )
314      normalize( taps, checkpoints );
315
316    return Flows.copy( flowDef, sources, sinks, traps, checkpoints );
317    }
318
319  private boolean addTaps( Map<String, Tap> current, Set<Tap> taps )
320    {
321    int size = taps.size();
322
323    taps.addAll( current.values() );
324
325    // if all the added values are not unique, taps.size will be less than original size + num tap instances
326    return size + current.size() != taps.size();
327    }
328
329  private void normalize( Set<Tap> taps, Map<String, Tap> current )
330    {
331    for( Tap tap : taps )
332      {
333      for( Map.Entry<String, Tap> entry : current.entrySet() )
334        {
335        if( entry.getValue().equals( tap ) ) // force equivalent instance to being the same instance
336          entry.setValue( tap );
337        }
338      }
339    }
340
341  private String makeCheckpointRootPath( FlowDef flowDef )
342    {
343    String flowName = flowDef.getName();
344    String runID = flowDef.getRunID();
345
346    if( runID == null )
347      return null;
348
349    if( flowName == null )
350      throw new PlannerException( "flow name is required when providing a run id" );
351
352    return flowName + "/" + runID;
353    }
354
355  protected void verifySourceNotSinks( Map<String, Tap> sources, Map<String, Tap> sinks )
356    {
357    Collection<Tap> sourcesSet = sources.values();
358
359    for( Tap tap : sinks.values() )
360      {
361      if( sourcesSet.contains( tap ) )
362        throw new PlannerException( "tap may not be used as both source and sink in the same Flow: " + tap );
363      }
364    }
365
366  /**
367   * Method verifyTaps ...
368   *
369   * @param taps          of type Map<String, Tap>
370   * @param areSources    of type boolean
371   * @param mayNotBeEmpty of type boolean
372   */
373  protected void verifyTaps( Map<String, Tap> taps, boolean areSources, boolean mayNotBeEmpty )
374    {
375    if( mayNotBeEmpty && taps.isEmpty() )
376      throw new PlannerException( ( areSources ? "source" : "sink" ) + " taps are required" );
377
378    for( String tapName : taps.keySet() )
379      {
380      if( areSources && !taps.get( tapName ).isSource() )
381        throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a source: " + taps.get( tapName ) );
382      else if( !areSources && !taps.get( tapName ).isSink() )
383        throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a sink: " + taps.get( tapName ) );
384      }
385    }
386
387  /**
388   * Method verifyEndPoints verifies
389   * <p/>
390   * there aren't dupe names in heads or tails.
391   * all the sink and source tap names match up with tail and head pipes
392   */
393  // todo: force dupe names to throw exceptions
394  protected void verifyPipeAssemblyEndPoints( FlowDef flowDef, Pipe[] flowTails )
395    {
396    Set<String> tapNames = new HashSet<String>();
397
398    tapNames.addAll( flowDef.getSources().keySet() );
399    tapNames.addAll( flowDef.getSinks().keySet() );
400
401    // handle tails
402    Set<Pipe> tails = new HashSet<Pipe>();
403    Set<String> tailNames = new HashSet<String>();
404
405    for( Pipe pipe : flowTails )
406      {
407      if( pipe instanceof SubAssembly )
408        {
409        for( Pipe tail : ( (SubAssembly) pipe ).getTails() )
410          {
411          String tailName = tail.getName();
412
413          if( !tapNames.contains( tailName ) )
414            throw new PlannerException( tail, "pipe name not found in either sink or source map: '" + tailName + "'" );
415
416          if( tailNames.contains( tailName ) && !tails.contains( tail ) )
417            throw new PlannerException( pipe, "duplicate tail name found: " + tailName );
418
419          tailNames.add( tailName );
420          tails.add( tail );
421          }
422        }
423      else
424        {
425        String tailName = pipe.getName();
426
427        if( !tapNames.contains( tailName ) )
428          throw new PlannerException( pipe, "pipe name not found in either sink or source map: '" + tailName + "'" );
429
430        if( tailNames.contains( tailName ) && !tails.contains( pipe ) )
431          throw new PlannerException( pipe, "duplicate tail name found: " + tailName );
432
433        tailNames.add( tailName );
434        tails.add( pipe );
435        }
436      }
437
438    tailNames.removeAll( flowDef.getSinks().keySet() );
439    Set<String> remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() );
440    remainingSinks.removeAll( tailNames );
441
442    if( tailNames.size() != 0 )
443      throw new PlannerException( "not all tail pipes bound to sink taps, remaining tail pipe names: [" + join( quote( tailNames, "'" ), ", " ) + "], remaining sink tap names: [" + join( quote( remainingSinks, "'" ), ", " ) + "]" );
444
445    // unlike heads, pipes can input to another pipe and simultaneously be a sink
446    // so there is no way to know all the intentional tails, so they aren't listed below in the exception
447    remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() );
448    remainingSinks.removeAll( asList( Pipe.names( flowTails ) ) );
449
450    if( remainingSinks.size() != 0 )
451      throw new PlannerException( "not all sink taps bound to tail pipes, remaining sink tap names: [" + join( quote( remainingSinks, "'" ), ", " ) + "]" );
452
453    // handle heads
454    Set<Pipe> heads = new HashSet<Pipe>();
455    Set<String> headNames = new HashSet<String>();
456
457    for( Pipe pipe : flowTails )
458      {
459      for( Pipe head : pipe.getHeads() )
460        {
461        String headName = head.getName();
462
463        if( !tapNames.contains( headName ) )
464          throw new PlannerException( head, "pipe name not found in either sink or source map: '" + headName + "'" );
465
466        if( headNames.contains( headName ) && !heads.contains( head ) )
467          LOG.warn( "duplicate head name found, not an error but heads should have unique names: '{}'", headName );
468
469        headNames.add( headName );
470        heads.add( head );
471        }
472      }
473
474    Set<String> allHeadNames = new HashSet<String>( headNames );
475    headNames.removeAll( flowDef.getSources().keySet() );
476    Set<String> remainingSources = new HashSet<String>( flowDef.getSources().keySet() );
477    remainingSources.removeAll( headNames );
478
479    if( headNames.size() != 0 )
480      throw new PlannerException( "not all head pipes bound to source taps, remaining head pipe names: [" + join( quote( headNames, "'" ), ", " ) + "], remaining source tap names: [" + join( quote( remainingSources, "'" ), ", " ) + "]" );
481
482    remainingSources = new HashSet<String>( flowDef.getSources().keySet() );
483    remainingSources.removeAll( allHeadNames );
484
485    if( remainingSources.size() != 0 )
486      throw new PlannerException( "not all source taps bound to head pipes, remaining source tap names: [" + join( quote( remainingSources, "'" ), ", " ) + "], remaining head pipe names: [" + join( quote( headNames, "'" ), ", " ) + "]" );
487
488    }
489
490  protected void verifyTraps( FlowDef flowDef, Pipe[] flowTails )
491    {
492    verifyNotSourcesSinks( flowDef.getTraps(), flowDef.getSources(), flowDef.getSinks(), "trap" );
493
494    Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) );
495
496    for( String name : flowDef.getTraps().keySet() )
497      {
498      if( !names.contains( name ) )
499        throw new PlannerException( "trap name not found in assembly: '" + name + "'" );
500      }
501    }
502
503  protected void verifyCheckpoints( FlowDef flowDef, Pipe[] flowTails )
504    {
505    verifyNotSourcesSinks( flowDef.getCheckpoints(), flowDef.getSources(), flowDef.getSinks(), "checkpoint" );
506
507    for( Tap checkpointTap : flowDef.getCheckpoints().values() )
508      {
509      Scheme scheme = checkpointTap.getScheme();
510
511      if( scheme.getSourceFields().equals( Fields.UNKNOWN ) && scheme.getSinkFields().equals( Fields.ALL ) )
512        continue;
513
514      throw new PlannerException( "checkpoint tap scheme must be undeclared, source fields must be UNKNOWN, and sink fields ALL, got: " + scheme.toString() );
515      }
516
517    Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) );
518
519    for( String name : flowDef.getCheckpoints().keySet() )
520      {
521      if( !names.contains( name ) )
522        throw new PlannerException( "named checkpoint declared in FlowDef, but no named branch found in pipe assembly: '" + name + "'" );
523
524      Set<Pipe> pipes = new HashSet<Pipe>( asList( Pipe.named( name, flowTails ) ) );
525
526      int count = 0;
527
528      for( Pipe pipe : pipes )
529        {
530        if( pipe instanceof Checkpoint )
531          count++;
532        }
533
534      if( count == 0 )
535        throw new PlannerException( "no checkpoint pipe with branch name found in pipe assembly: '" + name + "'" );
536
537      if( count > 1 )
538        throw new PlannerException( "more than one checkpoint pipe with branch name found in pipe assembly: '" + name + "'" );
539      }
540    }
541
542  private void verifyNotSourcesSinks( Map<String, Tap> taps, Map<String, Tap> sources, Map<String, Tap> sinks, String role )
543    {
544    Collection<Tap> sourceTaps = sources.values();
545    Collection<Tap> sinkTaps = sinks.values();
546
547    for( Tap tap : taps.values() )
548      {
549      if( sourceTaps.contains( tap ) )
550        throw new PlannerException( "tap may not be used as both a " + role + " and a source in the same Flow: " + tap );
551
552      if( sinkTaps.contains( tap ) )
553        throw new PlannerException( "tap may not be used as both a " + role + " and a sink in the same Flow: " + tap );
554      }
555    }
556
557  /**
558   * If there are rules for a given {@link cascading.flow.planner.rule.ProcessLevel} on the current platform
559   * there must be sub-graphs partitioned at that level.
560   */
561  public Exception verifyResult( RuleResult ruleResult )
562    {
563    try
564      {
565      verifyResultInternal( ruleResult );
566      }
567    catch( Exception exception )
568      {
569      return exception;
570      }
571
572    return null;
573    }
574
575  protected void verifyResultInternal( RuleResult ruleResult )
576    {
577    Set<ProcessLevel> processLevels = getReverseOrderedProcessLevels( ruleResult );
578
579    for( ProcessLevel processLevel : processLevels )
580      {
581      String registryName = ruleResult.getRegistry().getName();
582
583      switch( processLevel )
584        {
585        case Assembly:
586
587          FlowElementGraph finalFlowElementGraph = ruleResult.getAssemblyGraph();
588
589          if( finalFlowElementGraph.vertexSet().isEmpty() )
590            throw new PlannerException( "final assembly graph is empty: " + registryName );
591
592          break;
593
594        case Step:
595
596          Map<ElementGraph, List<? extends ElementGraph>> assemblyToSteps = ruleResult.getAssemblyToStepGraphMap();
597
598          if( assemblyToSteps.isEmpty() )
599            throw new PlannerException( "no steps partitioned: " + registryName );
600
601          for( ElementGraph assembly : assemblyToSteps.keySet() )
602            {
603            List<? extends ElementGraph> steps = assemblyToSteps.get( assembly );
604
605            if( steps.isEmpty() )
606              throw new PlannerException( "no steps partitioned from assembly: " + registryName, assembly );
607
608            Set<ElementGraph> stepSet = new HashSet<>( steps.size() );
609
610            for( ElementGraph step : steps )
611              {
612              if( !stepSet.add( step ) )
613                throw new PlannerException( "found duplicate step in flow: " + registryName, step );
614              }
615
616            Set<FlowElement> elements = createIdentitySet();
617
618            for( ElementGraph step : steps )
619              elements.addAll( step.vertexSet() );
620
621            Set<FlowElement> missing = differenceIdentity( assembly.vertexSet(), elements );
622
623            if( !missing.isEmpty() )
624              {
625              String message = "union of steps have " + missing.size() + " fewer elements than parent assembly: " + registryName + ", missing: [" + formatTraces( missing, ", " ) + "]";
626              throw new PlannerException( message, assembly );
627              }
628            }
629
630          break;
631
632        case Node:
633
634          Map<ElementGraph, List<? extends ElementGraph>> stepToNodes = ruleResult.getStepToNodeGraphMap();
635
636          if( stepToNodes.isEmpty() )
637            throw new PlannerException( "no nodes partitioned: " + registryName );
638
639          for( ElementGraph step : stepToNodes.keySet() )
640            {
641            List<? extends ElementGraph> nodes = stepToNodes.get( step );
642
643            if( nodes.isEmpty() )
644              throw new PlannerException( "no nodes partitioned from step: " + registryName, step );
645
646            Set<ElementGraph> nodesSet = new HashSet<>( nodes.size() );
647
648            for( ElementGraph node : nodes )
649              {
650              if( !nodesSet.add( node ) )
651                throw new PlannerException( "found duplicate node in step: " + registryName, node );
652              }
653
654            Set<FlowElement> elements = createIdentitySet();
655
656            for( ElementGraph node : nodes )
657              elements.addAll( node.vertexSet() );
658
659            Set<FlowElement> missing = differenceIdentity( step.vertexSet(), elements );
660
661            if( !missing.isEmpty() )
662              {
663              String message = "union of nodes have " + missing.size() + " fewer elements than parent step: " + registryName + ", missing: [" + formatTraces( missing, ", " ) + "]";
664              throw new PlannerException( message, step );
665              }
666            }
667
668          break;
669
670        case Pipeline:
671
672          // all nodes are partitioned into pipelines, but if partitioned all elements should be represented
673          Map<ElementGraph, List<? extends ElementGraph>> nodeToPipeline = ruleResult.getNodeToPipelineGraphMap();
674
675          if( nodeToPipeline.isEmpty() )
676            throw new PlannerException( "no pipelines partitioned: " + registryName );
677
678          for( ElementGraph node : nodeToPipeline.keySet() )
679            {
680            List<? extends ElementGraph> pipelines = nodeToPipeline.get( node );
681
682            if( pipelines.isEmpty() )
683              throw new PlannerException( "no pipelines partitioned from node: " + registryName, node );
684
685            Set<ElementGraph> pipelineSet = new HashSet<>( pipelines.size() );
686
687            for( ElementGraph pipeline : pipelines )
688              {
689              if( !pipelineSet.add( pipeline ) )
690                throw new PlannerException( "found duplicate pipeline in node: " + registryName, pipeline );
691              }
692
693            Set<FlowElement> elements = createIdentitySet();
694
695            for( ElementGraph pipeline : pipelines )
696              elements.addAll( pipeline.vertexSet() );
697
698            Set<FlowElement> missing = differenceIdentity( node.vertexSet(), elements );
699
700            if( !missing.isEmpty() )
701              {
702              String message = "union of pipelines have " + missing.size() + " fewer elements than parent node: " + registryName + ", missing: [" + formatTraces( missing, ", " ) + "]";
703              throw new PlannerException( message, node );
704              }
705            }
706
707          break;
708        }
709      }
710    }
711
712  protected PlannerException handleExceptionDuringPlanning( FlowDef flowDef, Exception exception, FlowElementGraph flowElementGraph )
713    {
714    if( exception instanceof PlannerException )
715      {
716      if( ( (PlannerException) exception ).elementGraph == null )
717        ( (PlannerException) exception ).elementGraph = flowElementGraph;
718
719      return (PlannerException) exception;
720      }
721    else if( exception instanceof ElementGraphException )
722      {
723      Throwable cause = exception.getCause();
724
725      if( cause == null )
726        cause = exception;
727
728      // captures pipegraph for debugging
729      // forward message in case cause or trace is lost
730      String message = String.format( "[%s] could not build flow from assembly", Util.truncate( flowDef.getName(), 25 ) );
731
732      if( cause.getMessage() != null )
733        message = String.format( "%s: [%s]", message, cause.getMessage() );
734
735      if( cause instanceof OperatorException )
736        return new PlannerException( message, cause, flowElementGraph );
737
738      if( cause instanceof TapException )
739        return new PlannerException( message, cause, flowElementGraph );
740
741      return new PlannerException( ( (ElementGraphException) exception ).getPipe(), message, cause, flowElementGraph );
742      }
743    else
744      {
745      // captures pipegraph for debugging
746      // forward message in case cause or trace is lost
747      String message = String.format( "[%s] could not build flow from assembly", Util.truncate( flowDef.getName(), 25 ) );
748
749      if( exception.getMessage() != null )
750        message = String.format( "%s: [%s]", message, exception.getMessage() );
751
752      return new PlannerException( message, exception, flowElementGraph );
753      }
754    }
755
756  public class IdentityElementFactgory extends IntermediatePipeElementFactory
757    {
758    @Override
759    public FlowElement create( ElementGraph graph, FlowElement flowElement )
760      {
761      return new Each( (Pipe) flowElement, Fields.ALL, new Identity( Fields.ALL ), Fields.RESULTS );
762      }
763    }
764
765  public class TempTapElementFactory extends IntermediateTapElementFactory
766    {
767    private String defaultDecoratorClassName;
768
769    public TempTapElementFactory()
770      {
771      }
772
773    public TempTapElementFactory( String defaultDecoratorClassName )
774      {
775      this.defaultDecoratorClassName = defaultDecoratorClassName;
776      }
777
778    @Override
779    public FlowElement create( ElementGraph graph, FlowElement flowElement )
780      {
781      if( flowElement instanceof Pipe )
782        return makeTempTap( (FlowElementGraph) graph, (Pipe) flowElement, defaultDecoratorClassName );
783
784      if( flowElement instanceof Tap )
785        return decorateTap( null, (Tap) flowElement, FlowConnectorProps.TEMPORARY_TAP_DECORATOR_CLASS, defaultDecoratorClassName );
786
787      throw new IllegalStateException( "unknown flow element type: " + flowElement );
788      }
789    }
790
791  private Tap makeTempTap( FlowElementGraph graph, Pipe pipe, String defaultDecoratorClassName )
792    {
793    Tap checkpointTap = graph.getCheckpointsMap().get( pipe.getName() );
794
795    if( checkpointTap != null )
796      {
797      LOG.info( "found checkpoint: {}, using tap: {}", pipe.getName(), checkpointTap );
798      checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS, null );
799      }
800
801    if( checkpointTap == null )
802      {
803      // only restart from a checkpoint pipe or checkpoint tap below
804      if( pipe instanceof Checkpoint )
805        {
806        checkpointTap = makeTempTap( checkpointTapRootPath, pipe.getName() );
807        checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS, null );
808        // mark as an anonymous checkpoint
809        checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" );
810        }
811      else
812        {
813        checkpointTap = makeTempTap( pipe.getName() );
814        }
815      }
816
817    return decorateTap( pipe, checkpointTap, FlowConnectorProps.TEMPORARY_TAP_DECORATOR_CLASS, defaultDecoratorClassName );
818    }
819
820  private Tap decorateTap( Pipe pipe, Tap tempTap, String decoratorClassProp, String defaultDecoratorClassName )
821    {
822    String decoratorClassName = PropertyUtil.getProperty( defaultProperties, pipe, decoratorClassProp );
823
824    if( Util.isEmpty( decoratorClassName ) )
825      decoratorClassName = defaultDecoratorClassName;
826
827    if( Util.isEmpty( decoratorClassName ) )
828      return tempTap;
829
830    LOG.info( "found decorator property: {}, with value: {}, wrapping tap: {}", decoratorClassProp, decoratorClassName, tempTap );
831
832    tempTap = Util.newInstance( decoratorClassName, tempTap );
833
834    return tempTap;
835    }
836
837  protected Tap makeTempTap( String name )
838    {
839    return makeTempTap( null, name );
840    }
841
842  protected DebugLevel getDebugLevel( FlowDef flowDef )
843    {
844    return flowDef.getDebugLevel() == null ? this.defaultDebugLevel : flowDef.getDebugLevel();
845    }
846
847  protected AssertionLevel getAssertionLevel( FlowDef flowDef )
848    {
849    return flowDef.getAssertionLevel() == null ? this.defaultAssertionLevel : flowDef.getAssertionLevel();
850    }
851
852  protected abstract Tap makeTempTap( String prefix, String name );
853
854  private Set<ProcessLevel> getReverseOrderedProcessLevels( RuleResult ruleResult )
855    {
856    Set<ProcessLevel> ordered = new TreeSet<>( Collections.reverseOrder() );
857
858    ordered.addAll( ruleResult.getRegistry().getProcessLevels() );
859
860    return ordered;
861    }
862  }