001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tap.hadoop;
023
024import java.beans.ConstructorProperties;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028
029import cascading.flow.FlowProcess;
030import cascading.scheme.Scheme;
031import cascading.tap.MultiSourceTap;
032import cascading.tap.TapException;
033import cascading.util.LazyIterable;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.fs.PathFilter;
039import org.apache.hadoop.mapred.JobConf;
040import org.apache.hadoop.mapred.RecordReader;
041
042/**
043 * Class GlobHfs is a type of {@link cascading.tap.MultiSourceTap} that accepts Hadoop style 'file globing' expressions so
044 * multiple files that match the given pattern may be used as the input sources for a given {@link cascading.flow.Flow}.
045 * <p/>
046 * See {@link FileSystem#globStatus(org.apache.hadoop.fs.Path)} for details on the globing syntax. But in short
047 * it is similar to standard regular expressions except alternation is done via {foo,bar} instead of (foo|bar).
048 * <p/>
049 * Note that a {@link cascading.flow.Flow} sourcing from GlobHfs is not currently compatible with the {@link cascading.cascade.Cascade}
050 * scheduler. GlobHfs expects the files and paths to exist so the wildcards can be resolved into concrete values so
051 * that the scheduler can order the Flows properly.
052 * <p/>
053 * Note that globing can match files or directories. It may consume less resources to match directories and let
054 * Hadoop include all sub-files immediately contained in the directory instead of enumerating every individual file.
055 * Ending the glob path with a {@code /} should match only directories.
056 *
057 * @see Hfs
058 * @see cascading.tap.MultiSourceTap
059 * @see FileSystem
060 */
061public class GlobHfs extends MultiSourceTap<Hfs, Configuration, RecordReader>
062  {
063  /** Field pathPattern */
064  private final String pathPattern;
065  /** Field pathFilter */
066  private final PathFilter pathFilter;
067
068  /**
069   * Constructor GlobHfs creates a new GlobHfs instance.
070   *
071   * @param scheme      of type Scheme
072   * @param pathPattern of type String
073   */
074  @ConstructorProperties({"scheme", "pathPattern"})
075  public GlobHfs( Scheme<Configuration, RecordReader, ?, ?, ?> scheme, String pathPattern )
076    {
077    this( scheme, pathPattern, null );
078    }
079
080  /**
081   * Constructor GlobHfs creates a new GlobHfs instance.
082   *
083   * @param scheme      of type Scheme
084   * @param pathPattern of type String
085   * @param pathFilter  of type PathFilter
086   */
087  @ConstructorProperties({"scheme", "pathPattern", "pathFilter"})
088  public GlobHfs( Scheme<Configuration, RecordReader, ?, ?, ?> scheme, String pathPattern, PathFilter pathFilter )
089    {
090    super( scheme );
091    this.pathPattern = pathPattern;
092    this.pathFilter = pathFilter;
093    }
094
095  @Override
096  public String getIdentifier()
097    {
098    return pathPattern;
099    }
100
101  @Override
102  protected Hfs[] getTaps()
103    {
104    return initTapsInternal( new JobConf() );
105    }
106
107  private Hfs[] initTapsInternal( Configuration conf )
108    {
109    if( taps != null )
110      return taps;
111
112    try
113      {
114      taps = makeTaps( conf );
115      }
116    catch( IOException exception )
117      {
118      throw new TapException( "unable to resolve taps for globing path: " + pathPattern );
119      }
120
121    return taps;
122    }
123
124  private Hfs[] makeTaps( Configuration conf ) throws IOException
125    {
126    FileStatus[] statusList;
127
128    Path path = new Path( pathPattern );
129
130    FileSystem fileSystem = path.getFileSystem( conf );
131
132    if( pathFilter == null )
133      statusList = fileSystem.globStatus( path );
134    else
135      statusList = fileSystem.globStatus( path, pathFilter );
136
137    if( statusList == null || statusList.length == 0 )
138      throw new TapException( "unable to find paths matching path pattern: " + pathPattern );
139
140    List<Hfs> notEmpty = new ArrayList<Hfs>();
141
142    for( int i = 0; i < statusList.length; i++ )
143      {
144      // remove empty files. some hadoop versions return non-zero for dirs
145      // so this jives with the expectations set in the above javadoc
146      if( statusList[ i ].isDir() || statusList[ i ].getLen() != 0 )
147        notEmpty.add( new Hfs( getScheme(), statusList[ i ].getPath().toString() ) );
148      }
149
150    if( notEmpty.isEmpty() )
151      throw new TapException( "all paths matching path pattern are zero length and not directories: " + pathPattern );
152
153    return notEmpty.toArray( new Hfs[ notEmpty.size() ] );
154    }
155
156  @Override
157  public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
158    {
159    Hfs[] taps = initTapsInternal( conf );
160
161    taps[ 0 ].sourceConfInitAddInputPaths( conf, new LazyIterable<Hfs, Path>( taps )
162      {
163      @Override
164      protected Path convert( Hfs next )
165        {
166        return next.getPath(); // we are building fully qualified paths above
167        }
168      } );
169
170    taps[ 0 ].sourceConfInitComplete( process, conf );
171    }
172
173  @Override
174  public boolean equals( Object object )
175    {
176    if( this == object )
177      return true;
178    if( object == null || getClass() != object.getClass() )
179      return false;
180
181    GlobHfs globHfs = (GlobHfs) object;
182
183    // do not compare tap arrays, these values should be sufficient to show identity
184    if( getScheme() != null ? !getScheme().equals( globHfs.getScheme() ) : globHfs.getScheme() != null )
185      return false;
186    if( pathFilter != null ? !pathFilter.equals( globHfs.pathFilter ) : globHfs.pathFilter != null )
187      return false;
188    if( pathPattern != null ? !pathPattern.equals( globHfs.pathPattern ) : globHfs.pathPattern != null )
189      return false;
190
191    return true;
192    }
193
194  @Override
195  public int hashCode()
196    {
197    int result = pathPattern != null ? pathPattern.hashCode() : 0;
198    result = 31 * result + ( pathFilter != null ? pathFilter.hashCode() : 0 );
199    return result;
200    }
201
202  @Override
203  public String toString()
204    {
205    return "GlobHfs[" + pathPattern + ']';
206    }
207  }