001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.util.HashMap;
026import java.util.Map;
027import java.util.Properties;
028
029import cascading.CascadingException;
030import cascading.flow.FlowStep;
031import cascading.flow.hadoop.util.HadoopUtil;
032import cascading.flow.planner.process.FlowStepGraph;
033import cascading.scheme.NullScheme;
034import cascading.tap.SinkMode;
035import cascading.tap.Tap;
036import cascading.tap.hadoop.Hfs;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.mapred.FileInputFormat;
039import org.apache.hadoop.mapred.FileOutputFormat;
040import org.apache.hadoop.mapred.JobConf;
041import org.apache.hadoop.mapreduce.Job;
042
043/**
044 * Class MapReduceFlow is a {@link cascading.flow.hadoop.HadoopFlow} subclass that supports custom MapReduce jobs
045 * pre-configured via the {@link JobConf} object.
046 * <p/>
047 * Use this class to allow custom MapReduce jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If
048 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled
049 * according to their dependencies (topologically).
050 * <p/>
051 * Set the parameter {@code deleteSinkOnInit} to {@code true} if the outputPath in the jobConf should be deleted before executing the MapReduce job.
052 * <p/>
053 * MapReduceFlow assumes the underlying input and output paths are compatible with the {@link Hfs} Tap.
054 * <p/>
055 * If the configured JobConf instance uses some other identifier instead of Hadoop FS paths, you should override the
056 * {@link #createSources(org.apache.hadoop.mapred.JobConf)}, {@link #createSinks(org.apache.hadoop.mapred.JobConf)}, and
057 * {@link #createTraps(org.apache.hadoop.mapred.JobConf)} methods to properly resolve the configured paths into
058 * usable {@link Tap} instances. By default createTraps returns an empty collection and should probably be left alone.
059 * <p/>
060 * MapReduceFlow supports both org.apache.hadoop.mapred.* and org.apache.hadoop.mapreduce.* API Jobs.
061 */
062public class MapReduceFlow extends HadoopFlow
063  {
064  /** Field deleteSinkOnInit */
065  protected boolean deleteSinkOnInit = false;
066
067  /**
068   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
069   *
070   * @param jobConf of type JobConf
071   */
072  @ConstructorProperties({"jobConf"})
073  public MapReduceFlow( JobConf jobConf )
074    {
075    this( jobConf.getJobName(), jobConf, false );
076    }
077
078  /**
079   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
080   *
081   * @param jobConf          of type JobConf
082   * @param deleteSinkOnInit of type boolean
083   */
084  @ConstructorProperties({"jobConf", "deleteSinkOnInit"})
085  public MapReduceFlow( JobConf jobConf, boolean deleteSinkOnInit )
086    {
087    this( jobConf.getJobName(), jobConf, deleteSinkOnInit );
088    }
089
090  /**
091   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
092   *
093   * @param name    of type String
094   * @param jobConf of type JobConf
095   */
096  @ConstructorProperties({"name", "jobConf"})
097  public MapReduceFlow( String name, JobConf jobConf )
098    {
099    this( name, jobConf, false );
100    }
101
102  /**
103   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
104   *
105   * @param name             of type String
106   * @param jobConf          of type JobConf
107   * @param deleteSinkOnInit of type boolean
108   */
109  @ConstructorProperties({"name", "jobConf", "deleteSinkOnInit"})
110  public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit )
111    {
112    this( new Properties(), name, jobConf, null, deleteSinkOnInit, true );
113    }
114
115  /**
116   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
117   *
118   * @param properties       of type Properties
119   * @param name             of type String
120   * @param jobConf          of type JobConf
121   * @param deleteSinkOnInit of type boolean
122   */
123  @ConstructorProperties({"properties", "name", "jobConf", "deleteSinkOnInit"})
124  public MapReduceFlow( Properties properties, String name, JobConf jobConf, boolean deleteSinkOnInit )
125    {
126    this( properties, name, jobConf, null, deleteSinkOnInit, true );
127    }
128
129  /**
130   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
131   *
132   * @param properties       of type Properties
133   * @param name             of type String
134   * @param jobConf          of type JobConf
135   * @param flowDescriptor   of type Map<String, String>
136   * @param deleteSinkOnInit of type boolean
137   */
138  @ConstructorProperties({"properties", "name", "jobConf", "flowDescriptor", "deleteSinkOnInit"})
139  public MapReduceFlow( Properties properties, String name, JobConf jobConf, Map<String, String> flowDescriptor, boolean deleteSinkOnInit )
140    {
141    this( properties, name, jobConf, flowDescriptor, deleteSinkOnInit, true );
142    }
143
144  /**
145   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
146   *
147   * @param properties       of type Properties
148   * @param name             of type String
149   * @param jobConf          of type JobConf
150   * @param flowDescriptor   of type Map<String, String>
151   * @param deleteSinkOnInit of type boolean
152   * @param stopJobsOnExit   of type boolean
153   */
154  @ConstructorProperties({"properties", "name", "jobConf", "flowDescriptor", "deleteSinkOnInit", "stopJobsOnExit"})
155  public MapReduceFlow( Properties properties, String name, JobConf jobConf, Map<String, String> flowDescriptor, boolean deleteSinkOnInit, boolean stopJobsOnExit )
156    {
157    super( HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ), properties, jobConf, name, flowDescriptor );
158    this.deleteSinkOnInit = deleteSinkOnInit;
159    this.stopJobsOnExit = stopJobsOnExit;
160
161    setSources( createSources( jobConf ) );
162    setSinks( createSinks( jobConf ) );
163    setTraps( createTraps( jobConf ) );
164    setFlowStepGraph( makeStepGraph( jobConf ) );
165    initSteps();
166
167    initializeNewJobsMap();
168    }
169
170  private FlowStepGraph makeStepGraph( JobConf jobConf )
171    {
172    FlowStepGraph flowStepGraph = new FlowStepGraph();
173
174    Tap sink = getSinksCollection().iterator().next();
175    FlowStep<JobConf> step = new MapReduceFlowStep( getName(), sink.toString(), jobConf, sink );
176
177    flowStepGraph.addVertex( step );
178
179    return flowStepGraph;
180    }
181
182  protected Map<String, Tap> createSources( JobConf jobConf )
183    {
184    Path[] paths = FileInputFormat.getInputPaths( jobConf );
185
186    if( paths.length == 0 )
187      {
188      try
189        {
190        paths = org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getInputPaths( new Job( jobConf ) );
191        }
192      catch( IOException exception )
193        {
194        throw new CascadingException( exception );
195        }
196      }
197
198    Map<String, Tap> taps = new HashMap<String, Tap>();
199
200    for( Path path : paths )
201      taps.put( path.toString(), new Hfs( new NullScheme(), path.toString() ) );
202
203    return taps;
204    }
205
206  protected Map<String, Tap> createSinks( JobConf jobConf )
207    {
208    Map<String, Tap> taps = new HashMap<String, Tap>();
209
210    Path path = FileOutputFormat.getOutputPath( jobConf );
211
212    if( path == null )
213      {
214      try
215        {
216        path = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath( new Job( jobConf ) );
217        }
218      catch( IOException exception )
219        {
220        throw new CascadingException( exception );
221        }
222      }
223
224    taps.put( path.toString(), new Hfs( new NullScheme(), path.toString(), deleteSinkOnInit ? SinkMode.REPLACE : SinkMode.KEEP ) );
225
226    return taps;
227    }
228
229  protected Map<String, Tap> createTraps( JobConf jobConf )
230    {
231    return new HashMap<String, Tap>();
232    }
233  }