001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap.hadoop; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.util.HashSet; 028import java.util.LinkedHashSet; 029import java.util.Set; 030 031import cascading.flow.FlowProcess; 032import cascading.flow.FlowRuntimeProps; 033import cascading.flow.hadoop.util.HadoopUtil; 034import cascading.scheme.Scheme; 035import cascading.tap.SinkMode; 036import cascading.tap.Tap; 037import cascading.tap.TapException; 038import cascading.tap.hadoop.io.CombineFileRecordReaderWrapper; 039import cascading.tap.hadoop.io.HadoopTupleEntrySchemeCollector; 040import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; 041import cascading.tap.type.FileType; 042import cascading.tuple.TupleEntryCollector; 043import cascading.tuple.TupleEntryIterator; 044import cascading.tuple.hadoop.TupleSerialization; 045import cascading.util.Util; 046import org.apache.hadoop.conf.Configurable; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FileStatus; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.fs.PathFilter; 052import org.apache.hadoop.mapred.FileInputFormat; 053import org.apache.hadoop.mapred.InputFormat; 054import org.apache.hadoop.mapred.InputSplit; 055import org.apache.hadoop.mapred.JobConf; 056import org.apache.hadoop.mapred.OutputCollector; 057import org.apache.hadoop.mapred.RecordReader; 058import org.apache.hadoop.mapred.Reporter; 059import org.apache.hadoop.mapred.lib.CombineFileInputFormat; 060import org.apache.hadoop.mapred.lib.CombineFileRecordReader; 061import org.apache.hadoop.mapred.lib.CombineFileSplit; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * Class Hfs is the base class for all Hadoop file system access. Hfs may only be used with the 067 * Hadoop {@link cascading.flow.FlowConnector} sub-classes when creating Hadoop executable {@link cascading.flow.Flow} 068 * instances. 069 * <p/> 070 * Paths typically should point to a directory, where in turn all the "part" files immediately in that directory will 071 * be included. This is the practice Hadoop expects. Sub-directories are not included and typically result in a failure. 072 * <p/> 073 * To include sub-directories, Hadoop supports "globing". Globing is a frustrating feature and is supported more 074 * robustly by {@link GlobHfs} and less so by Hfs. 075 * <p/> 076 * Hfs will accept {@code /*} (wildcard) paths, but not all convenience methods like 077 * {@code jobConf.getSize} will behave properly or reliably. Nor can the Hfs instance 078 * with a wildcard path be used as a sink to write data. 079 * <p/> 080 * In those cases use GlobHfs since it is a sub-class of {@link cascading.tap.MultiSourceTap}. 081 * <p/> 082 * Optionally use {@link Dfs} or {@link Lfs} for resources specific to Hadoop Distributed file system or 083 * the Local file system, respectively. Using Hfs is the best practice when possible, Lfs and Dfs are conveniences. 084 * <p/> 085 * Use the Hfs class if the 'kind' of resource is unknown at design time. To use, prefix a scheme to the 'stringPath'. Where 086 * <code>hdfs://...</code> will denote Dfs, and <code>file://...</code> will denote Lfs. 087 * <p/> 088 * Call {@link HfsProps#setTemporaryDirectory(java.util.Map, String)} to use a different temporary file directory path 089 * other than the current Hadoop default path. 090 * <p/> 091 * By default Cascading on Hadoop will assume any source or sink Tap using the {@code file://} URI scheme 092 * intends to read files from the local client filesystem (for example when using the {@code Lfs} Tap) where the Hadoop 093 * job jar is started. Subsequently Cascading will force any MapReduce jobs reading or writing to {@code file://} resources 094 * to run in Hadoop "standalone mode" so that the file can be read. 095 * <p/> 096 * To change this behavior, {@link HfsProps#setLocalModeScheme(java.util.Map, String)} to set a different scheme value, 097 * or to "none" to disable entirely for the case the file to be read is available on every Hadoop processing node 098 * in the exact same path. 099 * <p/> 100 * When using a MapReduce planner, Hfs can optionally combine multiple small files (or a series of small "blocks") into 101 * larger "splits". This reduces the number of resulting map tasks created by Hadoop and can improve application 102 * performance. 103 * <p/> 104 * This is enabled by calling {@link HfsProps#setUseCombinedInput(boolean)} to {@code true}. By default, merging 105 * or combining splits into large ones is disabled. 106 * <p/> 107 * Apache Tez planner does not require this setting, it is supported by default and enabled by the application manager. 108 */ 109public class Hfs extends Tap<Configuration, RecordReader, OutputCollector> implements FileType<Configuration> 110 { 111 /** Field LOG */ 112 private static final Logger LOG = LoggerFactory.getLogger( Hfs.class ); 113 114 /** Field stringPath */ 115 protected String stringPath; 116 /** Field uriScheme */ 117 transient URI uriScheme; 118 /** Field path */ 119 transient Path path; 120 /** Field paths */ 121 private transient FileStatus[] statuses; // only used by getModifiedTime 122 123 private transient String cachedPath = null; 124 125 private static final PathFilter HIDDEN_FILES_FILTER = new PathFilter() 126 { 127 public boolean accept( Path path ) 128 { 129 String name = path.getName(); 130 131 if( name.isEmpty() ) // should never happen 132 return true; 133 134 char first = name.charAt( 0 ); 135 136 return first != '_' && first != '.'; 137 } 138 }; 139 140 protected static String getLocalModeScheme( Configuration conf, String defaultValue ) 141 { 142 return conf.get( HfsProps.LOCAL_MODE_SCHEME, defaultValue ); 143 } 144 145 protected static boolean getUseCombinedInput( Configuration conf ) 146 { 147 boolean combineEnabled = conf.getBoolean( "cascading.hadoop.hfs.combine.files", false ); 148 149 if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) == null && !combineEnabled ) 150 return false; 151 152 if( !combineEnabled ) // enable if set in FlowRuntimeProps 153 combineEnabled = conf.getBoolean( FlowRuntimeProps.COMBINE_SPLITS, false ); 154 155 String platform = conf.get( "cascading.flow.platform", "" ); 156 157 // only supported by these platforms 158 if( platform.equals( "hadoop" ) || platform.equals( "hadoop2-mr1" ) ) 159 return combineEnabled; 160 161 // we are on a platform that supports combining, just not through the combiner 162 // do not enable it here locally 163 if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) != null ) 164 return false; 165 166 if( combineEnabled && !Boolean.getBoolean( "cascading.hadoop.hfs.combine.files.warned" ) ) 167 { 168 LOG.warn( "'cascading.hadoop.hfs.combine.files' has been set to true, but is unsupported by this platform: {}, will be ignored to prevent failures", platform ); 169 System.setProperty( "cascading.hadoop.hfs.combine.files.warned", "true" ); 170 } 171 172 return false; 173 } 174 175 protected static boolean getCombinedInputSafeMode( Configuration conf ) 176 { 177 return conf.getBoolean( "cascading.hadoop.hfs.combine.safemode", true ); 178 } 179 180 protected Hfs() 181 { 182 } 183 184 @ConstructorProperties({"scheme"}) 185 protected Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme ) 186 { 187 super( scheme ); 188 } 189 190 /** 191 * Constructor Hfs creates a new Hfs instance. 192 * 193 * @param scheme of type Scheme 194 * @param stringPath of type String 195 */ 196 @ConstructorProperties({"scheme", "stringPath"}) 197 public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath ) 198 { 199 super( scheme ); 200 setStringPath( stringPath ); 201 } 202 203 /** 204 * Constructor Hfs creates a new Hfs instance. 205 * 206 * @param scheme of type Scheme 207 * @param stringPath of type String 208 * @param sinkMode of type SinkMode 209 */ 210 @ConstructorProperties({"scheme", "stringPath", "sinkMode"}) 211 public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath, SinkMode sinkMode ) 212 { 213 super( scheme, sinkMode ); 214 setStringPath( stringPath ); 215 } 216 217 protected void setStringPath( String stringPath ) 218 { 219 this.stringPath = Util.normalizeUrl( stringPath ); 220 } 221 222 protected void setUriScheme( URI uriScheme ) 223 { 224 this.uriScheme = uriScheme; 225 } 226 227 public URI getURIScheme( Configuration jobConf ) 228 { 229 if( uriScheme != null ) 230 return uriScheme; 231 232 uriScheme = makeURIScheme( jobConf ); 233 234 return uriScheme; 235 } 236 237 protected URI makeURIScheme( Configuration configuration ) 238 { 239 try 240 { 241 URI uriScheme; 242 243 LOG.debug( "handling path: {}", stringPath ); 244 245 URI uri = new Path( stringPath ).toUri(); // safer URI parsing 246 String schemeString = uri.getScheme(); 247 String authority = uri.getAuthority(); 248 249 LOG.debug( "found scheme: {}, authority: {}", schemeString, authority ); 250 251 if( schemeString != null && authority != null ) 252 uriScheme = new URI( schemeString + "://" + uri.getAuthority() ); 253 else if( schemeString != null ) 254 uriScheme = new URI( schemeString + ":///" ); 255 else 256 uriScheme = getDefaultFileSystemURIScheme( configuration ); 257 258 LOG.debug( "using uri scheme: {}", uriScheme ); 259 260 return uriScheme; 261 } 262 catch( URISyntaxException exception ) 263 { 264 throw new TapException( "could not determine scheme from path: " + getPath(), exception ); 265 } 266 } 267 268 /** 269 * Method getDefaultFileSystemURIScheme returns the URI scheme for the default Hadoop FileSystem. 270 * 271 * @param configuration of type JobConf 272 * @return URI 273 */ 274 public URI getDefaultFileSystemURIScheme( Configuration configuration ) 275 { 276 return getDefaultFileSystem( configuration ).getUri(); 277 } 278 279 protected FileSystem getDefaultFileSystem( Configuration configuration ) 280 { 281 try 282 { 283 return FileSystem.get( configuration ); 284 } 285 catch( IOException exception ) 286 { 287 throw new TapException( "unable to get handle to underlying filesystem", exception ); 288 } 289 } 290 291 protected FileSystem getFileSystem( Configuration configuration ) 292 { 293 URI scheme = getURIScheme( configuration ); 294 295 try 296 { 297 return FileSystem.get( scheme, configuration ); 298 } 299 catch( IOException exception ) 300 { 301 throw new TapException( "unable to get handle to get filesystem for: " + scheme.getScheme(), exception ); 302 } 303 } 304 305 @Override 306 public String getIdentifier() 307 { 308 if( cachedPath == null ) 309 cachedPath = getPath().toString(); 310 311 return cachedPath; 312 } 313 314 public Path getPath() 315 { 316 if( path != null ) 317 return path; 318 319 if( stringPath == null ) 320 throw new IllegalStateException( "path not initialized" ); 321 322 path = new Path( stringPath ); 323 324 return path; 325 } 326 327 @Override 328 public String getFullIdentifier( Configuration conf ) 329 { 330 return getPath().makeQualified( getFileSystem( conf ) ).toString(); 331 } 332 333 @Override 334 public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf ) 335 { 336 String fullIdentifier = getFullIdentifier( conf ); 337 338 applySourceConfInitIdentifiers( process, conf, fullIdentifier ); 339 340 verifyNoDuplicates( conf ); 341 } 342 343 protected static void verifyNoDuplicates( Configuration conf ) 344 { 345 Path[] inputPaths = FileInputFormat.getInputPaths( HadoopUtil.asJobConfInstance( conf ) ); 346 Set<Path> paths = new HashSet<Path>( (int) ( inputPaths.length / .75f ) ); 347 348 for( Path inputPath : inputPaths ) 349 { 350 if( !paths.add( inputPath ) ) 351 throw new TapException( "may not add duplicate paths, found: " + inputPath ); 352 } 353 } 354 355 protected void applySourceConfInitIdentifiers( FlowProcess<? extends Configuration> process, Configuration conf, String... fullIdentifiers ) 356 { 357 for( String fullIdentifier : fullIdentifiers ) 358 sourceConfInitAddInputPath( conf, new Path( fullIdentifier ) ); 359 360 sourceConfInitComplete( process, conf ); 361 } 362 363 protected void sourceConfInitAddInputPath( Configuration conf, Path qualifiedPath ) 364 { 365 HadoopUtil.addInputPath( conf, qualifiedPath ); 366 367 makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via source: " ); 368 } 369 370 protected void sourceConfInitComplete( FlowProcess<? extends Configuration> process, Configuration conf ) 371 { 372 super.sourceConfInit( process, conf ); 373 374 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow 375 376 // use CombineFileInputFormat if that is enabled 377 handleCombineFileInputFormat( conf ); 378 } 379 380 /** 381 * Based on the configuration, handles and sets {@link CombineFileInputFormat} as the input 382 * format. 383 */ 384 private void handleCombineFileInputFormat( Configuration conf ) 385 { 386 // if combining files, override the configuration to use CombineFileInputFormat 387 if( !getUseCombinedInput( conf ) ) 388 return; 389 390 // get the prescribed individual input format from the underlying scheme so it can be used by CombinedInputFormat 391 String individualInputFormat = conf.get( "mapred.input.format.class" ); 392 393 if( individualInputFormat == null ) 394 throw new TapException( "input format is missing from the underlying scheme" ); 395 396 if( individualInputFormat.equals( CombinedInputFormat.class.getName() ) && 397 conf.get( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT ) == null ) 398 throw new TapException( "the input format class is already the combined input format but the underlying input format is missing" ); 399 400 // if safe mode is on (default) throw an exception if the InputFormat is not a FileInputFormat, otherwise log a 401 // warning and don't use the CombineFileInputFormat 402 boolean safeMode = getCombinedInputSafeMode( conf ); 403 404 if( !FileInputFormat.class.isAssignableFrom( conf.getClass( "mapred.input.format.class", null ) ) ) 405 { 406 if( safeMode ) 407 throw new TapException( "input format must be of type org.apache.hadoop.mapred.FileInputFormat, got: " + individualInputFormat ); 408 else 409 LOG.warn( "not combining input splits with CombineFileInputFormat, {} is not of type org.apache.hadoop.mapred.FileInputFormat.", individualInputFormat ); 410 } 411 else 412 { 413 // set the underlying individual input format 414 conf.set( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT, individualInputFormat ); 415 416 // override the input format class 417 conf.setClass( "mapred.input.format.class", CombinedInputFormat.class, InputFormat.class ); 418 } 419 } 420 421 @Override 422 public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf ) 423 { 424 Path qualifiedPath = new Path( getFullIdentifier( conf ) ); 425 426 HadoopUtil.setOutputPath( conf, qualifiedPath ); 427 super.sinkConfInit( process, conf ); 428 429 makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via sink: " ); 430 431 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow 432 } 433 434 private void makeLocal( Configuration conf, Path qualifiedPath, String infoMessage ) 435 { 436 // don't change the conf or log any messages if running cluster side 437 if( HadoopUtil.isInflow( conf ) ) 438 return; 439 440 String scheme = getLocalModeScheme( conf, "file" ); 441 442 if( !HadoopUtil.isLocal( conf ) && qualifiedPath.toUri().getScheme().equalsIgnoreCase( scheme ) ) 443 { 444 if( LOG.isInfoEnabled() ) 445 LOG.info( infoMessage + toString() ); 446 447 HadoopUtil.setLocal( conf ); // force job to run locally 448 } 449 } 450 451 @Override 452 public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException 453 { 454 // input may be null when this method is called on the client side or cluster side when accumulating 455 // for a HashJoin 456 return new HadoopTupleEntrySchemeIterator( flowProcess, this, input ); 457 } 458 459 @Override 460 public TupleEntryCollector openForWrite( FlowProcess<? extends Configuration> flowProcess, OutputCollector output ) throws IOException 461 { 462 // output may be null when this method is called on the client side or cluster side when creating 463 // side files with the PartitionTap 464 return new HadoopTupleEntrySchemeCollector( flowProcess, this, output ); 465 } 466 467 @Override 468 public boolean createResource( Configuration conf ) throws IOException 469 { 470 if( LOG.isDebugEnabled() ) 471 LOG.debug( "making dirs: {}", getFullIdentifier( conf ) ); 472 473 return getFileSystem( conf ).mkdirs( getPath() ); 474 } 475 476 @Override 477 public boolean deleteResource( Configuration conf ) throws IOException 478 { 479 String fullIdentifier = getFullIdentifier( conf ); 480 481 return deleteFullIdentifier( conf, fullIdentifier ); 482 } 483 484 private boolean deleteFullIdentifier( Configuration conf, String fullIdentifier ) throws IOException 485 { 486 if( LOG.isDebugEnabled() ) 487 LOG.debug( "deleting: {}", fullIdentifier ); 488 489 Path fullPath = new Path( fullIdentifier ); 490 491 // do not delete the root directory 492 if( fullPath.depth() == 0 ) 493 return true; 494 495 FileSystem fileSystem = getFileSystem( conf ); 496 497 try 498 { 499 return fileSystem.delete( fullPath, true ); 500 } 501 catch( NullPointerException exception ) 502 { 503 // hack to get around npe thrown when fs reaches root directory 504 // removes coupling to the new aws hadoop artifacts that may not be deployed 505 if( !( fileSystem.getClass().getSimpleName().equals( "NativeS3FileSystem" ) ) ) 506 throw exception; 507 } 508 509 return true; 510 } 511 512 public boolean deleteChildResource( FlowProcess<? extends Configuration> flowProcess, String childIdentifier ) throws IOException 513 { 514 return deleteChildResource( flowProcess.getConfig(), childIdentifier ); 515 } 516 517 public boolean deleteChildResource( Configuration conf, String childIdentifier ) throws IOException 518 { 519 Path childPath = new Path( childIdentifier ).makeQualified( getFileSystem( conf ) ); 520 521 if( !childPath.toString().startsWith( getFullIdentifier( conf ) ) ) 522 return false; 523 524 return deleteFullIdentifier( conf, childPath.toString() ); 525 } 526 527 @Override 528 public boolean resourceExists( Configuration conf ) throws IOException 529 { 530 // unfortunately getFileSystem( conf ).exists( getPath() ); does not account for "/*" etc 531 // nor is there an more efficient means to test for existence 532 FileStatus[] fileStatuses = getFileSystem( conf ).globStatus( getPath() ); 533 534 return fileStatuses != null && fileStatuses.length > 0; 535 } 536 537 @Override 538 public boolean isDirectory( FlowProcess<? extends Configuration> flowProcess ) throws IOException 539 { 540 return isDirectory( flowProcess.getConfig() ); 541 } 542 543 @Override 544 public boolean isDirectory( Configuration conf ) throws IOException 545 { 546 if( !resourceExists( conf ) ) 547 return false; 548 549 return getFileSystem( conf ).getFileStatus( getPath() ).isDir(); 550 } 551 552 @Override 553 public long getSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException 554 { 555 return getSize( flowProcess.getConfig() ); 556 } 557 558 @Override 559 public long getSize( Configuration conf ) throws IOException 560 { 561 if( !resourceExists( conf ) ) 562 return 0; 563 564 FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() ); 565 566 if( fileStatus.isDir() ) 567 return 0; 568 569 return getFileSystem( conf ).getFileStatus( getPath() ).getLen(); 570 } 571 572 /** 573 * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource. 574 * 575 * @param flowProcess 576 * @return long 577 * @throws IOException when 578 */ 579 public long getBlockSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException 580 { 581 return getBlockSize( flowProcess.getConfig() ); 582 } 583 584 /** 585 * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource. 586 * 587 * @param conf of JobConf 588 * @return long 589 * @throws IOException when 590 */ 591 public long getBlockSize( Configuration conf ) throws IOException 592 { 593 if( !resourceExists( conf ) ) 594 return 0; 595 596 FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() ); 597 598 if( fileStatus.isDir() ) 599 return 0; 600 601 return fileStatus.getBlockSize(); 602 } 603 604 /** 605 * Method getReplication returns the {@code replication} specified by the underlying file system for 606 * this resource. 607 * 608 * @param flowProcess 609 * @return int 610 * @throws IOException when 611 */ 612 public int getReplication( FlowProcess<? extends Configuration> flowProcess ) throws IOException 613 { 614 return getReplication( flowProcess.getConfig() ); 615 } 616 617 /** 618 * Method getReplication returns the {@code replication} specified by the underlying file system for 619 * this resource. 620 * 621 * @param conf of JobConf 622 * @return int 623 * @throws IOException when 624 */ 625 public int getReplication( Configuration conf ) throws IOException 626 { 627 if( !resourceExists( conf ) ) 628 return 0; 629 630 FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() ); 631 632 if( fileStatus.isDir() ) 633 return 0; 634 635 return fileStatus.getReplication(); 636 } 637 638 @Override 639 public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess ) throws IOException 640 { 641 return getChildIdentifiers( flowProcess.getConfig(), 1, false ); 642 } 643 644 @Override 645 public String[] getChildIdentifiers( Configuration conf ) throws IOException 646 { 647 return getChildIdentifiers( conf, 1, false ); 648 } 649 650 @Override 651 public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess, int depth, boolean fullyQualified ) throws IOException 652 { 653 return getChildIdentifiers( flowProcess.getConfig(), depth, fullyQualified ); 654 } 655 656 @Override 657 public String[] getChildIdentifiers( Configuration conf, int depth, boolean fullyQualified ) throws IOException 658 { 659 if( !resourceExists( conf ) ) 660 return new String[ 0 ]; 661 662 if( depth == 0 && !fullyQualified ) 663 return new String[]{getIdentifier()}; 664 665 String fullIdentifier = getFullIdentifier( conf ); 666 667 int trim = fullyQualified ? 0 : fullIdentifier.length() + 1; 668 669 Set<String> results = new LinkedHashSet<String>(); 670 671 getChildPaths( conf, results, trim, new Path( fullIdentifier ), depth ); 672 673 return results.toArray( new String[ results.size() ] ); 674 } 675 676 private void getChildPaths( Configuration conf, Set<String> results, int trim, Path path, int depth ) throws IOException 677 { 678 if( depth == 0 ) 679 { 680 String substring = path.toString().substring( trim ); 681 String identifier = getIdentifier(); 682 683 if( identifier == null || identifier.isEmpty() ) 684 results.add( new Path( substring ).toString() ); 685 else 686 results.add( new Path( identifier, substring ).toString() ); 687 688 return; 689 } 690 691 FileStatus[] statuses = getFileSystem( conf ).listStatus( path, HIDDEN_FILES_FILTER ); 692 693 if( statuses == null ) 694 return; 695 696 for( FileStatus fileStatus : statuses ) 697 getChildPaths( conf, results, trim, fileStatus.getPath(), depth - 1 ); 698 } 699 700 @Override 701 public long getModifiedTime( Configuration conf ) throws IOException 702 { 703 if( !resourceExists( conf ) ) 704 return 0; 705 706 FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() ); 707 708 if( !fileStatus.isDir() ) 709 return fileStatus.getModificationTime(); 710 711 // todo: this should ignore the _temporary path, or not cache if found in the array 712 makeStatuses( conf ); 713 714 // statuses is empty, return 0 715 if( statuses == null || statuses.length == 0 ) 716 return 0; 717 718 long date = 0; 719 720 // filter out directories as we don't recurs into sub dirs 721 for( FileStatus status : statuses ) 722 { 723 if( !status.isDir() ) 724 date = Math.max( date, status.getModificationTime() ); 725 } 726 727 return date; 728 } 729 730 public static Path getTempPath( Configuration conf ) 731 { 732 String tempDir = conf.get( HfsProps.TEMPORARY_DIRECTORY ); 733 734 if( tempDir == null ) 735 tempDir = conf.get( "hadoop.tmp.dir" ); 736 737 return new Path( tempDir ); 738 } 739 740 protected String makeTemporaryPathDirString( String name ) 741 { 742 // _ is treated as a hidden file, so wipe them out 743 name = name.replaceAll( "^[_\\W\\s]+", "" ); 744 745 if( name.isEmpty() ) 746 name = "temp-path"; 747 748 return name.replaceAll( "[\\W\\s]+", "_" ) + Util.createUniqueID(); 749 } 750 751 /** 752 * Given a file-system object, it makes an array of paths 753 * 754 * @param conf of type JobConf 755 * @throws IOException on failure 756 */ 757 private void makeStatuses( Configuration conf ) throws IOException 758 { 759 if( statuses != null ) 760 return; 761 762 statuses = getFileSystem( conf ).listStatus( getPath() ); 763 } 764 765 /** 766 * Method resetFileStatuses removes the status cache, if any. 767 */ 768 public void resetFileStatuses() 769 { 770 statuses = null; 771 } 772 773 /** Combined input format that uses the underlying individual input format to combine multiple files into a single split. */ 774 static class CombinedInputFormat extends CombineFileInputFormat implements Configurable 775 { 776 private Configuration conf; 777 778 public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter ) throws IOException 779 { 780 return new CombineFileRecordReader( job, (CombineFileSplit) split, reporter, CombineFileRecordReaderWrapper.class ); 781 } 782 783 @Override 784 public void setConf( Configuration conf ) 785 { 786 this.conf = conf; 787 788 // set the aliased property value, if zero, the super class will look up the hadoop property 789 setMaxSplitSize( conf.getLong( "cascading.hadoop.hfs.combine.max.size", 0 ) ); 790 } 791 792 @Override 793 public Configuration getConf() 794 { 795 return conf; 796 } 797 } 798 }