001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.planner;
022    
023    import java.util.ArrayList;
024    import java.util.Arrays;
025    import java.util.Collection;
026    import java.util.Collections;
027    import java.util.HashSet;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    
032    import cascading.flow.AssemblyPlanner;
033    import cascading.flow.Flow;
034    import cascading.flow.FlowConnector;
035    import cascading.flow.FlowDef;
036    import cascading.flow.FlowElement;
037    import cascading.operation.AssertionLevel;
038    import cascading.operation.DebugLevel;
039    import cascading.pipe.Checkpoint;
040    import cascading.pipe.CoGroup;
041    import cascading.pipe.Each;
042    import cascading.pipe.Every;
043    import cascading.pipe.Group;
044    import cascading.pipe.GroupBy;
045    import cascading.pipe.HashJoin;
046    import cascading.pipe.Merge;
047    import cascading.pipe.OperatorException;
048    import cascading.pipe.Pipe;
049    import cascading.pipe.Splice;
050    import cascading.pipe.SubAssembly;
051    import cascading.property.ConfigDef;
052    import cascading.property.PropertyUtil;
053    import cascading.scheme.Scheme;
054    import cascading.tap.Tap;
055    import cascading.tap.TapException;
056    import cascading.tuple.Fields;
057    import cascading.util.Util;
058    import org.jgrapht.GraphPath;
059    import org.jgrapht.Graphs;
060    import org.slf4j.Logger;
061    import org.slf4j.LoggerFactory;
062    
063    import static cascading.flow.planner.ElementGraphs.*;
064    import static java.util.Arrays.asList;
065    
066    /** Class FlowPlanner is the base class for all planner implementations. */
067    public abstract class FlowPlanner<F extends Flow, Config>
068      {
069      /** Field LOG */
070      private static final Logger LOG = LoggerFactory.getLogger( FlowPlanner.class );
071    
072      /** Field properties */
073      protected Map<Object, Object> properties;
074    
075      protected String checkpointRootPath = null;
076    
077      /** Field assertionLevel */
078      protected AssertionLevel assertionLevel;
079      /** Field debugLevel */
080      protected DebugLevel debugLevel;
081    
082      /**
083       * Method getAssertionLevel returns the configured target planner {@link cascading.operation.AssertionLevel}.
084       *
085       * @param properties of type Map<Object, Object>
086       * @return AssertionLevel the configured AssertionLevel
087       */
088      static AssertionLevel getAssertionLevel( Map<Object, Object> properties )
089        {
090        String assertionLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.assertionlevel", AssertionLevel.STRICT.name() );
091    
092        return AssertionLevel.valueOf( assertionLevel );
093        }
094    
095      /**
096       * Method getDebugLevel returns the configured target planner {@link cascading.operation.DebugLevel}.
097       *
098       * @param properties of type Map<Object, Object>
099       * @return DebugLevel the configured DebugLevel
100       */
101      static DebugLevel getDebugLevel( Map<Object, Object> properties )
102        {
103        String debugLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.debuglevel", DebugLevel.DEFAULT.name() );
104    
105        return DebugLevel.valueOf( debugLevel );
106        }
107    
108      public Map<Object, Object> getProperties()
109        {
110        return properties;
111        }
112    
113      public abstract Config getConfig();
114    
115      public abstract PlatformInfo getPlatformInfo();
116    
117      public void initialize( FlowConnector flowConnector, Map<Object, Object> properties )
118        {
119        this.properties = properties;
120        this.assertionLevel = getAssertionLevel( properties );
121        this.debugLevel = getDebugLevel( properties );
122        }
123    
124      protected abstract Flow createFlow( FlowDef flowDef );
125    
126      /**
127       * Method buildFlow renders the actual Flow instance.
128       *
129       * @param flowDef
130       * @return Flow
131       */
132      public abstract F buildFlow( FlowDef flowDef );
133    
134      protected Pipe[] resolveTails( FlowDef flowDef, Flow<Config> flow )
135        {
136        Pipe[] tails = flowDef.getTailsArray();
137    
138        tails = resolveAssemblyPlanners( flowDef, flow, tails );
139    
140        return tails;
141        }
142    
143      protected Pipe[] resolveAssemblyPlanners( FlowDef flowDef, Flow flow, Pipe[] pipes )
144        {
145        List<Pipe> tails = Arrays.asList( pipes );
146    
147        List<AssemblyPlanner> assemblyPlanners = flowDef.getAssemblyPlanners();
148    
149        for( AssemblyPlanner assemblyPlanner : assemblyPlanners )
150          {
151          tails = assemblyPlanner.resolveTails( new AssemblyPlannerContext( flowDef, flow, tails ) );
152    
153          if( tails.isEmpty() )
154            throw new PlannerException( "assembly planner: " + assemblyPlanner + ", returned zero tails" );
155    
156          tails = Collections.unmodifiableList( tails );
157          }
158    
159        return tails.toArray( new Pipe[ tails.size() ] );
160        }
161    
162      protected void verifyAssembly( FlowDef flowDef, Pipe[] tails )
163        {
164        verifyPipeAssemblyEndPoints( flowDef, tails );
165        verifyTraps( flowDef, tails );
166        verifyCheckpoints( flowDef, tails );
167        }
168    
169      protected void verifyAllTaps( FlowDef flowDef )
170        {
171        verifySourceNotSinks( flowDef.getSources(), flowDef.getSinks() );
172    
173        verifyTaps( flowDef.getSources(), true, true );
174        verifyTaps( flowDef.getSinks(), false, true );
175        verifyTaps( flowDef.getTraps(), false, false );
176    
177        // are both sources and sinks
178        verifyTaps( flowDef.getCheckpoints(), true, false );
179        verifyTaps( flowDef.getCheckpoints(), false, false );
180        }
181    
182      protected ElementGraph createElementGraph( FlowDef flowDef, Pipe[] flowTails )
183        {
184        Map<String, Tap> sources = flowDef.getSourcesCopy();
185        Map<String, Tap> sinks = flowDef.getSinksCopy();
186        Map<String, Tap> traps = flowDef.getTrapsCopy();
187        Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy();
188    
189        AssertionLevel assertionLevel = flowDef.getAssertionLevel() == null ? this.assertionLevel : flowDef.getAssertionLevel();
190        DebugLevel debugLevel = flowDef.getDebugLevel() == null ? this.debugLevel : flowDef.getDebugLevel();
191    
192        checkpointRootPath = makeCheckpointRootPath( flowDef );
193    
194        return new ElementGraph( getPlatformInfo(), flowTails, sources, sinks, traps, checkpoints, checkpointRootPath != null, assertionLevel, debugLevel );
195        }
196    
197      private String makeCheckpointRootPath( FlowDef flowDef )
198        {
199        String flowName = flowDef.getName();
200        String runID = flowDef.getRunID();
201    
202        if( runID == null )
203          return null;
204    
205        if( flowName == null )
206          throw new PlannerException( "flow name is required when providing a run id" );
207    
208        return flowName + "/" + runID;
209        }
210    
211    
212      protected void verifySourceNotSinks( Map<String, Tap> sources, Map<String, Tap> sinks )
213        {
214        Collection<Tap> sourcesSet = sources.values();
215    
216        for( Tap tap : sinks.values() )
217          {
218          if( sourcesSet.contains( tap ) )
219            throw new PlannerException( "tap may not be used as both source and sink in the same Flow: " + tap );
220          }
221        }
222    
223      /**
224       * Method verifyTaps ...
225       *
226       * @param taps          of type Map<String, Tap>
227       * @param areSources    of type boolean
228       * @param mayNotBeEmpty of type boolean
229       */
230      protected void verifyTaps( Map<String, Tap> taps, boolean areSources, boolean mayNotBeEmpty )
231        {
232        if( mayNotBeEmpty && taps.isEmpty() )
233          throw new PlannerException( ( areSources ? "source" : "sink" ) + " taps are required" );
234    
235        for( String tapName : taps.keySet() )
236          {
237          if( areSources && !taps.get( tapName ).isSource() )
238            throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a source: " + taps.get( tapName ) );
239          else if( !areSources && !taps.get( tapName ).isSink() )
240            throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a sink: " + taps.get( tapName ) );
241          }
242        }
243    
244      /**
245       * Method verifyEndPoints verifies
246       * <p/>
247       * there aren't dupe names in heads or tails.
248       * all the sink and source tap names match up with tail and head pipes
249       */
250      // todo: force dupe names to throw exceptions
251      protected void verifyPipeAssemblyEndPoints( FlowDef flowDef, Pipe[] flowTails )
252        {
253        Set<String> tapNames = new HashSet<String>();
254    
255        tapNames.addAll( flowDef.getSources().keySet() );
256        tapNames.addAll( flowDef.getSinks().keySet() );
257    
258        // handle tails
259        Set<Pipe> tails = new HashSet<Pipe>();
260        Set<String> tailNames = new HashSet<String>();
261    
262        for( Pipe pipe : flowTails )
263          {
264          if( pipe instanceof SubAssembly )
265            {
266            for( Pipe tail : ( (SubAssembly) pipe ).getTails() )
267              {
268              String tailName = tail.getName();
269    
270              if( !tapNames.contains( tailName ) )
271                throw new PlannerException( tail, "pipe name not found in either sink or source map: '" + tailName + "'" );
272    
273              if( tailNames.contains( tailName ) && !tails.contains( tail ) )
274                LOG.warn( "duplicate tail name found: '{}'", tailName );
275    //            throw new PlannerException( pipe, "duplicate tail name found: " + tailName );
276    
277              tailNames.add( tailName );
278              tails.add( tail );
279              }
280            }
281          else
282            {
283            String tailName = pipe.getName();
284    
285            if( !tapNames.contains( tailName ) )
286              throw new PlannerException( pipe, "pipe name not found in either sink or source map: '" + tailName + "'" );
287    
288            if( tailNames.contains( tailName ) && !tails.contains( pipe ) )
289              LOG.warn( "duplicate tail name found: '{}'", tailName );
290    //            throw new PlannerException( pipe, "duplicate tail name found: " + tailName );
291    
292            tailNames.add( tailName );
293            tails.add( pipe );
294            }
295          }
296    
297    //    Set<String> allTailNames = new HashSet<String>( tailNames );
298        tailNames.removeAll( flowDef.getSinks().keySet() );
299        Set<String> remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() );
300        remainingSinks.removeAll( tailNames );
301    
302        if( tailNames.size() != 0 )
303          throw new PlannerException( "not all tail pipes bound to sink taps, remaining tail pipe names: [" + Util.join( Util.quote( tailNames, "'" ), ", " ) + "], remaining sink tap names: [" + Util.join( Util.quote( remainingSinks, "'" ), ", " ) + "]" );
304    
305        // unlike heads, pipes can input to another pipe and simultaneously be a sink
306        // so there is no way to know all the intentional tails, so they aren't listed below in the exception
307        remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() );
308        remainingSinks.removeAll( asList( Pipe.names( flowTails ) ) );
309    
310        if( remainingSinks.size() != 0 )
311          throw new PlannerException( "not all sink taps bound to tail pipes, remaining sink tap names: [" + Util.join( Util.quote( remainingSinks, "'" ), ", " ) + "]" );
312    
313        // handle heads
314        Set<Pipe> heads = new HashSet<Pipe>();
315        Set<String> headNames = new HashSet<String>();
316    
317        for( Pipe pipe : flowTails )
318          {
319          for( Pipe head : pipe.getHeads() )
320            {
321            String headName = head.getName();
322    
323            if( !tapNames.contains( headName ) )
324              throw new PlannerException( head, "pipe name not found in either sink or source map: '" + headName + "'" );
325    
326            if( headNames.contains( headName ) && !heads.contains( head ) )
327              LOG.warn( "duplicate head name found, not an error but heads should have unique names: '{}'", headName );
328    //          throw new PlannerException( pipe, "duplicate head name found: " + headName );
329    
330            headNames.add( headName );
331            heads.add( head );
332            }
333          }
334    
335        Set<String> allHeadNames = new HashSet<String>( headNames );
336        headNames.removeAll( flowDef.getSources().keySet() );
337        Set<String> remainingSources = new HashSet<String>( flowDef.getSources().keySet() );
338        remainingSources.removeAll( headNames );
339    
340        if( headNames.size() != 0 )
341          throw new PlannerException( "not all head pipes bound to source taps, remaining head pipe names: [" + Util.join( Util.quote( headNames, "'" ), ", " ) + "], remaining source tap names: [" + Util.join( Util.quote( remainingSources, "'" ), ", " ) + "]" );
342    
343        remainingSources = new HashSet<String>( flowDef.getSources().keySet() );
344        remainingSources.removeAll( allHeadNames );
345    
346        if( remainingSources.size() != 0 )
347          throw new PlannerException( "not all source taps bound to head pipes, remaining source tap names: [" + Util.join( Util.quote( remainingSources, "'" ), ", " ) + "], remaining head pipe names: [" + Util.join( Util.quote( headNames, "'" ), ", " ) + "]" );
348    
349        }
350    
351      protected void verifyTraps( FlowDef flowDef, Pipe[] flowTails )
352        {
353        verifyNotSourcesSinks( flowDef.getTraps(), flowDef.getSources(), flowDef.getSinks(), "trap" );
354    
355        Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) );
356    
357        for( String name : flowDef.getTraps().keySet() )
358          {
359          if( !names.contains( name ) )
360            throw new PlannerException( "trap name not found in assembly: '" + name + "'" );
361          }
362        }
363    
364      protected void verifyCheckpoints( FlowDef flowDef, Pipe[] flowTails )
365        {
366        verifyNotSourcesSinks( flowDef.getCheckpoints(), flowDef.getSources(), flowDef.getSinks(), "checkpoint" );
367    
368        for( Tap checkpointTap : flowDef.getCheckpoints().values() )
369          {
370          Scheme scheme = checkpointTap.getScheme();
371    
372          if( scheme.getSourceFields().equals( Fields.UNKNOWN ) && scheme.getSinkFields().equals( Fields.ALL ) )
373            continue;
374    
375          throw new PlannerException( "checkpoint tap scheme must be undeclared, source fields must be UNKNOWN, and sink fields ALL, got: " + scheme.toString() );
376          }
377    
378        Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) );
379    
380        for( String name : flowDef.getCheckpoints().keySet() )
381          {
382          if( !names.contains( name ) )
383            throw new PlannerException( "checkpoint name not found in assembly: '" + name + "'" );
384    
385          Set<Pipe> pipes = new HashSet<Pipe>( asList( Pipe.named( name, flowTails ) ) );
386    
387          int count = 0;
388    
389          for( Pipe pipe : pipes )
390            {
391            if( pipe instanceof Checkpoint )
392              count++;
393            }
394    
395          if( count == 0 )
396            throw new PlannerException( "no checkpoint with name found in assembly: '" + name + "'" );
397    
398          if( count > 1 )
399            throw new PlannerException( "more than one checkpoint with name found in assembly: '" + name + "'" );
400          }
401        }
402    
403      private void verifyNotSourcesSinks( Map<String, Tap> taps, Map<String, Tap> sources, Map<String, Tap> sinks, String role )
404        {
405        Collection<Tap> sourceTaps = sources.values();
406        Collection<Tap> sinkTaps = sinks.values();
407    
408        for( Tap tap : taps.values() )
409          {
410          if( sourceTaps.contains( tap ) )
411            throw new PlannerException( "tap may not be used as both a " + role + " and a source in the same Flow: " + tap );
412    
413          if( sinkTaps.contains( tap ) )
414            throw new PlannerException( "tap may not be used as both a " + role + " and a sink in the same Flow: " + tap );
415          }
416        }
417    
418      /**
419       * Verifies that there are not only GroupAssertions following any given Group instance. This will adversely
420       * affect the stream entering any subsequent Tap of Each instances.
421       */
422      protected void failOnLoneGroupAssertion( ElementGraph elementGraph )
423        {
424        List<Group> groups = elementGraph.findAllGroups();
425    
426        // walk Every instances after Group
427        for( Group group : groups )
428          {
429          for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsFrom( group ) )
430            {
431            List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is tail
432    
433            int everies = 0;
434            int assertions = 0;
435    
436            for( FlowElement flowElement : flowElements )
437              {
438              if( flowElement instanceof Group )
439                continue;
440    
441              if( !( flowElement instanceof Every ) )
442                break;
443    
444              everies++;
445    
446              Every every = (Every) flowElement;
447    
448              if( every.getPlannerLevel() != null )
449                assertions++;
450              }
451    
452            if( everies != 0 && everies == assertions )
453              throw new PlannerException( "group assertions must be accompanied by aggregator operations" );
454            }
455          }
456        }
457    
458      protected void failOnMissingGroup( ElementGraph elementGraph )
459        {
460        List<Every> everies = elementGraph.findAllEveries();
461    
462        // walk Every instances after Group
463        for( Every every : everies )
464          {
465          for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( every ) )
466            {
467            List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is every
468            Collections.reverse( flowElements ); // first element is every
469    
470            for( FlowElement flowElement : flowElements )
471              {
472              if( flowElement instanceof Every || flowElement.getClass() == Pipe.class )
473                continue;
474    
475              if( flowElement instanceof GroupBy || flowElement instanceof CoGroup )
476                break;
477    
478              throw new PlannerException( (Pipe) flowElement, "Every may only be preceded by another Every or a Group pipe, found: " + flowElement );
479              }
480            }
481          }
482        }
483    
484      protected void failOnMisusedBuffer( ElementGraph elementGraph )
485        {
486        List<Every> everies = elementGraph.findAllEveries();
487    
488        // walk Every instances after Group
489        for( Every every : everies )
490          {
491          for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( every ) )
492            {
493            List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is every
494            Collections.reverse( flowElements ); // first element is every
495    
496            Every last = null;
497            boolean foundBuffer = false;
498            int foundEveries = -1;
499    
500            for( FlowElement flowElement : flowElements )
501              {
502              if( flowElement instanceof Each )
503                throw new PlannerException( (Pipe) flowElement, "Every may only be preceded by another Every or a GroupBy or CoGroup pipe, found: " + flowElement );
504    
505              if( flowElement instanceof Every )
506                {
507                foundEveries++;
508    
509                boolean isBuffer = ( (Every) flowElement ).isBuffer();
510    
511                if( foundEveries != 0 && ( isBuffer || foundBuffer ) )
512                  throw new PlannerException( (Pipe) flowElement, "Only one Every with a Buffer may follow a GroupBy or CoGroup pipe, no other Every instances are allowed immediately before or after, found: " + flowElement + " before: " + last );
513    
514                if( !foundBuffer )
515                  foundBuffer = isBuffer;
516    
517                last = (Every) flowElement;
518                }
519    
520              if( flowElement instanceof Group )
521                break;
522              }
523            }
524          }
525        }
526    
527      protected void failOnGroupEverySplit( ElementGraph elementGraph )
528        {
529        List<Group> groups = new ArrayList<Group>();
530    
531        elementGraph.findAllOfType( 1, 2, Group.class, groups );
532    
533        for( Group group : groups )
534          {
535          Set<FlowElement> children = elementGraph.getAllChildrenNotExactlyType( group, Pipe.class );
536    
537          for( FlowElement flowElement : children )
538            {
539            if( flowElement instanceof Every )
540              throw new PlannerException( (Every) flowElement, "Every instances may not split after a GroupBy or CoGroup pipe, found: " + flowElement + " after: " + group );
541            }
542          }
543        }
544    
545      protected PlannerException handleExceptionDuringPlanning( Exception exception, ElementGraph elementGraph )
546        {
547        if( exception instanceof PlannerException )
548          {
549          ( (PlannerException) exception ).elementGraph = elementGraph;
550    
551          return (PlannerException) exception;
552          }
553        else if( exception instanceof ElementGraphException )
554          {
555          Throwable cause = exception.getCause();
556    
557          if( cause == null )
558            cause = exception;
559    
560          // captures pipegraph for debugging
561          // forward message in case cause or trace is lost
562          String message = String.format( "could not build flow from assembly: [%s]", cause.getMessage() );
563    
564          if( cause instanceof OperatorException )
565            return new PlannerException( message, cause, elementGraph );
566    
567          if( cause instanceof TapException )
568            return new PlannerException( message, cause, elementGraph );
569    
570          return new PlannerException( ( (ElementGraphException) exception ).getPipe(), message, cause, elementGraph );
571          }
572        else
573          {
574          // captures pipegraph for debugging
575          // forward message in case cause or trace is lost
576          String message = String.format( "could not build flow from assembly: [%s]", exception.getMessage() );
577          return new PlannerException( message, exception, elementGraph );
578          }
579        }
580    
581      protected void handleNonSafeOperations( ElementGraph elementGraph )
582        {
583        // if there was a graph change, iterate paths again.
584        while( !internalNonSafeOperations( elementGraph ) )
585          ;
586        }
587    
588      private boolean internalNonSafeOperations( ElementGraph elementGraph )
589        {
590        Set<Pipe> tapInsertions = new HashSet<Pipe>();
591    
592        List<Pipe> splits = elementGraph.findAllPipeSplits();
593    
594        // if any predecessor is unsafe, insert temp
595        for( Pipe split : splits )
596          {
597          List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsTo( split );
598    
599          for( GraphPath<FlowElement, Scope> path : paths )
600            {
601            List<FlowElement> elements = Graphs.getPathVertexList( path );
602            Collections.reverse( elements );
603    
604            for( FlowElement element : elements )
605              {
606              if( !( element instanceof Each ) && element.getClass() != Pipe.class )
607                break;
608    
609              if( element.getClass() == Pipe.class )
610                continue;
611    
612              if( !( (Each) element ).getOperation().isSafe() )
613                {
614                tapInsertions.add( split );
615                break;
616                }
617              }
618            }
619          }
620    
621        for( Pipe pipe : tapInsertions )
622          insertTempTapAfter( elementGraph, pipe );
623    
624        return tapInsertions.isEmpty();
625        }
626    
627      /**
628       * Method insertTapAfter ...
629       *
630       * @param graph of type PipeGraph
631       * @param pipe  of type Pipe
632       */
633      protected void insertTempTapAfter( ElementGraph graph, Pipe pipe )
634        {
635        LOG.debug( "inserting tap after: {}", pipe );
636    
637        Tap checkpointTap = graph.getCheckpointsMap().get( pipe.getName() );
638    
639        if( checkpointTap != null )
640          LOG.info( "found checkpoint: {}, using tap: {}", pipe.getName(), checkpointTap );
641    
642        if( checkpointTap == null )
643          {
644          // only restart from a checkpoint pipe or checkpoint tap below
645          if( pipe instanceof Checkpoint )
646            {
647            checkpointTap = makeTempTap( checkpointRootPath, pipe.getName() );
648            // mark as an anonymous checkpoint
649            checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" );
650            }
651          else
652            {
653            checkpointTap = makeTempTap( pipe.getName() );
654            }
655          }
656    
657        graph.insertFlowElementAfter( pipe, checkpointTap );
658        }
659    
660      protected Tap makeTempTap( String name )
661        {
662        return makeTempTap( null, name );
663        }
664    
665      protected abstract Tap makeTempTap( String prefix, String name );
666    
667      /**
668       * Inserts a temporary Tap between logical MR jobs.
669       * <p/>
670       * Since all joins are at groups or splices, depth first search is safe
671       * <p/>
672       * todo: refactor so that rules are applied to path segments bounded by taps
673       * todo: this would allow balancing of operations within paths instead of pushing
674       * todo: all operations up. may allow for consolidation of rules
675       *
676       * @param elementGraph of type PipeGraph
677       */
678      protected void handleJobPartitioning( ElementGraph elementGraph )
679        {
680        // if there was a graph change, iterate paths again. prevents many temp taps from being inserted in front of a group
681        while( !internalJobPartitioning( elementGraph ) )
682          ;
683        }
684    
685      private boolean internalJobPartitioning( ElementGraph elementGraph )
686        {
687        for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsBetweenExtents() )
688          {
689          List<FlowElement> flowElements = Graphs.getPathVertexList( path );
690          List<Pipe> tapInsertions = new ArrayList<Pipe>();
691    
692          boolean foundGroup = false;
693    
694          for( int i = 0; i < flowElements.size(); i++ )
695            {
696            FlowElement flowElement = flowElements.get( i );
697    
698            if( flowElement instanceof ElementGraph.Extent ) // is an extent: head or tail
699              continue;
700            else if( flowElement instanceof Tap && flowElements.get( i - 1 ) instanceof ElementGraph.Extent )  // is a source tap
701              continue;
702    
703            if( flowElement instanceof Group && !foundGroup )
704              {
705              foundGroup = true;
706              }
707            else if( flowElement instanceof Splice && foundGroup ) // add tap between groups, push joins/merge map side
708              {
709              tapInsertions.add( (Pipe) flowElements.get( i - 1 ) );
710    
711              if( !( flowElement instanceof Group ) )
712                foundGroup = false;
713              }
714            else if( flowElement instanceof Checkpoint ) // add tap after checkpoint
715              {
716              if( flowElements.get( i + 1 ) instanceof Tap ) // don't keep inserting
717                continue;
718    
719              tapInsertions.add( (Pipe) flowElement );
720              foundGroup = false;
721              }
722            else if( flowElement instanceof Tap )
723              {
724              foundGroup = false;
725              }
726            }
727    
728          for( Pipe pipe : tapInsertions )
729            insertTempTapAfter( elementGraph, pipe );
730    
731          if( !tapInsertions.isEmpty() )
732            return false;
733          }
734    
735        return true;
736        }
737    
738      /**
739       * Prevent leftmost sources from sourcing a downstream join on the rightmost side intra-task by inserting a
740       * temp tap between the left-sourced join and right-sourced join.
741       *
742       * @param elementGraph
743       */
744      protected void handleJoins( ElementGraph elementGraph )
745        {
746        while( !internalJoins( elementGraph ) )
747          ;
748        }
749    
750      private boolean internalJoins( ElementGraph elementGraph )
751        {
752        List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsBetweenExtents();
753    
754        // large to small
755        Collections.reverse( paths );
756    
757        for( GraphPath<FlowElement, Scope> path : paths )
758          {
759          List<FlowElement> flowElements = Graphs.getPathVertexList( path );
760          List<Pipe> tapInsertions = new ArrayList<Pipe>();
761          List<HashJoin> joins = new ArrayList<HashJoin>();
762          List<Merge> merges = new ArrayList<Merge>();
763    
764          FlowElement lastSourceElement = null;
765    
766          for( int i = 0; i < flowElements.size(); i++ )
767            {
768            FlowElement flowElement = flowElements.get( i );
769    
770            if( flowElement instanceof Merge )
771              {
772              merges.add( (Merge) flowElement );
773              }
774            else if( flowElement instanceof HashJoin )
775              {
776              HashJoin join = (HashJoin) flowElement;
777    
778              Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, join, true );
779    
780              // is this path streamed
781              int pathPosition = pathPositionInto( path, join );
782              boolean thisPathIsStreamed = pathPosition == 0;
783    
784              boolean isAccumulatedAndStreamed = isBothAccumulatedAndStreamedPath( pathCounts ); // has streamed and accumulated paths
785              int pathCount = countPaths( pathCounts );
786    
787              int priorJoins = countTypesBetween( elementGraph, lastSourceElement, join, HashJoin.class );
788    
789              if( priorJoins == 0 )
790                {
791                // if same source is leading into the hashjoin, insert tap on the accumulated side
792                if( pathCount == 2 && isAccumulatedAndStreamed && !thisPathIsStreamed )
793                  {
794                  tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
795                  break;
796                  }
797    
798                // if more than one path into streamed and accumulated branches, insert tap on streamed side
799                if( pathCount > 2 && isAccumulatedAndStreamed && thisPathIsStreamed )
800                  {
801                  tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
802                  break;
803                  }
804                }
805    
806              if( !merges.isEmpty() )
807                {
808                // if a Merge is prior to a HashJoin, and its an accumulated path, force Merge results to disk
809                int joinPos = flowElements.indexOf( join );
810                int mergePos = nearest( flowElements, joinPos, merges );
811    
812                if( mergePos != -1 && joinPos > mergePos )
813                  {
814                  // if all paths are accumulated and streamed, insert
815                  // else if just if this path is accumulated
816                  if( ( isAccumulatedAndStreamed && thisPathIsStreamed ) || !thisPathIsStreamed )
817                    {
818                    tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
819                    break;
820                    }
821                  }
822                }
823    
824              joins.add( (HashJoin) flowElement );
825              }
826            else if( flowElement instanceof Tap || flowElement instanceof Group )
827              {
828              // added for JoinFieldedPipesPlatformTest.testJoinMergeGroupBy where Merge hides streamed nature of path
829              if( flowElement instanceof Group && !joins.isEmpty() )
830                {
831                List<Splice> splices = new ArrayList<Splice>();
832    
833                splices.addAll( merges );
834                splices.add( (Splice) flowElement );
835    
836                Collections.reverse( splices );
837    
838                for( Splice splice : splices )
839                  {
840                  Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, splice, true );
841    
842                  if( isBothAccumulatedAndStreamedPath( pathCounts ) )
843                    {
844                    tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( splice ) - 1 ) );
845                    break;
846                    }
847                  }
848    
849                if( !tapInsertions.isEmpty() )
850                  break;
851                }
852    
853              for( int j = 0; j < joins.size(); j++ )
854                {
855                HashJoin join = joins.get( j );
856    
857                int pathPosition = pathPositionInto( path, join );
858                boolean thisPathIsStreamed = pathPosition == 0;
859    
860                Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, join, true );
861    
862                boolean isAccumulatedAndStreamed = isBothAccumulatedAndStreamedPath( pathCounts ); // has streamed and accumulated paths
863                int pathCount = countPaths( pathCounts );
864    
865                if( pathCount >= 2 && isAccumulatedAndStreamed && thisPathIsStreamed )
866                  {
867                  tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
868                  break;
869                  }
870    
871                if( thisPathIsStreamed )
872                  continue;
873    
874                if( j == 0 ) // is accumulated on first join
875                  break;
876    
877                // prevent a streamed path from being accumulated by injecting a tap before the
878                // current HashJoin
879                tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
880                break;
881                }
882    
883              if( !tapInsertions.isEmpty() )
884                break;
885    
886              lastSourceElement = flowElement;
887              merges.clear();
888              joins.clear();
889              }
890            }
891    
892          for( Pipe pipe : tapInsertions )
893            insertTempTapAfter( elementGraph, pipe );
894    
895          if( !tapInsertions.isEmpty() )
896            return false;
897          }
898    
899        return true;
900        }
901    
902      private int nearest( List<FlowElement> flowElements, int index, List<Merge> merges )
903        {
904        List<Merge> reversed = new ArrayList<Merge>( merges );
905        Collections.reverse( reversed );
906    
907        for( Merge merge : reversed )
908          {
909          int pos = flowElements.indexOf( merge );
910          if( pos < index )
911            return pos;
912          }
913    
914        return -1;
915        }
916      }