001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.scheme.hadoop; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 import java.nio.charset.Charset; 026 import java.util.Arrays; 027 028 import cascading.flow.FlowProcess; 029 import cascading.management.annotation.Property; 030 import cascading.management.annotation.PropertyDescription; 031 import cascading.management.annotation.Visibility; 032 import cascading.scheme.Scheme; 033 import cascading.scheme.SinkCall; 034 import cascading.scheme.SourceCall; 035 import cascading.tap.Tap; 036 import cascading.tuple.Fields; 037 import cascading.tuple.Tuple; 038 import cascading.tuple.TupleEntry; 039 import org.apache.hadoop.fs.Path; 040 import org.apache.hadoop.io.LongWritable; 041 import org.apache.hadoop.io.Text; 042 import org.apache.hadoop.mapred.FileInputFormat; 043 import org.apache.hadoop.mapred.FileOutputFormat; 044 import org.apache.hadoop.mapred.JobConf; 045 import org.apache.hadoop.mapred.OutputCollector; 046 import org.apache.hadoop.mapred.RecordReader; 047 import org.apache.hadoop.mapred.TextInputFormat; 048 import org.apache.hadoop.mapred.TextOutputFormat; 049 050 /** 051 * A TextLine is a type of {@link cascading.scheme.Scheme} for plain text files. Files are broken into 052 * lines. Either line-feed or carriage-return are used to signal end of line. 053 * <p/> 054 * By default, this scheme returns a {@link Tuple} with two fields, "offset" and "line". 055 * <p/> 056 * Many of the constructors take both "sourceFields" and "sinkFields". sourceFields denote the field names 057 * to be used instead of the names "offset" and "line". sinkFields is a selector and is by default {@link Fields#ALL}. 058 * Any available field names can be given if only a subset of the incoming fields should be used. 059 * <p/> 060 * If a {@link Fields} instance is passed on the constructor as sourceFields having only one field, the return tuples 061 * will simply be the "line" value using the given field name. 062 * <p/> 063 * Note that TextLine will concatenate all the Tuple values for the selected fields with a TAB delimiter before 064 * writing out the line. 065 * <p/> 066 * Note sink compression is {@link Compress#DISABLE} by default. If {@code null} is passed to the constructor 067 * for the compression value, it will remain disabled. 068 * <p/> 069 * If any of the input files end with ".zip", an error will be thrown. 070 * * <p/> 071 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor 072 * argument. 073 */ 074 public class TextLine extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> 075 { 076 public enum Compress 077 { 078 DEFAULT, ENABLE, DISABLE 079 } 080 081 public static final String DEFAULT_CHARSET = "UTF-8"; 082 083 /** Field serialVersionUID */ 084 private static final long serialVersionUID = 1L; 085 /** Field DEFAULT_SOURCE_FIELDS */ 086 public static final Fields DEFAULT_SOURCE_FIELDS = new Fields( "offset", "line" ); 087 088 /** Field sinkCompression */ 089 Compress sinkCompression = Compress.DISABLE; 090 091 String charsetName = DEFAULT_CHARSET; 092 093 /** 094 * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where 095 * "offset" is the byte offset in the input file. 096 */ 097 public TextLine() 098 { 099 super( DEFAULT_SOURCE_FIELDS ); 100 } 101 102 /** 103 * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where 104 * "offset" is the byte offset in the input file. 105 * 106 * @param numSinkParts of type int 107 */ 108 @ConstructorProperties({"numSinkParts"}) 109 public TextLine( int numSinkParts ) 110 { 111 super( DEFAULT_SOURCE_FIELDS, numSinkParts ); 112 } 113 114 /** 115 * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where 116 * "offset" is the byte offset in the input file. 117 * 118 * @param sinkCompression of type Compress 119 */ 120 @ConstructorProperties({"sinkCompression"}) 121 public TextLine( Compress sinkCompression ) 122 { 123 super( DEFAULT_SOURCE_FIELDS ); 124 125 setSinkCompression( sinkCompression ); 126 } 127 128 /** 129 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 130 * subsequent tuples. 131 * 132 * @param sourceFields the source fields for this scheme 133 * @param sinkFields the sink fields for this scheme 134 */ 135 @ConstructorProperties({"sourceFields", "sinkFields"}) 136 public TextLine( Fields sourceFields, Fields sinkFields ) 137 { 138 super( sourceFields, sinkFields ); 139 140 verify( sourceFields ); 141 } 142 143 /** 144 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 145 * subsequent tuples. 146 * 147 * @param sourceFields the source fields for this scheme 148 * @param sinkFields the sink fields for this scheme 149 * @param charsetName of type String 150 */ 151 @ConstructorProperties({"sourceFields", "sinkFields", "charsetName"}) 152 public TextLine( Fields sourceFields, Fields sinkFields, String charsetName ) 153 { 154 super( sourceFields, sinkFields ); 155 156 // throws an exception if not found 157 setCharsetName( charsetName ); 158 159 verify( sourceFields ); 160 } 161 162 /** 163 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 164 * subsequent tuples. 165 * 166 * @param sourceFields the source fields for this scheme 167 * @param sinkFields the sink fields for this scheme 168 * @param numSinkParts of type int 169 */ 170 @ConstructorProperties({"sourceFields", "sinkFields", "numSinkParts"}) 171 public TextLine( Fields sourceFields, Fields sinkFields, int numSinkParts ) 172 { 173 super( sourceFields, sinkFields, numSinkParts ); 174 175 verify( sourceFields ); 176 } 177 178 /** 179 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 180 * subsequent tuples. 181 * 182 * @param sourceFields of type Fields 183 * @param sinkFields of type Fields 184 * @param sinkCompression of type Compress 185 */ 186 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression"}) 187 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression ) 188 { 189 super( sourceFields, sinkFields ); 190 191 setSinkCompression( sinkCompression ); 192 193 verify( sourceFields ); 194 } 195 196 /** 197 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 198 * subsequent tuples. 199 * 200 * @param sourceFields of type Fields 201 * @param sinkFields of type Fields 202 * @param sinkCompression of type Compress 203 * @param charsetName of type String 204 */ 205 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "charsetName"}) 206 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, String charsetName ) 207 { 208 super( sourceFields, sinkFields ); 209 210 setSinkCompression( sinkCompression ); 211 212 // throws an exception if not found 213 setCharsetName( charsetName ); 214 215 verify( sourceFields ); 216 } 217 218 /** 219 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 220 * subsequent tuples. 221 * 222 * @param sourceFields of type Fields 223 * @param sinkFields of type Fields 224 * @param sinkCompression of type Compress 225 * @param numSinkParts of type int 226 */ 227 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts"}) 228 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts ) 229 { 230 super( sourceFields, sinkFields, numSinkParts ); 231 232 setSinkCompression( sinkCompression ); 233 234 verify( sourceFields ); 235 } 236 237 /** 238 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 239 * subsequent tuples. 240 * 241 * @param sourceFields of type Fields 242 * @param sinkFields of type Fields 243 * @param sinkCompression of type Compress 244 * @param numSinkParts of type int 245 * @param charsetName of type String 246 */ 247 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts", "charsetName"}) 248 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts, String charsetName ) 249 { 250 super( sourceFields, sinkFields, numSinkParts ); 251 252 setSinkCompression( sinkCompression ); 253 254 // throws an exception if not found 255 setCharsetName( charsetName ); 256 257 verify( sourceFields ); 258 } 259 260 /** 261 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 262 * subsequent tuples. 263 * 264 * @param sourceFields the source fields for this scheme 265 */ 266 @ConstructorProperties({"sourceFields"}) 267 public TextLine( Fields sourceFields ) 268 { 269 super( sourceFields ); 270 271 verify( sourceFields ); 272 } 273 274 /** 275 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 276 * subsequent tuples. 277 * 278 * @param sourceFields the source fields for this scheme 279 * @param charsetName of type String 280 */ 281 @ConstructorProperties({"sourceFields", "charsetName"}) 282 public TextLine( Fields sourceFields, String charsetName ) 283 { 284 super( sourceFields ); 285 286 // throws an exception if not found 287 setCharsetName( charsetName ); 288 289 verify( sourceFields ); 290 } 291 292 /** 293 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 294 * subsequent tuples. The resulting data set will have numSinkParts. 295 * 296 * @param sourceFields the source fields for this scheme 297 * @param numSinkParts of type int 298 */ 299 @ConstructorProperties({"sourceFields", "numSinkParts"}) 300 public TextLine( Fields sourceFields, int numSinkParts ) 301 { 302 super( sourceFields, numSinkParts ); 303 304 verify( sourceFields ); 305 } 306 307 protected void setCharsetName( String charsetName ) 308 { 309 if( charsetName != null ) 310 this.charsetName = charsetName; 311 312 Charset.forName( this.charsetName ); 313 } 314 315 @Property(name = "charset", visibility = Visibility.PUBLIC) 316 @PropertyDescription(value = "character set used in this scheme.") 317 public String getCharsetName() 318 { 319 return charsetName; 320 } 321 322 protected void verify( Fields sourceFields ) 323 { 324 if( sourceFields.size() < 1 || sourceFields.size() > 2 ) 325 throw new IllegalArgumentException( "this scheme requires either one or two source fields, given [" + sourceFields + "]" ); 326 } 327 328 /** 329 * Method getSinkCompression returns the sinkCompression of this TextLine object. 330 * 331 * @return the sinkCompression (type Compress) of this TextLine object. 332 */ 333 @Property(name = "sinkCompression", visibility = Visibility.PUBLIC) 334 @PropertyDescription(value = "The compression of the scheme when used in a sink.") 335 public Compress getSinkCompression() 336 { 337 return sinkCompression; 338 } 339 340 /** 341 * Method setSinkCompression sets the sinkCompression of this TextLine object. If null, compression will remain disabled. 342 * 343 * @param sinkCompression the sinkCompression of this TextLine object. 344 */ 345 public void setSinkCompression( Compress sinkCompression ) 346 { 347 if( sinkCompression != null ) // leave disabled if null 348 this.sinkCompression = sinkCompression; 349 } 350 351 @Override 352 public void sourceConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf ) 353 { 354 if( hasZippedFiles( FileInputFormat.getInputPaths( conf ) ) ) 355 throw new IllegalStateException( "cannot read zip files: " + Arrays.toString( FileInputFormat.getInputPaths( conf ) ) ); 356 357 conf.setInputFormat( TextInputFormat.class ); 358 } 359 360 private boolean hasZippedFiles( Path[] paths ) 361 { 362 if( paths == null || paths.length == 0 ) 363 return false; 364 365 boolean isZipped = paths[ 0 ].getName().endsWith( ".zip" ); 366 367 for( int i = 1; i < paths.length; i++ ) 368 { 369 if( isZipped != paths[ i ].getName().endsWith( ".zip" ) ) 370 throw new IllegalStateException( "cannot mix zipped and upzipped files" ); 371 } 372 373 return isZipped; 374 } 375 376 @Override 377 public void presentSourceFields( FlowProcess<JobConf> flowProcess, Tap tap, Fields fields ) 378 { 379 // do nothing to change TextLine state 380 } 381 382 @Override 383 public void presentSinkFields( FlowProcess<JobConf> flowProcess, Tap tap, Fields fields ) 384 { 385 // do nothing to change TextLine state 386 } 387 388 @Override 389 public void sinkConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf ) 390 { 391 if( tap.getFullIdentifier( conf ).endsWith( ".zip" ) ) 392 throw new IllegalStateException( "cannot write zip files: " + FileOutputFormat.getOutputPath( conf ) ); 393 394 if( getSinkCompression() == Compress.DISABLE ) 395 conf.setBoolean( "mapred.output.compress", false ); 396 else if( getSinkCompression() == Compress.ENABLE ) 397 conf.setBoolean( "mapred.output.compress", true ); 398 399 conf.setOutputKeyClass( Text.class ); // be explicit 400 conf.setOutputValueClass( Text.class ); // be explicit 401 conf.setOutputFormat( TextOutputFormat.class ); 402 } 403 404 @Override 405 public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) 406 { 407 if( sourceCall.getContext() == null ) 408 sourceCall.setContext( new Object[ 3 ] ); 409 410 sourceCall.getContext()[ 0 ] = sourceCall.getInput().createKey(); 411 sourceCall.getContext()[ 1 ] = sourceCall.getInput().createValue(); 412 sourceCall.getContext()[ 2 ] = Charset.forName( charsetName ); 413 } 414 415 @Override 416 public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException 417 { 418 if( !sourceReadInput( sourceCall ) ) 419 return false; 420 421 sourceHandleInput( sourceCall ); 422 423 return true; 424 } 425 426 private boolean sourceReadInput( SourceCall<Object[], RecordReader> sourceCall ) throws IOException 427 { 428 Object[] context = sourceCall.getContext(); 429 430 return sourceCall.getInput().next( context[ 0 ], context[ 1 ] ); 431 } 432 433 protected void sourceHandleInput( SourceCall<Object[], RecordReader> sourceCall ) 434 { 435 TupleEntry result = sourceCall.getIncomingEntry(); 436 437 int index = 0; 438 Object[] context = sourceCall.getContext(); 439 440 // coerce into canonical forms 441 if( getSourceFields().size() == 2 ) 442 result.setLong( index++, ( (LongWritable) context[ 0 ] ).get() ); 443 444 result.setString( index, makeEncodedString( context ) ); 445 } 446 447 protected String makeEncodedString( Object[] context ) 448 { 449 Text text = (Text) context[ 1 ]; 450 return new String( text.getBytes(), 0, text.getLength(), (Charset) context[ 2 ] ); 451 } 452 453 @Override 454 public void sourceCleanup( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) 455 { 456 sourceCall.setContext( null ); 457 } 458 459 @Override 460 public void sinkPrepare( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException 461 { 462 sinkCall.setContext( new Object[ 2 ] ); 463 464 sinkCall.getContext()[ 0 ] = new Text(); 465 sinkCall.getContext()[ 1 ] = Charset.forName( charsetName ); 466 } 467 468 @Override 469 public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException 470 { 471 Text text = (Text) sinkCall.getContext()[ 0 ]; 472 Charset charset = (Charset) sinkCall.getContext()[ 1 ]; 473 String line = sinkCall.getOutgoingEntry().getTuple().toString(); 474 475 text.set( line.getBytes( charset ) ); 476 477 // it's ok to use NULL here so the collector does not write anything 478 sinkCall.getOutput().collect( null, text ); 479 } 480 }