001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap.hadoop.util; 022 023import java.io.IOException; 024import java.net.URI; 025import java.util.HashMap; 026import java.util.Map; 027import java.util.concurrent.atomic.AtomicInteger; 028 029import cascading.flow.hadoop.util.HadoopUtil; 030import cascading.tap.Tap; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.mapred.FileOutputFormat; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import static cascading.flow.hadoop.util.HadoopUtil.asJobConfInstance; 040 041public class Hadoop18TapUtil 042 { 043 /** Field LOG */ 044 private static final Logger LOG = LoggerFactory.getLogger( Hadoop18TapUtil.class ); 045 046 /** The Hadoop temporary path used to prevent collisions */ 047 public static final String TEMPORARY_PATH = "_temporary"; 048 049 private static final Map<String, AtomicInteger> pathCounts = new HashMap<String, AtomicInteger>(); 050 051 /** 052 * should only be called if not in a Flow 053 * 054 * @param conf 055 * @throws IOException 056 */ 057 public static void setupJob( Configuration conf ) throws IOException 058 { 059 Path outputPath = FileOutputFormat.getOutputPath( asJobConfInstance( conf ) ); 060 061 if( outputPath == null ) 062 return; 063 064 if( getFSSafe( conf, outputPath ) == null ) 065 return; 066 067 String taskID = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) ); 068 069 if( taskID == null ) // need to stuff a fake id 070 { 071 String mapper = conf.getBoolean( "mapred.task.is.map", conf.getBoolean( "mapreduce.task.is.map", true ) ) ? "m" : "r"; 072 String value = String.format( "attempt_%012d_0000_%s_000000_0", (int) Math.rint( System.currentTimeMillis() ), mapper ); 073 conf.set( "mapred.task.id", value ); 074 conf.set( "mapreduce.task.id", value ); 075 } 076 077 makeTempPath( conf ); 078 079 if( writeDirectlyToWorkingPath( conf, outputPath ) ) 080 { 081 LOG.info( "writing directly to output path: {}", outputPath ); 082 setWorkOutputPath( conf, outputPath ); 083 return; 084 } 085 086 // "mapred.work.output.dir" 087 Path taskOutputPath = getTaskOutputPath( conf ); 088 setWorkOutputPath( conf, taskOutputPath ); 089 } 090 091 public static synchronized void setupTask( Configuration conf ) throws IOException 092 { 093 String workpath = conf.get( "mapred.work.output.dir" ); 094 095 if( workpath == null ) 096 return; 097 098 FileSystem fs = getFSSafe( conf, new Path( workpath ) ); 099 100 if( fs == null ) 101 return; 102 103 String taskId = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) ); 104 105 LOG.info( "setting up task: '{}' - {}", taskId, workpath ); 106 107 AtomicInteger integer = pathCounts.get( workpath ); 108 109 if( integer == null ) 110 { 111 integer = new AtomicInteger(); 112 pathCounts.put( workpath, integer ); 113 } 114 115 integer.incrementAndGet(); 116 } 117 118 public static boolean needsTaskCommit( Configuration conf ) throws IOException 119 { 120 String workpath = conf.get( "mapred.work.output.dir" ); 121 122 if( workpath == null ) 123 return false; 124 125 Path taskOutputPath = new Path( workpath ); 126 127 if( taskOutputPath != null ) 128 { 129 FileSystem fs = getFSSafe( conf, taskOutputPath ); 130 131 if( fs == null ) 132 return false; 133 134 if( fs.exists( taskOutputPath ) ) 135 return true; 136 } 137 138 return false; 139 } 140 141 /** 142 * copies all files from the taskoutputpath to the outputpath 143 * 144 * @param conf 145 */ 146 public static void commitTask( Configuration conf ) throws IOException 147 { 148 Path taskOutputPath = new Path( conf.get( "mapred.work.output.dir" ) ); 149 150 FileSystem fs = getFSSafe( conf, taskOutputPath ); 151 152 if( fs == null ) 153 return; 154 155 AtomicInteger integer = pathCounts.get( taskOutputPath.toString() ); 156 157 if( integer.decrementAndGet() != 0 ) 158 return; 159 160 String taskId = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) ); 161 162 LOG.info( "committing task: '{}' - {}", taskId, taskOutputPath ); 163 164 if( taskOutputPath != null ) 165 { 166 if( writeDirectlyToWorkingPath( conf, taskOutputPath ) ) 167 return; 168 169 if( fs.exists( taskOutputPath ) ) 170 { 171 Path jobOutputPath = taskOutputPath.getParent().getParent(); 172 // Move the task outputs to their final place 173 moveTaskOutputs( conf, fs, jobOutputPath, taskOutputPath ); 174 175 // Delete the temporary task-specific output directory 176 if( !fs.delete( taskOutputPath, true ) ) 177 LOG.info( "failed to delete the temporary output directory of task: '{}' - {}", taskId, taskOutputPath ); 178 179 LOG.info( "saved output of task '{}' to {}", taskId, jobOutputPath ); 180 } 181 } 182 } 183 184 /** 185 * Called from flow step to remove temp dirs 186 * 187 * @param conf 188 * @throws IOException 189 */ 190 public static void cleanupTapMetaData( Configuration conf, Tap tap ) throws IOException 191 { 192 cleanTempPath( conf, new Path( tap.getIdentifier() ) ); 193 } 194 195 /** 196 * May only be called once. should only be called if not in a flow 197 * 198 * @param conf 199 */ 200 public static void cleanupJob( Configuration conf ) throws IOException 201 { 202 if( HadoopUtil.isInflow( conf ) ) 203 return; 204 205 Path outputPath = FileOutputFormat.getOutputPath( asJobConfInstance( conf ) ); 206 207 cleanTempPath( conf, outputPath ); 208 } 209 210 private static synchronized void cleanTempPath( Configuration conf, Path outputPath ) throws IOException 211 { 212 // do the clean up of temporary directory 213 214 if( outputPath != null ) 215 { 216 FileSystem fileSys = getFSSafe( conf, outputPath ); 217 218 if( fileSys == null ) 219 return; 220 221 if( !fileSys.exists( outputPath ) ) 222 return; 223 224 Path tmpDir = new Path( outputPath, TEMPORARY_PATH ); 225 226 LOG.info( "deleting temp path {}", tmpDir ); 227 228 if( fileSys.exists( tmpDir ) ) 229 fileSys.delete( tmpDir, true ); 230 } 231 } 232 233 private static FileSystem getFSSafe( Configuration conf, Path tmpDir ) 234 { 235 try 236 { 237 return tmpDir.getFileSystem( conf ); 238 } 239 catch( IOException e ) 240 { 241 // ignore 242 } 243 244 return null; 245 } 246 247 private static Path getTaskOutputPath( Configuration conf ) 248 { 249 String taskId = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) ); 250 251 Path p = new Path( FileOutputFormat.getOutputPath( asJobConfInstance( conf ) ), TEMPORARY_PATH + Path.SEPARATOR + "_" + taskId ); 252 253 try 254 { 255 FileSystem fs = p.getFileSystem( conf ); 256 return p.makeQualified( fs ); 257 } 258 catch( IOException ie ) 259 { 260 return p; 261 } 262 } 263 264 static void setWorkOutputPath( Configuration conf, Path outputDir ) 265 { 266 outputDir = new Path( asJobConfInstance( conf ).getWorkingDirectory(), outputDir ); 267 conf.set( "mapred.work.output.dir", outputDir.toString() ); 268 } 269 270 public static void makeTempPath( Configuration conf ) throws IOException 271 { 272 // create job specific temporary directory in output path 273 Path outputPath = FileOutputFormat.getOutputPath( asJobConfInstance( conf ) ); 274 275 if( outputPath != null ) 276 { 277 Path tmpDir = new Path( outputPath, TEMPORARY_PATH ); 278 FileSystem fileSys = tmpDir.getFileSystem( conf ); 279 280 if( !fileSys.exists( tmpDir ) && !fileSys.mkdirs( tmpDir ) ) 281 LOG.error( "mkdirs failed to create {}", tmpDir ); 282 } 283 } 284 285 private static void moveTaskOutputs( Configuration conf, FileSystem fs, Path jobOutputDir, Path taskOutput ) throws IOException 286 { 287 String taskId = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) ); 288 289 if( fs.isFile( taskOutput ) ) 290 { 291 Path finalOutputPath = getFinalPath( jobOutputDir, taskOutput, getTaskOutputPath( conf ) ); 292 if( !fs.rename( taskOutput, finalOutputPath ) ) 293 { 294 if( !fs.delete( finalOutputPath, true ) ) 295 throw new IOException( "Failed to delete earlier output of task: " + taskId ); 296 297 if( !fs.rename( taskOutput, finalOutputPath ) ) 298 throw new IOException( "Failed to save output of task: " + taskId ); 299 } 300 301 LOG.debug( "Moved {} to {}", taskOutput, finalOutputPath ); 302 } 303 else if( fs.getFileStatus( taskOutput ).isDir() ) 304 { 305 FileStatus[] paths = fs.listStatus( taskOutput ); 306 Path finalOutputPath = getFinalPath( jobOutputDir, taskOutput, getTaskOutputPath( conf ) ); 307 fs.mkdirs( finalOutputPath ); 308 if( paths != null ) 309 { 310 for( FileStatus path : paths ) 311 moveTaskOutputs( conf, fs, jobOutputDir, path.getPath() ); 312 } 313 } 314 } 315 316 private static Path getFinalPath( Path jobOutputDir, Path taskOutput, Path taskOutputPath ) throws IOException 317 { 318 URI taskOutputUri = taskOutput.toUri(); 319 URI relativePath = taskOutputPath.toUri().relativize( taskOutputUri ); 320 if( taskOutputUri == relativePath ) 321 {//taskOutputPath is not a parent of taskOutput 322 throw new IOException( "Can not get the relative path: base = " + taskOutputPath + " child = " + taskOutput ); 323 } 324 if( relativePath.getPath().length() > 0 ) 325 { 326 return new Path( jobOutputDir, relativePath.getPath() ); 327 } 328 else 329 { 330 return jobOutputDir; 331 } 332 } 333 334 /** used in AWS EMR to disable temp paths on some file systems, s3. */ 335 private static boolean writeDirectlyToWorkingPath( Configuration conf, Path path ) 336 { 337 FileSystem fs = getFSSafe( conf, path ); 338 339 if( fs == null ) 340 return false; 341 342 boolean result = conf.getBoolean( "mapred.output.direct." + fs.getClass().getSimpleName(), false ); 343 344 if( result ) 345 LOG.info( "output direct is enabled for this fs: " + fs.getName() ); 346 347 return result; 348 } 349 }