001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop.util;
022    
023    import java.io.IOException;
024    import java.net.URI;
025    import java.util.HashMap;
026    import java.util.Map;
027    import java.util.concurrent.atomic.AtomicInteger;
028    
029    import cascading.tap.Tap;
030    import org.apache.hadoop.fs.FileStatus;
031    import org.apache.hadoop.fs.FileSystem;
032    import org.apache.hadoop.fs.Path;
033    import org.apache.hadoop.mapred.FileOutputFormat;
034    import org.apache.hadoop.mapred.JobConf;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    public class Hadoop18TapUtil
039      {
040      /** Field LOG */
041      private static final Logger LOG = LoggerFactory.getLogger( Hadoop18TapUtil.class );
042    
043      /** The Hadoop temporary path used to prevent collisions */
044      public static final String TEMPORARY_PATH = "_temporary";
045    
046      private static final Map<String, AtomicInteger> pathCounts = new HashMap<String, AtomicInteger>();
047    
048      /**
049       * should only be called if not in a Flow
050       *
051       * @param conf
052       * @throws IOException
053       */
054      public static void setupJob( JobConf conf ) throws IOException
055        {
056        Path outputPath = FileOutputFormat.getOutputPath( conf );
057    
058        if( outputPath == null )
059          return;
060    
061        if( getFSSafe( conf, outputPath ) == null )
062          return;
063    
064        if( conf.get( "mapred.task.id" ) == null ) // need to stuff a fake id
065          {
066          String mapper = conf.getBoolean( "mapred.task.is.map", true ) ? "m" : "r";
067          conf.set( "mapred.task.id", String.format( "attempt_%012d_0000_%s_000000_0", (int) Math.rint( System.currentTimeMillis() ), mapper ) );
068          }
069    
070        makeTempPath( conf );
071    
072        if( writeDirectlyToWorkingPath( conf, outputPath ) )
073          {
074          LOG.info( "writing directly to output path: {}", outputPath );
075          setWorkOutputPath( conf, outputPath );
076          return;
077          }
078    
079        // "mapred.work.output.dir"
080        Path taskOutputPath = getTaskOutputPath( conf );
081        setWorkOutputPath( conf, taskOutputPath );
082        }
083    
084      public static synchronized void setupTask( JobConf conf ) throws IOException
085        {
086        String workpath = conf.get( "mapred.work.output.dir" );
087    
088        if( workpath == null )
089          return;
090    
091        FileSystem fs = getFSSafe( conf, new Path( workpath ) );
092    
093        if( fs == null )
094          return;
095    
096        String taskId = conf.get( "mapred.task.id" );
097    
098        LOG.info( "setting up task: '{}' - {}", taskId, workpath );
099    
100        AtomicInteger integer = pathCounts.get( workpath );
101    
102        if( integer == null )
103          {
104          integer = new AtomicInteger();
105          pathCounts.put( workpath, integer );
106          }
107    
108        integer.incrementAndGet();
109        }
110    
111      public static boolean needsTaskCommit( JobConf conf ) throws IOException
112        {
113        String workpath = conf.get( "mapred.work.output.dir" );
114    
115        if( workpath == null )
116          return false;
117    
118        Path taskOutputPath = new Path( workpath );
119    
120        if( taskOutputPath != null )
121          {
122          FileSystem fs = getFSSafe( conf, taskOutputPath );
123    
124          if( fs == null )
125            return false;
126    
127          if( fs.exists( taskOutputPath ) )
128            return true;
129          }
130    
131        return false;
132        }
133    
134      /**
135       * copies all files from the taskoutputpath to the outputpath
136       *
137       * @param conf
138       */
139      public static void commitTask( JobConf conf ) throws IOException
140        {
141        Path taskOutputPath = new Path( conf.get( "mapred.work.output.dir" ) );
142    
143        FileSystem fs = getFSSafe( conf, taskOutputPath );
144    
145        if( fs == null )
146          return;
147    
148        AtomicInteger integer = pathCounts.get( taskOutputPath.toString() );
149    
150        if( integer.decrementAndGet() != 0 )
151          return;
152    
153        String taskId = conf.get( "mapred.task.id" );
154    
155        LOG.info( "committing task: '{}' - {}", taskId, taskOutputPath );
156    
157        if( taskOutputPath != null )
158          {
159          if( writeDirectlyToWorkingPath( conf, taskOutputPath ) )
160            return;
161    
162          if( fs.exists( taskOutputPath ) )
163            {
164            Path jobOutputPath = taskOutputPath.getParent().getParent();
165            // Move the task outputs to their final place
166            moveTaskOutputs( conf, fs, jobOutputPath, taskOutputPath );
167    
168            // Delete the temporary task-specific output directory
169            if( !fs.delete( taskOutputPath, true ) )
170              LOG.info( "failed to delete the temporary output directory of task: '{}' - {}", taskId, taskOutputPath );
171    
172            LOG.info( "saved output of task '{}' to {}", taskId, jobOutputPath );
173            }
174          }
175        }
176    
177      /**
178       * Called from flow step to remove temp dirs
179       *
180       * @param conf
181       * @throws IOException
182       */
183      public static void cleanupTapMetaData( JobConf conf, Tap tap ) throws IOException
184        {
185        cleanTempPath( conf, new Path( tap.getIdentifier() ) );
186        }
187    
188      /**
189       * May only be called once. should only be called if not in a flow
190       *
191       * @param conf
192       */
193      public static void cleanupJob( JobConf conf ) throws IOException
194        {
195        if( isInflow( conf ) )
196          return;
197    
198        Path outputPath = FileOutputFormat.getOutputPath( conf );
199    
200        cleanTempPath( conf, outputPath );
201        }
202    
203      private static synchronized void cleanTempPath( JobConf conf, Path outputPath ) throws IOException
204        {
205        // do the clean up of temporary directory
206    
207        if( outputPath != null )
208          {
209          FileSystem fileSys = getFSSafe( conf, outputPath );
210    
211          if( fileSys == null )
212            return;
213    
214          if( !fileSys.exists( outputPath ) )
215            return;
216    
217          Path tmpDir = new Path( outputPath, TEMPORARY_PATH );
218    
219          LOG.info( "deleting temp path {}", tmpDir );
220    
221          if( fileSys.exists( tmpDir ) )
222            fileSys.delete( tmpDir, true );
223          }
224        }
225    
226      private static FileSystem getFSSafe( JobConf conf, Path tmpDir )
227        {
228        try
229          {
230          return tmpDir.getFileSystem( conf );
231          }
232        catch( IOException e )
233          {
234          // ignore
235          }
236    
237        return null;
238        }
239    
240      static boolean isInflow( JobConf conf )
241        {
242        return conf.get( "cascading.flow.step" ) != null || conf.get( "cascading.flow.step.path" ) != null;
243        }
244    
245      private static Path getTaskOutputPath( JobConf conf )
246        {
247        String taskId = conf.get( "mapred.task.id" );
248    
249        Path p = new Path( FileOutputFormat.getOutputPath( conf ), TEMPORARY_PATH + Path.SEPARATOR + "_" + taskId );
250    
251        try
252          {
253          FileSystem fs = p.getFileSystem( conf );
254          return p.makeQualified( fs );
255          }
256        catch( IOException ie )
257          {
258          return p;
259          }
260        }
261    
262      static void setWorkOutputPath( JobConf conf, Path outputDir )
263        {
264        outputDir = new Path( conf.getWorkingDirectory(), outputDir );
265        conf.set( "mapred.work.output.dir", outputDir.toString() );
266        }
267    
268      public static void makeTempPath( JobConf conf ) throws IOException
269        {
270        // create job specific temporary directory in output path
271        Path outputPath = FileOutputFormat.getOutputPath( conf );
272    
273        if( outputPath != null )
274          {
275          Path tmpDir = new Path( outputPath, TEMPORARY_PATH );
276          FileSystem fileSys = tmpDir.getFileSystem( conf );
277    
278          if( !fileSys.exists( tmpDir ) && !fileSys.mkdirs( tmpDir ) )
279            LOG.error( "mkdirs failed to create {}", tmpDir );
280          }
281        }
282    
283      private static void moveTaskOutputs( JobConf conf, FileSystem fs, Path jobOutputDir, Path taskOutput ) throws IOException
284        {
285        String taskId = conf.get( "mapred.task.id" );
286    
287        if( fs.isFile( taskOutput ) )
288          {
289          Path finalOutputPath = getFinalPath( jobOutputDir, taskOutput, getTaskOutputPath( conf ) );
290          if( !fs.rename( taskOutput, finalOutputPath ) )
291            {
292            if( !fs.delete( finalOutputPath, true ) )
293              {
294              throw new IOException( "Failed to delete earlier output of task: " + taskId );
295              }
296            if( !fs.rename( taskOutput, finalOutputPath ) )
297              {
298              throw new IOException( "Failed to save output of task: " + taskId );
299              }
300            }
301    
302          LOG.debug( "Moved {} to {}", taskOutput, finalOutputPath );
303          }
304        else if( fs.getFileStatus( taskOutput ).isDir() )
305          {
306          FileStatus[] paths = fs.listStatus( taskOutput );
307          Path finalOutputPath = getFinalPath( jobOutputDir, taskOutput, getTaskOutputPath( conf ) );
308          fs.mkdirs( finalOutputPath );
309          if( paths != null )
310            {
311            for( FileStatus path : paths )
312              moveTaskOutputs( conf, fs, jobOutputDir, path.getPath() );
313            }
314          }
315        }
316    
317      private static Path getFinalPath( Path jobOutputDir, Path taskOutput, Path taskOutputPath ) throws IOException
318        {
319        URI taskOutputUri = taskOutput.toUri();
320        URI relativePath = taskOutputPath.toUri().relativize( taskOutputUri );
321        if( taskOutputUri == relativePath )
322          {//taskOutputPath is not a parent of taskOutput
323          throw new IOException( "Can not get the relative path: base = " + taskOutputPath + " child = " + taskOutput );
324          }
325        if( relativePath.getPath().length() > 0 )
326          {
327          return new Path( jobOutputDir, relativePath.getPath() );
328          }
329        else
330          {
331          return jobOutputDir;
332          }
333        }
334    
335    
336      /** used in AWS EMR to disable temp paths on some file systems, s3. */
337      private static boolean writeDirectlyToWorkingPath( JobConf conf, Path path )
338        {
339        FileSystem fs = getFSSafe( conf, path );
340    
341        if( fs == null )
342          return false;
343    
344        boolean result = conf.getBoolean( "mapred.output.direct." + fs.getClass().getSimpleName(), false );
345    
346        if( result )
347          LOG.info( "output direct is enabled for this fs: " + fs.getName() );
348    
349        return result;
350        }
351    
352      }