001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.platform.hadoop; 022 023 import java.io.File; 024 import java.io.FileNotFoundException; 025 import java.io.IOException; 026 import java.net.URI; 027 import java.net.URISyntaxException; 028 import java.util.Comparator; 029 import java.util.HashMap; 030 import java.util.Map; 031 032 import cascading.flow.FlowProcess; 033 import cascading.flow.FlowSession; 034 import cascading.flow.hadoop.HadoopFlowProcess; 035 import cascading.platform.TestPlatform; 036 import cascading.scheme.Scheme; 037 import cascading.scheme.hadoop.TextDelimited; 038 import cascading.scheme.hadoop.TextLine; 039 import cascading.scheme.util.DelimitedParser; 040 import cascading.scheme.util.FieldTypeResolver; 041 import cascading.tap.SinkMode; 042 import cascading.tap.Tap; 043 import cascading.tap.hadoop.Hfs; 044 import cascading.tap.hadoop.PartitionTap; 045 import cascading.tap.hadoop.TemplateTap; 046 import cascading.tap.hadoop.util.Hadoop18TapUtil; 047 import cascading.tap.partition.Partition; 048 import cascading.tuple.Fields; 049 import org.apache.hadoop.conf.Configuration; 050 import org.apache.hadoop.fs.FileStatus; 051 import org.apache.hadoop.fs.FileSystem; 052 import org.apache.hadoop.fs.FileUtil; 053 import org.apache.hadoop.fs.Path; 054 import org.apache.hadoop.mapred.JobConf; 055 import org.slf4j.Logger; 056 import org.slf4j.LoggerFactory; 057 058 /** 059 * 060 */ 061 public abstract class BaseHadoopPlatform extends TestPlatform 062 { 063 private static final Logger LOG = LoggerFactory.getLogger( BaseHadoopPlatform.class ); 064 065 public transient static FileSystem fileSys; 066 public transient static Configuration jobConf; 067 public transient static Map<Object, Object> properties = new HashMap<Object, Object>(); 068 069 public int numMapTasks = 4; 070 public int numReduceTasks = 1; 071 protected String logger; 072 073 public BaseHadoopPlatform() 074 { 075 logger = System.getProperty( "log4j.logger" ); 076 } 077 078 @Override 079 public boolean isMapReduce() 080 { 081 return true; 082 } 083 084 public void setNumMapTasks( int numMapTasks ) 085 { 086 if( numMapTasks > 0 ) 087 this.numMapTasks = numMapTasks; 088 } 089 090 public void setNumReduceTasks( int numReduceTasks ) 091 { 092 if( numReduceTasks > 0 ) 093 this.numReduceTasks = numReduceTasks; 094 } 095 096 @Override 097 public Map<Object, Object> getProperties() 098 { 099 return new HashMap<Object, Object>( properties ); 100 } 101 102 @Override 103 public void tearDown() 104 { 105 } 106 107 public JobConf getJobConf() 108 { 109 return new JobConf( jobConf ); 110 } 111 112 public boolean isHDFSAvailable() 113 { 114 try 115 { 116 FileSystem fileSystem = FileSystem.get( new URI( "hdfs:", null, null ), getJobConf() ); 117 118 return fileSystem != null; 119 } 120 catch( IOException exception ) // if no hdfs, a no filesystem for scheme io exception will be caught 121 { 122 LOG.warn( "unable to get hdfs filesystem", exception ); 123 } 124 catch( URISyntaxException exception ) 125 { 126 throw new RuntimeException( "internal failure", exception ); 127 } 128 129 return false; 130 } 131 132 @Override 133 public FlowProcess getFlowProcess() 134 { 135 return new HadoopFlowProcess( FlowSession.NULL, getJobConf(), true ); 136 } 137 138 @Override 139 public void copyFromLocal( String inputFile ) throws IOException 140 { 141 if( !new File( inputFile ).exists() ) 142 throw new FileNotFoundException( "data file not found: " + inputFile ); 143 144 if( !isUseCluster() ) 145 return; 146 147 Path path = new Path( safeFileName( inputFile ) ); 148 149 if( !fileSys.exists( path ) ) 150 FileUtil.copy( new File( inputFile ), fileSys, path, false, jobConf ); 151 } 152 153 @Override 154 public void copyToLocal( String outputFile ) throws IOException 155 { 156 if( !isUseCluster() ) 157 return; 158 159 Path path = new Path( safeFileName( outputFile ) ); 160 161 if( !fileSys.exists( path ) ) 162 throw new FileNotFoundException( "data file not found: " + outputFile ); 163 164 File file = new File( outputFile ); 165 166 if( file.exists() ) 167 file.delete(); 168 169 if( fileSys.isFile( path ) ) 170 { 171 // its a file, so just copy it over 172 FileUtil.copy( fileSys, path, file, false, jobConf ); 173 return; 174 } 175 176 // it's a directory 177 file.mkdirs(); 178 179 FileStatus contents[] = fileSys.listStatus( path ); 180 181 for( FileStatus fileStatus : contents ) 182 { 183 Path currentPath = fileStatus.getPath(); 184 185 if( currentPath.getName().startsWith( "_" ) ) // filter out temp and log dirs 186 continue; 187 188 FileUtil.copy( fileSys, currentPath, new File( file, currentPath.getName() ), false, jobConf ); 189 } 190 } 191 192 @Override 193 public boolean remoteExists( String outputFile ) throws IOException 194 { 195 return fileSys.exists( new Path( safeFileName( outputFile ) ) ); 196 } 197 198 @Override 199 public boolean remoteRemove( String outputFile, boolean recursive ) throws IOException 200 { 201 return fileSys.delete( new Path( safeFileName( outputFile ) ), recursive ); 202 } 203 204 @Override 205 public Tap getTap( Scheme scheme, String filename, SinkMode mode ) 206 { 207 return new Hfs( scheme, safeFileName( filename ), mode ); 208 } 209 210 @Override 211 public Tap getTextFile( Fields sourceFields, Fields sinkFields, String filename, SinkMode mode ) 212 { 213 if( sourceFields == null ) 214 return new Hfs( new TextLine(), safeFileName( filename ), mode ); 215 216 return new Hfs( new TextLine( sourceFields, sinkFields ), safeFileName( filename ), mode ); 217 } 218 219 @Override 220 public Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode ) 221 { 222 return new Hfs( new TextDelimited( fields, hasHeader, delimiter, quote, types ), safeFileName( filename ), mode ); 223 } 224 225 @Override 226 public Tap getDelimitedFile( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode ) 227 { 228 return new Hfs( new TextDelimited( fields, skipHeader, writeHeader, delimiter, quote, types ), safeFileName( filename ), mode ); 229 } 230 231 @Override 232 public Tap getDelimitedFile( String delimiter, String quote, FieldTypeResolver fieldTypeResolver, String filename, SinkMode mode ) 233 { 234 return new Hfs( new TextDelimited( true, new DelimitedParser( delimiter, quote, fieldTypeResolver ) ), safeFileName( filename ), mode ); 235 } 236 237 @Override 238 public Tap getTemplateTap( Tap sink, String pathTemplate, int openThreshold ) 239 { 240 return new TemplateTap( (Hfs) sink, pathTemplate, openThreshold ); 241 } 242 243 @Override 244 public Tap getTemplateTap( Tap sink, String pathTemplate, Fields fields, int openThreshold ) 245 { 246 return new TemplateTap( (Hfs) sink, pathTemplate, fields, openThreshold ); 247 } 248 249 @Override 250 public Tap getPartitionTap( Tap sink, Partition partition, int openThreshold ) 251 { 252 return new PartitionTap( (Hfs) sink, partition, openThreshold ); 253 } 254 255 @Override 256 public Scheme getTestConfigDefScheme() 257 { 258 return new HadoopConfigDefScheme( new Fields( "line" ) ); 259 } 260 261 @Override 262 public Scheme getTestFailScheme() 263 { 264 return new HadoopFailScheme( new Fields( "line" ) ); 265 } 266 267 @Override 268 public Comparator getLongComparator( boolean reverseSort ) 269 { 270 return new TestLongComparator( reverseSort ); 271 } 272 273 @Override 274 public Comparator getStringComparator( boolean reverseSort ) 275 { 276 return new TestStringComparator( reverseSort ); 277 } 278 279 @Override 280 public String getHiddenTemporaryPath() 281 { 282 return Hadoop18TapUtil.TEMPORARY_PATH; 283 } 284 285 /** 286 * Replaces characters, that are not allowed by HDFS with an "_". 287 * 288 * @param filename The filename to make safe 289 * @return The filename with all non-supported characters removed. 290 */ 291 protected String safeFileName( String filename ) 292 { 293 return filename.replace( ":", "_" ); // not using Util.cleansePathName as it removes / 294 } 295 }