001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.scheme.hadoop;
023
024import java.beans.ConstructorProperties;
025import java.io.IOException;
026import java.nio.charset.Charset;
027import java.util.regex.Pattern;
028
029import cascading.flow.FlowProcess;
030import cascading.flow.hadoop.util.HadoopUtil;
031import cascading.management.annotation.Property;
032import cascading.management.annotation.PropertyDescription;
033import cascading.management.annotation.Visibility;
034import cascading.scheme.Scheme;
035import cascading.scheme.SinkCall;
036import cascading.scheme.SourceCall;
037import cascading.tap.Tap;
038import cascading.tuple.Fields;
039import cascading.tuple.Tuple;
040import cascading.tuple.TupleEntry;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.io.LongWritable;
043import org.apache.hadoop.io.Text;
044import org.apache.hadoop.mapred.InputFormat;
045import org.apache.hadoop.mapred.JobConf;
046import org.apache.hadoop.mapred.OutputCollector;
047import org.apache.hadoop.mapred.OutputFormat;
048import org.apache.hadoop.mapred.RecordReader;
049import org.apache.hadoop.mapred.TextInputFormat;
050import org.apache.hadoop.mapred.TextOutputFormat;
051
052import static cascading.flow.hadoop.util.HadoopUtil.asJobConfInstance;
053
054/**
055 * A TextLine is a type of {@link cascading.scheme.Scheme} for plain text files. Files are broken into
056 * lines. Either line-feed or carriage-return are used to signal end of line.
057 * <p/>
058 * By default, this scheme returns a {@link Tuple} with two fields, "offset" and "line".
059 * <p/>
060 * Many of the constructors take both "sourceFields" and "sinkFields". sourceFields denote the field names
061 * to be used instead of the names "offset" and "line". sinkFields is a selector and is by default {@link Fields#ALL}.
062 * Any available field names can be given if only a subset of the incoming fields should be used.
063 * <p/>
064 * If a {@link Fields} instance is passed on the constructor as sourceFields having only one field, the return tuples
065 * will simply be the "line" value using the given field name.
066 * <p/>
067 * Note that TextLine will concatenate all the Tuple values for the selected fields with a TAB delimiter before
068 * writing out the line.
069 * <p/>
070 * Note sink compression is {@link Compress#DISABLE} by default. If {@code null} is passed to the constructor
071 * for the compression value, it will remain disabled.
072 * <p/>
073 * If any of the input files end with ".zip", an error will be thrown.
074 * * <p/>
075 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor
076 * argument.
077 */
078public class TextLine extends Scheme<Configuration, RecordReader, OutputCollector, Object[], Object[]>
079  {
080  public enum Compress
081    {
082      DEFAULT, ENABLE, DISABLE
083    }
084
085  public static final String DEFAULT_CHARSET = "UTF-8";
086
087  /** Field serialVersionUID */
088  private static final long serialVersionUID = 1L;
089  /** Field DEFAULT_SOURCE_FIELDS */
090  public static final Fields DEFAULT_SOURCE_FIELDS = new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class );
091
092  /** Field zipPattern */
093  private static final Pattern zipPattern = Pattern.compile( "\\.[zZ][iI][pP]([ ,]|$)" );
094
095  /** Field sinkCompression */
096  Compress sinkCompression = Compress.DISABLE;
097
098  /** Field charsetName */
099  String charsetName = DEFAULT_CHARSET;
100
101  /**
102   * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where
103   * "offset" is the byte offset in the input file.
104   */
105  public TextLine()
106    {
107    super( DEFAULT_SOURCE_FIELDS );
108    }
109
110  /**
111   * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where
112   * "offset" is the byte offset in the input file.
113   *
114   * @param numSinkParts of type int
115   */
116  @ConstructorProperties({"numSinkParts"})
117  public TextLine( int numSinkParts )
118    {
119    super( DEFAULT_SOURCE_FIELDS, numSinkParts );
120    }
121
122  /**
123   * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where
124   * "offset" is the byte offset in the input file.
125   *
126   * @param sinkCompression of type Compress
127   */
128  @ConstructorProperties({"sinkCompression"})
129  public TextLine( Compress sinkCompression )
130    {
131    super( DEFAULT_SOURCE_FIELDS );
132
133    setSinkCompression( sinkCompression );
134    }
135
136  /**
137   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
138   * subsequent tuples.
139   *
140   * @param sourceFields the source fields for this scheme
141   * @param sinkFields   the sink fields for this scheme
142   */
143  @ConstructorProperties({"sourceFields", "sinkFields"})
144  public TextLine( Fields sourceFields, Fields sinkFields )
145    {
146    super( sourceFields, sinkFields );
147
148    verify( sourceFields );
149    }
150
151  /**
152   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
153   * subsequent tuples.
154   *
155   * @param sourceFields the source fields for this scheme
156   * @param sinkFields   the sink fields for this scheme
157   * @param charsetName  of type String
158   */
159  @ConstructorProperties({"sourceFields", "sinkFields", "charsetName"})
160  public TextLine( Fields sourceFields, Fields sinkFields, String charsetName )
161    {
162    super( sourceFields, sinkFields );
163
164    // throws an exception if not found
165    setCharsetName( charsetName );
166
167    verify( sourceFields );
168    }
169
170  /**
171   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
172   * subsequent tuples.
173   *
174   * @param sourceFields the source fields for this scheme
175   * @param sinkFields   the sink fields for this scheme
176   * @param numSinkParts of type int
177   */
178  @ConstructorProperties({"sourceFields", "sinkFields", "numSinkParts"})
179  public TextLine( Fields sourceFields, Fields sinkFields, int numSinkParts )
180    {
181    super( sourceFields, sinkFields, numSinkParts );
182
183    verify( sourceFields );
184    }
185
186  /**
187   * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
188   * subsequent tuples.
189   *
190   * @param sourceFields    of type Fields
191   * @param sinkFields      of type Fields
192   * @param sinkCompression of type Compress
193   */
194  @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression"})
195  public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression )
196    {
197    super( sourceFields, sinkFields );
198
199    setSinkCompression( sinkCompression );
200
201    verify( sourceFields );
202    }
203
204  /**
205   * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
206   * subsequent tuples.
207   *
208   * @param sourceFields    of type Fields
209   * @param sinkFields      of type Fields
210   * @param sinkCompression of type Compress
211   * @param charsetName     of type String
212   */
213  @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "charsetName"})
214  public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, String charsetName )
215    {
216    super( sourceFields, sinkFields );
217
218    setSinkCompression( sinkCompression );
219
220    // throws an exception if not found
221    setCharsetName( charsetName );
222
223    verify( sourceFields );
224    }
225
226  /**
227   * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
228   * subsequent tuples.
229   *
230   * @param sourceFields    of type Fields
231   * @param sinkFields      of type Fields
232   * @param sinkCompression of type Compress
233   * @param numSinkParts    of type int
234   */
235  @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts"})
236  public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts )
237    {
238    super( sourceFields, sinkFields, numSinkParts );
239
240    setSinkCompression( sinkCompression );
241
242    verify( sourceFields );
243    }
244
245  /**
246   * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
247   * subsequent tuples.
248   *
249   * @param sourceFields    of type Fields
250   * @param sinkFields      of type Fields
251   * @param sinkCompression of type Compress
252   * @param numSinkParts    of type int
253   * @param charsetName     of type String
254   */
255  @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts", "charsetName"})
256  public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts, String charsetName )
257    {
258    super( sourceFields, sinkFields, numSinkParts );
259
260    setSinkCompression( sinkCompression );
261
262    // throws an exception if not found
263    setCharsetName( charsetName );
264
265    verify( sourceFields );
266    }
267
268  /**
269   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
270   * subsequent tuples.
271   *
272   * @param sourceFields the source fields for this scheme
273   */
274  @ConstructorProperties({"sourceFields"})
275  public TextLine( Fields sourceFields )
276    {
277    super( sourceFields );
278
279    verify( sourceFields );
280    }
281
282  /**
283   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
284   * subsequent tuples.
285   *
286   * @param sourceFields the source fields for this scheme
287   * @param charsetName  of type String
288   */
289  @ConstructorProperties({"sourceFields", "charsetName"})
290  public TextLine( Fields sourceFields, String charsetName )
291    {
292    super( sourceFields );
293
294    // throws an exception if not found
295    setCharsetName( charsetName );
296
297    verify( sourceFields );
298    }
299
300  /**
301   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
302   * subsequent tuples. The resulting data set will have numSinkParts.
303   *
304   * @param sourceFields the source fields for this scheme
305   * @param numSinkParts of type int
306   */
307  @ConstructorProperties({"sourceFields", "numSinkParts"})
308  public TextLine( Fields sourceFields, int numSinkParts )
309    {
310    super( sourceFields, numSinkParts );
311
312    verify( sourceFields );
313    }
314
315  protected void setCharsetName( String charsetName )
316    {
317    if( charsetName != null )
318      this.charsetName = charsetName;
319
320    Charset.forName( this.charsetName );
321    }
322
323  @Property(name = "charset", visibility = Visibility.PUBLIC)
324  @PropertyDescription(value = "character set used in this scheme.")
325  public String getCharsetName()
326    {
327    return charsetName;
328    }
329
330  protected void verify( Fields sourceFields )
331    {
332    if( sourceFields.size() < 1 || sourceFields.size() > 2 )
333      throw new IllegalArgumentException( "this scheme requires either one or two source fields, given [" + sourceFields + "]" );
334    }
335
336  /**
337   * Method getSinkCompression returns the sinkCompression of this TextLine object.
338   *
339   * @return the sinkCompression (type Compress) of this TextLine object.
340   */
341  @Property(name = "sinkCompression", visibility = Visibility.PUBLIC)
342  @PropertyDescription(value = "The compression of the scheme when used in a sink.")
343  public Compress getSinkCompression()
344    {
345    return sinkCompression;
346    }
347
348  /**
349   * Method setSinkCompression sets the sinkCompression of this TextLine object. If null, compression will remain disabled.
350   *
351   * @param sinkCompression the sinkCompression of this TextLine object.
352   */
353  public void setSinkCompression( Compress sinkCompression )
354    {
355    if( sinkCompression != null ) // leave disabled if null
356      this.sinkCompression = sinkCompression;
357    }
358
359  @Override
360  public void sourceConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf )
361    {
362    JobConf jobConf = asJobConfInstance( conf );
363    String paths = jobConf.get( "mapred.input.dir", "" );
364
365    if( hasZippedFiles( paths ) )
366      throw new IllegalStateException( "cannot read zip files: " + paths );
367
368    conf.setBoolean( "mapred.mapper.new-api", false );
369    conf.setClass( "mapred.input.format.class", TextInputFormat.class, InputFormat.class );
370    }
371
372  private boolean hasZippedFiles( String paths )
373    {
374    if( paths == null || paths.length() == 0 )
375      return false;
376
377    return zipPattern.matcher( paths ).find();
378    }
379
380  @Override
381  public void presentSourceFields( FlowProcess<? extends Configuration> flowProcess, Tap tap, Fields fields )
382    {
383    // do nothing to change TextLine state
384    }
385
386  @Override
387  public void presentSinkFields( FlowProcess<? extends Configuration> flowProcess, Tap tap, Fields fields )
388    {
389    // do nothing to change TextLine state
390    }
391
392  @Override
393  public void sinkConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf )
394    {
395    if( tap.getFullIdentifier( conf ).endsWith( ".zip" ) )
396      throw new IllegalStateException( "cannot write zip files: " + HadoopUtil.getOutputPath( conf ) );
397
398    conf.setBoolean( "mapred.mapper.new-api", false );
399
400    if( getSinkCompression() == Compress.DISABLE )
401      conf.setBoolean( "mapred.output.compress", false );
402    else if( getSinkCompression() == Compress.ENABLE )
403      conf.setBoolean( "mapred.output.compress", true );
404
405    conf.setClass( "mapred.output.key.class", Text.class, Object.class );
406    conf.setClass( "mapred.output.value.class", Text.class, Object.class );
407    conf.setClass( "mapred.output.format.class", TextOutputFormat.class, OutputFormat.class );
408    }
409
410  @Override
411  public void sourcePrepare( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
412    {
413    if( sourceCall.getContext() == null )
414      sourceCall.setContext( new Object[ 3 ] );
415
416    sourceCall.getContext()[ 0 ] = sourceCall.getInput().createKey();
417    sourceCall.getContext()[ 1 ] = sourceCall.getInput().createValue();
418    sourceCall.getContext()[ 2 ] = Charset.forName( charsetName );
419    }
420
421  @Override
422  public boolean source( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException
423    {
424    if( !sourceReadInput( sourceCall ) )
425      return false;
426
427    sourceHandleInput( sourceCall );
428
429    return true;
430    }
431
432  private boolean sourceReadInput( SourceCall<Object[], RecordReader> sourceCall ) throws IOException
433    {
434    Object[] context = sourceCall.getContext();
435
436    return sourceCall.getInput().next( context[ 0 ], context[ 1 ] );
437    }
438
439  protected void sourceHandleInput( SourceCall<Object[], RecordReader> sourceCall )
440    {
441    TupleEntry result = sourceCall.getIncomingEntry();
442
443    int index = 0;
444    Object[] context = sourceCall.getContext();
445
446    // coerce into canonical forms
447    if( getSourceFields().size() == 2 )
448      result.setLong( index++, ( (LongWritable) context[ 0 ] ).get() );
449
450    result.setString( index, makeEncodedString( context ) );
451    }
452
453  protected String makeEncodedString( Object[] context )
454    {
455    Text text = (Text) context[ 1 ];
456    return new String( text.getBytes(), 0, text.getLength(), (Charset) context[ 2 ] );
457    }
458
459  @Override
460  public void sourceCleanup( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
461    {
462    sourceCall.setContext( null );
463    }
464
465  @Override
466  public void sinkPrepare( FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
467    {
468    sinkCall.setContext( new Object[ 2 ] );
469
470    sinkCall.getContext()[ 0 ] = new Text();
471    sinkCall.getContext()[ 1 ] = Charset.forName( charsetName );
472    }
473
474  @Override
475  public void sink( FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
476    {
477    Text text = (Text) sinkCall.getContext()[ 0 ];
478    Charset charset = (Charset) sinkCall.getContext()[ 1 ];
479    String line = sinkCall.getOutgoingEntry().getTuple().toString();
480
481    text.set( line.getBytes( charset ) );
482
483    // it's ok to use NULL here so the collector does not write anything
484    sinkCall.getOutput().collect( null, text );
485    }
486  }