001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop.util;
022
023import java.io.IOException;
024import java.net.URI;
025import java.util.HashMap;
026import java.util.Map;
027import java.util.concurrent.atomic.AtomicInteger;
028
029import cascading.flow.hadoop.util.HadoopUtil;
030import cascading.tap.Tap;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.mapred.FileOutputFormat;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import static cascading.flow.hadoop.util.HadoopUtil.asJobConfInstance;
040
041public class Hadoop18TapUtil
042  {
043  /** Field LOG */
044  private static final Logger LOG = LoggerFactory.getLogger( Hadoop18TapUtil.class );
045
046  /** The Hadoop temporary path used to prevent collisions */
047  public static final String TEMPORARY_PATH = "_temporary";
048
049  private static final Map<String, AtomicInteger> pathCounts = new HashMap<String, AtomicInteger>();
050
051  /**
052   * should only be called if not in a Flow
053   *
054   * @param conf
055   * @throws IOException
056   */
057  public static void setupJob( Configuration conf ) throws IOException
058    {
059    Path outputPath = FileOutputFormat.getOutputPath( asJobConfInstance( conf ) );
060
061    if( outputPath == null )
062      return;
063
064    if( getFSSafe( conf, outputPath ) == null )
065      return;
066
067    String taskID = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) );
068
069    if( taskID == null ) // need to stuff a fake id
070      {
071      String mapper = conf.getBoolean( "mapred.task.is.map", conf.getBoolean( "mapreduce.task.is.map", true ) ) ? "m" : "r";
072      String value = String.format( "attempt_%012d_0000_%s_000000_0", (int) Math.rint( System.currentTimeMillis() ), mapper );
073      conf.set( "mapred.task.id", value );
074      conf.set( "mapreduce.task.id", value );
075      }
076
077    makeTempPath( conf );
078
079    if( writeDirectlyToWorkingPath( conf, outputPath ) )
080      {
081      LOG.info( "writing directly to output path: {}", outputPath );
082      setWorkOutputPath( conf, outputPath );
083      return;
084      }
085
086    // "mapred.work.output.dir"
087    Path taskOutputPath = getTaskOutputPath( conf );
088    setWorkOutputPath( conf, taskOutputPath );
089    }
090
091  public static synchronized void setupTask( Configuration conf ) throws IOException
092    {
093    String workpath = conf.get( "mapred.work.output.dir" );
094
095    if( workpath == null )
096      return;
097
098    FileSystem fs = getFSSafe( conf, new Path( workpath ) );
099
100    if( fs == null )
101      return;
102
103    String taskId = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) );
104
105    LOG.info( "setting up task: '{}' - {}", taskId, workpath );
106
107    AtomicInteger integer = pathCounts.get( workpath );
108
109    if( integer == null )
110      {
111      integer = new AtomicInteger();
112      pathCounts.put( workpath, integer );
113      }
114
115    integer.incrementAndGet();
116    }
117
118  public static boolean needsTaskCommit( Configuration conf ) throws IOException
119    {
120    String workpath = conf.get( "mapred.work.output.dir" );
121
122    if( workpath == null )
123      return false;
124
125    Path taskOutputPath = new Path( workpath );
126
127    if( taskOutputPath != null )
128      {
129      FileSystem fs = getFSSafe( conf, taskOutputPath );
130
131      if( fs == null )
132        return false;
133
134      if( fs.exists( taskOutputPath ) )
135        return true;
136      }
137
138    return false;
139    }
140
141  /**
142   * copies all files from the taskoutputpath to the outputpath
143   *
144   * @param conf
145   */
146  public static void commitTask( Configuration conf ) throws IOException
147    {
148    Path taskOutputPath = new Path( conf.get( "mapred.work.output.dir" ) );
149
150    FileSystem fs = getFSSafe( conf, taskOutputPath );
151
152    if( fs == null )
153      return;
154
155    AtomicInteger integer = pathCounts.get( taskOutputPath.toString() );
156
157    if( integer.decrementAndGet() != 0 )
158      return;
159
160    String taskId = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) );
161
162    LOG.info( "committing task: '{}' - {}", taskId, taskOutputPath );
163
164    if( taskOutputPath != null )
165      {
166      if( writeDirectlyToWorkingPath( conf, taskOutputPath ) )
167        return;
168
169      if( fs.exists( taskOutputPath ) )
170        {
171        Path jobOutputPath = taskOutputPath.getParent().getParent();
172        // Move the task outputs to their final place
173        moveTaskOutputs( conf, fs, jobOutputPath, taskOutputPath );
174
175        // Delete the temporary task-specific output directory
176        if( !fs.delete( taskOutputPath, true ) )
177          LOG.info( "failed to delete the temporary output directory of task: '{}' - {}", taskId, taskOutputPath );
178
179        LOG.info( "saved output of task '{}' to {}", taskId, jobOutputPath );
180        }
181      }
182    }
183
184  /**
185   * Called from flow step to remove temp dirs
186   *
187   * @param conf
188   * @throws IOException
189   */
190  public static void cleanupTapMetaData( Configuration conf, Tap tap ) throws IOException
191    {
192    cleanTempPath( conf, new Path( tap.getIdentifier() ) );
193    }
194
195  /**
196   * May only be called once. should only be called if not in a flow
197   *
198   * @param conf
199   */
200  public static void cleanupJob( Configuration conf ) throws IOException
201    {
202    if( HadoopUtil.isInflow( conf ) )
203      return;
204
205    Path outputPath = FileOutputFormat.getOutputPath( asJobConfInstance( conf ) );
206
207    cleanTempPath( conf, outputPath );
208    }
209
210  private static synchronized void cleanTempPath( Configuration conf, Path outputPath ) throws IOException
211    {
212    // do the clean up of temporary directory
213
214    if( outputPath != null )
215      {
216      FileSystem fileSys = getFSSafe( conf, outputPath );
217
218      if( fileSys == null )
219        return;
220
221      if( !fileSys.exists( outputPath ) )
222        return;
223
224      Path tmpDir = new Path( outputPath, TEMPORARY_PATH );
225
226      LOG.info( "deleting temp path {}", tmpDir );
227
228      if( fileSys.exists( tmpDir ) )
229        fileSys.delete( tmpDir, true );
230      }
231    }
232
233  private static FileSystem getFSSafe( Configuration conf, Path tmpDir )
234    {
235    try
236      {
237      return tmpDir.getFileSystem( conf );
238      }
239    catch( IOException e )
240      {
241      // ignore
242      }
243
244    return null;
245    }
246
247  private static Path getTaskOutputPath( Configuration conf )
248    {
249    String taskId = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) );
250
251    Path p = new Path( FileOutputFormat.getOutputPath( asJobConfInstance( conf ) ), TEMPORARY_PATH + Path.SEPARATOR + "_" + taskId );
252
253    try
254      {
255      FileSystem fs = p.getFileSystem( conf );
256      return p.makeQualified( fs );
257      }
258    catch( IOException ie )
259      {
260      return p;
261      }
262    }
263
264  static void setWorkOutputPath( Configuration conf, Path outputDir )
265    {
266    outputDir = new Path( asJobConfInstance( conf ).getWorkingDirectory(), outputDir );
267    conf.set( "mapred.work.output.dir", outputDir.toString() );
268    }
269
270  public static void makeTempPath( Configuration conf ) throws IOException
271    {
272    // create job specific temporary directory in output path
273    Path outputPath = FileOutputFormat.getOutputPath( asJobConfInstance( conf ) );
274
275    if( outputPath != null )
276      {
277      Path tmpDir = new Path( outputPath, TEMPORARY_PATH );
278      FileSystem fileSys = tmpDir.getFileSystem( conf );
279
280      if( !fileSys.exists( tmpDir ) && !fileSys.mkdirs( tmpDir ) )
281        LOG.error( "mkdirs failed to create {}", tmpDir );
282      }
283    }
284
285  private static void moveTaskOutputs( Configuration conf, FileSystem fs, Path jobOutputDir, Path taskOutput ) throws IOException
286    {
287    String taskId = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) );
288
289    if( fs.isFile( taskOutput ) )
290      {
291      Path finalOutputPath = getFinalPath( jobOutputDir, taskOutput, getTaskOutputPath( conf ) );
292      if( !fs.rename( taskOutput, finalOutputPath ) )
293        {
294        if( !fs.delete( finalOutputPath, true ) )
295          throw new IOException( "Failed to delete earlier output of task: " + taskId );
296
297        if( !fs.rename( taskOutput, finalOutputPath ) )
298          throw new IOException( "Failed to save output of task: " + taskId );
299        }
300
301      LOG.debug( "Moved {} to {}", taskOutput, finalOutputPath );
302      }
303    else if( fs.getFileStatus( taskOutput ).isDir() )
304      {
305      FileStatus[] paths = fs.listStatus( taskOutput );
306      Path finalOutputPath = getFinalPath( jobOutputDir, taskOutput, getTaskOutputPath( conf ) );
307      fs.mkdirs( finalOutputPath );
308      if( paths != null )
309        {
310        for( FileStatus path : paths )
311          moveTaskOutputs( conf, fs, jobOutputDir, path.getPath() );
312        }
313      }
314    }
315
316  private static Path getFinalPath( Path jobOutputDir, Path taskOutput, Path taskOutputPath ) throws IOException
317    {
318    URI taskOutputUri = taskOutput.toUri();
319    URI relativePath = taskOutputPath.toUri().relativize( taskOutputUri );
320    if( taskOutputUri == relativePath )
321      {//taskOutputPath is not a parent of taskOutput
322      throw new IOException( "Can not get the relative path: base = " + taskOutputPath + " child = " + taskOutput );
323      }
324    if( relativePath.getPath().length() > 0 )
325      {
326      return new Path( jobOutputDir, relativePath.getPath() );
327      }
328    else
329      {
330      return jobOutputDir;
331      }
332    }
333
334  /** used in AWS EMR to disable temp paths on some file systems, s3. */
335  private static boolean writeDirectlyToWorkingPath( Configuration conf, Path path )
336    {
337    FileSystem fs = getFSSafe( conf, path );
338
339    if( fs == null )
340      return false;
341
342    boolean result = conf.getBoolean( "mapred.output.direct." + fs.getClass().getSimpleName(), false );
343
344    if( result )
345      LOG.info( "output direct is enabled for this fs: " + fs.getName() );
346
347    return result;
348    }
349  }