001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.io.IOException;
024import java.util.Map;
025
026import cascading.flow.BaseFlow;
027import cascading.flow.Flow;
028import cascading.flow.FlowDef;
029import cascading.flow.FlowException;
030import cascading.flow.FlowProcess;
031import cascading.flow.FlowStep;
032import cascading.flow.hadoop.util.HadoopMRUtil;
033import cascading.flow.hadoop.util.HadoopUtil;
034import cascading.flow.planner.BaseFlowStep;
035import cascading.flow.planner.PlatformInfo;
036import cascading.property.PropertyUtil;
037import cascading.tap.hadoop.io.HttpFileSystem;
038import cascading.util.ShutdownUtil;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.mapred.JobConf;
041import riffle.process.ProcessConfiguration;
042
043import static cascading.flow.FlowProps.MAX_CONCURRENT_STEPS;
044import static cascading.flow.FlowProps.PRESERVE_TEMPORARY_FILES;
045
046/**
047 * Class HadoopFlow is the Apache Hadoop specific implementation of a {@link Flow}.
048 * <p/>
049 * HadoopFlow must be created through a {@link cascading.flow.FlowConnector} sub-class instance.
050 * <p/>
051 * If classpath paths are provided on the {@link FlowDef}, the Hadoop distributed cache mechanism will be used
052 * to augment the remote classpath.
053 * <p/>
054 * Any path elements that are relative will be uploaded to HDFS, and the HDFS URI will be used on the JobConf. Note
055 * all paths are added as "files" to the JobConf, not archives, so they aren't needlessly uncompressed cluster side.
056 *
057 * @see cascading.flow.FlowConnector
058 */
059public class HadoopFlow extends BaseFlow<JobConf>
060  {
061  /** Field hdfsShutdown */
062  private static Thread hdfsShutdown = null;
063  /** Field shutdownHook */
064  private static ShutdownUtil.Hook shutdownHook;
065  /** Field jobConf */
066  private transient JobConf jobConf;
067  /** Field preserveTemporaryFiles */
068  private boolean preserveTemporaryFiles = false;
069  /** Field syncPaths */
070  private transient Map<Path, Path> syncPaths;
071
072  protected HadoopFlow()
073    {
074    }
075
076  /**
077   * Returns property preserveTemporaryFiles.
078   *
079   * @param properties of type Map
080   * @return a boolean
081   */
082  static boolean getPreserveTemporaryFiles( Map<Object, Object> properties )
083    {
084    return Boolean.parseBoolean( PropertyUtil.getProperty( properties, PRESERVE_TEMPORARY_FILES, "false" ) );
085    }
086
087  static int getMaxConcurrentSteps( JobConf jobConf )
088    {
089    return jobConf.getInt( MAX_CONCURRENT_STEPS, 0 );
090    }
091
092  protected HadoopFlow( PlatformInfo platformInfo, Map<Object, Object> properties, JobConf jobConf, String name, Map<String, String> flowDescriptor )
093    {
094    super( platformInfo, properties, jobConf, name, flowDescriptor );
095
096    initFromProperties( properties );
097    }
098
099  public HadoopFlow( PlatformInfo platformInfo, Map<Object, Object> properties, JobConf jobConf, FlowDef flowDef )
100    {
101    super( platformInfo, properties, jobConf, flowDef );
102
103    initFromProperties( properties );
104    }
105
106  @Override
107  protected void initFromProperties( Map<Object, Object> properties )
108    {
109    super.initFromProperties( properties );
110    preserveTemporaryFiles = getPreserveTemporaryFiles( properties );
111    }
112
113  protected void initConfig( Map<Object, Object> properties, JobConf parentConfig )
114    {
115    if( properties != null )
116      parentConfig = createConfig( properties, parentConfig );
117
118    if( parentConfig == null ) // this is ok, getJobConf will pass a default parent in
119      return;
120
121    jobConf = HadoopUtil.copyJobConf( parentConfig ); // prevent local values from being shared
122    jobConf.set( "fs.http.impl", HttpFileSystem.class.getName() );
123    jobConf.set( "fs.https.impl", HttpFileSystem.class.getName() );
124
125    syncPaths = HadoopMRUtil.addToClassPath( jobConf, getClassPath() );
126    }
127
128  @Override
129  protected void setConfigProperty( JobConf config, Object key, Object value )
130    {
131    // don't let these objects pass, even though toString is called below.
132    if( value instanceof Class || value instanceof JobConf || value == null )
133      return;
134
135    config.set( key.toString(), value.toString() );
136    }
137
138  @Override
139  protected JobConf newConfig( JobConf defaultConfig )
140    {
141    return defaultConfig == null ? new JobConf() : HadoopUtil.copyJobConf( defaultConfig );
142    }
143
144  @ProcessConfiguration
145  @Override
146  public JobConf getConfig()
147    {
148    if( jobConf == null )
149      initConfig( null, new JobConf() );
150
151    return jobConf;
152    }
153
154  @Override
155  public JobConf getConfigCopy()
156    {
157    return HadoopUtil.copyJobConf( getConfig() );
158    }
159
160  @Override
161  public Map<Object, Object> getConfigAsProperties()
162    {
163    return HadoopUtil.createProperties( getConfig() );
164    }
165
166  /**
167   * Method getProperty returns the value associated with the given key from the underlying properties system.
168   *
169   * @param key of type String
170   * @return String
171   */
172  public String getProperty( String key )
173    {
174    return getConfig().get( key );
175    }
176
177  @Override
178  public FlowProcess<JobConf> getFlowProcess()
179    {
180    return new HadoopFlowProcess( getFlowSession(), getConfig() );
181    }
182
183  /**
184   * Method isPreserveTemporaryFiles returns false if temporary files will be cleaned when this Flow completes.
185   *
186   * @return the preserveTemporaryFiles (type boolean) of this Flow object.
187   */
188  public boolean isPreserveTemporaryFiles()
189    {
190    return preserveTemporaryFiles;
191    }
192
193  @Override
194  protected void internalStart()
195    {
196    try
197      {
198      copyToDistributedCache();
199      deleteSinksIfReplace();
200      deleteTrapsIfReplace();
201      deleteCheckpointsIfReplace();
202      }
203    catch( IOException exception )
204      {
205      throw new FlowException( "unable to delete sinks", exception );
206      }
207
208    registerHadoopShutdownHook();
209    }
210
211  protected void registerHadoopShutdownHook()
212    {
213    registerHadoopShutdownHook( this );
214    }
215
216  protected void copyToDistributedCache()
217    {
218    HadoopUtil.syncPaths( jobConf, syncPaths, true );
219    }
220
221  @Override
222  public boolean stepsAreLocal()
223    {
224    return HadoopUtil.isLocal( getConfig() );
225    }
226
227  private void cleanTemporaryFiles( boolean stop )
228    {
229    if( stop ) // unstable to call fs operations during shutdown
230      return;
231
232    // use step config so cascading.flow.step.path property is properly used
233    for( FlowStep<JobConf> step : getFlowSteps() )
234      ( (BaseFlowStep<JobConf>) step ).clean();
235    }
236
237  private static synchronized void registerHadoopShutdownHook( Flow flow )
238    {
239    if( !flow.isStopJobsOnExit() )
240      return;
241
242    // guaranteed singleton here
243    if( shutdownHook != null )
244      return;
245
246    getHdfsShutdownHook();
247
248    shutdownHook = new ShutdownUtil.Hook()
249    {
250    @Override
251    public Priority priority()
252      {
253      return Priority.LAST; // very last thing to happen
254      }
255
256    @Override
257    public void execute()
258      {
259      callHdfsShutdownHook();
260      }
261    };
262
263    ShutdownUtil.addHook( shutdownHook );
264    }
265
266  private synchronized static void callHdfsShutdownHook()
267    {
268    if( hdfsShutdown != null )
269      hdfsShutdown.start();
270    }
271
272  private synchronized static void getHdfsShutdownHook()
273    {
274    if( hdfsShutdown == null )
275      hdfsShutdown = HadoopUtil.getHDFSShutdownHook();
276    }
277
278  protected void internalClean( boolean stop )
279    {
280    if( !isPreserveTemporaryFiles() )
281      cleanTemporaryFiles( stop );
282    }
283
284  protected void internalShutdown()
285    {
286    }
287
288  protected int getMaxNumParallelSteps()
289    {
290    return stepsAreLocal() ? 1 : getMaxConcurrentSteps( getConfig() );
291    }
292
293  @Override
294  protected long getTotalSliceCPUMilliSeconds()
295    {
296    // this is a hadoop2 MR specific counter/value
297    long counterValue = flowStats.getCounterValue( "org.apache.hadoop.mapreduce.TaskCounter", "CPU_MILLISECONDS" );
298
299    if( counterValue == 0 )
300      return -1;
301
302    return counterValue;
303    }
304  }