001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.util.HashSet;
028import java.util.LinkedHashSet;
029import java.util.Set;
030
031import cascading.flow.FlowProcess;
032import cascading.flow.hadoop.util.HadoopUtil;
033import cascading.scheme.Scheme;
034import cascading.tap.SinkMode;
035import cascading.tap.Tap;
036import cascading.tap.TapException;
037import cascading.tap.hadoop.io.CombineFileRecordReaderWrapper;
038import cascading.tap.hadoop.io.HadoopTupleEntrySchemeCollector;
039import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
040import cascading.tap.type.FileType;
041import cascading.tuple.TupleEntryCollector;
042import cascading.tuple.TupleEntryIterator;
043import cascading.tuple.hadoop.TupleSerialization;
044import cascading.util.Util;
045import org.apache.hadoop.conf.Configurable;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.FileStatus;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.fs.PathFilter;
051import org.apache.hadoop.mapred.FileInputFormat;
052import org.apache.hadoop.mapred.InputFormat;
053import org.apache.hadoop.mapred.InputSplit;
054import org.apache.hadoop.mapred.JobConf;
055import org.apache.hadoop.mapred.OutputCollector;
056import org.apache.hadoop.mapred.RecordReader;
057import org.apache.hadoop.mapred.Reporter;
058import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
059import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
060import org.apache.hadoop.mapred.lib.CombineFileSplit;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * Class Hfs is the base class for all Hadoop file system access. Hfs may only be used with the
066 * Hadoop {@link cascading.flow.FlowConnector} sub-classes when creating Hadoop executable {@link cascading.flow.Flow}
067 * instances.
068 * <p/>
069 * Paths typically should point to a directory, where in turn all the "part" files immediately in that directory will
070 * be included. This is the practice Hadoop expects. Sub-directories are not included and typically result in a failure.
071 * <p/>
072 * To include sub-directories, Hadoop supports "globing". Globing is a frustrating feature and is supported more
073 * robustly by {@link GlobHfs} and less so by Hfs.
074 * <p/>
075 * Hfs will accept {@code /*} (wildcard) paths, but not all convenience methods like
076 * {@code jobConf.getSize} will behave properly or reliably. Nor can the Hfs instance
077 * with a wildcard path be used as a sink to write data.
078 * <p/>
079 * In those cases use GlobHfs since it is a sub-class of {@link cascading.tap.MultiSourceTap}.
080 * <p/>
081 * Optionally use {@link Dfs} or {@link Lfs} for resources specific to Hadoop Distributed file system or
082 * the Local file system, respectively. Using Hfs is the best practice when possible, Lfs and Dfs are conveniences.
083 * <p/>
084 * Use the Hfs class if the 'kind' of resource is unknown at design time. To use, prefix a scheme to the 'stringPath'. Where
085 * <code>hdfs://...</code> will denote Dfs, and <code>file://...</code> will denote Lfs.
086 * <p/>
087 * Call {@link HfsProps#setTemporaryDirectory(java.util.Map, String)} to use a different temporary file directory path
088 * other than the current Hadoop default path.
089 * <p/>
090 * By default Cascading on Hadoop will assume any source or sink Tap using the {@code file://} URI scheme
091 * intends to read files from the local client filesystem (for example when using the {@code Lfs} Tap) where the Hadoop
092 * job jar is started. Subsequently Cascading will force any MapReduce jobs reading or writing to {@code file://} resources
093 * to run in Hadoop "standalone mode" so that the file can be read.
094 * <p/>
095 * To change this behavior, {@link HfsProps#setLocalModeScheme(java.util.Map, String)} to set a different scheme value,
096 * or to "none" to disable entirely for the case the file to be read is available on every Hadoop processing node
097 * in the exact same path.
098 * <p/>
099 * When using a MapReduce planner, Hfs can optionally combine multiple small files (or a series of small "blocks") into
100 * larger "splits". This reduces the number of resulting map tasks created by Hadoop and can improve application
101 * performance.
102 * <p/>
103 * This is enabled by calling {@link HfsProps#setUseCombinedInput(boolean)} to {@code true}. By default, merging
104 * or combining splits into large ones is disabled.
105 * <p/>
106 * Apache Tez planner does not require this setting, it is supported by default and enabled by the application manager.
107 */
108public class Hfs extends Tap<Configuration, RecordReader, OutputCollector> implements FileType<Configuration>
109  {
110  /** Field LOG */
111  private static final Logger LOG = LoggerFactory.getLogger( Hfs.class );
112
113  /** Field stringPath */
114  protected String stringPath;
115  /** Field uriScheme */
116  transient URI uriScheme;
117  /** Field path */
118  transient Path path;
119  /** Field paths */
120  private transient FileStatus[] statuses; // only used by getModifiedTime
121
122  private transient String cachedPath = null;
123
124  private static final PathFilter HIDDEN_FILES_FILTER = new PathFilter()
125  {
126  public boolean accept( Path path )
127    {
128    String name = path.getName();
129
130    if( name.isEmpty() ) // should never happen
131      return true;
132
133    char first = name.charAt( 0 );
134
135    return first != '_' && first != '.';
136    }
137  };
138
139  protected static String getLocalModeScheme( Configuration conf, String defaultValue )
140    {
141    return conf.get( HfsProps.LOCAL_MODE_SCHEME, defaultValue );
142    }
143
144  protected static boolean getUseCombinedInput( Configuration conf )
145    {
146    String platform = conf.get( "cascading.flow.platform", "" );
147    boolean combineEnabled = conf.getBoolean( "cascading.hadoop.hfs.combine.files", false );
148
149    // only supported by these platforms
150    if( platform.equals( "hadoop" ) || platform.equals( "hadoop2-mr1" ) )
151      return combineEnabled;
152
153    if( combineEnabled && !Boolean.getBoolean( "cascading.hadoop.hfs.combine.files.warned" ) )
154      {
155      LOG.warn( "'cascading.hadoop.hfs.combine.files' has been set to true, but is unsupported by this platform: {}, will be ignored to prevent failures", platform );
156      System.setProperty( "cascading.hadoop.hfs.combine.files.warned", "true" );
157      }
158
159    return false;
160    }
161
162  protected static boolean getCombinedInputSafeMode( Configuration conf )
163    {
164    return conf.getBoolean( "cascading.hadoop.hfs.combine.safemode", true );
165    }
166
167  protected Hfs()
168    {
169    }
170
171  @ConstructorProperties({"scheme"})
172  protected Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme )
173    {
174    super( scheme );
175    }
176
177  /**
178   * Constructor Hfs creates a new Hfs instance.
179   *
180   * @param scheme     of type Scheme
181   * @param stringPath of type String
182   */
183  @ConstructorProperties({"scheme", "stringPath"})
184  public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath )
185    {
186    super( scheme );
187    setStringPath( stringPath );
188    }
189
190  /**
191   * Constructor Hfs creates a new Hfs instance.
192   *
193   * @param scheme     of type Scheme
194   * @param stringPath of type String
195   * @param sinkMode   of type SinkMode
196   */
197  @ConstructorProperties({"scheme", "stringPath", "sinkMode"})
198  public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath, SinkMode sinkMode )
199    {
200    super( scheme, sinkMode );
201    setStringPath( stringPath );
202    }
203
204  protected void setStringPath( String stringPath )
205    {
206    this.stringPath = Util.normalizeUrl( stringPath );
207    }
208
209  protected void setUriScheme( URI uriScheme )
210    {
211    this.uriScheme = uriScheme;
212    }
213
214  public URI getURIScheme( Configuration jobConf )
215    {
216    if( uriScheme != null )
217      return uriScheme;
218
219    uriScheme = makeURIScheme( jobConf );
220
221    return uriScheme;
222    }
223
224  protected URI makeURIScheme( Configuration configuration )
225    {
226    try
227      {
228      URI uriScheme;
229
230      LOG.debug( "handling path: {}", stringPath );
231
232      URI uri = new Path( stringPath ).toUri(); // safer URI parsing
233      String schemeString = uri.getScheme();
234      String authority = uri.getAuthority();
235
236      LOG.debug( "found scheme: {}, authority: {}", schemeString, authority );
237
238      if( schemeString != null && authority != null )
239        uriScheme = new URI( schemeString + "://" + uri.getAuthority() );
240      else if( schemeString != null )
241        uriScheme = new URI( schemeString + ":///" );
242      else
243        uriScheme = getDefaultFileSystemURIScheme( configuration );
244
245      LOG.debug( "using uri scheme: {}", uriScheme );
246
247      return uriScheme;
248      }
249    catch( URISyntaxException exception )
250      {
251      throw new TapException( "could not determine scheme from path: " + getPath(), exception );
252      }
253    }
254
255  /**
256   * Method getDefaultFileSystemURIScheme returns the URI scheme for the default Hadoop FileSystem.
257   *
258   * @param configuration of type JobConf
259   * @return URI
260   */
261  public URI getDefaultFileSystemURIScheme( Configuration configuration )
262    {
263    return getDefaultFileSystem( configuration ).getUri();
264    }
265
266  protected FileSystem getDefaultFileSystem( Configuration configuration )
267    {
268    try
269      {
270      return FileSystem.get( configuration );
271      }
272    catch( IOException exception )
273      {
274      throw new TapException( "unable to get handle to underlying filesystem", exception );
275      }
276    }
277
278  protected FileSystem getFileSystem( Configuration configuration )
279    {
280    URI scheme = getURIScheme( configuration );
281
282    try
283      {
284      return FileSystem.get( scheme, configuration );
285      }
286    catch( IOException exception )
287      {
288      throw new TapException( "unable to get handle to get filesystem for: " + scheme.getScheme(), exception );
289      }
290    }
291
292  @Override
293  public String getIdentifier()
294    {
295    if( cachedPath == null )
296      cachedPath = getPath().toString();
297
298    return cachedPath;
299    }
300
301  public Path getPath()
302    {
303    if( path != null )
304      return path;
305
306    if( stringPath == null )
307      throw new IllegalStateException( "path not initialized" );
308
309    path = new Path( stringPath );
310
311    return path;
312    }
313
314  @Override
315  public String getFullIdentifier( Configuration conf )
316    {
317    return getPath().makeQualified( getFileSystem( conf ) ).toString();
318    }
319
320  @Override
321  public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
322    {
323    String fullIdentifier = getFullIdentifier( conf );
324
325    applySourceConfInitIdentifiers( process, conf, fullIdentifier );
326
327    verifyNoDuplicates( conf );
328    }
329
330  protected static void verifyNoDuplicates( Configuration conf )
331    {
332    Path[] inputPaths = FileInputFormat.getInputPaths( HadoopUtil.asJobConfInstance( conf ) );
333    Set<Path> paths = new HashSet<Path>( (int) ( inputPaths.length / .75f ) );
334
335    for( Path inputPath : inputPaths )
336      {
337      if( !paths.add( inputPath ) )
338        throw new TapException( "may not add duplicate paths, found: " + inputPath );
339      }
340    }
341
342  protected void applySourceConfInitIdentifiers( FlowProcess<? extends Configuration> process, Configuration conf, String... fullIdentifiers )
343    {
344    for( String fullIdentifier : fullIdentifiers )
345      sourceConfInitAddInputPath( conf, new Path( fullIdentifier ) );
346
347    sourceConfInitComplete( process, conf );
348    }
349
350  protected void sourceConfInitAddInputPath( Configuration conf, Path qualifiedPath )
351    {
352    HadoopUtil.addInputPath( conf, qualifiedPath );
353
354    makeLocal( conf, qualifiedPath, "forcing job to local mode, via source: " );
355    }
356
357  protected void sourceConfInitComplete( FlowProcess<? extends Configuration> process, Configuration conf )
358    {
359    super.sourceConfInit( process, conf );
360
361    TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
362
363    // use CombineFileInputFormat if that is enabled
364    handleCombineFileInputFormat( conf );
365    }
366
367  /**
368   * Based on the configuration, handles and sets {@link CombineFileInputFormat} as the input
369   * format.
370   */
371  private void handleCombineFileInputFormat( Configuration conf )
372    {
373    // if combining files, override the configuration to use CombineFileInputFormat
374    if( !getUseCombinedInput( conf ) )
375      return;
376
377    // get the prescribed individual input format from the underlying scheme so it can be used by CombinedInputFormat
378    String individualInputFormat = conf.get( "mapred.input.format.class" );
379
380    if( individualInputFormat == null )
381      throw new TapException( "input format is missing from the underlying scheme" );
382
383    if( individualInputFormat.equals( CombinedInputFormat.class.getName() ) &&
384      conf.get( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT ) == null )
385      throw new TapException( "the input format class is already the combined input format but the underlying input format is missing" );
386
387    // if safe mode is on (default) throw an exception if the InputFormat is not a FileInputFormat, otherwise log a
388    // warning and don't use the CombineFileInputFormat
389    boolean safeMode = getCombinedInputSafeMode( conf );
390
391    if( !FileInputFormat.class.isAssignableFrom( conf.getClass( "mapred.input.format.class", null ) ) )
392      {
393      if( safeMode )
394        throw new TapException( "input format must be of type org.apache.hadoop.mapred.FileInputFormat, got: " + individualInputFormat );
395      else
396        LOG.warn( "not combining input splits with CombineFileInputFormat, {} is not of type org.apache.hadoop.mapred.FileInputFormat.", individualInputFormat );
397      }
398    else
399      {
400      // set the underlying individual input format
401      conf.set( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT, individualInputFormat );
402
403      // override the input format class
404      conf.setClass( "mapred.input.format.class", CombinedInputFormat.class, InputFormat.class );
405      }
406    }
407
408  @Override
409  public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
410    {
411    Path qualifiedPath = new Path( getFullIdentifier( conf ) );
412
413    HadoopUtil.setOutputPath( conf, qualifiedPath );
414    super.sinkConfInit( process, conf );
415
416    makeLocal( conf, qualifiedPath, "forcing job to local mode, via sink: " );
417
418    TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
419    }
420
421  private void makeLocal( Configuration conf, Path qualifiedPath, String infoMessage )
422    {
423    String scheme = getLocalModeScheme( conf, "file" );
424
425    if( !HadoopUtil.isLocal( conf ) && qualifiedPath.toUri().getScheme().equalsIgnoreCase( scheme ) )
426      {
427      if( LOG.isInfoEnabled() )
428        LOG.info( infoMessage + toString() );
429
430      HadoopUtil.setLocal( conf ); // force job to run locally
431      }
432    }
433
434  @Override
435  public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
436    {
437    // input may be null when this method is called on the client side or cluster side when accumulating
438    // for a HashJoin
439    return new HadoopTupleEntrySchemeIterator( flowProcess, this, input );
440    }
441
442  @Override
443  public TupleEntryCollector openForWrite( FlowProcess<? extends Configuration> flowProcess, OutputCollector output ) throws IOException
444    {
445    // output may be null when this method is called on the client side or cluster side when creating
446    // side files with the PartitionTap
447    return new HadoopTupleEntrySchemeCollector( flowProcess, this, output );
448    }
449
450  @Override
451  public boolean createResource( Configuration conf ) throws IOException
452    {
453    if( LOG.isDebugEnabled() )
454      LOG.debug( "making dirs: {}", getFullIdentifier( conf ) );
455
456    return getFileSystem( conf ).mkdirs( getPath() );
457    }
458
459  @Override
460  public boolean deleteResource( Configuration conf ) throws IOException
461    {
462    String fullIdentifier = getFullIdentifier( conf );
463
464    return deleteFullIdentifier( conf, fullIdentifier );
465    }
466
467  private boolean deleteFullIdentifier( Configuration conf, String fullIdentifier ) throws IOException
468    {
469    if( LOG.isDebugEnabled() )
470      LOG.debug( "deleting: {}", fullIdentifier );
471
472    Path fullPath = new Path( fullIdentifier );
473
474    // do not delete the root directory
475    if( fullPath.depth() == 0 )
476      return true;
477
478    FileSystem fileSystem = getFileSystem( conf );
479
480    try
481      {
482      return fileSystem.delete( fullPath, true );
483      }
484    catch( NullPointerException exception )
485      {
486      // hack to get around npe thrown when fs reaches root directory
487      // removes coupling to the new aws hadoop artifacts that may not be deployed
488      if( !( fileSystem.getClass().getSimpleName().equals( "NativeS3FileSystem" ) ) )
489        throw exception;
490      }
491
492    return true;
493    }
494
495  public boolean deleteChildResource( FlowProcess<? extends Configuration> flowProcess, String childIdentifier ) throws IOException
496    {
497    return deleteChildResource( flowProcess.getConfig(), childIdentifier );
498    }
499
500  public boolean deleteChildResource( Configuration conf, String childIdentifier ) throws IOException
501    {
502    Path childPath = new Path( childIdentifier ).makeQualified( getFileSystem( conf ) );
503
504    if( !childPath.toString().startsWith( getFullIdentifier( conf ) ) )
505      return false;
506
507    return deleteFullIdentifier( conf, childPath.toString() );
508    }
509
510  @Override
511  public boolean resourceExists( Configuration conf ) throws IOException
512    {
513    // unfortunately getFileSystem( conf ).exists( getPath() ); does not account for "/*" etc
514    // nor is there an more efficient means to test for existence
515    FileStatus[] fileStatuses = getFileSystem( conf ).globStatus( getPath() );
516
517    return fileStatuses != null && fileStatuses.length > 0;
518    }
519
520  @Override
521  public boolean isDirectory( FlowProcess<? extends Configuration> flowProcess ) throws IOException
522    {
523    return isDirectory( flowProcess.getConfig() );
524    }
525
526  @Override
527  public boolean isDirectory( Configuration conf ) throws IOException
528    {
529    if( !resourceExists( conf ) )
530      return false;
531
532    return getFileSystem( conf ).getFileStatus( getPath() ).isDir();
533    }
534
535  @Override
536  public long getSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException
537    {
538    return getSize( flowProcess.getConfig() );
539    }
540
541  @Override
542  public long getSize( Configuration conf ) throws IOException
543    {
544    if( !resourceExists( conf ) )
545      return 0;
546
547    FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
548
549    if( fileStatus.isDir() )
550      return 0;
551
552    return getFileSystem( conf ).getFileStatus( getPath() ).getLen();
553    }
554
555  /**
556   * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource.
557   *
558   * @param flowProcess
559   * @return long
560   * @throws IOException when
561   */
562  public long getBlockSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException
563    {
564    return getBlockSize( flowProcess.getConfig() );
565    }
566
567  /**
568   * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource.
569   *
570   * @param conf of JobConf
571   * @return long
572   * @throws IOException when
573   */
574  public long getBlockSize( Configuration conf ) throws IOException
575    {
576    if( !resourceExists( conf ) )
577      return 0;
578
579    FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
580
581    if( fileStatus.isDir() )
582      return 0;
583
584    return fileStatus.getBlockSize();
585    }
586
587  /**
588   * Method getReplication returns the {@code replication} specified by the underlying file system for
589   * this resource.
590   *
591   * @param flowProcess
592   * @return int
593   * @throws IOException when
594   */
595  public int getReplication( FlowProcess<? extends Configuration> flowProcess ) throws IOException
596    {
597    return getReplication( flowProcess.getConfig() );
598    }
599
600  /**
601   * Method getReplication returns the {@code replication} specified by the underlying file system for
602   * this resource.
603   *
604   * @param conf of JobConf
605   * @return int
606   * @throws IOException when
607   */
608  public int getReplication( Configuration conf ) throws IOException
609    {
610    if( !resourceExists( conf ) )
611      return 0;
612
613    FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
614
615    if( fileStatus.isDir() )
616      return 0;
617
618    return fileStatus.getReplication();
619    }
620
621  @Override
622  public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess ) throws IOException
623    {
624    return getChildIdentifiers( flowProcess.getConfig(), 1, false );
625    }
626
627  @Override
628  public String[] getChildIdentifiers( Configuration conf ) throws IOException
629    {
630    return getChildIdentifiers( conf, 1, false );
631    }
632
633  @Override
634  public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess, int depth, boolean fullyQualified ) throws IOException
635    {
636    return getChildIdentifiers( flowProcess.getConfig(), depth, fullyQualified );
637    }
638
639  @Override
640  public String[] getChildIdentifiers( Configuration conf, int depth, boolean fullyQualified ) throws IOException
641    {
642    if( !resourceExists( conf ) )
643      return new String[ 0 ];
644
645    if( depth == 0 && !fullyQualified )
646      return new String[]{getIdentifier()};
647
648    String fullIdentifier = getFullIdentifier( conf );
649
650    int trim = fullyQualified ? 0 : fullIdentifier.length() + 1;
651
652    Set<String> results = new LinkedHashSet<String>();
653
654    getChildPaths( conf, results, trim, new Path( fullIdentifier ), depth );
655
656    return results.toArray( new String[ results.size() ] );
657    }
658
659  private void getChildPaths( Configuration conf, Set<String> results, int trim, Path path, int depth ) throws IOException
660    {
661    if( depth == 0 )
662      {
663      String substring = path.toString().substring( trim );
664      String identifier = getIdentifier();
665
666      if( identifier == null || identifier.isEmpty() )
667        results.add( new Path( substring ).toString() );
668      else
669        results.add( new Path( identifier, substring ).toString() );
670
671      return;
672      }
673
674    FileStatus[] statuses = getFileSystem( conf ).listStatus( path, HIDDEN_FILES_FILTER );
675
676    if( statuses == null )
677      return;
678
679    for( FileStatus fileStatus : statuses )
680      getChildPaths( conf, results, trim, fileStatus.getPath(), depth - 1 );
681    }
682
683  @Override
684  public long getModifiedTime( Configuration conf ) throws IOException
685    {
686    if( !resourceExists( conf ) )
687      return 0;
688
689    FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
690
691    if( !fileStatus.isDir() )
692      return fileStatus.getModificationTime();
693
694    // todo: this should ignore the _temporary path, or not cache if found in the array
695    makeStatuses( conf );
696
697    // statuses is empty, return 0
698    if( statuses == null || statuses.length == 0 )
699      return 0;
700
701    long date = 0;
702
703    // filter out directories as we don't recurs into sub dirs
704    for( FileStatus status : statuses )
705      {
706      if( !status.isDir() )
707        date = Math.max( date, status.getModificationTime() );
708      }
709
710    return date;
711    }
712
713  public static Path getTempPath( Configuration conf )
714    {
715    String tempDir = conf.get( HfsProps.TEMPORARY_DIRECTORY );
716
717    if( tempDir == null )
718      tempDir = conf.get( "hadoop.tmp.dir" );
719
720    return new Path( tempDir );
721    }
722
723  protected String makeTemporaryPathDirString( String name )
724    {
725    // _ is treated as a hidden file, so wipe them out
726    name = name.replaceAll( "^[_\\W\\s]+", "" );
727
728    if( name.isEmpty() )
729      name = "temp-path";
730
731    return name.replaceAll( "[\\W\\s]+", "_" ) + Util.createUniqueID();
732    }
733
734  /**
735   * Given a file-system object, it makes an array of paths
736   *
737   * @param conf of type JobConf
738   * @throws IOException on failure
739   */
740  private void makeStatuses( Configuration conf ) throws IOException
741    {
742    if( statuses != null )
743      return;
744
745    statuses = getFileSystem( conf ).listStatus( getPath() );
746    }
747
748  /**
749   * Method resetFileStatuses removes the status cache, if any.
750   */
751  public void resetFileStatuses()
752    {
753    statuses = null;
754    }
755
756  /** Combined input format that uses the underlying individual input format to combine multiple files into a single split. */
757  static class CombinedInputFormat extends CombineFileInputFormat implements Configurable
758    {
759    private Configuration conf;
760
761    public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter ) throws IOException
762      {
763      return new CombineFileRecordReader( job, (CombineFileSplit) split, reporter, CombineFileRecordReaderWrapper.class );
764      }
765
766    @Override
767    public void setConf( Configuration conf )
768      {
769      this.conf = conf;
770
771      // set the aliased property value, if zero, the super class will look up the hadoop property
772      setMaxSplitSize( conf.getLong( "cascading.hadoop.hfs.combine.max.size", 0 ) );
773      }
774
775    @Override
776    public Configuration getConf()
777      {
778      return conf;
779      }
780    }
781  }