001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap.hadoop; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027 028import cascading.flow.FlowProcess; 029import cascading.scheme.Scheme; 030import cascading.tap.MultiSourceTap; 031import cascading.tap.TapException; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.fs.PathFilter; 037import org.apache.hadoop.mapred.JobConf; 038import org.apache.hadoop.mapred.RecordReader; 039 040/** 041 * Class GlobHfs is a type of {@link cascading.tap.MultiSourceTap} that accepts Hadoop style 'file globing' expressions so 042 * multiple files that match the given pattern may be used as the input sources for a given {@link cascading.flow.Flow}. 043 * <p/> 044 * See {@link FileSystem#globStatus(org.apache.hadoop.fs.Path)} for details on the globing syntax. But in short 045 * it is similar to standard regular expressions except alternation is done via {foo,bar} instead of (foo|bar). 046 * <p/> 047 * Note that a {@link cascading.flow.Flow} sourcing from GlobHfs is not currently compatible with the {@link cascading.cascade.Cascade} 048 * scheduler. GlobHfs expects the files and paths to exist so the wildcards can be resolved into concrete values so 049 * that the scheduler can order the Flows properly. 050 * <p/> 051 * Note that globing can match files or directories. It may consume less resources to match directories and let 052 * Hadoop include all sub-files immediately contained in the directory instead of enumerating every individual file. 053 * Ending the glob path with a {@code /} should match only directories. 054 * 055 * @see Hfs 056 * @see cascading.tap.MultiSourceTap 057 * @see FileSystem 058 */ 059public class GlobHfs extends MultiSourceTap<Hfs, Configuration, RecordReader> 060 { 061 /** Field pathPattern */ 062 private final String pathPattern; 063 /** Field pathFilter */ 064 private final PathFilter pathFilter; 065 066 /** 067 * Constructor GlobHfs creates a new GlobHfs instance. 068 * 069 * @param scheme of type Scheme 070 * @param pathPattern of type String 071 */ 072 @ConstructorProperties({"scheme", "pathPattern"}) 073 public GlobHfs( Scheme<Configuration, RecordReader, ?, ?, ?> scheme, String pathPattern ) 074 { 075 this( scheme, pathPattern, null ); 076 } 077 078 /** 079 * Constructor GlobHfs creates a new GlobHfs instance. 080 * 081 * @param scheme of type Scheme 082 * @param pathPattern of type String 083 * @param pathFilter of type PathFilter 084 */ 085 @ConstructorProperties({"scheme", "pathPattern", "pathFilter"}) 086 public GlobHfs( Scheme<Configuration, RecordReader, ?, ?, ?> scheme, String pathPattern, PathFilter pathFilter ) 087 { 088 super( scheme ); 089 this.pathPattern = pathPattern; 090 this.pathFilter = pathFilter; 091 } 092 093 @Override 094 public String getIdentifier() 095 { 096 return pathPattern; 097 } 098 099 @Override 100 protected Hfs[] getTaps() 101 { 102 return initTapsInternal( new JobConf() ); 103 } 104 105 private Hfs[] initTapsInternal( Configuration conf ) 106 { 107 if( taps != null ) 108 return taps; 109 110 try 111 { 112 taps = makeTaps( conf ); 113 } 114 catch( IOException exception ) 115 { 116 throw new TapException( "unable to resolve taps for globing path: " + pathPattern ); 117 } 118 119 return taps; 120 } 121 122 private Hfs[] makeTaps( Configuration conf ) throws IOException 123 { 124 FileStatus[] statusList; 125 126 Path path = new Path( pathPattern ); 127 128 FileSystem fileSystem = path.getFileSystem( conf ); 129 130 if( pathFilter == null ) 131 statusList = fileSystem.globStatus( path ); 132 else 133 statusList = fileSystem.globStatus( path, pathFilter ); 134 135 if( statusList == null || statusList.length == 0 ) 136 throw new TapException( "unable to find paths matching path pattern: " + pathPattern ); 137 138 List<Hfs> notEmpty = new ArrayList<Hfs>(); 139 140 for( int i = 0; i < statusList.length; i++ ) 141 { 142 // remove empty files. some hadoop versions return non-zero for dirs 143 // so this jives with the expectations set in the above javadoc 144 if( statusList[ i ].isDir() || statusList[ i ].getLen() != 0 ) 145 notEmpty.add( new Hfs( getScheme(), statusList[ i ].getPath().toString() ) ); 146 } 147 148 if( notEmpty.isEmpty() ) 149 throw new TapException( "all paths matching path pattern are zero length and not directories: " + pathPattern ); 150 151 return notEmpty.toArray( new Hfs[ notEmpty.size() ] ); 152 } 153 154 @Override 155 public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf ) 156 { 157 Hfs[] taps = initTapsInternal( conf ); 158 159 for( Hfs tap : taps ) 160 taps[ 0 ].sourceConfInitAddInputPath( conf, tap.getPath() ); // we are building fully qualified paths above 161 162 taps[ 0 ].sourceConfInitComplete( process, conf ); 163 } 164 165 @Override 166 public boolean equals( Object object ) 167 { 168 if( this == object ) 169 return true; 170 if( object == null || getClass() != object.getClass() ) 171 return false; 172 173 GlobHfs globHfs = (GlobHfs) object; 174 175 // do not compare tap arrays, these values should be sufficient to show identity 176 if( getScheme() != null ? !getScheme().equals( globHfs.getScheme() ) : globHfs.getScheme() != null ) 177 return false; 178 if( pathFilter != null ? !pathFilter.equals( globHfs.pathFilter ) : globHfs.pathFilter != null ) 179 return false; 180 if( pathPattern != null ? !pathPattern.equals( globHfs.pathPattern ) : globHfs.pathPattern != null ) 181 return false; 182 183 return true; 184 } 185 186 @Override 187 public int hashCode() 188 { 189 int result = pathPattern != null ? pathPattern.hashCode() : 0; 190 result = 31 * result + ( pathFilter != null ? pathFilter.hashCode() : 0 ); 191 return result; 192 } 193 194 @Override 195 public String toString() 196 { 197 return "GlobHfs[" + pathPattern + ']'; 198 } 199 }