001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tap.hadoop; 023 024import java.beans.ConstructorProperties; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028 029import cascading.flow.FlowProcess; 030import cascading.scheme.Scheme; 031import cascading.tap.MultiSourceTap; 032import cascading.tap.TapException; 033import cascading.util.LazyIterable; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileStatus; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.fs.PathFilter; 039import org.apache.hadoop.mapred.JobConf; 040import org.apache.hadoop.mapred.RecordReader; 041 042/** 043 * Class GlobHfs is a type of {@link cascading.tap.MultiSourceTap} that accepts Hadoop style 'file globing' expressions so 044 * multiple files that match the given pattern may be used as the input sources for a given {@link cascading.flow.Flow}. 045 * <p/> 046 * See {@link FileSystem#globStatus(org.apache.hadoop.fs.Path)} for details on the globing syntax. But in short 047 * it is similar to standard regular expressions except alternation is done via {foo,bar} instead of (foo|bar). 048 * <p/> 049 * Note that a {@link cascading.flow.Flow} sourcing from GlobHfs is not currently compatible with the {@link cascading.cascade.Cascade} 050 * scheduler. GlobHfs expects the files and paths to exist so the wildcards can be resolved into concrete values so 051 * that the scheduler can order the Flows properly. 052 * <p/> 053 * Note that globing can match files or directories. It may consume less resources to match directories and let 054 * Hadoop include all sub-files immediately contained in the directory instead of enumerating every individual file. 055 * Ending the glob path with a {@code /} should match only directories. 056 * 057 * @see Hfs 058 * @see cascading.tap.MultiSourceTap 059 * @see FileSystem 060 */ 061public class GlobHfs extends MultiSourceTap<Hfs, Configuration, RecordReader> 062 { 063 /** Field pathPattern */ 064 private final String pathPattern; 065 /** Field pathFilter */ 066 private final PathFilter pathFilter; 067 068 /** 069 * Constructor GlobHfs creates a new GlobHfs instance. 070 * 071 * @param scheme of type Scheme 072 * @param pathPattern of type String 073 */ 074 @ConstructorProperties({"scheme", "pathPattern"}) 075 public GlobHfs( Scheme<Configuration, RecordReader, ?, ?, ?> scheme, String pathPattern ) 076 { 077 this( scheme, pathPattern, null ); 078 } 079 080 /** 081 * Constructor GlobHfs creates a new GlobHfs instance. 082 * 083 * @param scheme of type Scheme 084 * @param pathPattern of type String 085 * @param pathFilter of type PathFilter 086 */ 087 @ConstructorProperties({"scheme", "pathPattern", "pathFilter"}) 088 public GlobHfs( Scheme<Configuration, RecordReader, ?, ?, ?> scheme, String pathPattern, PathFilter pathFilter ) 089 { 090 super( scheme ); 091 this.pathPattern = pathPattern; 092 this.pathFilter = pathFilter; 093 } 094 095 @Override 096 public String getIdentifier() 097 { 098 return pathPattern; 099 } 100 101 @Override 102 protected Hfs[] getTaps() 103 { 104 return initTapsInternal( new JobConf() ); 105 } 106 107 private Hfs[] initTapsInternal( Configuration conf ) 108 { 109 if( taps != null ) 110 return taps; 111 112 try 113 { 114 taps = makeTaps( conf ); 115 } 116 catch( IOException exception ) 117 { 118 throw new TapException( "unable to resolve taps for globing path: " + pathPattern ); 119 } 120 121 return taps; 122 } 123 124 private Hfs[] makeTaps( Configuration conf ) throws IOException 125 { 126 FileStatus[] statusList; 127 128 Path path = new Path( pathPattern ); 129 130 FileSystem fileSystem = path.getFileSystem( conf ); 131 132 if( pathFilter == null ) 133 statusList = fileSystem.globStatus( path ); 134 else 135 statusList = fileSystem.globStatus( path, pathFilter ); 136 137 if( statusList == null || statusList.length == 0 ) 138 throw new TapException( "unable to find paths matching path pattern: " + pathPattern ); 139 140 List<Hfs> notEmpty = new ArrayList<Hfs>(); 141 142 for( int i = 0; i < statusList.length; i++ ) 143 { 144 // remove empty files. some hadoop versions return non-zero for dirs 145 // so this jives with the expectations set in the above javadoc 146 if( statusList[ i ].isDir() || statusList[ i ].getLen() != 0 ) 147 notEmpty.add( new Hfs( getScheme(), statusList[ i ].getPath().toString() ) ); 148 } 149 150 if( notEmpty.isEmpty() ) 151 throw new TapException( "all paths matching path pattern are zero length and not directories: " + pathPattern ); 152 153 return notEmpty.toArray( new Hfs[ notEmpty.size() ] ); 154 } 155 156 @Override 157 public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf ) 158 { 159 Hfs[] taps = initTapsInternal( conf ); 160 161 taps[ 0 ].sourceConfInitAddInputPaths( conf, new LazyIterable<Hfs, Path>( taps ) 162 { 163 @Override 164 protected Path convert( Hfs next ) 165 { 166 return next.getPath(); // we are building fully qualified paths above 167 } 168 } ); 169 170 taps[ 0 ].sourceConfInitComplete( process, conf ); 171 } 172 173 @Override 174 public boolean equals( Object object ) 175 { 176 if( this == object ) 177 return true; 178 if( object == null || getClass() != object.getClass() ) 179 return false; 180 181 GlobHfs globHfs = (GlobHfs) object; 182 183 // do not compare tap arrays, these values should be sufficient to show identity 184 if( getScheme() != null ? !getScheme().equals( globHfs.getScheme() ) : globHfs.getScheme() != null ) 185 return false; 186 if( pathFilter != null ? !pathFilter.equals( globHfs.pathFilter ) : globHfs.pathFilter != null ) 187 return false; 188 if( pathPattern != null ? !pathPattern.equals( globHfs.pathPattern ) : globHfs.pathPattern != null ) 189 return false; 190 191 return true; 192 } 193 194 @Override 195 public int hashCode() 196 { 197 int result = pathPattern != null ? pathPattern.hashCode() : 0; 198 result = 31 * result + ( pathFilter != null ? pathFilter.hashCode() : 0 ); 199 return result; 200 } 201 202 @Override 203 public String toString() 204 { 205 return "GlobHfs[" + pathPattern + ']'; 206 } 207 }