001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop.planner;
022    
023    import java.net.URI;
024    import java.util.Collections;
025    import java.util.Comparator;
026    import java.util.HashMap;
027    import java.util.HashSet;
028    import java.util.Iterator;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.Properties;
032    import java.util.Set;
033    import java.util.TreeSet;
034    
035    import cascading.flow.FlowConnector;
036    import cascading.flow.FlowDef;
037    import cascading.flow.FlowElement;
038    import cascading.flow.hadoop.HadoopFlow;
039    import cascading.flow.hadoop.util.HadoopUtil;
040    import cascading.flow.planner.ElementGraph;
041    import cascading.flow.planner.ElementGraphs;
042    import cascading.flow.planner.FlowPlanner;
043    import cascading.flow.planner.FlowStepGraph;
044    import cascading.flow.planner.PlatformInfo;
045    import cascading.flow.planner.Scope;
046    import cascading.pipe.CoGroup;
047    import cascading.pipe.Every;
048    import cascading.pipe.Group;
049    import cascading.pipe.Pipe;
050    import cascading.property.AppProps;
051    import cascading.property.PropertyUtil;
052    import cascading.tap.Tap;
053    import cascading.tap.hadoop.Hfs;
054    import cascading.tap.hadoop.util.TempHfs;
055    import cascading.util.Util;
056    import org.apache.hadoop.mapred.JobConf;
057    import org.jgrapht.GraphPath;
058    import org.jgrapht.Graphs;
059    import org.slf4j.Logger;
060    import org.slf4j.LoggerFactory;
061    
062    import static cascading.flow.planner.ElementGraphs.getAllShortestPathsBetween;
063    
064    /**
065     * Class HadoopPlanner is the core Hadoop MapReduce planner.
066     * <p/>
067     * Notes:
068     * <p/>
069     * <strong>Custom JobConf properties</strong><br/>
070     * A custom JobConf instance can be passed to this planner by calling {@link #copyJobConf(java.util.Map, org.apache.hadoop.mapred.JobConf)}
071     * on a map properties object before constructing a new {@link cascading.flow.hadoop.HadoopFlowConnector}.
072     * <p/>
073     * A better practice would be to set Hadoop properties directly on the map properties object handed to the FlowConnector.
074     * All values in the map will be passed to a new default JobConf instance to be used as defaults for all resulting
075     * Flow instances.
076     * <p/>
077     * For example, {@code properties.set("mapred.child.java.opts","-Xmx512m");} would convince Hadoop
078     * to spawn all child jvms with a heap of 512MB.
079     */
080    public class HadoopPlanner extends FlowPlanner<HadoopFlow, JobConf>
081      {
082      /** Field LOG */
083      private static final Logger LOG = LoggerFactory.getLogger( HadoopPlanner.class );
084    
085      /** Field jobConf */
086      private JobConf jobConf;
087      /** Field intermediateSchemeClass */
088      private Class intermediateSchemeClass;
089    
090      /**
091       * Method copyJobConf adds the given JobConf values to the given properties object. Use this method to pass
092       * custom default Hadoop JobConf properties to Hadoop.
093       *
094       * @param properties of type Map
095       * @param jobConf    of type JobConf
096       */
097      public static void copyJobConf( Map<Object, Object> properties, JobConf jobConf )
098        {
099        for( Map.Entry<String, String> entry : jobConf )
100          properties.put( entry.getKey(), entry.getValue() );
101        }
102    
103      /**
104       * Method createJobConf returns a new JobConf instance using the values in the given properties argument.
105       *
106       * @param properties of type Map
107       * @return a JobConf instance
108       */
109      public static JobConf createJobConf( Map<Object, Object> properties )
110        {
111        JobConf conf = new JobConf();
112    
113        copyProperties( conf, properties );
114    
115        return conf;
116        }
117    
118      /**
119       * Method copyProperties adds the given Map values to the given JobConf object.
120       *
121       * @param jobConf    of type JobConf
122       * @param properties of type Map
123       */
124      public static void copyProperties( JobConf jobConf, Map<Object, Object> properties )
125        {
126        if( properties instanceof Properties )
127          {
128          Properties props = (Properties) properties;
129          Set<String> keys = props.stringPropertyNames();
130    
131          for( String key : keys )
132            jobConf.set( key, props.getProperty( key ) );
133          }
134        else
135          {
136          for( Map.Entry<Object, Object> entry : properties.entrySet() )
137            {
138            if( entry.getValue() != null )
139              jobConf.set( entry.getKey().toString(), entry.getValue().toString() );
140            }
141          }
142        }
143    
144      /**
145       * Method setNormalizeHeterogeneousSources adds the given doNormalize boolean to the given properties object.
146       * Use this method if additional jobs should be planned in to handle incompatible InputFormat classes.
147       * <p/>
148       * Normalization is off by default and should only be enabled by advanced users. Typically this will decrease
149       * application performance.
150       *
151       * @param properties  of type Map
152       * @param doNormalize of type boolean
153       */
154      @Deprecated
155      public static void setNormalizeHeterogeneousSources( Map<Object, Object> properties, boolean doNormalize )
156        {
157        properties.put( "cascading.multimapreduceplanner.normalizesources", Boolean.toString( doNormalize ) );
158        }
159    
160      /**
161       * Method getNormalizeHeterogeneousSources returns if this planner will normalize heterogeneous input sources.
162       *
163       * @param properties of type Map
164       * @return a boolean
165       */
166      @Deprecated
167      public static boolean getNormalizeHeterogeneousSources( Map<Object, Object> properties )
168        {
169        return Boolean.parseBoolean( PropertyUtil.getProperty( properties, "cascading.multimapreduceplanner.normalizesources", "false" ) );
170        }
171    
172      /**
173       * Method setCollapseAdjacentTaps enables/disables an optimization that will identify if a sink tap and an intermediate tap
174       * are equivalent field wise, and discard the intermediate tap for the sink tap to minimize the number of MR jobs.
175       * <p/>
176       * Note that some Scheme types may lose type information if the planner cannot detect field types. This could result
177       * in type mismatch errors during joins.
178       *
179       * @param properties
180       * @param collapseAdjacent
181       */
182      public static void setCollapseAdjacentTaps( Map<Object, Object> properties, boolean collapseAdjacent )
183        {
184        properties.put( "cascading.multimapreduceplanner.collapseadjacentaps", Boolean.toString( collapseAdjacent ) );
185        }
186    
187      public static boolean getCollapseAdjacentTaps( Map<Object, Object> properties )
188        {
189        return Boolean.parseBoolean( PropertyUtil.getProperty( properties, "cascading.multimapreduceplanner.collapseadjacentaps", "true" ) );
190        }
191    
192      @Override
193      public JobConf getConfig()
194        {
195        return jobConf;
196        }
197    
198      @Override
199      public PlatformInfo getPlatformInfo()
200        {
201        return HadoopUtil.getPlatformInfo();
202        }
203    
204      @Override
205      public void initialize( FlowConnector flowConnector, Map<Object, Object> properties )
206        {
207        super.initialize( flowConnector, properties );
208    
209        jobConf = HadoopUtil.createJobConf( properties, createJobConf( properties ) );
210        checkPlatform( jobConf );
211        intermediateSchemeClass = flowConnector.getIntermediateSchemeClass( properties );
212    
213        Class type = AppProps.getApplicationJarClass( properties );
214        if( jobConf.getJar() == null && type != null )
215          jobConf.setJarByClass( type );
216    
217        String path = AppProps.getApplicationJarPath( properties );
218        if( jobConf.getJar() == null && path != null )
219          jobConf.setJar( path );
220    
221        if( jobConf.getJar() == null )
222          jobConf.setJarByClass( HadoopUtil.findMainClass( HadoopPlanner.class ) );
223    
224        AppProps.setApplicationJarPath( properties, jobConf.getJar() );
225    
226        LOG.info( "using application jar: {}", jobConf.getJar() );
227        }
228    
229      protected void checkPlatform( JobConf jobConf )
230        {
231        if( HadoopUtil.isYARN( jobConf ) )
232          LOG.warn( "running YARN based flows on Hadoop 1.x may cause problems, please use the 'cascading-hadoop2-mr1' dependencies" );
233        }
234    
235      @Override
236      protected HadoopFlow createFlow( FlowDef flowDef )
237        {
238        return new HadoopFlow( getPlatformInfo(), getProperties(), getConfig(), flowDef );
239        }
240    
241      @Override
242      public HadoopFlow buildFlow( FlowDef flowDef )
243        {
244        ElementGraph elementGraph = null;
245    
246        try
247          {
248          // generic
249          verifyAllTaps( flowDef );
250    
251          HadoopFlow flow = createFlow( flowDef );
252    
253          Pipe[] tails = resolveTails( flowDef, flow );
254    
255          verifyAssembly( flowDef, tails );
256    
257          elementGraph = createElementGraph( flowDef, tails );
258    
259          // rules
260          failOnLoneGroupAssertion( elementGraph );
261          failOnMissingGroup( elementGraph );
262          failOnMisusedBuffer( elementGraph );
263          failOnGroupEverySplit( elementGraph );
264    
265          // m/r specific
266          handleWarnEquivalentPaths( elementGraph );
267          handleSplit( elementGraph );
268          handleJobPartitioning( elementGraph );
269          handleJoins( elementGraph );
270          handleNonSafeOperations( elementGraph );
271    
272          if( getNormalizeHeterogeneousSources( properties ) )
273            handleHeterogeneousSources( elementGraph );
274    
275          // generic
276          elementGraph.removeUnnecessaryPipes(); // groups must be added before removing pipes
277          elementGraph.resolveFields();
278    
279          elementGraph = flow.updateSchemes( elementGraph );
280    
281          // m/r specific
282          if( getCollapseAdjacentTaps( properties ) )
283            handleAdjacentTaps( elementGraph );
284    
285          FlowStepGraph flowStepGraph = new HadoopStepGraph( flowDef.getName(), elementGraph );
286    
287          flow.initialize( elementGraph, flowStepGraph );
288    
289          return flow;
290          }
291        catch( Exception exception )
292          {
293          throw handleExceptionDuringPlanning( exception, elementGraph );
294          }
295        }
296    
297      private void handleWarnEquivalentPaths( ElementGraph elementGraph )
298        {
299        List<CoGroup> coGroups = elementGraph.findAllCoGroups();
300    
301        for( CoGroup coGroup : coGroups )
302          {
303          List<GraphPath<FlowElement, Scope>> graphPaths = elementGraph.getAllShortestPathsTo( coGroup );
304    
305          List<List<FlowElement>> paths = ElementGraphs.asPathList( graphPaths );
306    
307          if( !areEquivalentPaths( elementGraph, paths ) )
308            continue;
309    
310          LOG.warn( "found equivalent paths from: {} to: {}", paths.get( 0 ).get( 1 ), coGroup );
311    
312          // in order to remove dupe paths, we need to verify there isn't any branching
313          }
314        }
315    
316      private boolean areEquivalentPaths( ElementGraph elementGraph, List<List<FlowElement>> paths )
317        {
318        int length = sameLength( paths );
319    
320        if( length == -1 )
321          return false;
322    
323        Set<FlowElement> elements = new TreeSet<FlowElement>( new EquivalenceComparator( elementGraph ) );
324    
325        for( int i = 0; i < length; i++ )
326          {
327          elements.clear();
328    
329          for( List<FlowElement> path : paths )
330            elements.add( path.get( i ) );
331    
332          if( elements.size() != 1 )
333            return false;
334          }
335    
336        return true;
337        }
338    
339      private class EquivalenceComparator implements Comparator<FlowElement>
340        {
341        private final ElementGraph elementGraph;
342    
343        public EquivalenceComparator( ElementGraph elementGraph )
344          {
345          this.elementGraph = elementGraph;
346          }
347    
348        @Override
349        public int compare( FlowElement lhs, FlowElement rhs )
350          {
351          boolean areEquivalent = lhs.isEquivalentTo( rhs );
352          boolean sameIncoming = elementGraph.inDegreeOf( lhs ) == elementGraph.inDegreeOf( rhs );
353          boolean sameOutgoing = elementGraph.outDegreeOf( lhs ) == elementGraph.outDegreeOf( rhs );
354    
355          if( areEquivalent && sameIncoming && sameOutgoing )
356            return 0;
357    
358          return System.identityHashCode( lhs ) - System.identityHashCode( rhs );
359          }
360        }
361    
362      private int sameLength( List<List<FlowElement>> paths )
363        {
364        int lastSize = paths.get( 0 ).size();
365    
366        for( int i = 1; i < paths.size(); i++ )
367          {
368          if( paths.get( i ).size() != lastSize )
369            return -1;
370          }
371    
372        return lastSize;
373        }
374    
375      /**
376       * optimized for this case
377       * <pre>
378       *         e - t           e1 - e - t
379       * t - e1 -       -- > t -
380       *         e - t           e1 - e - t
381       * </pre>
382       * <p/>
383       * this should run in two map/red jobs, not 3. needs to be a flag on e1 to prevent this
384       * <p/>
385       * <pre>
386       *        g - t                 g - t
387       * g - e -       --> g - e - t -
388       *        g - t                 g - t
389       * </pre>
390       * <p/>
391       * <pre>
392       *             - e - e                            e - e
393       * t - e1 - e2         - g  --> t - e1 - e2 - t -       - g
394       *             - e - e                            e - e
395       * </pre>
396       *
397       * @param elementGraph
398       */
399      private void handleSplit( ElementGraph elementGraph )
400        {
401        // if there was a graph change, iterate paths again.
402        while( !internalSplit( elementGraph ) )
403          ;
404        }
405    
406      private boolean internalSplit( ElementGraph elementGraph )
407        {
408        List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsBetweenExtents();
409    
410        for( GraphPath<FlowElement, Scope> path : paths )
411          {
412          List<FlowElement> flowElements = Graphs.getPathVertexList( path );
413          Set<Pipe> tapInsertions = new HashSet<Pipe>();
414          FlowElement lastInsertable = null;
415    
416          for( int i = 0; i < flowElements.size(); i++ )
417            {
418            FlowElement flowElement = flowElements.get( i );
419    
420            if( flowElement instanceof ElementGraph.Extent ) // is an extent: head or tail
421              continue;
422    
423            // if Tap, Group, or Every - we insert the tap here
424            if( flowElement instanceof Tap || flowElement instanceof Group || flowElement instanceof Every )
425              lastInsertable = flowElement;
426    
427            // support splits on Pipe unless the previous is a Tap
428            if( flowElement.getClass() == Pipe.class && flowElements.get( i - 1 ) instanceof Tap )
429              continue;
430    
431            if( flowElement instanceof Tap )
432              continue;
433    
434            if( elementGraph.outDegreeOf( flowElement ) <= 1 )
435              continue;
436    
437            // we are at the root of a split here
438    
439            // do any split paths converge on a single Group?
440            int maxPaths = elementGraph.getMaxNumPathsBetweenElementAndGroupingMergeJoin( flowElement );
441            if( maxPaths <= 1 && lastInsertable instanceof Tap )
442              continue;
443    
444            tapInsertions.add( (Pipe) flowElement );
445            }
446    
447          for( Pipe pipe : tapInsertions )
448            insertTempTapAfter( elementGraph, pipe );
449    
450          if( !tapInsertions.isEmpty() )
451            return false;
452          }
453    
454        return true;
455        }
456    
457      /**
458       * will collapse adjacent and equivalent taps.
459       * equivalence is based on the tap adjacent taps using the same filesystem
460       * and the sink being symmetrical, and having the same fields as the temp tap.
461       * <p/>
462       * <p/>
463       * must be run after fields are resolved so temp taps have fully defined scheme instances.
464       *
465       * @param elementGraph
466       */
467      private void handleAdjacentTaps( ElementGraph elementGraph )
468        {
469        // if there was a graph change, iterate paths again.
470        while( !internalAdjacentTaps( elementGraph ) )
471          ;
472        }
473    
474      private boolean internalAdjacentTaps( ElementGraph elementGraph )
475        {
476        List<Tap> taps = elementGraph.findAllTaps();
477    
478        for( Tap tap : taps )
479          {
480          if( !( tap.isTemporary() ) )
481            continue;
482    
483          for( FlowElement successor : elementGraph.getAllSuccessors( tap ) )
484            {
485            if( !( successor instanceof Hfs ) )
486              continue;
487    
488            Hfs successorTap = (Hfs) successor;
489    
490            // does this scheme source what it sinks
491            if( !successorTap.getScheme().isSymmetrical() )
492              continue;
493    
494            URI tempURIScheme = getDefaultURIScheme( tap ); // temp uses default fs
495            URI successorURIScheme = getURIScheme( successorTap );
496    
497            if( !tempURIScheme.equals( successorURIScheme ) )
498              continue;
499    
500            // safe, both are symmetrical
501            // should be called after fields are resolved
502            if( !tap.getSourceFields().equals( successorTap.getSourceFields() ) )
503              continue;
504    
505            elementGraph.replaceElementWith( tap, successor );
506    
507            return false;
508            }
509          }
510    
511        return true;
512        }
513    
514      private URI getDefaultURIScheme( Tap tap )
515        {
516        return ( (Hfs) tap ).getDefaultFileSystemURIScheme( jobConf );
517        }
518    
519      private URI getURIScheme( Tap tap )
520        {
521        return ( (Hfs) tap ).getURIScheme( jobConf );
522        }
523    
524      private void handleHeterogeneousSources( ElementGraph elementGraph )
525        {
526        while( !internalHeterogeneousSources( elementGraph ) )
527          ;
528        }
529    
530      private boolean internalHeterogeneousSources( ElementGraph elementGraph )
531        {
532        // find all Groups
533        List<Group> groups = elementGraph.findAllMergeJoinGroups();
534    
535        // compare group sources
536        Map<Group, Set<Tap>> normalizeGroups = new HashMap<Group, Set<Tap>>();
537    
538        for( Group group : groups )
539          {
540          Set<Tap> taps = new HashSet<Tap>();
541    
542          // iterate each shortest path to current group finding each tap sourcing the merge/join
543          for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( group ) )
544            {
545            List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is group
546            Collections.reverse( flowElements ); // first element is group
547    
548            for( FlowElement previousElement : flowElements )
549              {
550              if( previousElement instanceof Tap )
551                {
552                taps.add( (Tap) previousElement );
553                break; // stop finding taps in this path
554                }
555              }
556            }
557    
558          if( taps.size() == 1 )
559            continue;
560    
561          Iterator<Tap> iterator = taps.iterator();
562          Tap commonTap = iterator.next();
563    
564          while( iterator.hasNext() )
565            {
566            Tap tap = iterator.next();
567    
568            // making assumption hadoop can handle multiple filesytems, but not multiple inputformats
569            // in the same job
570            // possibly could test for common input format
571            if( getSchemeClass( tap ) != getSchemeClass( commonTap ) )
572              {
573              normalizeGroups.put( group, taps );
574              break;
575              }
576            }
577          }
578    
579        // if incompatible, insert Tap after its join/merge pipe
580        for( Group group : normalizeGroups.keySet() )
581          {
582          Set<Tap> taps = normalizeGroups.get( group );
583    
584          for( Tap tap : taps )
585            {
586            if( tap.isTemporary() || getSchemeClass( tap ).equals( intermediateSchemeClass ) ) // we normalize to TempHfs
587              continue;
588    
589            // handle case where there is a split on a pipe between the tap and group
590            for( GraphPath<FlowElement, Scope> path : getAllShortestPathsBetween( elementGraph, tap, group ) )
591              {
592              List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // shortest path tap -> group
593              Collections.reverse( flowElements ); // group -> tap
594    
595              FlowElement flowElement = flowElements.get( 1 );
596    
597              if( flowElement instanceof Tap && ( (Tap) flowElement ).isTemporary() )
598                continue;
599    
600              LOG.warn( "inserting step to normalize incompatible sources: {}", tap );
601    
602              insertTempTapAfter( elementGraph, (Pipe) flowElement );
603    
604              return false;
605              }
606            }
607          }
608    
609        return normalizeGroups.isEmpty();
610        }
611    
612      @Override
613      protected Tap makeTempTap( String prefix, String name )
614        {
615        // must give Taps unique names
616        return new TempHfs( jobConf, Util.makePath( prefix, name ), intermediateSchemeClass, prefix == null );
617        }
618    
619      private Class getSchemeClass( Tap tap )
620        {
621        if( tap.isTemporary() )
622          return ( (TempHfs) tap ).getSchemeClass();
623        else
624          return tap.getScheme().getClass();
625        }
626      }