001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap.hadoop; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.util.HashSet; 028import java.util.LinkedHashSet; 029import java.util.Set; 030 031import cascading.flow.FlowProcess; 032import cascading.flow.hadoop.util.HadoopUtil; 033import cascading.scheme.Scheme; 034import cascading.tap.SinkMode; 035import cascading.tap.Tap; 036import cascading.tap.TapException; 037import cascading.tap.hadoop.io.CombineFileRecordReaderWrapper; 038import cascading.tap.hadoop.io.HadoopTupleEntrySchemeCollector; 039import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; 040import cascading.tap.type.FileType; 041import cascading.tuple.TupleEntryCollector; 042import cascading.tuple.TupleEntryIterator; 043import cascading.tuple.hadoop.TupleSerialization; 044import cascading.util.Util; 045import org.apache.hadoop.conf.Configurable; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.FileStatus; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.fs.PathFilter; 051import org.apache.hadoop.mapred.FileInputFormat; 052import org.apache.hadoop.mapred.InputFormat; 053import org.apache.hadoop.mapred.InputSplit; 054import org.apache.hadoop.mapred.JobConf; 055import org.apache.hadoop.mapred.OutputCollector; 056import org.apache.hadoop.mapred.RecordReader; 057import org.apache.hadoop.mapred.Reporter; 058import org.apache.hadoop.mapred.lib.CombineFileInputFormat; 059import org.apache.hadoop.mapred.lib.CombineFileRecordReader; 060import org.apache.hadoop.mapred.lib.CombineFileSplit; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064/** 065 * Class Hfs is the base class for all Hadoop file system access. Hfs may only be used with the 066 * Hadoop {@link cascading.flow.FlowConnector} sub-classes when creating Hadoop executable {@link cascading.flow.Flow} 067 * instances. 068 * <p/> 069 * Paths typically should point to a directory, where in turn all the "part" files immediately in that directory will 070 * be included. This is the practice Hadoop expects. Sub-directories are not included and typically result in a failure. 071 * <p/> 072 * To include sub-directories, Hadoop supports "globing". Globing is a frustrating feature and is supported more 073 * robustly by {@link GlobHfs} and less so by Hfs. 074 * <p/> 075 * Hfs will accept {@code /*} (wildcard) paths, but not all convenience methods like 076 * {@code jobConf.getSize} will behave properly or reliably. Nor can the Hfs instance 077 * with a wildcard path be used as a sink to write data. 078 * <p/> 079 * In those cases use GlobHfs since it is a sub-class of {@link cascading.tap.MultiSourceTap}. 080 * <p/> 081 * Optionally use {@link Dfs} or {@link Lfs} for resources specific to Hadoop Distributed file system or 082 * the Local file system, respectively. Using Hfs is the best practice when possible, Lfs and Dfs are conveniences. 083 * <p/> 084 * Use the Hfs class if the 'kind' of resource is unknown at design time. To use, prefix a scheme to the 'stringPath'. Where 085 * <code>hdfs://...</code> will denote Dfs, and <code>file://...</code> will denote Lfs. 086 * <p/> 087 * Call {@link HfsProps#setTemporaryDirectory(java.util.Map, String)} to use a different temporary file directory path 088 * other than the current Hadoop default path. 089 * <p/> 090 * By default Cascading on Hadoop will assume any source or sink Tap using the {@code file://} URI scheme 091 * intends to read files from the local client filesystem (for example when using the {@code Lfs} Tap) where the Hadoop 092 * job jar is started. Subsequently Cascading will force any MapReduce jobs reading or writing to {@code file://} resources 093 * to run in Hadoop "standalone mode" so that the file can be read. 094 * <p/> 095 * To change this behavior, {@link HfsProps#setLocalModeScheme(java.util.Map, String)} to set a different scheme value, 096 * or to "none" to disable entirely for the case the file to be read is available on every Hadoop processing node 097 * in the exact same path. 098 * <p/> 099 * When using a MapReduce planner, Hfs can optionally combine multiple small files (or a series of small "blocks") into 100 * larger "splits". This reduces the number of resulting map tasks created by Hadoop and can improve application 101 * performance. 102 * <p/> 103 * This is enabled by calling {@link HfsProps#setUseCombinedInput(boolean)} to {@code true}. By default, merging 104 * or combining splits into large ones is disabled. 105 * <p/> 106 * Apache Tez planner does not require this setting, it is supported by default and enabled by the application manager. 107 */ 108public class Hfs extends Tap<Configuration, RecordReader, OutputCollector> implements FileType<Configuration> 109 { 110 /** Field LOG */ 111 private static final Logger LOG = LoggerFactory.getLogger( Hfs.class ); 112 113 /** Field stringPath */ 114 protected String stringPath; 115 /** Field uriScheme */ 116 transient URI uriScheme; 117 /** Field path */ 118 transient Path path; 119 /** Field paths */ 120 private transient FileStatus[] statuses; // only used by getModifiedTime 121 122 private transient String cachedPath = null; 123 124 private static final PathFilter HIDDEN_FILES_FILTER = new PathFilter() 125 { 126 public boolean accept( Path path ) 127 { 128 String name = path.getName(); 129 130 if( name.isEmpty() ) // should never happen 131 return true; 132 133 char first = name.charAt( 0 ); 134 135 return first != '_' && first != '.'; 136 } 137 }; 138 139 protected static String getLocalModeScheme( Configuration conf, String defaultValue ) 140 { 141 return conf.get( HfsProps.LOCAL_MODE_SCHEME, defaultValue ); 142 } 143 144 protected static boolean getUseCombinedInput( Configuration conf ) 145 { 146 String platform = conf.get( "cascading.flow.platform", "" ); 147 boolean combineEnabled = conf.getBoolean( "cascading.hadoop.hfs.combine.files", false ); 148 149 // only supported by these platforms 150 if( platform.equals( "hadoop" ) || platform.equals( "hadoop2-mr1" ) ) 151 return combineEnabled; 152 153 if( combineEnabled && !Boolean.getBoolean( "cascading.hadoop.hfs.combine.files.warned" ) ) 154 { 155 LOG.warn( "'cascading.hadoop.hfs.combine.files' has been set to true, but is unsupported by this platform: {}, will be ignored to prevent failures", platform ); 156 System.setProperty( "cascading.hadoop.hfs.combine.files.warned", "true" ); 157 } 158 159 return false; 160 } 161 162 protected static boolean getCombinedInputSafeMode( Configuration conf ) 163 { 164 return conf.getBoolean( "cascading.hadoop.hfs.combine.safemode", true ); 165 } 166 167 protected Hfs() 168 { 169 } 170 171 @ConstructorProperties({"scheme"}) 172 protected Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme ) 173 { 174 super( scheme ); 175 } 176 177 /** 178 * Constructor Hfs creates a new Hfs instance. 179 * 180 * @param scheme of type Scheme 181 * @param stringPath of type String 182 */ 183 @ConstructorProperties({"scheme", "stringPath"}) 184 public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath ) 185 { 186 super( scheme ); 187 setStringPath( stringPath ); 188 } 189 190 /** 191 * Constructor Hfs creates a new Hfs instance. 192 * 193 * @param scheme of type Scheme 194 * @param stringPath of type String 195 * @param sinkMode of type SinkMode 196 */ 197 @ConstructorProperties({"scheme", "stringPath", "sinkMode"}) 198 public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath, SinkMode sinkMode ) 199 { 200 super( scheme, sinkMode ); 201 setStringPath( stringPath ); 202 } 203 204 protected void setStringPath( String stringPath ) 205 { 206 this.stringPath = Util.normalizeUrl( stringPath ); 207 } 208 209 protected void setUriScheme( URI uriScheme ) 210 { 211 this.uriScheme = uriScheme; 212 } 213 214 public URI getURIScheme( Configuration jobConf ) 215 { 216 if( uriScheme != null ) 217 return uriScheme; 218 219 uriScheme = makeURIScheme( jobConf ); 220 221 return uriScheme; 222 } 223 224 protected URI makeURIScheme( Configuration configuration ) 225 { 226 try 227 { 228 URI uriScheme; 229 230 LOG.debug( "handling path: {}", stringPath ); 231 232 URI uri = new Path( stringPath ).toUri(); // safer URI parsing 233 String schemeString = uri.getScheme(); 234 String authority = uri.getAuthority(); 235 236 LOG.debug( "found scheme: {}, authority: {}", schemeString, authority ); 237 238 if( schemeString != null && authority != null ) 239 uriScheme = new URI( schemeString + "://" + uri.getAuthority() ); 240 else if( schemeString != null ) 241 uriScheme = new URI( schemeString + ":///" ); 242 else 243 uriScheme = getDefaultFileSystemURIScheme( configuration ); 244 245 LOG.debug( "using uri scheme: {}", uriScheme ); 246 247 return uriScheme; 248 } 249 catch( URISyntaxException exception ) 250 { 251 throw new TapException( "could not determine scheme from path: " + getPath(), exception ); 252 } 253 } 254 255 /** 256 * Method getDefaultFileSystemURIScheme returns the URI scheme for the default Hadoop FileSystem. 257 * 258 * @param configuration of type JobConf 259 * @return URI 260 */ 261 public URI getDefaultFileSystemURIScheme( Configuration configuration ) 262 { 263 return getDefaultFileSystem( configuration ).getUri(); 264 } 265 266 protected FileSystem getDefaultFileSystem( Configuration configuration ) 267 { 268 try 269 { 270 return FileSystem.get( configuration ); 271 } 272 catch( IOException exception ) 273 { 274 throw new TapException( "unable to get handle to underlying filesystem", exception ); 275 } 276 } 277 278 protected FileSystem getFileSystem( Configuration configuration ) 279 { 280 URI scheme = getURIScheme( configuration ); 281 282 try 283 { 284 return FileSystem.get( scheme, configuration ); 285 } 286 catch( IOException exception ) 287 { 288 throw new TapException( "unable to get handle to get filesystem for: " + scheme.getScheme(), exception ); 289 } 290 } 291 292 @Override 293 public String getIdentifier() 294 { 295 if( cachedPath == null ) 296 cachedPath = getPath().toString(); 297 298 return cachedPath; 299 } 300 301 public Path getPath() 302 { 303 if( path != null ) 304 return path; 305 306 if( stringPath == null ) 307 throw new IllegalStateException( "path not initialized" ); 308 309 path = new Path( stringPath ); 310 311 return path; 312 } 313 314 @Override 315 public String getFullIdentifier( Configuration conf ) 316 { 317 return getPath().makeQualified( getFileSystem( conf ) ).toString(); 318 } 319 320 @Override 321 public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf ) 322 { 323 String fullIdentifier = getFullIdentifier( conf ); 324 325 applySourceConfInitIdentifiers( process, conf, fullIdentifier ); 326 327 verifyNoDuplicates( conf ); 328 } 329 330 protected static void verifyNoDuplicates( Configuration conf ) 331 { 332 Path[] inputPaths = FileInputFormat.getInputPaths( HadoopUtil.asJobConfInstance( conf ) ); 333 Set<Path> paths = new HashSet<Path>( (int) ( inputPaths.length / .75f ) ); 334 335 for( Path inputPath : inputPaths ) 336 { 337 if( !paths.add( inputPath ) ) 338 throw new TapException( "may not add duplicate paths, found: " + inputPath ); 339 } 340 } 341 342 protected void applySourceConfInitIdentifiers( FlowProcess<? extends Configuration> process, Configuration conf, String... fullIdentifiers ) 343 { 344 for( String fullIdentifier : fullIdentifiers ) 345 sourceConfInitAddInputPath( conf, new Path( fullIdentifier ) ); 346 347 sourceConfInitComplete( process, conf ); 348 } 349 350 protected void sourceConfInitAddInputPath( Configuration conf, Path qualifiedPath ) 351 { 352 HadoopUtil.addInputPath( conf, qualifiedPath ); 353 354 makeLocal( conf, qualifiedPath, "forcing job to local mode, via source: " ); 355 } 356 357 protected void sourceConfInitComplete( FlowProcess<? extends Configuration> process, Configuration conf ) 358 { 359 super.sourceConfInit( process, conf ); 360 361 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow 362 363 // use CombineFileInputFormat if that is enabled 364 handleCombineFileInputFormat( conf ); 365 } 366 367 /** 368 * Based on the configuration, handles and sets {@link CombineFileInputFormat} as the input 369 * format. 370 */ 371 private void handleCombineFileInputFormat( Configuration conf ) 372 { 373 // if combining files, override the configuration to use CombineFileInputFormat 374 if( !getUseCombinedInput( conf ) ) 375 return; 376 377 // get the prescribed individual input format from the underlying scheme so it can be used by CombinedInputFormat 378 String individualInputFormat = conf.get( "mapred.input.format.class" ); 379 380 if( individualInputFormat == null ) 381 throw new TapException( "input format is missing from the underlying scheme" ); 382 383 if( individualInputFormat.equals( CombinedInputFormat.class.getName() ) && 384 conf.get( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT ) == null ) 385 throw new TapException( "the input format class is already the combined input format but the underlying input format is missing" ); 386 387 // if safe mode is on (default) throw an exception if the InputFormat is not a FileInputFormat, otherwise log a 388 // warning and don't use the CombineFileInputFormat 389 boolean safeMode = getCombinedInputSafeMode( conf ); 390 391 if( !FileInputFormat.class.isAssignableFrom( conf.getClass( "mapred.input.format.class", null ) ) ) 392 { 393 if( safeMode ) 394 throw new TapException( "input format must be of type org.apache.hadoop.mapred.FileInputFormat, got: " + individualInputFormat ); 395 else 396 LOG.warn( "not combining input splits with CombineFileInputFormat, {} is not of type org.apache.hadoop.mapred.FileInputFormat.", individualInputFormat ); 397 } 398 else 399 { 400 // set the underlying individual input format 401 conf.set( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT, individualInputFormat ); 402 403 // override the input format class 404 conf.setClass( "mapred.input.format.class", CombinedInputFormat.class, InputFormat.class ); 405 } 406 } 407 408 @Override 409 public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf ) 410 { 411 Path qualifiedPath = new Path( getFullIdentifier( conf ) ); 412 413 HadoopUtil.setOutputPath( conf, qualifiedPath ); 414 super.sinkConfInit( process, conf ); 415 416 makeLocal( conf, qualifiedPath, "forcing job to local mode, via sink: " ); 417 418 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow 419 } 420 421 private void makeLocal( Configuration conf, Path qualifiedPath, String infoMessage ) 422 { 423 String scheme = getLocalModeScheme( conf, "file" ); 424 425 if( !HadoopUtil.isLocal( conf ) && qualifiedPath.toUri().getScheme().equalsIgnoreCase( scheme ) ) 426 { 427 if( LOG.isInfoEnabled() ) 428 LOG.info( infoMessage + toString() ); 429 430 HadoopUtil.setLocal( conf ); // force job to run locally 431 } 432 } 433 434 @Override 435 public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException 436 { 437 // input may be null when this method is called on the client side or cluster side when accumulating 438 // for a HashJoin 439 return new HadoopTupleEntrySchemeIterator( flowProcess, this, input ); 440 } 441 442 @Override 443 public TupleEntryCollector openForWrite( FlowProcess<? extends Configuration> flowProcess, OutputCollector output ) throws IOException 444 { 445 // output may be null when this method is called on the client side or cluster side when creating 446 // side files with the PartitionTap 447 return new HadoopTupleEntrySchemeCollector( flowProcess, this, output ); 448 } 449 450 @Override 451 public boolean createResource( Configuration conf ) throws IOException 452 { 453 if( LOG.isDebugEnabled() ) 454 LOG.debug( "making dirs: {}", getFullIdentifier( conf ) ); 455 456 return getFileSystem( conf ).mkdirs( getPath() ); 457 } 458 459 @Override 460 public boolean deleteResource( Configuration conf ) throws IOException 461 { 462 String fullIdentifier = getFullIdentifier( conf ); 463 464 return deleteFullIdentifier( conf, fullIdentifier ); 465 } 466 467 private boolean deleteFullIdentifier( Configuration conf, String fullIdentifier ) throws IOException 468 { 469 if( LOG.isDebugEnabled() ) 470 LOG.debug( "deleting: {}", fullIdentifier ); 471 472 Path fullPath = new Path( fullIdentifier ); 473 474 // do not delete the root directory 475 if( fullPath.depth() == 0 ) 476 return true; 477 478 FileSystem fileSystem = getFileSystem( conf ); 479 480 try 481 { 482 return fileSystem.delete( fullPath, true ); 483 } 484 catch( NullPointerException exception ) 485 { 486 // hack to get around npe thrown when fs reaches root directory 487 // removes coupling to the new aws hadoop artifacts that may not be deployed 488 if( !( fileSystem.getClass().getSimpleName().equals( "NativeS3FileSystem" ) ) ) 489 throw exception; 490 } 491 492 return true; 493 } 494 495 public boolean deleteChildResource( FlowProcess<? extends Configuration> flowProcess, String childIdentifier ) throws IOException 496 { 497 return deleteChildResource( flowProcess.getConfig(), childIdentifier ); 498 } 499 500 public boolean deleteChildResource( Configuration conf, String childIdentifier ) throws IOException 501 { 502 Path childPath = new Path( childIdentifier ).makeQualified( getFileSystem( conf ) ); 503 504 if( !childPath.toString().startsWith( getFullIdentifier( conf ) ) ) 505 return false; 506 507 return deleteFullIdentifier( conf, childPath.toString() ); 508 } 509 510 @Override 511 public boolean resourceExists( Configuration conf ) throws IOException 512 { 513 // unfortunately getFileSystem( conf ).exists( getPath() ); does not account for "/*" etc 514 // nor is there an more efficient means to test for existence 515 FileStatus[] fileStatuses = getFileSystem( conf ).globStatus( getPath() ); 516 517 return fileStatuses != null && fileStatuses.length > 0; 518 } 519 520 @Override 521 public boolean isDirectory( FlowProcess<? extends Configuration> flowProcess ) throws IOException 522 { 523 return isDirectory( flowProcess.getConfig() ); 524 } 525 526 @Override 527 public boolean isDirectory( Configuration conf ) throws IOException 528 { 529 if( !resourceExists( conf ) ) 530 return false; 531 532 return getFileSystem( conf ).getFileStatus( getPath() ).isDir(); 533 } 534 535 @Override 536 public long getSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException 537 { 538 return getSize( flowProcess.getConfig() ); 539 } 540 541 @Override 542 public long getSize( Configuration conf ) throws IOException 543 { 544 if( !resourceExists( conf ) ) 545 return 0; 546 547 FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() ); 548 549 if( fileStatus.isDir() ) 550 return 0; 551 552 return getFileSystem( conf ).getFileStatus( getPath() ).getLen(); 553 } 554 555 /** 556 * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource. 557 * 558 * @param flowProcess 559 * @return long 560 * @throws IOException when 561 */ 562 public long getBlockSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException 563 { 564 return getBlockSize( flowProcess.getConfig() ); 565 } 566 567 /** 568 * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource. 569 * 570 * @param conf of JobConf 571 * @return long 572 * @throws IOException when 573 */ 574 public long getBlockSize( Configuration conf ) throws IOException 575 { 576 if( !resourceExists( conf ) ) 577 return 0; 578 579 FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() ); 580 581 if( fileStatus.isDir() ) 582 return 0; 583 584 return fileStatus.getBlockSize(); 585 } 586 587 /** 588 * Method getReplication returns the {@code replication} specified by the underlying file system for 589 * this resource. 590 * 591 * @param flowProcess 592 * @return int 593 * @throws IOException when 594 */ 595 public int getReplication( FlowProcess<? extends Configuration> flowProcess ) throws IOException 596 { 597 return getReplication( flowProcess.getConfig() ); 598 } 599 600 /** 601 * Method getReplication returns the {@code replication} specified by the underlying file system for 602 * this resource. 603 * 604 * @param conf of JobConf 605 * @return int 606 * @throws IOException when 607 */ 608 public int getReplication( Configuration conf ) throws IOException 609 { 610 if( !resourceExists( conf ) ) 611 return 0; 612 613 FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() ); 614 615 if( fileStatus.isDir() ) 616 return 0; 617 618 return fileStatus.getReplication(); 619 } 620 621 @Override 622 public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess ) throws IOException 623 { 624 return getChildIdentifiers( flowProcess.getConfig(), 1, false ); 625 } 626 627 @Override 628 public String[] getChildIdentifiers( Configuration conf ) throws IOException 629 { 630 return getChildIdentifiers( conf, 1, false ); 631 } 632 633 @Override 634 public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess, int depth, boolean fullyQualified ) throws IOException 635 { 636 return getChildIdentifiers( flowProcess.getConfig(), depth, fullyQualified ); 637 } 638 639 @Override 640 public String[] getChildIdentifiers( Configuration conf, int depth, boolean fullyQualified ) throws IOException 641 { 642 if( !resourceExists( conf ) ) 643 return new String[ 0 ]; 644 645 if( depth == 0 && !fullyQualified ) 646 return new String[]{getIdentifier()}; 647 648 String fullIdentifier = getFullIdentifier( conf ); 649 650 int trim = fullyQualified ? 0 : fullIdentifier.length() + 1; 651 652 Set<String> results = new LinkedHashSet<String>(); 653 654 getChildPaths( conf, results, trim, new Path( fullIdentifier ), depth ); 655 656 return results.toArray( new String[ results.size() ] ); 657 } 658 659 private void getChildPaths( Configuration conf, Set<String> results, int trim, Path path, int depth ) throws IOException 660 { 661 if( depth == 0 ) 662 { 663 String substring = path.toString().substring( trim ); 664 String identifier = getIdentifier(); 665 666 if( identifier == null || identifier.isEmpty() ) 667 results.add( new Path( substring ).toString() ); 668 else 669 results.add( new Path( identifier, substring ).toString() ); 670 671 return; 672 } 673 674 FileStatus[] statuses = getFileSystem( conf ).listStatus( path, HIDDEN_FILES_FILTER ); 675 676 if( statuses == null ) 677 return; 678 679 for( FileStatus fileStatus : statuses ) 680 getChildPaths( conf, results, trim, fileStatus.getPath(), depth - 1 ); 681 } 682 683 @Override 684 public long getModifiedTime( Configuration conf ) throws IOException 685 { 686 if( !resourceExists( conf ) ) 687 return 0; 688 689 FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() ); 690 691 if( !fileStatus.isDir() ) 692 return fileStatus.getModificationTime(); 693 694 // todo: this should ignore the _temporary path, or not cache if found in the array 695 makeStatuses( conf ); 696 697 // statuses is empty, return 0 698 if( statuses == null || statuses.length == 0 ) 699 return 0; 700 701 long date = 0; 702 703 // filter out directories as we don't recurs into sub dirs 704 for( FileStatus status : statuses ) 705 { 706 if( !status.isDir() ) 707 date = Math.max( date, status.getModificationTime() ); 708 } 709 710 return date; 711 } 712 713 public static Path getTempPath( Configuration conf ) 714 { 715 String tempDir = conf.get( HfsProps.TEMPORARY_DIRECTORY ); 716 717 if( tempDir == null ) 718 tempDir = conf.get( "hadoop.tmp.dir" ); 719 720 return new Path( tempDir ); 721 } 722 723 protected String makeTemporaryPathDirString( String name ) 724 { 725 // _ is treated as a hidden file, so wipe them out 726 name = name.replaceAll( "^[_\\W\\s]+", "" ); 727 728 if( name.isEmpty() ) 729 name = "temp-path"; 730 731 return name.replaceAll( "[\\W\\s]+", "_" ) + Util.createUniqueID(); 732 } 733 734 /** 735 * Given a file-system object, it makes an array of paths 736 * 737 * @param conf of type JobConf 738 * @throws IOException on failure 739 */ 740 private void makeStatuses( Configuration conf ) throws IOException 741 { 742 if( statuses != null ) 743 return; 744 745 statuses = getFileSystem( conf ).listStatus( getPath() ); 746 } 747 748 /** 749 * Method resetFileStatuses removes the status cache, if any. 750 */ 751 public void resetFileStatuses() 752 { 753 statuses = null; 754 } 755 756 /** Combined input format that uses the underlying individual input format to combine multiple files into a single split. */ 757 static class CombinedInputFormat extends CombineFileInputFormat implements Configurable 758 { 759 private Configuration conf; 760 761 public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter ) throws IOException 762 { 763 return new CombineFileRecordReader( job, (CombineFileSplit) split, reporter, CombineFileRecordReaderWrapper.class ); 764 } 765 766 @Override 767 public void setConf( Configuration conf ) 768 { 769 this.conf = conf; 770 771 // set the aliased property value, if zero, the super class will look up the hadoop property 772 setMaxSplitSize( conf.getLong( "cascading.hadoop.hfs.combine.max.size", 0 ) ); 773 } 774 775 @Override 776 public Configuration getConf() 777 { 778 return conf; 779 } 780 } 781 }