001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.planner;
022    
023    import java.util.ArrayList;
024    import java.util.Arrays;
025    import java.util.Collection;
026    import java.util.Collections;
027    import java.util.HashSet;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    
032    import cascading.flow.AssemblyPlanner;
033    import cascading.flow.Flow;
034    import cascading.flow.FlowConnector;
035    import cascading.flow.FlowConnectorProps;
036    import cascading.flow.FlowDef;
037    import cascading.flow.FlowElement;
038    import cascading.operation.AssertionLevel;
039    import cascading.operation.DebugLevel;
040    import cascading.pipe.Checkpoint;
041    import cascading.pipe.CoGroup;
042    import cascading.pipe.Each;
043    import cascading.pipe.Every;
044    import cascading.pipe.Group;
045    import cascading.pipe.GroupBy;
046    import cascading.pipe.HashJoin;
047    import cascading.pipe.Merge;
048    import cascading.pipe.OperatorException;
049    import cascading.pipe.Pipe;
050    import cascading.pipe.Splice;
051    import cascading.pipe.SubAssembly;
052    import cascading.property.ConfigDef;
053    import cascading.property.PropertyUtil;
054    import cascading.scheme.Scheme;
055    import cascading.tap.DecoratorTap;
056    import cascading.tap.Tap;
057    import cascading.tap.TapException;
058    import cascading.tuple.Fields;
059    import cascading.util.Util;
060    import org.jgrapht.GraphPath;
061    import org.jgrapht.Graphs;
062    import org.slf4j.Logger;
063    import org.slf4j.LoggerFactory;
064    
065    import static cascading.flow.planner.ElementGraphs.*;
066    import static java.util.Arrays.asList;
067    
068    /** Class FlowPlanner is the base class for all planner implementations. */
069    public abstract class FlowPlanner<F extends Flow, Config>
070      {
071      /** Field LOG */
072      private static final Logger LOG = LoggerFactory.getLogger( FlowPlanner.class );
073    
074      /** Field properties */
075      protected Map<Object, Object> properties;
076    
077      protected String checkpointRootPath = null;
078    
079      /** Field assertionLevel */
080      protected AssertionLevel assertionLevel;
081      /** Field debugLevel */
082      protected DebugLevel debugLevel;
083    
084      /**
085       * Method getAssertionLevel returns the configured target planner {@link cascading.operation.AssertionLevel}.
086       *
087       * @param properties of type Map<Object, Object>
088       * @return AssertionLevel the configured AssertionLevel
089       */
090      static AssertionLevel getAssertionLevel( Map<Object, Object> properties )
091        {
092        String assertionLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.assertionlevel", AssertionLevel.STRICT.name() );
093    
094        return AssertionLevel.valueOf( assertionLevel );
095        }
096    
097      /**
098       * Method getDebugLevel returns the configured target planner {@link cascading.operation.DebugLevel}.
099       *
100       * @param properties of type Map<Object, Object>
101       * @return DebugLevel the configured DebugLevel
102       */
103      static DebugLevel getDebugLevel( Map<Object, Object> properties )
104        {
105        String debugLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.debuglevel", DebugLevel.DEFAULT.name() );
106    
107        return DebugLevel.valueOf( debugLevel );
108        }
109    
110      public Map<Object, Object> getProperties()
111        {
112        return properties;
113        }
114    
115      public abstract Config getConfig();
116    
117      public abstract PlatformInfo getPlatformInfo();
118    
119      public void initialize( FlowConnector flowConnector, Map<Object, Object> properties )
120        {
121        this.properties = properties;
122        this.assertionLevel = getAssertionLevel( properties );
123        this.debugLevel = getDebugLevel( properties );
124        }
125    
126      protected abstract Flow createFlow( FlowDef flowDef );
127    
128      /**
129       * Method buildFlow renders the actual Flow instance.
130       *
131       * @param flowDef
132       * @return Flow
133       */
134      public abstract F buildFlow( FlowDef flowDef );
135    
136      protected Pipe[] resolveTails( FlowDef flowDef, Flow<Config> flow )
137        {
138        Pipe[] tails = flowDef.getTailsArray();
139    
140        tails = resolveAssemblyPlanners( flowDef, flow, tails );
141    
142        return tails;
143        }
144    
145      protected Pipe[] resolveAssemblyPlanners( FlowDef flowDef, Flow flow, Pipe[] pipes )
146        {
147        List<Pipe> tails = Arrays.asList( pipes );
148    
149        List<AssemblyPlanner> assemblyPlanners = flowDef.getAssemblyPlanners();
150    
151        for( AssemblyPlanner assemblyPlanner : assemblyPlanners )
152          {
153          tails = assemblyPlanner.resolveTails( new AssemblyPlannerContext( flowDef, flow, tails ) );
154    
155          if( tails.isEmpty() )
156            throw new PlannerException( "assembly planner: " + assemblyPlanner + ", returned zero tails" );
157    
158          tails = Collections.unmodifiableList( tails );
159          }
160    
161        return tails.toArray( new Pipe[ tails.size() ] );
162        }
163    
164      protected void verifyAssembly( FlowDef flowDef, Pipe[] tails )
165        {
166        verifyPipeAssemblyEndPoints( flowDef, tails );
167        verifyTraps( flowDef, tails );
168        verifyCheckpoints( flowDef, tails );
169        }
170    
171      protected void verifyAllTaps( FlowDef flowDef )
172        {
173        verifySourceNotSinks( flowDef.getSources(), flowDef.getSinks() );
174    
175        verifyTaps( flowDef.getSources(), true, true );
176        verifyTaps( flowDef.getSinks(), false, true );
177        verifyTaps( flowDef.getTraps(), false, false );
178    
179        // are both sources and sinks
180        verifyTaps( flowDef.getCheckpoints(), true, false );
181        verifyTaps( flowDef.getCheckpoints(), false, false );
182        }
183    
184      protected ElementGraph createElementGraph( FlowDef flowDef, Pipe[] flowTails )
185        {
186        Map<String, Tap> sources = flowDef.getSourcesCopy();
187        Map<String, Tap> sinks = flowDef.getSinksCopy();
188        Map<String, Tap> traps = flowDef.getTrapsCopy();
189        Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy();
190    
191        AssertionLevel assertionLevel = flowDef.getAssertionLevel() == null ? this.assertionLevel : flowDef.getAssertionLevel();
192        DebugLevel debugLevel = flowDef.getDebugLevel() == null ? this.debugLevel : flowDef.getDebugLevel();
193    
194        checkpointRootPath = makeCheckpointRootPath( flowDef );
195    
196        return new ElementGraph( getPlatformInfo(), flowTails, sources, sinks, traps, checkpoints, checkpointRootPath != null, assertionLevel, debugLevel );
197        }
198    
199      private String makeCheckpointRootPath( FlowDef flowDef )
200        {
201        String flowName = flowDef.getName();
202        String runID = flowDef.getRunID();
203    
204        if( runID == null )
205          return null;
206    
207        if( flowName == null )
208          throw new PlannerException( "flow name is required when providing a run id" );
209    
210        return flowName + "/" + runID;
211        }
212    
213      protected void verifySourceNotSinks( Map<String, Tap> sources, Map<String, Tap> sinks )
214        {
215        Collection<Tap> sourcesSet = sources.values();
216    
217        for( Tap tap : sinks.values() )
218          {
219          if( sourcesSet.contains( tap ) )
220            throw new PlannerException( "tap may not be used as both source and sink in the same Flow: " + tap );
221          }
222        }
223    
224      /**
225       * Method verifyTaps ...
226       *
227       * @param taps          of type Map<String, Tap>
228       * @param areSources    of type boolean
229       * @param mayNotBeEmpty of type boolean
230       */
231      protected void verifyTaps( Map<String, Tap> taps, boolean areSources, boolean mayNotBeEmpty )
232        {
233        if( mayNotBeEmpty && taps.isEmpty() )
234          throw new PlannerException( ( areSources ? "source" : "sink" ) + " taps are required" );
235    
236        for( String tapName : taps.keySet() )
237          {
238          if( areSources && !taps.get( tapName ).isSource() )
239            throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a source: " + taps.get( tapName ) );
240          else if( !areSources && !taps.get( tapName ).isSink() )
241            throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a sink: " + taps.get( tapName ) );
242          }
243        }
244    
245      /**
246       * Method verifyEndPoints verifies
247       * <p/>
248       * there aren't dupe names in heads or tails.
249       * all the sink and source tap names match up with tail and head pipes
250       */
251      // todo: force dupe names to throw exceptions
252      protected void verifyPipeAssemblyEndPoints( FlowDef flowDef, Pipe[] flowTails )
253        {
254        Set<String> tapNames = new HashSet<String>();
255    
256        tapNames.addAll( flowDef.getSources().keySet() );
257        tapNames.addAll( flowDef.getSinks().keySet() );
258    
259        // handle tails
260        Set<Pipe> tails = new HashSet<Pipe>();
261        Set<String> tailNames = new HashSet<String>();
262    
263        for( Pipe pipe : flowTails )
264          {
265          if( pipe instanceof SubAssembly )
266            {
267            for( Pipe tail : ( (SubAssembly) pipe ).getTails() )
268              {
269              String tailName = tail.getName();
270    
271              if( !tapNames.contains( tailName ) )
272                throw new PlannerException( tail, "pipe name not found in either sink or source map: '" + tailName + "'" );
273    
274              if( tailNames.contains( tailName ) && !tails.contains( tail ) )
275                LOG.warn( "duplicate tail name found: '{}'", tailName );
276    //            throw new PlannerException( pipe, "duplicate tail name found: " + tailName );
277    
278              tailNames.add( tailName );
279              tails.add( tail );
280              }
281            }
282          else
283            {
284            String tailName = pipe.getName();
285    
286            if( !tapNames.contains( tailName ) )
287              throw new PlannerException( pipe, "pipe name not found in either sink or source map: '" + tailName + "'" );
288    
289            if( tailNames.contains( tailName ) && !tails.contains( pipe ) )
290              LOG.warn( "duplicate tail name found: '{}'", tailName );
291    //            throw new PlannerException( pipe, "duplicate tail name found: " + tailName );
292    
293            tailNames.add( tailName );
294            tails.add( pipe );
295            }
296          }
297    
298    //    Set<String> allTailNames = new HashSet<String>( tailNames );
299        tailNames.removeAll( flowDef.getSinks().keySet() );
300        Set<String> remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() );
301        remainingSinks.removeAll( tailNames );
302    
303        if( tailNames.size() != 0 )
304          throw new PlannerException( "not all tail pipes bound to sink taps, remaining tail pipe names: [" + Util.join( Util.quote( tailNames, "'" ), ", " ) + "], remaining sink tap names: [" + Util.join( Util.quote( remainingSinks, "'" ), ", " ) + "]" );
305    
306        // unlike heads, pipes can input to another pipe and simultaneously be a sink
307        // so there is no way to know all the intentional tails, so they aren't listed below in the exception
308        remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() );
309        remainingSinks.removeAll( asList( Pipe.names( flowTails ) ) );
310    
311        if( remainingSinks.size() != 0 )
312          throw new PlannerException( "not all sink taps bound to tail pipes, remaining sink tap names: [" + Util.join( Util.quote( remainingSinks, "'" ), ", " ) + "]" );
313    
314        // handle heads
315        Set<Pipe> heads = new HashSet<Pipe>();
316        Set<String> headNames = new HashSet<String>();
317    
318        for( Pipe pipe : flowTails )
319          {
320          for( Pipe head : pipe.getHeads() )
321            {
322            String headName = head.getName();
323    
324            if( !tapNames.contains( headName ) )
325              throw new PlannerException( head, "pipe name not found in either sink or source map: '" + headName + "'" );
326    
327            if( headNames.contains( headName ) && !heads.contains( head ) )
328              LOG.warn( "duplicate head name found, not an error but heads should have unique names: '{}'", headName );
329    //          throw new PlannerException( pipe, "duplicate head name found: " + headName );
330    
331            headNames.add( headName );
332            heads.add( head );
333            }
334          }
335    
336        Set<String> allHeadNames = new HashSet<String>( headNames );
337        headNames.removeAll( flowDef.getSources().keySet() );
338        Set<String> remainingSources = new HashSet<String>( flowDef.getSources().keySet() );
339        remainingSources.removeAll( headNames );
340    
341        if( headNames.size() != 0 )
342          throw new PlannerException( "not all head pipes bound to source taps, remaining head pipe names: [" + Util.join( Util.quote( headNames, "'" ), ", " ) + "], remaining source tap names: [" + Util.join( Util.quote( remainingSources, "'" ), ", " ) + "]" );
343    
344        remainingSources = new HashSet<String>( flowDef.getSources().keySet() );
345        remainingSources.removeAll( allHeadNames );
346    
347        if( remainingSources.size() != 0 )
348          throw new PlannerException( "not all source taps bound to head pipes, remaining source tap names: [" + Util.join( Util.quote( remainingSources, "'" ), ", " ) + "], remaining head pipe names: [" + Util.join( Util.quote( headNames, "'" ), ", " ) + "]" );
349    
350        }
351    
352      protected void verifyTraps( FlowDef flowDef, Pipe[] flowTails )
353        {
354        verifyNotSourcesSinks( flowDef.getTraps(), flowDef.getSources(), flowDef.getSinks(), "trap" );
355    
356        Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) );
357    
358        for( String name : flowDef.getTraps().keySet() )
359          {
360          if( !names.contains( name ) )
361            throw new PlannerException( "trap name not found in assembly: '" + name + "'" );
362          }
363        }
364    
365      protected void verifyCheckpoints( FlowDef flowDef, Pipe[] flowTails )
366        {
367        verifyNotSourcesSinks( flowDef.getCheckpoints(), flowDef.getSources(), flowDef.getSinks(), "checkpoint" );
368    
369        for( Tap checkpointTap : flowDef.getCheckpoints().values() )
370          {
371          Scheme scheme = checkpointTap.getScheme();
372    
373          if( scheme.getSourceFields().equals( Fields.UNKNOWN ) && scheme.getSinkFields().equals( Fields.ALL ) )
374            continue;
375    
376          throw new PlannerException( "checkpoint tap scheme must be undeclared, source fields must be UNKNOWN, and sink fields ALL, got: " + scheme.toString() );
377          }
378    
379        Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) );
380    
381        for( String name : flowDef.getCheckpoints().keySet() )
382          {
383          if( !names.contains( name ) )
384            throw new PlannerException( "checkpoint name not found in assembly: '" + name + "'" );
385    
386          Set<Pipe> pipes = new HashSet<Pipe>( asList( Pipe.named( name, flowTails ) ) );
387    
388          int count = 0;
389    
390          for( Pipe pipe : pipes )
391            {
392            if( pipe instanceof Checkpoint )
393              count++;
394            }
395    
396          if( count == 0 )
397            throw new PlannerException( "no checkpoint with name found in assembly: '" + name + "'" );
398    
399          if( count > 1 )
400            throw new PlannerException( "more than one checkpoint with name found in assembly: '" + name + "'" );
401          }
402        }
403    
404      private void verifyNotSourcesSinks( Map<String, Tap> taps, Map<String, Tap> sources, Map<String, Tap> sinks, String role )
405        {
406        Collection<Tap> sourceTaps = sources.values();
407        Collection<Tap> sinkTaps = sinks.values();
408    
409        for( Tap tap : taps.values() )
410          {
411          if( sourceTaps.contains( tap ) )
412            throw new PlannerException( "tap may not be used as both a " + role + " and a source in the same Flow: " + tap );
413    
414          if( sinkTaps.contains( tap ) )
415            throw new PlannerException( "tap may not be used as both a " + role + " and a sink in the same Flow: " + tap );
416          }
417        }
418    
419      /**
420       * Verifies that there are not only GroupAssertions following any given Group instance. This will adversely
421       * affect the stream entering any subsequent Tap of Each instances.
422       */
423      protected void failOnLoneGroupAssertion( ElementGraph elementGraph )
424        {
425        List<Group> groups = elementGraph.findAllGroups();
426    
427        // walk Every instances after Group
428        for( Group group : groups )
429          {
430          for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsFrom( group ) )
431            {
432            List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is tail
433    
434            int everies = 0;
435            int assertions = 0;
436    
437            for( FlowElement flowElement : flowElements )
438              {
439              if( flowElement instanceof Group )
440                continue;
441    
442              if( !( flowElement instanceof Every ) )
443                break;
444    
445              everies++;
446    
447              Every every = (Every) flowElement;
448    
449              if( every.getPlannerLevel() != null )
450                assertions++;
451              }
452    
453            if( everies != 0 && everies == assertions )
454              throw new PlannerException( "group assertions must be accompanied by aggregator operations" );
455            }
456          }
457        }
458    
459      protected void failOnMissingGroup( ElementGraph elementGraph )
460        {
461        List<Every> everies = elementGraph.findAllEveries();
462    
463        // walk Every instances after Group
464        for( Every every : everies )
465          {
466          for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( every ) )
467            {
468            List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is every
469            Collections.reverse( flowElements ); // first element is every
470    
471            for( FlowElement flowElement : flowElements )
472              {
473              if( flowElement instanceof Every || flowElement.getClass() == Pipe.class )
474                continue;
475    
476              if( flowElement instanceof GroupBy || flowElement instanceof CoGroup )
477                break;
478    
479              throw new PlannerException( (Pipe) flowElement, "Every may only be preceded by another Every or a Group pipe, found: " + flowElement );
480              }
481            }
482          }
483        }
484    
485      protected void failOnMisusedBuffer( ElementGraph elementGraph )
486        {
487        List<Every> everies = elementGraph.findAllEveries();
488    
489        // walk Every instances after Group
490        for( Every every : everies )
491          {
492          for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( every ) )
493            {
494            List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is every
495            Collections.reverse( flowElements ); // first element is every
496    
497            Every last = null;
498            boolean foundBuffer = false;
499            int foundEveries = -1;
500    
501            for( FlowElement flowElement : flowElements )
502              {
503              if( flowElement instanceof Each )
504                throw new PlannerException( (Pipe) flowElement, "Every may only be preceded by another Every or a GroupBy or CoGroup pipe, found: " + flowElement );
505    
506              if( flowElement instanceof Every )
507                {
508                foundEveries++;
509    
510                boolean isBuffer = ( (Every) flowElement ).isBuffer();
511    
512                if( foundEveries != 0 && ( isBuffer || foundBuffer ) )
513                  throw new PlannerException( (Pipe) flowElement, "Only one Every with a Buffer may follow a GroupBy or CoGroup pipe, no other Every instances are allowed immediately before or after, found: " + flowElement + " before: " + last );
514    
515                if( !foundBuffer )
516                  foundBuffer = isBuffer;
517    
518                last = (Every) flowElement;
519                }
520    
521              if( flowElement instanceof Group )
522                break;
523              }
524            }
525          }
526        }
527    
528      protected void failOnGroupEverySplit( ElementGraph elementGraph )
529        {
530        List<Group> groups = new ArrayList<Group>();
531    
532        elementGraph.findAllOfType( 1, 2, Group.class, groups );
533    
534        for( Group group : groups )
535          {
536          Set<FlowElement> children = elementGraph.getAllChildrenNotExactlyType( group, Pipe.class );
537    
538          for( FlowElement flowElement : children )
539            {
540            if( flowElement instanceof Every )
541              throw new PlannerException( (Every) flowElement, "Every instances may not split after a GroupBy or CoGroup pipe, found: " + flowElement + " after: " + group );
542            }
543          }
544        }
545    
546      protected PlannerException handleExceptionDuringPlanning( Exception exception, ElementGraph elementGraph )
547        {
548        if( exception instanceof PlannerException )
549          {
550          ( (PlannerException) exception ).elementGraph = elementGraph;
551    
552          return (PlannerException) exception;
553          }
554        else if( exception instanceof ElementGraphException )
555          {
556          Throwable cause = exception.getCause();
557    
558          if( cause == null )
559            cause = exception;
560    
561          // captures pipegraph for debugging
562          // forward message in case cause or trace is lost
563          String message = String.format( "could not build flow from assembly: [%s]", cause.getMessage() );
564    
565          if( cause instanceof OperatorException )
566            return new PlannerException( message, cause, elementGraph );
567    
568          if( cause instanceof TapException )
569            return new PlannerException( message, cause, elementGraph );
570    
571          return new PlannerException( ( (ElementGraphException) exception ).getPipe(), message, cause, elementGraph );
572          }
573        else
574          {
575          // captures pipegraph for debugging
576          // forward message in case cause or trace is lost
577          String message = String.format( "could not build flow from assembly: [%s]", exception.getMessage() );
578          return new PlannerException( message, exception, elementGraph );
579          }
580        }
581    
582      protected void handleNonSafeOperations( ElementGraph elementGraph )
583        {
584        // if there was a graph change, iterate paths again.
585        while( !internalNonSafeOperations( elementGraph ) )
586          ;
587        }
588    
589      private boolean internalNonSafeOperations( ElementGraph elementGraph )
590        {
591        Set<Pipe> tapInsertions = new HashSet<Pipe>();
592    
593        List<Pipe> splits = elementGraph.findAllPipeSplits();
594    
595        // if any predecessor is unsafe, insert temp
596        for( Pipe split : splits )
597          {
598          List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsTo( split );
599    
600          for( GraphPath<FlowElement, Scope> path : paths )
601            {
602            List<FlowElement> elements = Graphs.getPathVertexList( path );
603            Collections.reverse( elements );
604    
605            for( FlowElement element : elements )
606              {
607              if( !( element instanceof Each ) && element.getClass() != Pipe.class )
608                break;
609    
610              if( element.getClass() == Pipe.class )
611                continue;
612    
613              if( !( (Each) element ).getOperation().isSafe() )
614                {
615                tapInsertions.add( split );
616                break;
617                }
618              }
619            }
620          }
621    
622        for( Pipe pipe : tapInsertions )
623          insertTempTapAfter( elementGraph, pipe );
624    
625        return tapInsertions.isEmpty();
626        }
627    
628      /**
629       * Method insertTapAfter ...
630       *
631       * @param graph of type PipeGraph
632       * @param pipe  of type Pipe
633       */
634      protected void insertTempTapAfter( ElementGraph graph, Pipe pipe )
635        {
636        LOG.debug( "inserting tap after: {}", pipe );
637    
638        Tap checkpointTap = graph.getCheckpointsMap().get( pipe.getName() );
639    
640        if( checkpointTap != null )
641          {
642          LOG.info( "found checkpoint: {}, using tap: {}", pipe.getName(), checkpointTap );
643          checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS );
644          }
645    
646        if( checkpointTap == null )
647          {
648          // only restart from a checkpoint pipe or checkpoint tap below
649          if( pipe instanceof Checkpoint )
650            {
651            checkpointTap = makeTempTap( checkpointRootPath, pipe.getName() );
652            checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS );
653            // mark as an anonymous checkpoint
654            checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" );
655            }
656          else
657            {
658            checkpointTap = makeTempTap( pipe.getName() );
659            }
660          }
661    
662        checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.TEMPORARY_TAP_DECORATOR_CLASS );
663    
664        graph.insertFlowElementAfter( pipe, checkpointTap );
665        }
666    
667      private Tap decorateTap( Pipe pipe, Tap tempTap, String decoratorClass )
668        {
669        String decoratorClassName = PropertyUtil.getProperty( properties, pipe, decoratorClass );
670    
671        if( Util.isEmpty( decoratorClassName ) )
672          return tempTap;
673    
674        LOG.info( "found decorator: {}, wrapping tap: {}", decoratorClass, tempTap );
675    
676        tempTap = Util.newInstance( decoratorClassName, tempTap );
677    
678        return tempTap;
679        }
680    
681      protected Tap makeTempTap( String name )
682        {
683        return makeTempTap( null, name );
684        }
685    
686      protected abstract Tap makeTempTap( String prefix, String name );
687    
688      /**
689       * Inserts a temporary Tap between logical MR jobs.
690       * <p/>
691       * Since all joins are at groups or splices, depth first search is safe
692       * <p/>
693       * todo: refactor so that rules are applied to path segments bounded by taps
694       * todo: this would allow balancing of operations within paths instead of pushing
695       * todo: all operations up. may allow for consolidation of rules
696       *
697       * @param elementGraph of type PipeGraph
698       */
699      protected void handleJobPartitioning( ElementGraph elementGraph )
700        {
701        // if there was a graph change, iterate paths again. prevents many temp taps from being inserted in front of a group
702        while( !internalJobPartitioning( elementGraph ) )
703          ;
704        }
705    
706      private boolean internalJobPartitioning( ElementGraph elementGraph )
707        {
708        for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsBetweenExtents() )
709          {
710          List<FlowElement> flowElements = Graphs.getPathVertexList( path );
711          List<Pipe> tapInsertions = new ArrayList<Pipe>();
712    
713          boolean foundGroup = false;
714    
715          for( int i = 0; i < flowElements.size(); i++ )
716            {
717            FlowElement flowElement = flowElements.get( i );
718    
719            if( flowElement instanceof ElementGraph.Extent ) // is an extent: head or tail
720              continue;
721            else if( flowElement instanceof Tap && flowElements.get( i - 1 ) instanceof ElementGraph.Extent )  // is a source tap
722              continue;
723    
724            if( flowElement instanceof Group && !foundGroup )
725              {
726              foundGroup = true;
727              }
728            else if( flowElement instanceof Splice && foundGroup ) // add tap between groups, push joins/merge map side
729              {
730              tapInsertions.add( (Pipe) flowElements.get( i - 1 ) );
731    
732              if( !( flowElement instanceof Group ) )
733                foundGroup = false;
734              }
735            else if( flowElement instanceof Checkpoint ) // add tap after checkpoint
736              {
737              if( flowElements.get( i + 1 ) instanceof Tap ) // don't keep inserting
738                continue;
739    
740              tapInsertions.add( (Pipe) flowElement );
741              foundGroup = false;
742              }
743            else if( flowElement instanceof Tap )
744              {
745              foundGroup = false;
746              }
747            }
748    
749          for( Pipe pipe : tapInsertions )
750            insertTempTapAfter( elementGraph, pipe );
751    
752          if( !tapInsertions.isEmpty() )
753            return false;
754          }
755    
756        return true;
757        }
758    
759      /**
760       * Prevent leftmost sources from sourcing a downstream join on the rightmost side intra-task by inserting a
761       * temp tap between the left-sourced join and right-sourced join.
762       *
763       * @param elementGraph
764       */
765      protected void handleJoins( ElementGraph elementGraph )
766        {
767        while( !internalJoins( elementGraph ) )
768          ;
769        }
770    
771      private boolean internalJoins( ElementGraph elementGraph )
772        {
773        List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsBetweenExtents();
774    
775        // large to small
776        Collections.reverse( paths );
777    
778        for( GraphPath<FlowElement, Scope> path : paths )
779          {
780          List<FlowElement> flowElements = Graphs.getPathVertexList( path );
781          List<Pipe> tapInsertions = new ArrayList<Pipe>();
782          List<HashJoin> joins = new ArrayList<HashJoin>();
783          List<Merge> merges = new ArrayList<Merge>();
784    
785          FlowElement lastSourceElement = null;
786    
787          for( int i = 0; i < flowElements.size(); i++ )
788            {
789            FlowElement flowElement = flowElements.get( i );
790    
791            if( flowElement instanceof Merge )
792              {
793              merges.add( (Merge) flowElement );
794              }
795            else if( flowElement instanceof HashJoin )
796              {
797              HashJoin join = (HashJoin) flowElement;
798    
799              Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, join, true );
800    
801              // is this path streamed
802              int pathPosition = pathPositionInto( path, join );
803              boolean thisPathIsStreamed = pathPosition == 0;
804    
805              boolean isAccumulatedAndStreamed = isBothAccumulatedAndStreamedPath( pathCounts ); // has streamed and accumulated paths
806              int pathCount = countPaths( pathCounts );
807    
808              int priorJoins = countTypesBetween( elementGraph, lastSourceElement, join, HashJoin.class );
809    
810              if( priorJoins == 0 )
811                {
812                // if same source is leading into the hashjoin, insert tap on the accumulated side
813                if( pathCount == 2 && isAccumulatedAndStreamed && !thisPathIsStreamed )
814                  {
815                  tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
816                  break;
817                  }
818    
819                // if more than one path into streamed and accumulated branches, insert tap on streamed side
820                if( pathCount > 2 && isAccumulatedAndStreamed && thisPathIsStreamed )
821                  {
822                  tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
823                  break;
824                  }
825                }
826    
827              if( !merges.isEmpty() )
828                {
829                // if a Merge is prior to a HashJoin, and its an accumulated path, force Merge results to disk
830                int joinPos = flowElements.indexOf( join );
831                int mergePos = nearest( flowElements, joinPos, merges );
832    
833                if( mergePos != -1 && joinPos > mergePos )
834                  {
835                  // if all paths are accumulated and streamed, insert
836                  // else if just if this path is accumulated
837                  if( ( isAccumulatedAndStreamed && thisPathIsStreamed ) || !thisPathIsStreamed )
838                    {
839                    tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
840                    break;
841                    }
842                  }
843                }
844    
845              joins.add( (HashJoin) flowElement );
846              }
847            else if( flowElement instanceof Tap || flowElement instanceof Group )
848              {
849              // added for JoinFieldedPipesPlatformTest.testJoinMergeGroupBy where Merge hides streamed nature of path
850              if( flowElement instanceof Group && !joins.isEmpty() )
851                {
852                List<Splice> splices = new ArrayList<Splice>();
853    
854                splices.addAll( merges );
855                splices.add( (Splice) flowElement );
856    
857                Collections.reverse( splices );
858    
859                for( Splice splice : splices )
860                  {
861                  Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, splice, true );
862    
863                  if( isBothAccumulatedAndStreamedPath( pathCounts ) )
864                    {
865                    tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( splice ) - 1 ) );
866                    break;
867                    }
868                  }
869    
870                if( !tapInsertions.isEmpty() )
871                  break;
872                }
873    
874              for( int j = 0; j < joins.size(); j++ )
875                {
876                HashJoin join = joins.get( j );
877    
878                int pathPosition = pathPositionInto( path, join );
879                boolean thisPathIsStreamed = pathPosition == 0;
880    
881                Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, join, true );
882    
883                boolean isAccumulatedAndStreamed = isBothAccumulatedAndStreamedPath( pathCounts ); // has streamed and accumulated paths
884                int pathCount = countPaths( pathCounts );
885    
886                if( pathCount >= 2 && isAccumulatedAndStreamed && thisPathIsStreamed )
887                  {
888                  tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
889                  break;
890                  }
891    
892                if( thisPathIsStreamed )
893                  continue;
894    
895                if( j == 0 ) // is accumulated on first join
896                  break;
897    
898                // prevent a streamed path from being accumulated by injecting a tap before the
899                // current HashJoin
900                tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
901                break;
902                }
903    
904              if( !tapInsertions.isEmpty() )
905                break;
906    
907              lastSourceElement = flowElement;
908              merges.clear();
909              joins.clear();
910              }
911            }
912    
913          for( Pipe pipe : tapInsertions )
914            insertTempTapAfter( elementGraph, pipe );
915    
916          if( !tapInsertions.isEmpty() )
917            return false;
918          }
919    
920        return true;
921        }
922    
923      private int nearest( List<FlowElement> flowElements, int index, List<Merge> merges )
924        {
925        List<Merge> reversed = new ArrayList<Merge>( merges );
926        Collections.reverse( reversed );
927    
928        for( Merge merge : reversed )
929          {
930          int pos = flowElements.indexOf( merge );
931          if( pos < index )
932            return pos;
933          }
934    
935        return -1;
936        }
937      }