001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez;
022
023import java.io.IOException;
024import java.util.Map;
025
026import cascading.flow.BaseFlow;
027import cascading.flow.Flow;
028import cascading.flow.FlowDef;
029import cascading.flow.FlowException;
030import cascading.flow.FlowProcess;
031import cascading.flow.FlowStep;
032import cascading.flow.hadoop.util.HadoopUtil;
033import cascading.flow.planner.BaseFlowStep;
034import cascading.flow.planner.PlatformInfo;
035import cascading.property.PropertyUtil;
036import cascading.tap.hadoop.io.HttpFileSystem;
037import cascading.util.ShutdownUtil;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.security.UserGroupInformation;
041import org.apache.tez.common.counters.TaskCounter;
042import org.apache.tez.dag.api.TezConfiguration;
043import riffle.process.ProcessConfiguration;
044
045import static cascading.flow.FlowProps.MAX_CONCURRENT_STEPS;
046import static cascading.flow.FlowProps.PRESERVE_TEMPORARY_FILES;
047
048/**
049 * Class HadoopFlow is the Apache Hadoop specific implementation of a {@link cascading.flow.Flow}.
050 * <p/>
051 * HadoopFlow must be created through a {@link cascading.flow.tez.Hadoop2TezFlowConnector} instance.
052 * <p/>
053 * If classpath paths are provided on the {@link cascading.flow.FlowDef}, the Hadoop distributed cache mechanism will be used
054 * to augment the remote classpath.
055 * <p/>
056 * Any path elements that are relative will be uploaded to HDFS, and the HDFS URI will be used on the JobConf. Note
057 * all paths are added as "files" to the JobConf, not archives, so they aren't needlessly uncompressed cluster side.
058 *
059 * @see cascading.flow.tez.Hadoop2TezFlowConnector
060 */
061public class Hadoop2TezFlow extends BaseFlow<TezConfiguration>
062  {
063  /** Field hdfsShutdown */
064  private static Thread hdfsShutdown = null;
065  /** Field shutdownHook */
066  private static ShutdownUtil.Hook shutdownHook;
067  /** Field jobConf */
068  private transient TezConfiguration flowConf;
069  /** Field preserveTemporaryFiles */
070  private boolean preserveTemporaryFiles = false;
071
072  private String flowStagingPath;
073
074  protected Hadoop2TezFlow()
075    {
076    }
077
078  /**
079   * Returns property preserveTemporaryFiles.
080   *
081   * @param properties of type Map
082   * @return a boolean
083   */
084  static boolean getPreserveTemporaryFiles( Map<Object, Object> properties )
085    {
086    return Boolean.parseBoolean( PropertyUtil.getProperty( properties, PRESERVE_TEMPORARY_FILES, "false" ) );
087    }
088
089  static int getMaxConcurrentSteps( TezConfiguration jobConf )
090    {
091    return jobConf.getInt( MAX_CONCURRENT_STEPS, 0 );
092    }
093
094  public Hadoop2TezFlow( PlatformInfo platformInfo, Map<Object, Object> properties, TezConfiguration flowConf, FlowDef flowDef )
095    {
096    super( platformInfo, properties, flowConf, flowDef );
097
098    initFromProperties( properties );
099    }
100
101  @Override
102  protected void initFromProperties( Map<Object, Object> properties )
103    {
104    super.initFromProperties( properties );
105
106    preserveTemporaryFiles = getPreserveTemporaryFiles( properties );
107    }
108
109  protected void initConfig( Map<Object, Object> properties, TezConfiguration parentConfig )
110    {
111    if( properties != null )
112      parentConfig = createConfig( properties, parentConfig );
113
114    if( parentConfig == null ) // this is ok, getJobConf will pass a default parent in
115      return;
116
117    flowConf = new TezConfiguration( parentConfig ); // prevent local values from being shared
118    flowConf.set( "fs.http.impl", HttpFileSystem.class.getName() );
119    flowConf.set( "fs.https.impl", HttpFileSystem.class.getName() );
120
121    UserGroupInformation.setConfiguration( flowConf );
122
123    flowStagingPath = createStagingRoot();
124    }
125
126  public String getFlowStagingPath()
127    {
128    if( flowStagingPath == null )
129      flowStagingPath = createStagingRoot();
130
131    return flowStagingPath;
132    }
133
134  private String createStagingRoot()
135    {
136    return ".staging" + Path.SEPARATOR + getID();
137    }
138
139  @Override
140  protected void setConfigProperty( TezConfiguration config, Object key, Object value )
141    {
142    // don't let these objects pass, even though toString is called below.
143    if( value instanceof Class || value instanceof Configuration || value == null )
144      return;
145
146    config.set( key.toString(), value.toString() );
147    }
148
149  @Override
150  protected TezConfiguration newConfig( TezConfiguration defaultConfig )
151    {
152    return defaultConfig == null ? new TezConfiguration() : new TezConfiguration( defaultConfig );
153    }
154
155  @ProcessConfiguration
156  @Override
157  public TezConfiguration getConfig()
158    {
159    if( flowConf == null )
160      initConfig( null, new TezConfiguration() );
161
162    return flowConf;
163    }
164
165  @Override
166  public TezConfiguration getConfigCopy()
167    {
168    return new TezConfiguration( getConfig() );
169    }
170
171  @Override
172  public Map<Object, Object> getConfigAsProperties()
173    {
174    return HadoopUtil.createProperties( getConfig() );
175    }
176
177  /**
178   * Method getProperty returns the value associated with the given key from the underlying properties system.
179   *
180   * @param key of type String
181   * @return String
182   */
183  public String getProperty( String key )
184    {
185    return getConfig().get( key );
186    }
187
188  @Override
189  public FlowProcess<TezConfiguration> getFlowProcess()
190    {
191    return new Hadoop2TezFlowProcess( getFlowSession(), null, getConfig() );
192    }
193
194  /**
195   * Method isPreserveTemporaryFiles returns false if temporary files will be cleaned when this Flow completes.
196   *
197   * @return the preserveTemporaryFiles (type boolean) of this Flow object.
198   */
199  public boolean isPreserveTemporaryFiles()
200    {
201    return preserveTemporaryFiles;
202    }
203
204  @Override
205  protected void internalStart()
206    {
207    try
208      {
209      copyArtifactsToRemote();
210      deleteSinksIfReplace();
211      deleteTrapsIfReplace();
212      deleteCheckpointsIfReplace();
213      }
214    catch( IOException exception )
215      {
216      throw new FlowException( "unable to delete sinks", exception );
217      }
218
219    registerHadoopShutdownHook( this );
220    }
221
222  private void copyArtifactsToRemote()
223    {
224    for( FlowStep<TezConfiguration> flowStep : getFlowSteps() )
225      ( (Hadoop2TezFlowStep) flowStep ).syncArtifacts();
226    }
227
228  @Override
229  public boolean stepsAreLocal()
230    {
231    return HadoopUtil.isLocal( getConfig() );
232    }
233
234  private void cleanTemporaryFiles( boolean stop )
235    {
236    if( stop ) // unstable to call fs operations during shutdown
237      return;
238
239    // use step config so cascading.flow.step.path property is properly used
240    for( FlowStep<TezConfiguration> step : getFlowSteps() )
241      ( (BaseFlowStep<TezConfiguration>) step ).clean();
242    }
243
244  private static synchronized void registerHadoopShutdownHook( Flow flow )
245    {
246    if( !flow.isStopJobsOnExit() )
247      return;
248
249    // guaranteed singleton here
250    if( shutdownHook != null )
251      return;
252
253    getHdfsShutdownHook();
254
255    shutdownHook = new ShutdownUtil.Hook()
256    {
257    @Override
258    public Priority priority()
259      {
260      return Priority.LAST; // very last thing to happen
261      }
262
263    @Override
264    public void execute()
265      {
266      callHdfsShutdownHook();
267      }
268    };
269
270    ShutdownUtil.addHook( shutdownHook );
271    }
272
273  private synchronized static void callHdfsShutdownHook()
274    {
275    if( hdfsShutdown != null )
276      hdfsShutdown.start();
277    }
278
279  private synchronized static void getHdfsShutdownHook()
280    {
281    if( hdfsShutdown == null )
282      hdfsShutdown = HadoopUtil.getHDFSShutdownHook();
283    }
284
285  protected void internalClean( boolean stop )
286    {
287    if( !isPreserveTemporaryFiles() )
288      cleanTemporaryFiles( stop );
289    }
290
291  protected void internalShutdown()
292    {
293    }
294
295  protected int getMaxNumParallelSteps()
296    {
297    return stepsAreLocal() ? 1 : getMaxConcurrentSteps( getConfig() );
298    }
299
300  @Override
301  protected long getTotalSliceCPUMilliSeconds()
302    {
303    long counterValue = flowStats.getCounterValue( TaskCounter.CPU_MILLISECONDS );
304
305    if( counterValue == 0 )
306      return -1;
307
308    return counterValue;
309    }
310  }