001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.scheme.hadoop; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025import java.nio.charset.Charset; 026import java.util.Arrays; 027 028import cascading.flow.FlowProcess; 029import cascading.flow.hadoop.util.HadoopUtil; 030import cascading.management.annotation.Property; 031import cascading.management.annotation.PropertyDescription; 032import cascading.management.annotation.Visibility; 033import cascading.scheme.Scheme; 034import cascading.scheme.SinkCall; 035import cascading.scheme.SourceCall; 036import cascading.tap.Tap; 037import cascading.tuple.Fields; 038import cascading.tuple.Tuple; 039import cascading.tuple.TupleEntry; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.io.LongWritable; 043import org.apache.hadoop.io.Text; 044import org.apache.hadoop.mapred.FileInputFormat; 045import org.apache.hadoop.mapred.InputFormat; 046import org.apache.hadoop.mapred.OutputCollector; 047import org.apache.hadoop.mapred.OutputFormat; 048import org.apache.hadoop.mapred.RecordReader; 049import org.apache.hadoop.mapred.TextInputFormat; 050import org.apache.hadoop.mapred.TextOutputFormat; 051 052import static cascading.flow.hadoop.util.HadoopUtil.asJobConfInstance; 053 054/** 055 * A TextLine is a type of {@link cascading.scheme.Scheme} for plain text files. Files are broken into 056 * lines. Either line-feed or carriage-return are used to signal end of line. 057 * <p/> 058 * By default, this scheme returns a {@link Tuple} with two fields, "offset" and "line". 059 * <p/> 060 * Many of the constructors take both "sourceFields" and "sinkFields". sourceFields denote the field names 061 * to be used instead of the names "offset" and "line". sinkFields is a selector and is by default {@link Fields#ALL}. 062 * Any available field names can be given if only a subset of the incoming fields should be used. 063 * <p/> 064 * If a {@link Fields} instance is passed on the constructor as sourceFields having only one field, the return tuples 065 * will simply be the "line" value using the given field name. 066 * <p/> 067 * Note that TextLine will concatenate all the Tuple values for the selected fields with a TAB delimiter before 068 * writing out the line. 069 * <p/> 070 * Note sink compression is {@link Compress#DISABLE} by default. If {@code null} is passed to the constructor 071 * for the compression value, it will remain disabled. 072 * <p/> 073 * If any of the input files end with ".zip", an error will be thrown. 074 * * <p/> 075 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor 076 * argument. 077 */ 078public class TextLine extends Scheme<Configuration, RecordReader, OutputCollector, Object[], Object[]> 079 { 080 public enum Compress 081 { 082 DEFAULT, ENABLE, DISABLE 083 } 084 085 public static final String DEFAULT_CHARSET = "UTF-8"; 086 087 /** Field serialVersionUID */ 088 private static final long serialVersionUID = 1L; 089 /** Field DEFAULT_SOURCE_FIELDS */ 090 public static final Fields DEFAULT_SOURCE_FIELDS = new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ); 091 092 /** Field sinkCompression */ 093 Compress sinkCompression = Compress.DISABLE; 094 095 String charsetName = DEFAULT_CHARSET; 096 097 /** 098 * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where 099 * "offset" is the byte offset in the input file. 100 */ 101 public TextLine() 102 { 103 super( DEFAULT_SOURCE_FIELDS ); 104 } 105 106 /** 107 * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where 108 * "offset" is the byte offset in the input file. 109 * 110 * @param numSinkParts of type int 111 */ 112 @ConstructorProperties({"numSinkParts"}) 113 public TextLine( int numSinkParts ) 114 { 115 super( DEFAULT_SOURCE_FIELDS, numSinkParts ); 116 } 117 118 /** 119 * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where 120 * "offset" is the byte offset in the input file. 121 * 122 * @param sinkCompression of type Compress 123 */ 124 @ConstructorProperties({"sinkCompression"}) 125 public TextLine( Compress sinkCompression ) 126 { 127 super( DEFAULT_SOURCE_FIELDS ); 128 129 setSinkCompression( sinkCompression ); 130 } 131 132 /** 133 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 134 * subsequent tuples. 135 * 136 * @param sourceFields the source fields for this scheme 137 * @param sinkFields the sink fields for this scheme 138 */ 139 @ConstructorProperties({"sourceFields", "sinkFields"}) 140 public TextLine( Fields sourceFields, Fields sinkFields ) 141 { 142 super( sourceFields, sinkFields ); 143 144 verify( sourceFields ); 145 } 146 147 /** 148 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 149 * subsequent tuples. 150 * 151 * @param sourceFields the source fields for this scheme 152 * @param sinkFields the sink fields for this scheme 153 * @param charsetName of type String 154 */ 155 @ConstructorProperties({"sourceFields", "sinkFields", "charsetName"}) 156 public TextLine( Fields sourceFields, Fields sinkFields, String charsetName ) 157 { 158 super( sourceFields, sinkFields ); 159 160 // throws an exception if not found 161 setCharsetName( charsetName ); 162 163 verify( sourceFields ); 164 } 165 166 /** 167 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 168 * subsequent tuples. 169 * 170 * @param sourceFields the source fields for this scheme 171 * @param sinkFields the sink fields for this scheme 172 * @param numSinkParts of type int 173 */ 174 @ConstructorProperties({"sourceFields", "sinkFields", "numSinkParts"}) 175 public TextLine( Fields sourceFields, Fields sinkFields, int numSinkParts ) 176 { 177 super( sourceFields, sinkFields, numSinkParts ); 178 179 verify( sourceFields ); 180 } 181 182 /** 183 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 184 * subsequent tuples. 185 * 186 * @param sourceFields of type Fields 187 * @param sinkFields of type Fields 188 * @param sinkCompression of type Compress 189 */ 190 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression"}) 191 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression ) 192 { 193 super( sourceFields, sinkFields ); 194 195 setSinkCompression( sinkCompression ); 196 197 verify( sourceFields ); 198 } 199 200 /** 201 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 202 * subsequent tuples. 203 * 204 * @param sourceFields of type Fields 205 * @param sinkFields of type Fields 206 * @param sinkCompression of type Compress 207 * @param charsetName of type String 208 */ 209 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "charsetName"}) 210 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, String charsetName ) 211 { 212 super( sourceFields, sinkFields ); 213 214 setSinkCompression( sinkCompression ); 215 216 // throws an exception if not found 217 setCharsetName( charsetName ); 218 219 verify( sourceFields ); 220 } 221 222 /** 223 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 224 * subsequent tuples. 225 * 226 * @param sourceFields of type Fields 227 * @param sinkFields of type Fields 228 * @param sinkCompression of type Compress 229 * @param numSinkParts of type int 230 */ 231 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts"}) 232 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts ) 233 { 234 super( sourceFields, sinkFields, numSinkParts ); 235 236 setSinkCompression( sinkCompression ); 237 238 verify( sourceFields ); 239 } 240 241 /** 242 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 243 * subsequent tuples. 244 * 245 * @param sourceFields of type Fields 246 * @param sinkFields of type Fields 247 * @param sinkCompression of type Compress 248 * @param numSinkParts of type int 249 * @param charsetName of type String 250 */ 251 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts", "charsetName"}) 252 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts, String charsetName ) 253 { 254 super( sourceFields, sinkFields, numSinkParts ); 255 256 setSinkCompression( sinkCompression ); 257 258 // throws an exception if not found 259 setCharsetName( charsetName ); 260 261 verify( sourceFields ); 262 } 263 264 /** 265 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 266 * subsequent tuples. 267 * 268 * @param sourceFields the source fields for this scheme 269 */ 270 @ConstructorProperties({"sourceFields"}) 271 public TextLine( Fields sourceFields ) 272 { 273 super( sourceFields ); 274 275 verify( sourceFields ); 276 } 277 278 /** 279 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 280 * subsequent tuples. 281 * 282 * @param sourceFields the source fields for this scheme 283 * @param charsetName of type String 284 */ 285 @ConstructorProperties({"sourceFields", "charsetName"}) 286 public TextLine( Fields sourceFields, String charsetName ) 287 { 288 super( sourceFields ); 289 290 // throws an exception if not found 291 setCharsetName( charsetName ); 292 293 verify( sourceFields ); 294 } 295 296 /** 297 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 298 * subsequent tuples. The resulting data set will have numSinkParts. 299 * 300 * @param sourceFields the source fields for this scheme 301 * @param numSinkParts of type int 302 */ 303 @ConstructorProperties({"sourceFields", "numSinkParts"}) 304 public TextLine( Fields sourceFields, int numSinkParts ) 305 { 306 super( sourceFields, numSinkParts ); 307 308 verify( sourceFields ); 309 } 310 311 protected void setCharsetName( String charsetName ) 312 { 313 if( charsetName != null ) 314 this.charsetName = charsetName; 315 316 Charset.forName( this.charsetName ); 317 } 318 319 @Property(name = "charset", visibility = Visibility.PUBLIC) 320 @PropertyDescription(value = "character set used in this scheme.") 321 public String getCharsetName() 322 { 323 return charsetName; 324 } 325 326 protected void verify( Fields sourceFields ) 327 { 328 if( sourceFields.size() < 1 || sourceFields.size() > 2 ) 329 throw new IllegalArgumentException( "this scheme requires either one or two source fields, given [" + sourceFields + "]" ); 330 } 331 332 /** 333 * Method getSinkCompression returns the sinkCompression of this TextLine object. 334 * 335 * @return the sinkCompression (type Compress) of this TextLine object. 336 */ 337 @Property(name = "sinkCompression", visibility = Visibility.PUBLIC) 338 @PropertyDescription(value = "The compression of the scheme when used in a sink.") 339 public Compress getSinkCompression() 340 { 341 return sinkCompression; 342 } 343 344 /** 345 * Method setSinkCompression sets the sinkCompression of this TextLine object. If null, compression will remain disabled. 346 * 347 * @param sinkCompression the sinkCompression of this TextLine object. 348 */ 349 public void setSinkCompression( Compress sinkCompression ) 350 { 351 if( sinkCompression != null ) // leave disabled if null 352 this.sinkCompression = sinkCompression; 353 } 354 355 @Override 356 public void sourceConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf ) 357 { 358 if( hasZippedFiles( FileInputFormat.getInputPaths( asJobConfInstance( conf ) ) ) ) 359 throw new IllegalStateException( "cannot read zip files: " + Arrays.toString( FileInputFormat.getInputPaths( asJobConfInstance( conf ) ) ) ); 360 361 conf.setBoolean( "mapred.mapper.new-api", false ); 362 conf.setClass( "mapred.input.format.class", TextInputFormat.class, InputFormat.class ); 363 } 364 365 private boolean hasZippedFiles( Path[] paths ) 366 { 367 if( paths == null || paths.length == 0 ) 368 return false; 369 370 boolean isZipped = paths[ 0 ].getName().endsWith( ".zip" ); 371 372 for( int i = 1; i < paths.length; i++ ) 373 { 374 if( isZipped != paths[ i ].getName().endsWith( ".zip" ) ) 375 throw new IllegalStateException( "cannot mix zipped and upzipped files" ); 376 } 377 378 return isZipped; 379 } 380 381 @Override 382 public void presentSourceFields( FlowProcess<? extends Configuration> flowProcess, Tap tap, Fields fields ) 383 { 384 // do nothing to change TextLine state 385 } 386 387 @Override 388 public void presentSinkFields( FlowProcess<? extends Configuration> flowProcess, Tap tap, Fields fields ) 389 { 390 // do nothing to change TextLine state 391 } 392 393 @Override 394 public void sinkConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf ) 395 { 396 if( tap.getFullIdentifier( conf ).endsWith( ".zip" ) ) 397 throw new IllegalStateException( "cannot write zip files: " + HadoopUtil.getOutputPath( conf ) ); 398 399 conf.setBoolean( "mapred.mapper.new-api", false ); 400 401 if( getSinkCompression() == Compress.DISABLE ) 402 conf.setBoolean( "mapred.output.compress", false ); 403 else if( getSinkCompression() == Compress.ENABLE ) 404 conf.setBoolean( "mapred.output.compress", true ); 405 406 conf.setClass( "mapred.output.key.class", Text.class, Object.class ); 407 conf.setClass( "mapred.output.value.class", Text.class, Object.class ); 408 conf.setClass( "mapred.output.format.class", TextOutputFormat.class, OutputFormat.class ); 409 } 410 411 @Override 412 public void sourcePrepare( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) 413 { 414 if( sourceCall.getContext() == null ) 415 sourceCall.setContext( new Object[ 3 ] ); 416 417 sourceCall.getContext()[ 0 ] = sourceCall.getInput().createKey(); 418 sourceCall.getContext()[ 1 ] = sourceCall.getInput().createValue(); 419 sourceCall.getContext()[ 2 ] = Charset.forName( charsetName ); 420 } 421 422 @Override 423 public boolean source( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException 424 { 425 if( !sourceReadInput( sourceCall ) ) 426 return false; 427 428 sourceHandleInput( sourceCall ); 429 430 return true; 431 } 432 433 private boolean sourceReadInput( SourceCall<Object[], RecordReader> sourceCall ) throws IOException 434 { 435 Object[] context = sourceCall.getContext(); 436 437 return sourceCall.getInput().next( context[ 0 ], context[ 1 ] ); 438 } 439 440 protected void sourceHandleInput( SourceCall<Object[], RecordReader> sourceCall ) 441 { 442 TupleEntry result = sourceCall.getIncomingEntry(); 443 444 int index = 0; 445 Object[] context = sourceCall.getContext(); 446 447 // coerce into canonical forms 448 if( getSourceFields().size() == 2 ) 449 result.setLong( index++, ( (LongWritable) context[ 0 ] ).get() ); 450 451 result.setString( index, makeEncodedString( context ) ); 452 } 453 454 protected String makeEncodedString( Object[] context ) 455 { 456 Text text = (Text) context[ 1 ]; 457 return new String( text.getBytes(), 0, text.getLength(), (Charset) context[ 2 ] ); 458 } 459 460 @Override 461 public void sourceCleanup( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) 462 { 463 sourceCall.setContext( null ); 464 } 465 466 @Override 467 public void sinkPrepare( FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException 468 { 469 sinkCall.setContext( new Object[ 2 ] ); 470 471 sinkCall.getContext()[ 0 ] = new Text(); 472 sinkCall.getContext()[ 1 ] = Charset.forName( charsetName ); 473 } 474 475 @Override 476 public void sink( FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException 477 { 478 Text text = (Text) sinkCall.getContext()[ 0 ]; 479 Charset charset = (Charset) sinkCall.getContext()[ 1 ]; 480 String line = sinkCall.getOutgoingEntry().getTuple().toString(); 481 482 text.set( line.getBytes( charset ) ); 483 484 // it's ok to use NULL here so the collector does not write anything 485 sinkCall.getOutput().collect( null, text ); 486 } 487 }