001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.util.HashMap;
026    import java.util.Map;
027    import java.util.Properties;
028    
029    import cascading.CascadingException;
030    import cascading.flow.FlowStep;
031    import cascading.flow.hadoop.planner.HadoopStepGraph;
032    import cascading.flow.hadoop.util.HadoopUtil;
033    import cascading.flow.planner.FlowStepGraph;
034    import cascading.scheme.NullScheme;
035    import cascading.tap.Tap;
036    import cascading.tap.hadoop.Hfs;
037    import org.apache.hadoop.fs.Path;
038    import org.apache.hadoop.mapred.FileInputFormat;
039    import org.apache.hadoop.mapred.FileOutputFormat;
040    import org.apache.hadoop.mapred.JobConf;
041    import org.apache.hadoop.mapreduce.Job;
042    
043    /**
044     * Class MapReduceFlow is a {@link cascading.flow.hadoop.HadoopFlow} subclass that supports custom MapReduce jobs
045     * pre-configured via the {@link JobConf} object.
046     * <p/>
047     * Use this class to allow custom MapReduce jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If
048     * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled
049     * according to their dependencies (topologically).
050     * <p/>
051     * Set the parameter {@code deleteSinkOnInit} to {@code true} if the outputPath in the jobConf should be deleted before executing the MapReduce job.
052     * <p/>
053     * MapReduceFlow assumes the underlying input and output paths are compatible with the {@link Hfs} Tap.
054     * <p/>
055     * If the configured JobConf instance uses some other identifier instead of Hadoop FS paths, you should override the
056     * {@link #createSources(org.apache.hadoop.mapred.JobConf)}, {@link #createSinks(org.apache.hadoop.mapred.JobConf)}, and
057     * {@link #createTraps(org.apache.hadoop.mapred.JobConf)} methods to properly resolve the configured paths into
058     * usable {@link Tap} instances. By default createTraps returns an empty collection and should probably be left alone.
059     * <p/>
060     * MapReduceFlow supports both org.apache.hadoop.mapred.* and org.apache.hadoop.mapreduce.* API Jobs.
061     */
062    public class MapReduceFlow extends HadoopFlow
063      {
064      /** Field deleteSinkOnInit */
065      protected boolean deleteSinkOnInit = false;
066    
067      /**
068       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
069       *
070       * @param jobConf of type JobConf
071       */
072      @ConstructorProperties( {"jobConf"} )
073      public MapReduceFlow( JobConf jobConf )
074        {
075        this( jobConf.getJobName(), jobConf, false );
076        }
077    
078      /**
079       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
080       *
081       * @param jobConf          of type JobConf
082       * @param deleteSinkOnInit of type boolean
083       */
084      @ConstructorProperties( {"jobConf", "deleteSinkOnInit"} )
085      public MapReduceFlow( JobConf jobConf, boolean deleteSinkOnInit )
086        {
087        this( jobConf.getJobName(), jobConf, deleteSinkOnInit );
088        }
089    
090      /**
091       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
092       *
093       * @param name    of type String
094       * @param jobConf of type JobConf
095       */
096      @ConstructorProperties( {"name", "jobConf"} )
097      public MapReduceFlow( String name, JobConf jobConf )
098        {
099        this( name, jobConf, false );
100        }
101    
102      /**
103       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
104       *
105       * @param name             of type String
106       * @param jobConf          of type JobConf
107       * @param deleteSinkOnInit of type boolean
108       */
109      @ConstructorProperties( {"name", "jobConf", "deleteSinkOnInit"} )
110      public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit )
111        {
112        this( name, jobConf, deleteSinkOnInit, true );
113        }
114    
115      /**
116       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
117       *
118       * @param name             of type String
119       * @param jobConf          of type JobConf
120       * @param deleteSinkOnInit of type boolean
121       * @param stopJobsOnExit   of type boolean
122       */
123      @ConstructorProperties( {"name", "jobConf", "deleteSinkOnInit", "stopJobsOnExit"} )
124      public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit, boolean stopJobsOnExit )
125        {
126        super( HadoopUtil.getPlatformInfo(), new Properties(), jobConf, name, null );
127        this.deleteSinkOnInit = deleteSinkOnInit;
128        this.stopJobsOnExit = stopJobsOnExit;
129    
130        setSources( createSources( jobConf ) );
131        setSinks( createSinks( jobConf ) );
132        setTraps( createTraps( jobConf ) );
133        setFlowStepGraph( makeStepGraph( jobConf ) );
134        initSteps();
135    
136        initializeNewJobsMap();
137        }
138    
139      private FlowStepGraph makeStepGraph( JobConf jobConf )
140        {
141        FlowStepGraph<JobConf> flowStepGraph = new HadoopStepGraph();
142    
143        Tap sink = getSinksCollection().iterator().next();
144        FlowStep<JobConf> step = new MapReduceFlowStep( getName(), sink.toString(), jobConf, sink );
145    
146        flowStepGraph.addVertex( step );
147    
148        return flowStepGraph;
149        }
150    
151      protected Map<String, Tap> createSources( JobConf jobConf )
152        {
153        Path[] paths = FileInputFormat.getInputPaths( jobConf );
154    
155        if( paths.length == 0 )
156          {
157          try
158            {
159            paths = org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getInputPaths( new Job( jobConf ) );
160            }
161          catch( IOException exception )
162            {
163            throw new CascadingException( exception );
164            }
165          }
166    
167        Map<String, Tap> taps = new HashMap<String, Tap>();
168    
169        for( Path path : paths )
170          taps.put( path.toString(), new Hfs( new NullScheme(), path.toString() ) );
171    
172        return taps;
173        }
174    
175      protected Map<String, Tap> createSinks( JobConf jobConf )
176        {
177        Map<String, Tap> taps = new HashMap<String, Tap>();
178    
179        Path path = FileOutputFormat.getOutputPath( jobConf );
180    
181        if( path == null )
182          {
183          try
184            {
185            path = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath( new Job( jobConf ) );
186            }
187          catch( IOException exception )
188            {
189            throw new CascadingException( exception );
190            }
191          }
192    
193        taps.put( path.toString(), new Hfs( new NullScheme(), path.toString(), deleteSinkOnInit ) );
194    
195        return taps;
196        }
197    
198      protected Map<String, Tap> createTraps( JobConf jobConf )
199        {
200        return new HashMap<String, Tap>();
201        }
202      }