001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.util.HashSet;
028import java.util.LinkedHashSet;
029import java.util.Set;
030
031import cascading.flow.FlowProcess;
032import cascading.flow.FlowRuntimeProps;
033import cascading.flow.hadoop.util.HadoopUtil;
034import cascading.scheme.Scheme;
035import cascading.tap.SinkMode;
036import cascading.tap.Tap;
037import cascading.tap.TapException;
038import cascading.tap.hadoop.io.CombineFileRecordReaderWrapper;
039import cascading.tap.hadoop.io.HadoopTupleEntrySchemeCollector;
040import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
041import cascading.tap.type.FileType;
042import cascading.tuple.TupleEntryCollector;
043import cascading.tuple.TupleEntryIterator;
044import cascading.tuple.hadoop.TupleSerialization;
045import cascading.util.Util;
046import org.apache.hadoop.conf.Configurable;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileStatus;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.fs.PathFilter;
052import org.apache.hadoop.mapred.FileInputFormat;
053import org.apache.hadoop.mapred.InputFormat;
054import org.apache.hadoop.mapred.InputSplit;
055import org.apache.hadoop.mapred.JobConf;
056import org.apache.hadoop.mapred.OutputCollector;
057import org.apache.hadoop.mapred.RecordReader;
058import org.apache.hadoop.mapred.Reporter;
059import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
060import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
061import org.apache.hadoop.mapred.lib.CombineFileSplit;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * Class Hfs is the base class for all Hadoop file system access. Hfs may only be used with the
067 * Hadoop {@link cascading.flow.FlowConnector} sub-classes when creating Hadoop executable {@link cascading.flow.Flow}
068 * instances.
069 * <p/>
070 * Paths typically should point to a directory, where in turn all the "part" files immediately in that directory will
071 * be included. This is the practice Hadoop expects. Sub-directories are not included and typically result in a failure.
072 * <p/>
073 * To include sub-directories, Hadoop supports "globing". Globing is a frustrating feature and is supported more
074 * robustly by {@link GlobHfs} and less so by Hfs.
075 * <p/>
076 * Hfs will accept {@code /*} (wildcard) paths, but not all convenience methods like
077 * {@code jobConf.getSize} will behave properly or reliably. Nor can the Hfs instance
078 * with a wildcard path be used as a sink to write data.
079 * <p/>
080 * In those cases use GlobHfs since it is a sub-class of {@link cascading.tap.MultiSourceTap}.
081 * <p/>
082 * Optionally use {@link Dfs} or {@link Lfs} for resources specific to Hadoop Distributed file system or
083 * the Local file system, respectively. Using Hfs is the best practice when possible, Lfs and Dfs are conveniences.
084 * <p/>
085 * Use the Hfs class if the 'kind' of resource is unknown at design time. To use, prefix a scheme to the 'stringPath'. Where
086 * <code>hdfs://...</code> will denote Dfs, and <code>file://...</code> will denote Lfs.
087 * <p/>
088 * Call {@link HfsProps#setTemporaryDirectory(java.util.Map, String)} to use a different temporary file directory path
089 * other than the current Hadoop default path.
090 * <p/>
091 * By default Cascading on Hadoop will assume any source or sink Tap using the {@code file://} URI scheme
092 * intends to read files from the local client filesystem (for example when using the {@code Lfs} Tap) where the Hadoop
093 * job jar is started. Subsequently Cascading will force any MapReduce jobs reading or writing to {@code file://} resources
094 * to run in Hadoop "standalone mode" so that the file can be read.
095 * <p/>
096 * To change this behavior, {@link HfsProps#setLocalModeScheme(java.util.Map, String)} to set a different scheme value,
097 * or to "none" to disable entirely for the case the file to be read is available on every Hadoop processing node
098 * in the exact same path.
099 * <p/>
100 * When using a MapReduce planner, Hfs can optionally combine multiple small files (or a series of small "blocks") into
101 * larger "splits". This reduces the number of resulting map tasks created by Hadoop and can improve application
102 * performance.
103 * <p/>
104 * This is enabled by calling {@link HfsProps#setUseCombinedInput(boolean)} to {@code true}. By default, merging
105 * or combining splits into large ones is disabled.
106 * <p/>
107 * Apache Tez planner does not require this setting, it is supported by default and enabled by the application manager.
108 */
109public class Hfs extends Tap<Configuration, RecordReader, OutputCollector> implements FileType<Configuration>
110  {
111  /** Field LOG */
112  private static final Logger LOG = LoggerFactory.getLogger( Hfs.class );
113
114  /** Field stringPath */
115  protected String stringPath;
116  /** Field uriScheme */
117  transient URI uriScheme;
118  /** Field path */
119  transient Path path;
120  /** Field paths */
121  private transient FileStatus[] statuses; // only used by getModifiedTime
122
123  private transient String cachedPath = null;
124
125  private static final PathFilter HIDDEN_FILES_FILTER = new PathFilter()
126    {
127    public boolean accept( Path path )
128      {
129      String name = path.getName();
130
131      if( name.isEmpty() ) // should never happen
132        return true;
133
134      char first = name.charAt( 0 );
135
136      return first != '_' && first != '.';
137      }
138    };
139
140  protected static String getLocalModeScheme( Configuration conf, String defaultValue )
141    {
142    return conf.get( HfsProps.LOCAL_MODE_SCHEME, defaultValue );
143    }
144
145  protected static boolean getUseCombinedInput( Configuration conf )
146    {
147    boolean combineEnabled = conf.getBoolean( "cascading.hadoop.hfs.combine.files", false );
148
149    if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) == null && !combineEnabled )
150      return false;
151
152    if( !combineEnabled ) // enable if set in FlowRuntimeProps
153      combineEnabled = conf.getBoolean( FlowRuntimeProps.COMBINE_SPLITS, false );
154
155    String platform = conf.get( "cascading.flow.platform", "" );
156
157    // only supported by these platforms
158    if( platform.equals( "hadoop" ) || platform.equals( "hadoop2-mr1" ) )
159      return combineEnabled;
160
161    // we are on a platform that supports combining, just not through the combiner
162    // do not enable it here locally
163    if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) != null )
164      return false;
165
166    if( combineEnabled && !Boolean.getBoolean( "cascading.hadoop.hfs.combine.files.warned" ) )
167      {
168      LOG.warn( "'cascading.hadoop.hfs.combine.files' has been set to true, but is unsupported by this platform: {}, will be ignored to prevent failures", platform );
169      System.setProperty( "cascading.hadoop.hfs.combine.files.warned", "true" );
170      }
171
172    return false;
173    }
174
175  protected static boolean getCombinedInputSafeMode( Configuration conf )
176    {
177    return conf.getBoolean( "cascading.hadoop.hfs.combine.safemode", true );
178    }
179
180  protected Hfs()
181    {
182    }
183
184  @ConstructorProperties({"scheme"})
185  protected Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme )
186    {
187    super( scheme );
188    }
189
190  /**
191   * Constructor Hfs creates a new Hfs instance.
192   *
193   * @param scheme     of type Scheme
194   * @param stringPath of type String
195   */
196  @ConstructorProperties({"scheme", "stringPath"})
197  public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath )
198    {
199    super( scheme );
200    setStringPath( stringPath );
201    }
202
203  /**
204   * Constructor Hfs creates a new Hfs instance.
205   *
206   * @param scheme     of type Scheme
207   * @param stringPath of type String
208   * @param sinkMode   of type SinkMode
209   */
210  @ConstructorProperties({"scheme", "stringPath", "sinkMode"})
211  public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath, SinkMode sinkMode )
212    {
213    super( scheme, sinkMode );
214    setStringPath( stringPath );
215    }
216
217  protected void setStringPath( String stringPath )
218    {
219    this.stringPath = Util.normalizeUrl( stringPath );
220    }
221
222  protected void setUriScheme( URI uriScheme )
223    {
224    this.uriScheme = uriScheme;
225    }
226
227  public URI getURIScheme( Configuration jobConf )
228    {
229    if( uriScheme != null )
230      return uriScheme;
231
232    uriScheme = makeURIScheme( jobConf );
233
234    return uriScheme;
235    }
236
237  protected URI makeURIScheme( Configuration configuration )
238    {
239    try
240      {
241      URI uriScheme;
242
243      LOG.debug( "handling path: {}", stringPath );
244
245      URI uri = new Path( stringPath ).toUri(); // safer URI parsing
246      String schemeString = uri.getScheme();
247      String authority = uri.getAuthority();
248
249      LOG.debug( "found scheme: {}, authority: {}", schemeString, authority );
250
251      if( schemeString != null && authority != null )
252        uriScheme = new URI( schemeString + "://" + uri.getAuthority() );
253      else if( schemeString != null )
254        uriScheme = new URI( schemeString + ":///" );
255      else
256        uriScheme = getDefaultFileSystemURIScheme( configuration );
257
258      LOG.debug( "using uri scheme: {}", uriScheme );
259
260      return uriScheme;
261      }
262    catch( URISyntaxException exception )
263      {
264      throw new TapException( "could not determine scheme from path: " + getPath(), exception );
265      }
266    }
267
268  /**
269   * Method getDefaultFileSystemURIScheme returns the URI scheme for the default Hadoop FileSystem.
270   *
271   * @param configuration of type JobConf
272   * @return URI
273   */
274  public URI getDefaultFileSystemURIScheme( Configuration configuration )
275    {
276    return getDefaultFileSystem( configuration ).getUri();
277    }
278
279  protected FileSystem getDefaultFileSystem( Configuration configuration )
280    {
281    try
282      {
283      return FileSystem.get( configuration );
284      }
285    catch( IOException exception )
286      {
287      throw new TapException( "unable to get handle to underlying filesystem", exception );
288      }
289    }
290
291  protected FileSystem getFileSystem( Configuration configuration )
292    {
293    URI scheme = getURIScheme( configuration );
294
295    try
296      {
297      return FileSystem.get( scheme, configuration );
298      }
299    catch( IOException exception )
300      {
301      throw new TapException( "unable to get handle to get filesystem for: " + scheme.getScheme(), exception );
302      }
303    }
304
305  @Override
306  public String getIdentifier()
307    {
308    if( cachedPath == null )
309      cachedPath = getPath().toString();
310
311    return cachedPath;
312    }
313
314  public Path getPath()
315    {
316    if( path != null )
317      return path;
318
319    if( stringPath == null )
320      throw new IllegalStateException( "path not initialized" );
321
322    path = new Path( stringPath );
323
324    return path;
325    }
326
327  @Override
328  public String getFullIdentifier( Configuration conf )
329    {
330    return getPath().makeQualified( getFileSystem( conf ) ).toString();
331    }
332
333  @Override
334  public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
335    {
336    String fullIdentifier = getFullIdentifier( conf );
337
338    applySourceConfInitIdentifiers( process, conf, fullIdentifier );
339
340    verifyNoDuplicates( conf );
341    }
342
343  protected static void verifyNoDuplicates( Configuration conf )
344    {
345    Path[] inputPaths = FileInputFormat.getInputPaths( HadoopUtil.asJobConfInstance( conf ) );
346    Set<Path> paths = new HashSet<Path>( (int) ( inputPaths.length / .75f ) );
347
348    for( Path inputPath : inputPaths )
349      {
350      if( !paths.add( inputPath ) )
351        throw new TapException( "may not add duplicate paths, found: " + inputPath );
352      }
353    }
354
355  protected void applySourceConfInitIdentifiers( FlowProcess<? extends Configuration> process, Configuration conf, String... fullIdentifiers )
356    {
357    for( String fullIdentifier : fullIdentifiers )
358      sourceConfInitAddInputPath( conf, new Path( fullIdentifier ) );
359
360    sourceConfInitComplete( process, conf );
361    }
362
363  protected void sourceConfInitAddInputPath( Configuration conf, Path qualifiedPath )
364    {
365    HadoopUtil.addInputPath( conf, qualifiedPath );
366
367    makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via source: " );
368    }
369
370  protected void sourceConfInitComplete( FlowProcess<? extends Configuration> process, Configuration conf )
371    {
372    super.sourceConfInit( process, conf );
373
374    TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
375
376    // use CombineFileInputFormat if that is enabled
377    handleCombineFileInputFormat( conf );
378    }
379
380  /**
381   * Based on the configuration, handles and sets {@link CombineFileInputFormat} as the input
382   * format.
383   */
384  private void handleCombineFileInputFormat( Configuration conf )
385    {
386    // if combining files, override the configuration to use CombineFileInputFormat
387    if( !getUseCombinedInput( conf ) )
388      return;
389
390    // get the prescribed individual input format from the underlying scheme so it can be used by CombinedInputFormat
391    String individualInputFormat = conf.get( "mapred.input.format.class" );
392
393    if( individualInputFormat == null )
394      throw new TapException( "input format is missing from the underlying scheme" );
395
396    if( individualInputFormat.equals( CombinedInputFormat.class.getName() ) &&
397      conf.get( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT ) == null )
398      throw new TapException( "the input format class is already the combined input format but the underlying input format is missing" );
399
400    // if safe mode is on (default) throw an exception if the InputFormat is not a FileInputFormat, otherwise log a
401    // warning and don't use the CombineFileInputFormat
402    boolean safeMode = getCombinedInputSafeMode( conf );
403
404    if( !FileInputFormat.class.isAssignableFrom( conf.getClass( "mapred.input.format.class", null ) ) )
405      {
406      if( safeMode )
407        throw new TapException( "input format must be of type org.apache.hadoop.mapred.FileInputFormat, got: " + individualInputFormat );
408      else
409        LOG.warn( "not combining input splits with CombineFileInputFormat, {} is not of type org.apache.hadoop.mapred.FileInputFormat.", individualInputFormat );
410      }
411    else
412      {
413      // set the underlying individual input format
414      conf.set( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT, individualInputFormat );
415
416      // override the input format class
417      conf.setClass( "mapred.input.format.class", CombinedInputFormat.class, InputFormat.class );
418      }
419    }
420
421  @Override
422  public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
423    {
424    Path qualifiedPath = new Path( getFullIdentifier( conf ) );
425
426    HadoopUtil.setOutputPath( conf, qualifiedPath );
427    super.sinkConfInit( process, conf );
428
429    makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via sink: " );
430
431    TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
432    }
433
434  private void makeLocal( Configuration conf, Path qualifiedPath, String infoMessage )
435    {
436    // don't change the conf or log any messages if running cluster side
437    if( HadoopUtil.isInflow( conf ) )
438      return;
439
440    String scheme = getLocalModeScheme( conf, "file" );
441
442    if( !HadoopUtil.isLocal( conf ) && qualifiedPath.toUri().getScheme().equalsIgnoreCase( scheme ) )
443      {
444      if( LOG.isInfoEnabled() )
445        LOG.info( infoMessage + toString() );
446
447      HadoopUtil.setLocal( conf ); // force job to run locally
448      }
449    }
450
451  @Override
452  public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
453    {
454    // input may be null when this method is called on the client side or cluster side when accumulating
455    // for a HashJoin
456    return new HadoopTupleEntrySchemeIterator( flowProcess, this, input );
457    }
458
459  @Override
460  public TupleEntryCollector openForWrite( FlowProcess<? extends Configuration> flowProcess, OutputCollector output ) throws IOException
461    {
462    // output may be null when this method is called on the client side or cluster side when creating
463    // side files with the PartitionTap
464    return new HadoopTupleEntrySchemeCollector( flowProcess, this, output );
465    }
466
467  @Override
468  public boolean createResource( Configuration conf ) throws IOException
469    {
470    if( LOG.isDebugEnabled() )
471      LOG.debug( "making dirs: {}", getFullIdentifier( conf ) );
472
473    return getFileSystem( conf ).mkdirs( getPath() );
474    }
475
476  @Override
477  public boolean deleteResource( Configuration conf ) throws IOException
478    {
479    String fullIdentifier = getFullIdentifier( conf );
480
481    return deleteFullIdentifier( conf, fullIdentifier );
482    }
483
484  private boolean deleteFullIdentifier( Configuration conf, String fullIdentifier ) throws IOException
485    {
486    if( LOG.isDebugEnabled() )
487      LOG.debug( "deleting: {}", fullIdentifier );
488
489    Path fullPath = new Path( fullIdentifier );
490
491    // do not delete the root directory
492    if( fullPath.depth() == 0 )
493      return true;
494
495    FileSystem fileSystem = getFileSystem( conf );
496
497    try
498      {
499      return fileSystem.delete( fullPath, true );
500      }
501    catch( NullPointerException exception )
502      {
503      // hack to get around npe thrown when fs reaches root directory
504      // removes coupling to the new aws hadoop artifacts that may not be deployed
505      if( !( fileSystem.getClass().getSimpleName().equals( "NativeS3FileSystem" ) ) )
506        throw exception;
507      }
508
509    return true;
510    }
511
512  public boolean deleteChildResource( FlowProcess<? extends Configuration> flowProcess, String childIdentifier ) throws IOException
513    {
514    return deleteChildResource( flowProcess.getConfig(), childIdentifier );
515    }
516
517  public boolean deleteChildResource( Configuration conf, String childIdentifier ) throws IOException
518    {
519    Path childPath = new Path( childIdentifier ).makeQualified( getFileSystem( conf ) );
520
521    if( !childPath.toString().startsWith( getFullIdentifier( conf ) ) )
522      return false;
523
524    return deleteFullIdentifier( conf, childPath.toString() );
525    }
526
527  @Override
528  public boolean resourceExists( Configuration conf ) throws IOException
529    {
530    // unfortunately getFileSystem( conf ).exists( getPath() ); does not account for "/*" etc
531    // nor is there an more efficient means to test for existence
532    FileStatus[] fileStatuses = getFileSystem( conf ).globStatus( getPath() );
533
534    return fileStatuses != null && fileStatuses.length > 0;
535    }
536
537  @Override
538  public boolean isDirectory( FlowProcess<? extends Configuration> flowProcess ) throws IOException
539    {
540    return isDirectory( flowProcess.getConfig() );
541    }
542
543  @Override
544  public boolean isDirectory( Configuration conf ) throws IOException
545    {
546    if( !resourceExists( conf ) )
547      return false;
548
549    return getFileSystem( conf ).getFileStatus( getPath() ).isDir();
550    }
551
552  @Override
553  public long getSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException
554    {
555    return getSize( flowProcess.getConfig() );
556    }
557
558  @Override
559  public long getSize( Configuration conf ) throws IOException
560    {
561    if( !resourceExists( conf ) )
562      return 0;
563
564    FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
565
566    if( fileStatus.isDir() )
567      return 0;
568
569    return getFileSystem( conf ).getFileStatus( getPath() ).getLen();
570    }
571
572  /**
573   * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource.
574   *
575   * @param flowProcess
576   * @return long
577   * @throws IOException when
578   */
579  public long getBlockSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException
580    {
581    return getBlockSize( flowProcess.getConfig() );
582    }
583
584  /**
585   * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource.
586   *
587   * @param conf of JobConf
588   * @return long
589   * @throws IOException when
590   */
591  public long getBlockSize( Configuration conf ) throws IOException
592    {
593    if( !resourceExists( conf ) )
594      return 0;
595
596    FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
597
598    if( fileStatus.isDir() )
599      return 0;
600
601    return fileStatus.getBlockSize();
602    }
603
604  /**
605   * Method getReplication returns the {@code replication} specified by the underlying file system for
606   * this resource.
607   *
608   * @param flowProcess
609   * @return int
610   * @throws IOException when
611   */
612  public int getReplication( FlowProcess<? extends Configuration> flowProcess ) throws IOException
613    {
614    return getReplication( flowProcess.getConfig() );
615    }
616
617  /**
618   * Method getReplication returns the {@code replication} specified by the underlying file system for
619   * this resource.
620   *
621   * @param conf of JobConf
622   * @return int
623   * @throws IOException when
624   */
625  public int getReplication( Configuration conf ) throws IOException
626    {
627    if( !resourceExists( conf ) )
628      return 0;
629
630    FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
631
632    if( fileStatus.isDir() )
633      return 0;
634
635    return fileStatus.getReplication();
636    }
637
638  @Override
639  public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess ) throws IOException
640    {
641    return getChildIdentifiers( flowProcess.getConfig(), 1, false );
642    }
643
644  @Override
645  public String[] getChildIdentifiers( Configuration conf ) throws IOException
646    {
647    return getChildIdentifiers( conf, 1, false );
648    }
649
650  @Override
651  public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess, int depth, boolean fullyQualified ) throws IOException
652    {
653    return getChildIdentifiers( flowProcess.getConfig(), depth, fullyQualified );
654    }
655
656  @Override
657  public String[] getChildIdentifiers( Configuration conf, int depth, boolean fullyQualified ) throws IOException
658    {
659    if( !resourceExists( conf ) )
660      return new String[ 0 ];
661
662    if( depth == 0 && !fullyQualified )
663      return new String[]{getIdentifier()};
664
665    String fullIdentifier = getFullIdentifier( conf );
666
667    int trim = fullyQualified ? 0 : fullIdentifier.length() + 1;
668
669    Set<String> results = new LinkedHashSet<String>();
670
671    getChildPaths( conf, results, trim, new Path( fullIdentifier ), depth );
672
673    return results.toArray( new String[ results.size() ] );
674    }
675
676  private void getChildPaths( Configuration conf, Set<String> results, int trim, Path path, int depth ) throws IOException
677    {
678    if( depth == 0 )
679      {
680      String substring = path.toString().substring( trim );
681      String identifier = getIdentifier();
682
683      if( identifier == null || identifier.isEmpty() )
684        results.add( new Path( substring ).toString() );
685      else
686        results.add( new Path( identifier, substring ).toString() );
687
688      return;
689      }
690
691    FileStatus[] statuses = getFileSystem( conf ).listStatus( path, HIDDEN_FILES_FILTER );
692
693    if( statuses == null )
694      return;
695
696    for( FileStatus fileStatus : statuses )
697      getChildPaths( conf, results, trim, fileStatus.getPath(), depth - 1 );
698    }
699
700  @Override
701  public long getModifiedTime( Configuration conf ) throws IOException
702    {
703    if( !resourceExists( conf ) )
704      return 0;
705
706    FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
707
708    if( !fileStatus.isDir() )
709      return fileStatus.getModificationTime();
710
711    // todo: this should ignore the _temporary path, or not cache if found in the array
712    makeStatuses( conf );
713
714    // statuses is empty, return 0
715    if( statuses == null || statuses.length == 0 )
716      return 0;
717
718    long date = 0;
719
720    // filter out directories as we don't recurs into sub dirs
721    for( FileStatus status : statuses )
722      {
723      if( !status.isDir() )
724        date = Math.max( date, status.getModificationTime() );
725      }
726
727    return date;
728    }
729
730  public static Path getTempPath( Configuration conf )
731    {
732    String tempDir = conf.get( HfsProps.TEMPORARY_DIRECTORY );
733
734    if( tempDir == null )
735      tempDir = conf.get( "hadoop.tmp.dir" );
736
737    return new Path( tempDir );
738    }
739
740  protected String makeTemporaryPathDirString( String name )
741    {
742    // _ is treated as a hidden file, so wipe them out
743    name = name.replaceAll( "^[_\\W\\s]+", "" );
744
745    if( name.isEmpty() )
746      name = "temp-path";
747
748    return name.replaceAll( "[\\W\\s]+", "_" ) + Util.createUniqueID();
749    }
750
751  /**
752   * Given a file-system object, it makes an array of paths
753   *
754   * @param conf of type JobConf
755   * @throws IOException on failure
756   */
757  private void makeStatuses( Configuration conf ) throws IOException
758    {
759    if( statuses != null )
760      return;
761
762    statuses = getFileSystem( conf ).listStatus( getPath() );
763    }
764
765  /**
766   * Method resetFileStatuses removes the status cache, if any.
767   */
768  public void resetFileStatuses()
769    {
770    statuses = null;
771    }
772
773  /** Combined input format that uses the underlying individual input format to combine multiple files into a single split. */
774  static class CombinedInputFormat extends CombineFileInputFormat implements Configurable
775    {
776    private Configuration conf;
777
778    public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter ) throws IOException
779      {
780      return new CombineFileRecordReader( job, (CombineFileSplit) split, reporter, CombineFileRecordReaderWrapper.class );
781      }
782
783    @Override
784    public void setConf( Configuration conf )
785      {
786      this.conf = conf;
787
788      // set the aliased property value, if zero, the super class will look up the hadoop property
789      setMaxSplitSize( conf.getLong( "cascading.hadoop.hfs.combine.max.size", 0 ) );
790      }
791
792    @Override
793    public Configuration getConf()
794      {
795      return conf;
796      }
797    }
798  }