001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.beans.ConstructorProperties;
024import java.util.Map;
025import java.util.Properties;
026
027import cascading.flow.hadoop.util.HadoopUtil;
028import cascading.tap.Tap;
029import cascading.tap.hadoop.Hfs;
030import org.apache.hadoop.mapred.JobConf;
031
032/**
033 * Class MapReduceFlow is a {@link cascading.flow.hadoop.HadoopFlow} subclass that supports custom MapReduce jobs
034 * pre-configured via the {@link JobConf} object.
035 * <p/>
036 * Use this class to allow custom MapReduce jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If
037 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled
038 * according to their dependencies (topologically).
039 * <p/>
040 * Set the parameter {@code deleteSinkOnInit} to {@code true} if the outputPath in the jobConf should be deleted before executing the MapReduce job.
041 * <p/>
042 * MapReduceFlow assumes the underlying input and output paths are compatible with the {@link Hfs} Tap.
043 * <p/>
044 * If the configured JobConf instance uses some other identifier instead of Hadoop FS paths, you should override the
045 * {@link #createSources(org.apache.hadoop.mapred.JobConf)}, {@link #createSinks(org.apache.hadoop.mapred.JobConf)}, and
046 * {@link #createTraps(org.apache.hadoop.mapred.JobConf)} methods to properly resolve the configured paths into
047 * usable {@link Tap} instances. By default createTraps returns an empty collection and should probably be left alone.
048 * <p/>
049 * MapReduceFlow supports both org.apache.hadoop.mapred.* and org.apache.hadoop.mapreduce.* API Jobs.
050 */
051public class MapReduceFlow extends BaseMapReduceFlow
052  {
053  /**
054   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
055   *
056   * @param jobConf of type JobConf
057   */
058  @ConstructorProperties({"jobConf"})
059  public MapReduceFlow( JobConf jobConf )
060    {
061    this( jobConf.getJobName(), jobConf, false );
062    }
063
064  /**
065   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
066   *
067   * @param jobConf          of type JobConf
068   * @param deleteSinkOnInit of type boolean
069   */
070  @ConstructorProperties({"jobConf", "deleteSinkOnInit"})
071  public MapReduceFlow( JobConf jobConf, boolean deleteSinkOnInit )
072    {
073    this( jobConf.getJobName(), jobConf, deleteSinkOnInit );
074    }
075
076  /**
077   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
078   *
079   * @param name    of type String
080   * @param jobConf of type JobConf
081   */
082  @ConstructorProperties({"name", "jobConf"})
083  public MapReduceFlow( String name, JobConf jobConf )
084    {
085    this( name, jobConf, false );
086    }
087
088  /**
089   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
090   *
091   * @param name             of type String
092   * @param jobConf          of type JobConf
093   * @param deleteSinkOnInit of type boolean
094   */
095  @ConstructorProperties({"name", "jobConf", "deleteSinkOnInit"})
096  public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit )
097    {
098    this( new Properties(), name, jobConf, null, deleteSinkOnInit, true );
099    }
100
101  /**
102   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
103   *
104   * @param properties       of type Properties
105   * @param name             of type String
106   * @param jobConf          of type JobConf
107   * @param deleteSinkOnInit of type boolean
108   */
109  @ConstructorProperties({"properties", "name", "jobConf", "deleteSinkOnInit"})
110  public MapReduceFlow( Properties properties, String name, JobConf jobConf, boolean deleteSinkOnInit )
111    {
112    this( properties, name, jobConf, null, deleteSinkOnInit, true );
113    }
114
115  /**
116   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
117   *
118   * @param properties       of type Properties
119   * @param name             of type String
120   * @param jobConf          of type JobConf
121   * @param flowDescriptor   of type Map<String, String>
122   * @param deleteSinkOnInit of type boolean
123   */
124  @ConstructorProperties({"properties", "name", "jobConf", "flowDescriptor", "deleteSinkOnInit"})
125  public MapReduceFlow( Properties properties, String name, JobConf jobConf, Map<String, String> flowDescriptor, boolean deleteSinkOnInit )
126    {
127    this( properties, name, jobConf, flowDescriptor, deleteSinkOnInit, true );
128    }
129
130  /**
131   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
132   *
133   * @param properties       of type Properties
134   * @param name             of type String
135   * @param jobConf          of type JobConf
136   * @param flowDescriptor   of type Map<String, String>
137   * @param deleteSinkOnInit of type boolean
138   * @param stopJobsOnExit   of type boolean
139   */
140  @ConstructorProperties({"properties", "name", "jobConf", "flowDescriptor", "deleteSinkOnInit", "stopJobsOnExit"})
141  public MapReduceFlow( Properties properties, String name, JobConf jobConf, Map<String, String> flowDescriptor, boolean deleteSinkOnInit, boolean stopJobsOnExit )
142    {
143    super( HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ), properties, jobConf, name, flowDescriptor, deleteSinkOnInit );
144    this.stopJobsOnExit = stopJobsOnExit;
145
146    initializeFrom( jobConf ); // push off initialization allowing for overrides
147    }
148
149  protected void initializeFrom( JobConf jobConf )
150    {
151    setSources( createSources( jobConf ) );
152    setSinks( createSinks( jobConf ) );
153    setTraps( createTraps( jobConf ) );
154    setFlowStepGraph( makeStepGraph( jobConf ) );
155
156    // this mirrors BaseFlow#initialize()
157
158    initSteps();
159
160    this.flowStats = createPrepareFlowStats(); // must be last
161
162    initializeNewJobsMap();
163
164    initializeChildStats();
165    }
166  }