001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.platform.hadoop; 022 023import java.io.File; 024import java.io.FileNotFoundException; 025import java.io.IOException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.util.Comparator; 029import java.util.HashMap; 030import java.util.Map; 031 032import cascading.platform.TestPlatform; 033import cascading.scheme.Scheme; 034import cascading.scheme.hadoop.TextDelimited; 035import cascading.scheme.hadoop.TextLine; 036import cascading.scheme.util.DelimitedParser; 037import cascading.scheme.util.FieldTypeResolver; 038import cascading.tap.SinkMode; 039import cascading.tap.Tap; 040import cascading.tap.hadoop.Hfs; 041import cascading.tap.hadoop.PartitionTap; 042import cascading.tap.hadoop.util.Hadoop18TapUtil; 043import cascading.tap.partition.Partition; 044import cascading.tuple.Fields; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FileStatus; 047import org.apache.hadoop.fs.FileSystem; 048import org.apache.hadoop.fs.FileUtil; 049import org.apache.hadoop.fs.Path; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * 055 */ 056public abstract class BaseHadoopPlatform<Config extends Configuration> extends TestPlatform 057 { 058 private static final Logger LOG = LoggerFactory.getLogger( BaseHadoopPlatform.class ); 059 060 public transient static FileSystem fileSys; 061 public transient static Configuration configuration; 062 public transient static Map<Object, Object> properties = new HashMap<Object, Object>(); 063 064 protected String logger; 065 066 public BaseHadoopPlatform() 067 { 068 this.logger = System.getProperty( "log4j.logger" ); 069 this.numMappers = 4; 070 this.numReducers = 1; 071 } 072 073 @Override 074 public boolean isMapReduce() 075 { 076 return true; 077 } 078 079 @Override 080 public void setNumMappers( int numMapTasks ) 081 { 082 if( numMapTasks > 0 ) 083 this.numMappers = numMapTasks; 084 } 085 086 @Override 087 public void setNumReducers( int numReduceTasks ) 088 { 089 if( numReduceTasks > 0 ) 090 this.numReducers = numReduceTasks; 091 } 092 093 @Override 094 public void setNumGatherPartitions( int numGatherPartitions ) 095 { 096 if( numGatherPartitions > 0 ) 097 this.numGatherPartitions = numGatherPartitions; 098 } 099 100 @Override 101 public Map<Object, Object> getProperties() 102 { 103 return new HashMap<Object, Object>( properties ); 104 } 105 106 @Override 107 public void tearDown() 108 { 109 } 110 111 public abstract Config getConfiguration(); 112 113 public boolean isHDFSAvailable() 114 { 115 try 116 { 117 FileSystem fileSystem = FileSystem.get( new URI( "hdfs:", null, null ), configuration ); 118 119 return fileSystem != null; 120 } 121 catch( IOException exception ) // if no hdfs, a no filesystem for scheme io exception will be caught 122 { 123 LOG.warn( "unable to get hdfs filesystem", exception ); 124 } 125 catch( URISyntaxException exception ) 126 { 127 throw new RuntimeException( "internal failure", exception ); 128 } 129 130 return false; 131 } 132 133 @Override 134 public void copyFromLocal( String inputFile ) throws IOException 135 { 136 if( !new File( inputFile ).exists() ) 137 throw new FileNotFoundException( "data file not found: " + inputFile ); 138 139 if( !isUseCluster() ) 140 return; 141 142 Path path = new Path( safeFileName( inputFile ) ); 143 144 if( !fileSys.exists( path ) ) 145 FileUtil.copy( new File( inputFile ), fileSys, path, false, configuration ); 146 } 147 148 @Override 149 public void copyToLocal( String outputFile ) throws IOException 150 { 151 if( !isUseCluster() ) 152 return; 153 154 Path path = new Path( safeFileName( outputFile ) ); 155 156 if( !fileSys.exists( path ) ) 157 throw new FileNotFoundException( "data file not found: " + outputFile ); 158 159 File file = new File( outputFile ); 160 161 if( file.exists() ) 162 file.delete(); 163 164 if( fileSys.isFile( path ) ) 165 { 166 // its a file, so just copy it over 167 FileUtil.copy( fileSys, path, file, false, configuration ); 168 return; 169 } 170 171 // it's a directory 172 file.mkdirs(); 173 174 FileStatus contents[] = fileSys.listStatus( path ); 175 176 for( FileStatus fileStatus : contents ) 177 { 178 Path currentPath = fileStatus.getPath(); 179 180 if( currentPath.getName().startsWith( "_" ) ) // filter out temp and log dirs 181 continue; 182 183 FileUtil.copy( fileSys, currentPath, new File( file, currentPath.getName() ), false, configuration ); 184 } 185 } 186 187 @Override 188 public boolean remoteExists( String outputFile ) throws IOException 189 { 190 return fileSys.exists( new Path( safeFileName( outputFile ) ) ); 191 } 192 193 @Override 194 public boolean remoteRemove( String outputFile, boolean recursive ) throws IOException 195 { 196 return fileSys.delete( new Path( safeFileName( outputFile ) ), recursive ); 197 } 198 199 @Override 200 public Tap getTap( Scheme scheme, String filename, SinkMode mode ) 201 { 202 return new Hfs( scheme, safeFileName( filename ), mode ); 203 } 204 205 @Override 206 public Tap getTextFile( Fields sourceFields, Fields sinkFields, String filename, SinkMode mode ) 207 { 208 if( sourceFields == null ) 209 return new Hfs( new TextLine(), safeFileName( filename ), mode ); 210 211 return new Hfs( new TextLine( sourceFields, sinkFields ), safeFileName( filename ), mode ); 212 } 213 214 @Override 215 public Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode ) 216 { 217 return new Hfs( new TextDelimited( fields, hasHeader, delimiter, quote, types ), safeFileName( filename ), mode ); 218 } 219 220 @Override 221 public Tap getDelimitedFile( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode ) 222 { 223 return new Hfs( new TextDelimited( fields, skipHeader, writeHeader, delimiter, quote, types ), safeFileName( filename ), mode ); 224 } 225 226 @Override 227 public Tap getDelimitedFile( String delimiter, String quote, FieldTypeResolver fieldTypeResolver, String filename, SinkMode mode ) 228 { 229 return new Hfs( new TextDelimited( true, new DelimitedParser( delimiter, quote, fieldTypeResolver ) ), safeFileName( filename ), mode ); 230 } 231 232 @Override 233 public Tap getPartitionTap( Tap sink, Partition partition, int openThreshold ) 234 { 235 return new PartitionTap( (Hfs) sink, partition, openThreshold ); 236 } 237 238 @Override 239 public Scheme getTestConfigDefScheme() 240 { 241 return new HadoopConfigDefScheme( new Fields( "line" ), isDAG() ); 242 } 243 244 @Override 245 public Scheme getTestFailScheme() 246 { 247 return new HadoopFailScheme( new Fields( "line" ) ); 248 } 249 250 @Override 251 public Comparator getLongComparator( boolean reverseSort ) 252 { 253 return new TestLongComparator( reverseSort ); 254 } 255 256 @Override 257 public Comparator getStringComparator( boolean reverseSort ) 258 { 259 return new TestStringComparator( reverseSort ); 260 } 261 262 @Override 263 public String getHiddenTemporaryPath() 264 { 265 return Hadoop18TapUtil.TEMPORARY_PATH; 266 } 267 268 /** 269 * Replaces characters, that are not allowed by HDFS with an "_". 270 * 271 * @param filename The filename to make safe 272 * @return The filename with all non-supported characters removed. 273 */ 274 protected String safeFileName( String filename ) 275 { 276 return filename.replace( ":", "_" ); // not using Util.cleansePathName as it removes / 277 } 278 }