001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tap.hadoop;
023
024import java.beans.ConstructorProperties;
025import java.io.IOException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.util.HashSet;
029import java.util.LinkedHashSet;
030import java.util.Set;
031
032import cascading.flow.FlowProcess;
033import cascading.flow.FlowRuntimeProps;
034import cascading.flow.hadoop.util.HadoopUtil;
035import cascading.scheme.Scheme;
036import cascading.tap.SinkMode;
037import cascading.tap.Tap;
038import cascading.tap.TapException;
039import cascading.tap.hadoop.io.CombineFileRecordReaderWrapper;
040import cascading.tap.hadoop.io.HadoopTupleEntrySchemeCollector;
041import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
042import cascading.tap.type.FileType;
043import cascading.tuple.TupleEntryCollector;
044import cascading.tuple.TupleEntryIterator;
045import cascading.tuple.hadoop.TupleSerialization;
046import cascading.util.LazyIterable;
047import cascading.util.Util;
048import org.apache.hadoop.conf.Configurable;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.fs.FileStatus;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.fs.PathFilter;
054import org.apache.hadoop.mapred.FileInputFormat;
055import org.apache.hadoop.mapred.InputFormat;
056import org.apache.hadoop.mapred.InputSplit;
057import org.apache.hadoop.mapred.JobConf;
058import org.apache.hadoop.mapred.OutputCollector;
059import org.apache.hadoop.mapred.RecordReader;
060import org.apache.hadoop.mapred.Reporter;
061import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
062import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
063import org.apache.hadoop.mapred.lib.CombineFileSplit;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * Class Hfs is the base class for all Hadoop file system access. Hfs may only be used with the
069 * Hadoop {@link cascading.flow.FlowConnector} sub-classes when creating Hadoop executable {@link cascading.flow.Flow}
070 * instances.
071 * <p/>
072 * Paths typically should point to a directory, where in turn all the "part" files immediately in that directory will
073 * be included. This is the practice Hadoop expects. Sub-directories are not included and typically result in a failure.
074 * <p/>
075 * To include sub-directories, Hadoop supports "globing". Globing is a frustrating feature and is supported more
076 * robustly by {@link GlobHfs} and less so by Hfs.
077 * <p/>
078 * Hfs will accept {@code /*} (wildcard) paths, but not all convenience methods like
079 * {@code jobConf.getSize} will behave properly or reliably. Nor can the Hfs instance
080 * with a wildcard path be used as a sink to write data.
081 * <p/>
082 * In those cases use GlobHfs since it is a sub-class of {@link cascading.tap.MultiSourceTap}.
083 * <p/>
084 * Optionally use {@link Dfs} or {@link Lfs} for resources specific to Hadoop Distributed file system or
085 * the Local file system, respectively. Using Hfs is the best practice when possible, Lfs and Dfs are conveniences.
086 * <p/>
087 * Use the Hfs class if the 'kind' of resource is unknown at design time. To use, prefix a scheme to the 'stringPath'. Where
088 * <code>hdfs://...</code> will denote Dfs, and <code>file://...</code> will denote Lfs.
089 * <p/>
090 * Call {@link HfsProps#setTemporaryDirectory(java.util.Map, String)} to use a different temporary file directory path
091 * other than the current Hadoop default path.
092 * <p/>
093 * By default Cascading on Hadoop will assume any source or sink Tap using the {@code file://} URI scheme
094 * intends to read files from the local client filesystem (for example when using the {@code Lfs} Tap) where the Hadoop
095 * job jar is started. Subsequently Cascading will force any MapReduce jobs reading or writing to {@code file://} resources
096 * to run in Hadoop "standalone mode" so that the file can be read.
097 * <p/>
098 * To change this behavior, {@link HfsProps#setLocalModeScheme(java.util.Map, String)} to set a different scheme value,
099 * or to "none" to disable entirely for the case the file to be read is available on every Hadoop processing node
100 * in the exact same path.
101 * <p/>
102 * When using a MapReduce planner, Hfs can optionally combine multiple small files (or a series of small "blocks") into
103 * larger "splits". This reduces the number of resulting map tasks created by Hadoop and can improve application
104 * performance.
105 * <p/>
106 * This is enabled by calling {@link HfsProps#setUseCombinedInput(boolean)} to {@code true}. By default, merging
107 * or combining splits into large ones is disabled.
108 * <p/>
109 * Apache Tez planner does not require this setting, it is supported by default and enabled by the application manager.
110 */
111public class Hfs extends Tap<Configuration, RecordReader, OutputCollector> implements FileType<Configuration>
112  {
113  /** Field LOG */
114  private static final Logger LOG = LoggerFactory.getLogger( Hfs.class );
115
116  /** Field stringPath */
117  protected String stringPath;
118  /** Field uriScheme */
119  transient URI uriScheme;
120  /** Field path */
121  transient Path path;
122  /** Field paths */
123  private transient FileStatus[] statuses; // only used by getModifiedTime
124
125  private transient String cachedPath = null;
126
127  private static final PathFilter HIDDEN_FILES_FILTER = new PathFilter()
128    {
129    public boolean accept( Path path )
130      {
131      String name = path.getName();
132
133      if( name.isEmpty() ) // should never happen
134        return true;
135
136      char first = name.charAt( 0 );
137
138      return first != '_' && first != '.';
139      }
140    };
141
142  protected static String getLocalModeScheme( Configuration conf, String defaultValue )
143    {
144    return conf.get( HfsProps.LOCAL_MODE_SCHEME, defaultValue );
145    }
146
147  protected static boolean getUseCombinedInput( Configuration conf )
148    {
149    boolean combineEnabled = conf.getBoolean( "cascading.hadoop.hfs.combine.files", false );
150
151    if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) == null && !combineEnabled )
152      return false;
153
154    if( !combineEnabled ) // enable if set in FlowRuntimeProps
155      combineEnabled = conf.getBoolean( FlowRuntimeProps.COMBINE_SPLITS, false );
156
157    String platform = conf.get( "cascading.flow.platform", "" );
158
159    // only supported by these platforms
160    if( platform.equals( "hadoop" ) || platform.equals( "hadoop2-mr1" ) )
161      return combineEnabled;
162
163    // we are on a platform that supports combining, just not through the combiner
164    // do not enable it here locally
165    if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) != null )
166      return false;
167
168    if( combineEnabled && !Boolean.getBoolean( "cascading.hadoop.hfs.combine.files.warned" ) )
169      {
170      LOG.warn( "'cascading.hadoop.hfs.combine.files' has been set to true, but is unsupported by this platform: {}, will be ignored to prevent failures", platform );
171      System.setProperty( "cascading.hadoop.hfs.combine.files.warned", "true" );
172      }
173
174    return false;
175    }
176
177  protected static boolean getCombinedInputSafeMode( Configuration conf )
178    {
179    return conf.getBoolean( "cascading.hadoop.hfs.combine.safemode", true );
180    }
181
182  protected Hfs()
183    {
184    }
185
186  @ConstructorProperties({"scheme"})
187  protected Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme )
188    {
189    super( scheme );
190    }
191
192  /**
193   * Constructor Hfs creates a new Hfs instance.
194   *
195   * @param scheme     of type Scheme
196   * @param stringPath of type String
197   */
198  @ConstructorProperties({"scheme", "stringPath"})
199  public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath )
200    {
201    super( scheme );
202    setStringPath( stringPath );
203    }
204
205  /**
206   * Constructor Hfs creates a new Hfs instance.
207   *
208   * @param scheme     of type Scheme
209   * @param stringPath of type String
210   * @param sinkMode   of type SinkMode
211   */
212  @ConstructorProperties({"scheme", "stringPath", "sinkMode"})
213  public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath, SinkMode sinkMode )
214    {
215    super( scheme, sinkMode );
216    setStringPath( stringPath );
217    }
218
219  protected void setStringPath( String stringPath )
220    {
221    this.stringPath = Util.normalizeUrl( stringPath );
222    }
223
224  protected void setUriScheme( URI uriScheme )
225    {
226    this.uriScheme = uriScheme;
227    }
228
229  public URI getURIScheme( Configuration jobConf )
230    {
231    if( uriScheme != null )
232      return uriScheme;
233
234    uriScheme = makeURIScheme( jobConf );
235
236    return uriScheme;
237    }
238
239  protected URI makeURIScheme( Configuration configuration )
240    {
241    try
242      {
243      URI uriScheme;
244
245      LOG.debug( "handling path: {}", stringPath );
246
247      URI uri = new Path( stringPath ).toUri(); // safer URI parsing
248      String schemeString = uri.getScheme();
249      String authority = uri.getAuthority();
250
251      LOG.debug( "found scheme: {}, authority: {}", schemeString, authority );
252
253      if( schemeString != null && authority != null )
254        uriScheme = new URI( schemeString + "://" + uri.getAuthority() );
255      else if( schemeString != null )
256        uriScheme = new URI( schemeString + ":///" );
257      else
258        uriScheme = getDefaultFileSystemURIScheme( configuration );
259
260      LOG.debug( "using uri scheme: {}", uriScheme );
261
262      return uriScheme;
263      }
264    catch( URISyntaxException exception )
265      {
266      throw new TapException( "could not determine scheme from path: " + getPath(), exception );
267      }
268    }
269
270  /**
271   * Method getDefaultFileSystemURIScheme returns the URI scheme for the default Hadoop FileSystem.
272   *
273   * @param configuration of type JobConf
274   * @return URI
275   */
276  public URI getDefaultFileSystemURIScheme( Configuration configuration )
277    {
278    return getDefaultFileSystem( configuration ).getUri();
279    }
280
281  protected FileSystem getDefaultFileSystem( Configuration configuration )
282    {
283    try
284      {
285      return FileSystem.get( configuration );
286      }
287    catch( IOException exception )
288      {
289      throw new TapException( "unable to get handle to underlying filesystem", exception );
290      }
291    }
292
293  protected FileSystem getFileSystem( Configuration configuration )
294    {
295    URI scheme = getURIScheme( configuration );
296
297    try
298      {
299      return FileSystem.get( scheme, configuration );
300      }
301    catch( IOException exception )
302      {
303      throw new TapException( "unable to get handle to get filesystem for: " + scheme.getScheme(), exception );
304      }
305    }
306
307  @Override
308  public String getIdentifier()
309    {
310    if( cachedPath == null )
311      cachedPath = getPath().toString();
312
313    return cachedPath;
314    }
315
316  public Path getPath()
317    {
318    if( path != null )
319      return path;
320
321    if( stringPath == null )
322      throw new IllegalStateException( "path not initialized" );
323
324    path = new Path( stringPath );
325
326    return path;
327    }
328
329  @Override
330  public String getFullIdentifier( Configuration conf )
331    {
332    return getPath().makeQualified( getFileSystem( conf ) ).toString();
333    }
334
335  @Override
336  public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
337    {
338    String fullIdentifier = getFullIdentifier( conf );
339
340    applySourceConfInitIdentifiers( process, conf, fullIdentifier );
341
342    verifyNoDuplicates( conf );
343    }
344
345  protected static void verifyNoDuplicates( Configuration conf )
346    {
347    Path[] inputPaths = FileInputFormat.getInputPaths( HadoopUtil.asJobConfInstance( conf ) );
348    Set<Path> paths = new HashSet<Path>( (int) ( inputPaths.length / .75f ) );
349
350    for( Path inputPath : inputPaths )
351      {
352      if( !paths.add( inputPath ) )
353        throw new TapException( "may not add duplicate paths, found: " + inputPath );
354      }
355    }
356
357  protected void applySourceConfInitIdentifiers( FlowProcess<? extends Configuration> process, Configuration conf, final String... fullIdentifiers )
358    {
359    sourceConfInitAddInputPaths( conf, new LazyIterable<String, Path>( fullIdentifiers )
360      {
361      @Override
362      protected Path convert( String next )
363        {
364        return new Path( next );
365        }
366      } );
367
368    sourceConfInitComplete( process, conf );
369    }
370
371  protected void sourceConfInitAddInputPaths( Configuration conf, Iterable<Path> qualifiedPaths )
372    {
373    HadoopUtil.addInputPaths( conf, qualifiedPaths );
374
375    for( Path qualifiedPath : qualifiedPaths )
376      {
377      boolean stop = !makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via source: " );
378
379      if( stop )
380        break;
381      }
382    }
383
384  @Deprecated
385  protected void sourceConfInitAddInputPath( Configuration conf, Path qualifiedPath )
386    {
387    HadoopUtil.addInputPath( conf, qualifiedPath );
388
389    makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via source: " );
390    }
391
392  protected void sourceConfInitComplete( FlowProcess<? extends Configuration> process, Configuration conf )
393    {
394    super.sourceConfInit( process, conf );
395
396    TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
397
398    // use CombineFileInputFormat if that is enabled
399    handleCombineFileInputFormat( conf );
400    }
401
402  /**
403   * Based on the configuration, handles and sets {@link CombineFileInputFormat} as the input
404   * format.
405   */
406  private void handleCombineFileInputFormat( Configuration conf )
407    {
408    // if combining files, override the configuration to use CombineFileInputFormat
409    if( !getUseCombinedInput( conf ) )
410      return;
411
412    // get the prescribed individual input format from the underlying scheme so it can be used by CombinedInputFormat
413    String individualInputFormat = conf.get( "mapred.input.format.class" );
414
415    if( individualInputFormat == null )
416      throw new TapException( "input format is missing from the underlying scheme" );
417
418    if( individualInputFormat.equals( CombinedInputFormat.class.getName() ) &&
419      conf.get( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT ) == null )
420      throw new TapException( "the input format class is already the combined input format but the underlying input format is missing" );
421
422    // if safe mode is on (default) throw an exception if the InputFormat is not a FileInputFormat, otherwise log a
423    // warning and don't use the CombineFileInputFormat
424    boolean safeMode = getCombinedInputSafeMode( conf );
425
426    if( !FileInputFormat.class.isAssignableFrom( conf.getClass( "mapred.input.format.class", null ) ) )
427      {
428      if( safeMode )
429        throw new TapException( "input format must be of type org.apache.hadoop.mapred.FileInputFormat, got: " + individualInputFormat );
430      else
431        LOG.warn( "not combining input splits with CombineFileInputFormat, {} is not of type org.apache.hadoop.mapred.FileInputFormat.", individualInputFormat );
432      }
433    else
434      {
435      // set the underlying individual input format
436      conf.set( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT, individualInputFormat );
437
438      // override the input format class
439      conf.setClass( "mapred.input.format.class", CombinedInputFormat.class, InputFormat.class );
440      }
441    }
442
443  @Override
444  public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
445    {
446    Path qualifiedPath = new Path( getFullIdentifier( conf ) );
447
448    HadoopUtil.setOutputPath( conf, qualifiedPath );
449    super.sinkConfInit( process, conf );
450
451    makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via sink: " );
452
453    TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
454    }
455
456  private boolean makeLocal( Configuration conf, Path qualifiedPath, String infoMessage )
457    {
458    // don't change the conf or log any messages if running cluster side
459    if( HadoopUtil.isInflow( conf ) )
460      return false;
461
462    String scheme = getLocalModeScheme( conf, "file" );
463
464    if( !HadoopUtil.isLocal( conf ) && qualifiedPath.toUri().getScheme().equalsIgnoreCase( scheme ) )
465      {
466      if( LOG.isInfoEnabled() )
467        LOG.info( infoMessage + toString() );
468
469      HadoopUtil.setLocal( conf ); // force job to run locally
470
471      return false; // only need to set local once
472      }
473
474    return true;
475    }
476
477  @Override
478  public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
479    {
480    // input may be null when this method is called on the client side or cluster side when accumulating
481    // for a HashJoin
482    return new HadoopTupleEntrySchemeIterator( flowProcess, this, input );
483    }
484
485  @Override
486  public TupleEntryCollector openForWrite( FlowProcess<? extends Configuration> flowProcess, OutputCollector output ) throws IOException
487    {
488    resetFileStatuses();
489
490    // output may be null when this method is called on the client side or cluster side when creating
491    // side files with the PartitionTap
492    return new HadoopTupleEntrySchemeCollector( flowProcess, this, output );
493    }
494
495  @Override
496  public boolean createResource( Configuration conf ) throws IOException
497    {
498    if( LOG.isDebugEnabled() )
499      LOG.debug( "making dirs: {}", getFullIdentifier( conf ) );
500
501    return getFileSystem( conf ).mkdirs( getPath() );
502    }
503
504  @Override
505  public boolean deleteResource( Configuration conf ) throws IOException
506    {
507    String fullIdentifier = getFullIdentifier( conf );
508
509    return deleteFullIdentifier( conf, fullIdentifier );
510    }
511
512  private boolean deleteFullIdentifier( Configuration conf, String fullIdentifier ) throws IOException
513    {
514    if( LOG.isDebugEnabled() )
515      LOG.debug( "deleting: {}", fullIdentifier );
516
517    resetFileStatuses();
518
519    Path fullPath = new Path( fullIdentifier );
520
521    // do not delete the root directory
522    if( fullPath.depth() == 0 )
523      return true;
524
525    FileSystem fileSystem = getFileSystem( conf );
526
527    try
528      {
529      return fileSystem.delete( fullPath, true );
530      }
531    catch( NullPointerException exception )
532      {
533      // hack to get around npe thrown when fs reaches root directory
534      // removes coupling to the new aws hadoop artifacts that may not be deployed
535      if( !( fileSystem.getClass().getSimpleName().equals( "NativeS3FileSystem" ) ) )
536        throw exception;
537      }
538
539    return true;
540    }
541
542  public boolean deleteChildResource( FlowProcess<? extends Configuration> flowProcess, String childIdentifier ) throws IOException
543    {
544    return deleteChildResource( flowProcess.getConfig(), childIdentifier );
545    }
546
547  public boolean deleteChildResource( Configuration conf, String childIdentifier ) throws IOException
548    {
549    resetFileStatuses();
550
551    Path childPath = new Path( childIdentifier ).makeQualified( getFileSystem( conf ) );
552
553    if( !childPath.toString().startsWith( getFullIdentifier( conf ) ) )
554      return false;
555
556    return deleteFullIdentifier( conf, childPath.toString() );
557    }
558
559  @Override
560  public boolean resourceExists( Configuration conf ) throws IOException
561    {
562    // unfortunately getFileSystem( conf ).exists( getPath() ); does not account for "/*" etc
563    // nor is there an more efficient means to test for existence
564    FileStatus[] fileStatuses = getFileSystem( conf ).globStatus( getPath() );
565
566    return fileStatuses != null && fileStatuses.length > 0;
567    }
568
569  @Override
570  public boolean isDirectory( FlowProcess<? extends Configuration> flowProcess ) throws IOException
571    {
572    return isDirectory( flowProcess.getConfig() );
573    }
574
575  @Override
576  public boolean isDirectory( Configuration conf ) throws IOException
577    {
578    if( !resourceExists( conf ) )
579      return false;
580
581    return getFileSystem( conf ).getFileStatus( getPath() ).isDir();
582    }
583
584  @Override
585  public long getSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException
586    {
587    return getSize( flowProcess.getConfig() );
588    }
589
590  @Override
591  public long getSize( Configuration conf ) throws IOException
592    {
593    if( !resourceExists( conf ) )
594      return 0;
595
596    FileStatus fileStatus = getFileStatus( conf );
597
598    if( fileStatus.isDir() )
599      return 0;
600
601    return getFileSystem( conf ).getFileStatus( getPath() ).getLen();
602    }
603
604  /**
605   * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource.
606   *
607   * @param flowProcess
608   * @return long
609   * @throws IOException when
610   */
611  public long getBlockSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException
612    {
613    return getBlockSize( flowProcess.getConfig() );
614    }
615
616  /**
617   * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource.
618   *
619   * @param conf of JobConf
620   * @return long
621   * @throws IOException when
622   */
623  public long getBlockSize( Configuration conf ) throws IOException
624    {
625    if( !resourceExists( conf ) )
626      return 0;
627
628    FileStatus fileStatus = getFileStatus( conf );
629
630    if( fileStatus.isDir() )
631      return 0;
632
633    return fileStatus.getBlockSize();
634    }
635
636  /**
637   * Method getReplication returns the {@code replication} specified by the underlying file system for
638   * this resource.
639   *
640   * @param flowProcess
641   * @return int
642   * @throws IOException when
643   */
644  public int getReplication( FlowProcess<? extends Configuration> flowProcess ) throws IOException
645    {
646    return getReplication( flowProcess.getConfig() );
647    }
648
649  /**
650   * Method getReplication returns the {@code replication} specified by the underlying file system for
651   * this resource.
652   *
653   * @param conf of JobConf
654   * @return int
655   * @throws IOException when
656   */
657  public int getReplication( Configuration conf ) throws IOException
658    {
659    if( !resourceExists( conf ) )
660      return 0;
661
662    FileStatus fileStatus = getFileStatus( conf );
663
664    if( fileStatus.isDir() )
665      return 0;
666
667    return fileStatus.getReplication();
668    }
669
670  @Override
671  public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess ) throws IOException
672    {
673    return getChildIdentifiers( flowProcess.getConfig(), 1, false );
674    }
675
676  @Override
677  public String[] getChildIdentifiers( Configuration conf ) throws IOException
678    {
679    return getChildIdentifiers( conf, 1, false );
680    }
681
682  @Override
683  public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess, int depth, boolean fullyQualified ) throws IOException
684    {
685    return getChildIdentifiers( flowProcess.getConfig(), depth, fullyQualified );
686    }
687
688  @Override
689  public String[] getChildIdentifiers( Configuration conf, int depth, boolean fullyQualified ) throws IOException
690    {
691    if( !resourceExists( conf ) )
692      return new String[ 0 ];
693
694    if( depth == 0 && !fullyQualified )
695      return new String[]{getIdentifier()};
696
697    String fullIdentifier = getFullIdentifier( conf );
698
699    int trim = fullyQualified ? 0 : fullIdentifier.length() + 1;
700
701    Set<String> results = new LinkedHashSet<String>();
702
703    getChildPaths( conf, results, trim, new Path( fullIdentifier ), depth );
704
705    return results.toArray( new String[ results.size() ] );
706    }
707
708  private void getChildPaths( Configuration conf, Set<String> results, int trim, Path path, int depth ) throws IOException
709    {
710    if( depth == 0 )
711      {
712      String substring = path.toString().substring( trim );
713      String identifier = getIdentifier();
714
715      if( identifier == null || identifier.isEmpty() )
716        results.add( new Path( substring ).toString() );
717      else
718        results.add( new Path( identifier, substring ).toString() );
719
720      return;
721      }
722
723    FileStatus[] statuses = getFileSystem( conf ).listStatus( path, HIDDEN_FILES_FILTER );
724
725    if( statuses == null )
726      return;
727
728    for( FileStatus fileStatus : statuses )
729      getChildPaths( conf, results, trim, fileStatus.getPath(), depth - 1 );
730    }
731
732  @Override
733  public long getModifiedTime( Configuration conf ) throws IOException
734    {
735    if( !resourceExists( conf ) )
736      return 0;
737
738    FileStatus fileStatus = getFileStatus( conf );
739
740    if( !fileStatus.isDir() )
741      return fileStatus.getModificationTime();
742
743    // todo: this should ignore the _temporary path, or not cache if found in the array
744    makeStatuses( conf );
745
746    // statuses is empty, return 0
747    if( statuses == null || statuses.length == 0 )
748      return 0;
749
750    long date = 0;
751
752    // filter out directories as we don't recurs into sub dirs
753    for( FileStatus status : statuses )
754      {
755      if( !status.isDir() )
756        date = Math.max( date, status.getModificationTime() );
757      }
758
759    return date;
760    }
761
762  public FileStatus getFileStatus( Configuration conf ) throws IOException
763    {
764    return getFileSystem( conf ).getFileStatus( getPath() );
765    }
766
767  public static Path getTempPath( Configuration conf )
768    {
769    String tempDir = conf.get( HfsProps.TEMPORARY_DIRECTORY );
770
771    if( tempDir == null )
772      tempDir = conf.get( "hadoop.tmp.dir" );
773
774    return new Path( tempDir );
775    }
776
777  protected String makeTemporaryPathDirString( String name )
778    {
779    // _ is treated as a hidden file, so wipe them out
780    name = name.replaceAll( "^[_\\W\\s]+", "" );
781
782    if( name.isEmpty() )
783      name = "temp-path";
784
785    return name.replaceAll( "[\\W\\s]+", "_" ) + Util.createUniqueID();
786    }
787
788  /**
789   * Given a file-system object, it makes an array of paths
790   *
791   * @param conf of type JobConf
792   * @throws IOException on failure
793   */
794  private void makeStatuses( Configuration conf ) throws IOException
795    {
796    if( statuses != null )
797      return;
798
799    statuses = getFileSystem( conf ).listStatus( getPath() );
800    }
801
802  /**
803   * Method resetFileStatuses removes the status cache, if any.
804   */
805  public void resetFileStatuses()
806    {
807    statuses = null;
808    }
809
810  /** Combined input format that uses the underlying individual input format to combine multiple files into a single split. */
811  static class CombinedInputFormat extends CombineFileInputFormat implements Configurable
812    {
813    private Configuration conf;
814
815    public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter ) throws IOException
816      {
817      return new CombineFileRecordReader( job, (CombineFileSplit) split, reporter, CombineFileRecordReaderWrapper.class );
818      }
819
820    @Override
821    public void setConf( Configuration conf )
822      {
823      this.conf = conf;
824
825      // set the aliased property value, if zero, the super class will look up the hadoop property
826      setMaxSplitSize( conf.getLong( "cascading.hadoop.hfs.combine.max.size", 0 ) );
827      }
828
829    @Override
830    public Configuration getConf()
831      {
832      return conf;
833      }
834    }
835  }