001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap.hadoop; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 import java.util.ArrayList; 026 import java.util.List; 027 028 import cascading.flow.FlowProcess; 029 import cascading.scheme.Scheme; 030 import cascading.tap.MultiSourceTap; 031 import cascading.tap.TapException; 032 import org.apache.hadoop.fs.FileStatus; 033 import org.apache.hadoop.fs.FileSystem; 034 import org.apache.hadoop.fs.Path; 035 import org.apache.hadoop.fs.PathFilter; 036 import org.apache.hadoop.mapred.JobConf; 037 import org.apache.hadoop.mapred.RecordReader; 038 039 /** 040 * Class GlobHfs is a type of {@link cascading.tap.MultiSourceTap} that accepts Hadoop style 'file globing' expressions so 041 * multiple files that match the given pattern may be used as the input sources for a given {@link cascading.flow.Flow}. 042 * <p/> 043 * See {@link FileSystem#globStatus(org.apache.hadoop.fs.Path)} for details on the globing syntax. But in short 044 * it is similar to standard regular expressions except alternation is done via {foo,bar} instead of (foo|bar). 045 * <p/> 046 * Note that a {@link cascading.flow.Flow} sourcing from GlobHfs is not currently compatible with the {@link cascading.cascade.Cascade} 047 * scheduler. GlobHfs expects the files and paths to exist so the wildcards can be resolved into concrete values so 048 * that the scheduler can order the Flows properly. 049 * <p/> 050 * Note that globing can match files or directories. It may consume less resources to match directories and let 051 * Hadoop include all sub-files immediately contained in the directory instead of enumerating every individual file. 052 * Ending the glob path with a {@code /} should match only directories. 053 * 054 * @see Hfs 055 * @see cascading.tap.MultiSourceTap 056 * @see FileSystem 057 */ 058 public class GlobHfs extends MultiSourceTap<Hfs, JobConf, RecordReader> 059 { 060 /** Field pathPattern */ 061 private final String pathPattern; 062 /** Field pathFilter */ 063 private final PathFilter pathFilter; 064 065 /** 066 * Constructor GlobHfs creates a new GlobHfs instance. 067 * 068 * @param scheme of type Scheme 069 * @param pathPattern of type String 070 */ 071 @ConstructorProperties({"scheme", "pathPattern"}) 072 public GlobHfs( Scheme<JobConf, RecordReader, ?, ?, ?> scheme, String pathPattern ) 073 { 074 this( scheme, pathPattern, null ); 075 } 076 077 /** 078 * Constructor GlobHfs creates a new GlobHfs instance. 079 * 080 * @param scheme of type Scheme 081 * @param pathPattern of type String 082 * @param pathFilter of type PathFilter 083 */ 084 @ConstructorProperties({"scheme", "pathPattern", "pathFilter"}) 085 public GlobHfs( Scheme<JobConf, RecordReader, ?, ?, ?> scheme, String pathPattern, PathFilter pathFilter ) 086 { 087 super( scheme ); 088 this.pathPattern = pathPattern; 089 this.pathFilter = pathFilter; 090 } 091 092 @Override 093 public String getIdentifier() 094 { 095 return pathPattern; 096 } 097 098 @Override 099 protected Hfs[] getTaps() 100 { 101 return initTapsInternal( new JobConf() ); 102 } 103 104 private Hfs[] initTapsInternal( JobConf conf ) 105 { 106 if( taps != null ) 107 return taps; 108 109 try 110 { 111 taps = makeTaps( conf ); 112 } 113 catch( IOException exception ) 114 { 115 throw new TapException( "unable to resolve taps for globing path: " + pathPattern ); 116 } 117 118 return taps; 119 } 120 121 private Hfs[] makeTaps( JobConf conf ) throws IOException 122 { 123 FileStatus[] statusList; 124 125 Path path = new Path( pathPattern ); 126 127 FileSystem fileSystem = path.getFileSystem( conf ); 128 129 if( pathFilter == null ) 130 statusList = fileSystem.globStatus( path ); 131 else 132 statusList = fileSystem.globStatus( path, pathFilter ); 133 134 if( statusList == null || statusList.length == 0 ) 135 throw new TapException( "unable to find paths matching path pattern: " + pathPattern ); 136 137 List<Hfs> notEmpty = new ArrayList<Hfs>(); 138 139 for( int i = 0; i < statusList.length; i++ ) 140 { 141 // remove empty files. some hadoop versions return non-zero for dirs 142 // so this jives with the expectations set in the above javadoc 143 if( statusList[ i ].isDir() || statusList[ i ].getLen() != 0 ) 144 notEmpty.add( new Hfs( getScheme(), statusList[ i ].getPath().toString() ) ); 145 } 146 147 if( notEmpty.isEmpty() ) 148 throw new TapException( "all paths matching path pattern are zero length and not directories: " + pathPattern ); 149 150 return notEmpty.toArray( new Hfs[ notEmpty.size() ] ); 151 } 152 153 @Override 154 public void sourceConfInit( FlowProcess<JobConf> process, JobConf conf ) 155 { 156 Hfs[] taps = initTapsInternal( conf ); 157 158 for( Hfs tap : taps ) 159 taps[ 0 ].sourceConfInitAddInputPath( conf, tap.getPath() ); // we are building fully qualified paths above 160 161 taps[ 0 ].sourceConfInitComplete( process, conf ); 162 } 163 164 @Override 165 public boolean equals( Object object ) 166 { 167 if( this == object ) 168 return true; 169 if( object == null || getClass() != object.getClass() ) 170 return false; 171 172 GlobHfs globHfs = (GlobHfs) object; 173 174 // do not compare tap arrays, these values should be sufficient to show identity 175 if( getScheme() != null ? !getScheme().equals( globHfs.getScheme() ) : globHfs.getScheme() != null ) 176 return false; 177 if( pathFilter != null ? !pathFilter.equals( globHfs.pathFilter ) : globHfs.pathFilter != null ) 178 return false; 179 if( pathPattern != null ? !pathPattern.equals( globHfs.pathPattern ) : globHfs.pathPattern != null ) 180 return false; 181 182 return true; 183 } 184 185 @Override 186 public int hashCode() 187 { 188 int result = pathPattern != null ? pathPattern.hashCode() : 0; 189 result = 31 * result + ( pathFilter != null ? pathFilter.hashCode() : 0 ); 190 return result; 191 } 192 193 @Override 194 public String toString() 195 { 196 return "GlobHfs[" + pathPattern + ']'; 197 } 198 }