001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.io.IOException;
024import java.util.Map;
025
026import cascading.flow.BaseFlow;
027import cascading.flow.Flow;
028import cascading.flow.FlowDef;
029import cascading.flow.FlowException;
030import cascading.flow.FlowProcess;
031import cascading.flow.FlowStep;
032import cascading.flow.hadoop.util.HadoopUtil;
033import cascading.flow.planner.BaseFlowStep;
034import cascading.flow.planner.PlatformInfo;
035import cascading.property.PropertyUtil;
036import cascading.tap.hadoop.io.HttpFileSystem;
037import cascading.util.ShutdownUtil;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.mapred.JobConf;
040import riffle.process.ProcessConfiguration;
041
042import static cascading.flow.FlowProps.MAX_CONCURRENT_STEPS;
043import static cascading.flow.FlowProps.PRESERVE_TEMPORARY_FILES;
044
045/**
046 * Class HadoopFlow is the Apache Hadoop specific implementation of a {@link Flow}.
047 * <p/>
048 * HadoopFlow must be created through a {@link cascading.flow.FlowConnector} sub-class instance.
049 * <p/>
050 * If classpath paths are provided on the {@link FlowDef}, the Hadoop distributed cache mechanism will be used
051 * to augment the remote classpath.
052 * <p/>
053 * Any path elements that are relative will be uploaded to HDFS, and the HDFS URI will be used on the JobConf. Note
054 * all paths are added as "files" to the JobConf, not archives, so they aren't needlessly uncompressed cluster side.
055 *
056 * @see cascading.flow.FlowConnector
057 */
058public class HadoopFlow extends BaseFlow<JobConf>
059  {
060  /** Field hdfsShutdown */
061  private static Thread hdfsShutdown = null;
062  /** Field shutdownHook */
063  private static ShutdownUtil.Hook shutdownHook;
064  /** Field jobConf */
065  private transient JobConf jobConf;
066  /** Field preserveTemporaryFiles */
067  private boolean preserveTemporaryFiles = false;
068  /** Field syncPaths */
069  private transient Map<Path, Path> syncPaths;
070
071  protected HadoopFlow()
072    {
073    }
074
075  /**
076   * Returns property preserveTemporaryFiles.
077   *
078   * @param properties of type Map
079   * @return a boolean
080   */
081  static boolean getPreserveTemporaryFiles( Map<Object, Object> properties )
082    {
083    return Boolean.parseBoolean( PropertyUtil.getProperty( properties, PRESERVE_TEMPORARY_FILES, "false" ) );
084    }
085
086  static int getMaxConcurrentSteps( JobConf jobConf )
087    {
088    return jobConf.getInt( MAX_CONCURRENT_STEPS, 0 );
089    }
090
091  protected HadoopFlow( PlatformInfo platformInfo, Map<Object, Object> properties, JobConf jobConf, String name, Map<String, String> flowDescriptor )
092    {
093    super( platformInfo, properties, jobConf, name, flowDescriptor );
094    initFromProperties( properties );
095    }
096
097  public HadoopFlow( PlatformInfo platformInfo, Map<Object, Object> properties, JobConf jobConf, FlowDef flowDef )
098    {
099    super( platformInfo, properties, jobConf, flowDef );
100
101    initFromProperties( properties );
102    }
103
104  @Override
105  protected void initFromProperties( Map<Object, Object> properties )
106    {
107    super.initFromProperties( properties );
108    preserveTemporaryFiles = getPreserveTemporaryFiles( properties );
109    }
110
111  protected void initConfig( Map<Object, Object> properties, JobConf parentConfig )
112    {
113    if( properties != null )
114      parentConfig = createConfig( properties, parentConfig );
115
116    if( parentConfig == null ) // this is ok, getJobConf will pass a default parent in
117      return;
118
119    jobConf = HadoopUtil.copyJobConf( parentConfig ); // prevent local values from being shared
120    jobConf.set( "fs.http.impl", HttpFileSystem.class.getName() );
121    jobConf.set( "fs.https.impl", HttpFileSystem.class.getName() );
122
123    syncPaths = HadoopUtil.addToClassPath( jobConf, getClassPath() );
124    }
125
126  @Override
127  protected void setConfigProperty( JobConf config, Object key, Object value )
128    {
129    // don't let these objects pass, even though toString is called below.
130    if( value instanceof Class || value instanceof JobConf )
131      return;
132
133    config.set( key.toString(), value.toString() );
134    }
135
136  @Override
137  protected JobConf newConfig( JobConf defaultConfig )
138    {
139    return defaultConfig == null ? new JobConf() : HadoopUtil.copyJobConf( defaultConfig );
140    }
141
142  @ProcessConfiguration
143  @Override
144  public JobConf getConfig()
145    {
146    if( jobConf == null )
147      initConfig( null, new JobConf() );
148
149    return jobConf;
150    }
151
152  @Override
153  public JobConf getConfigCopy()
154    {
155    return HadoopUtil.copyJobConf( getConfig() );
156    }
157
158  @Override
159  public Map<Object, Object> getConfigAsProperties()
160    {
161    return HadoopUtil.createProperties( getConfig() );
162    }
163
164  /**
165   * Method getProperty returns the value associated with the given key from the underlying properties system.
166   *
167   * @param key of type String
168   * @return String
169   */
170  public String getProperty( String key )
171    {
172    return getConfig().get( key );
173    }
174
175  @Override
176  public FlowProcess<JobConf> getFlowProcess()
177    {
178    return new HadoopFlowProcess( getFlowSession(), getConfig() );
179    }
180
181  /**
182   * Method isPreserveTemporaryFiles returns false if temporary files will be cleaned when this Flow completes.
183   *
184   * @return the preserveTemporaryFiles (type boolean) of this Flow object.
185   */
186  public boolean isPreserveTemporaryFiles()
187    {
188    return preserveTemporaryFiles;
189    }
190
191  @Override
192  protected void internalStart()
193    {
194    try
195      {
196      copyToDistributedCache();
197      deleteSinksIfReplace();
198      deleteTrapsIfReplace();
199      deleteCheckpointsIfReplace();
200      }
201    catch( IOException exception )
202      {
203      throw new FlowException( "unable to delete sinks", exception );
204      }
205
206    registerHadoopShutdownHook( this );
207    }
208
209  private void copyToDistributedCache()
210    {
211    HadoopUtil.syncPaths( jobConf, syncPaths, true );
212    }
213
214  @Override
215  public boolean stepsAreLocal()
216    {
217    return HadoopUtil.isLocal( getConfig() );
218    }
219
220  private void cleanTemporaryFiles( boolean stop )
221    {
222    if( stop ) // unstable to call fs operations during shutdown
223      return;
224
225    // use step config so cascading.flow.step.path property is properly used
226    for( FlowStep<JobConf> step : getFlowSteps() )
227      ( (BaseFlowStep<JobConf>) step ).clean();
228    }
229
230  private static synchronized void registerHadoopShutdownHook( Flow flow )
231    {
232    if( !flow.isStopJobsOnExit() )
233      return;
234
235    // guaranteed singleton here
236    if( shutdownHook != null )
237      return;
238
239    getHdfsShutdownHook();
240
241    shutdownHook = new ShutdownUtil.Hook()
242    {
243    @Override
244    public Priority priority()
245      {
246      return Priority.LAST; // very last thing to happen
247      }
248
249    @Override
250    public void execute()
251      {
252      callHdfsShutdownHook();
253      }
254    };
255
256    ShutdownUtil.addHook( shutdownHook );
257    }
258
259  private synchronized static void callHdfsShutdownHook()
260    {
261    if( hdfsShutdown != null )
262      hdfsShutdown.start();
263    }
264
265  private synchronized static void getHdfsShutdownHook()
266    {
267    if( hdfsShutdown == null )
268      hdfsShutdown = HadoopUtil.getHDFSShutdownHook();
269    }
270
271  protected void internalClean( boolean stop )
272    {
273    if( !isPreserveTemporaryFiles() )
274      cleanTemporaryFiles( stop );
275    }
276
277  protected void internalShutdown()
278    {
279    }
280
281  protected int getMaxNumParallelSteps()
282    {
283    return stepsAreLocal() ? 1 : getMaxConcurrentSteps( getConfig() );
284    }
285
286  @Override
287  protected long getTotalSliceCPUMilliSeconds()
288    {
289    // this is a hadoop2 MR specific counter/value
290    long counterValue = flowStats.getCounterValue( "org.apache.hadoop.mapreduce.TaskCounter", "CPU_MILLISECONDS" );
291
292    if( counterValue == 0 )
293      return -1;
294
295    return counterValue;
296    }
297  }