001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.scheme.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.nio.charset.Charset;
026    import java.util.Arrays;
027    
028    import cascading.flow.FlowProcess;
029    import cascading.management.annotation.Property;
030    import cascading.management.annotation.PropertyDescription;
031    import cascading.management.annotation.Visibility;
032    import cascading.scheme.Scheme;
033    import cascading.scheme.SinkCall;
034    import cascading.scheme.SourceCall;
035    import cascading.tap.Tap;
036    import cascading.tuple.Fields;
037    import cascading.tuple.Tuple;
038    import cascading.tuple.TupleEntry;
039    import org.apache.hadoop.fs.Path;
040    import org.apache.hadoop.io.LongWritable;
041    import org.apache.hadoop.io.Text;
042    import org.apache.hadoop.mapred.FileInputFormat;
043    import org.apache.hadoop.mapred.FileOutputFormat;
044    import org.apache.hadoop.mapred.JobConf;
045    import org.apache.hadoop.mapred.OutputCollector;
046    import org.apache.hadoop.mapred.RecordReader;
047    import org.apache.hadoop.mapred.TextInputFormat;
048    import org.apache.hadoop.mapred.TextOutputFormat;
049    
050    /**
051     * A TextLine is a type of {@link cascading.scheme.Scheme} for plain text files. Files are broken into
052     * lines. Either line-feed or carriage-return are used to signal end of line.
053     * <p/>
054     * By default, this scheme returns a {@link Tuple} with two fields, "offset" and "line".
055     * <p/>
056     * Many of the constructors take both "sourceFields" and "sinkFields". sourceFields denote the field names
057     * to be used instead of the names "offset" and "line". sinkFields is a selector and is by default {@link Fields#ALL}.
058     * Any available field names can be given if only a subset of the incoming fields should be used.
059     * <p/>
060     * If a {@link Fields} instance is passed on the constructor as sourceFields having only one field, the return tuples
061     * will simply be the "line" value using the given field name.
062     * <p/>
063     * Note that TextLine will concatenate all the Tuple values for the selected fields with a TAB delimiter before
064     * writing out the line.
065     * <p/>
066     * Note sink compression is {@link Compress#DISABLE} by default. If {@code null} is passed to the constructor
067     * for the compression value, it will remain disabled.
068     * <p/>
069     * If any of the input files end with ".zip", an error will be thrown.
070     * * <p/>
071     * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor
072     * argument.
073     */
074    public class TextLine extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>
075      {
076      public enum Compress
077        {
078          DEFAULT, ENABLE, DISABLE
079        }
080    
081      public static final String DEFAULT_CHARSET = "UTF-8";
082    
083      /** Field serialVersionUID */
084      private static final long serialVersionUID = 1L;
085      /** Field DEFAULT_SOURCE_FIELDS */
086      public static final Fields DEFAULT_SOURCE_FIELDS = new Fields( "offset", "line" );
087    
088      /** Field sinkCompression */
089      Compress sinkCompression = Compress.DISABLE;
090    
091      String charsetName = DEFAULT_CHARSET;
092    
093      /**
094       * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where
095       * "offset" is the byte offset in the input file.
096       */
097      public TextLine()
098        {
099        super( DEFAULT_SOURCE_FIELDS );
100        }
101    
102      /**
103       * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where
104       * "offset" is the byte offset in the input file.
105       *
106       * @param numSinkParts of type int
107       */
108      @ConstructorProperties({"numSinkParts"})
109      public TextLine( int numSinkParts )
110        {
111        super( DEFAULT_SOURCE_FIELDS, numSinkParts );
112        }
113    
114      /**
115       * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where
116       * "offset" is the byte offset in the input file.
117       *
118       * @param sinkCompression of type Compress
119       */
120      @ConstructorProperties({"sinkCompression"})
121      public TextLine( Compress sinkCompression )
122        {
123        super( DEFAULT_SOURCE_FIELDS );
124    
125        setSinkCompression( sinkCompression );
126        }
127    
128      /**
129       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
130       * subsequent tuples.
131       *
132       * @param sourceFields the source fields for this scheme
133       * @param sinkFields   the sink fields for this scheme
134       */
135      @ConstructorProperties({"sourceFields", "sinkFields"})
136      public TextLine( Fields sourceFields, Fields sinkFields )
137        {
138        super( sourceFields, sinkFields );
139    
140        verify( sourceFields );
141        }
142    
143      /**
144       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
145       * subsequent tuples.
146       *
147       * @param sourceFields the source fields for this scheme
148       * @param sinkFields   the sink fields for this scheme
149       * @param charsetName  of type String
150       */
151      @ConstructorProperties({"sourceFields", "sinkFields", "charsetName"})
152      public TextLine( Fields sourceFields, Fields sinkFields, String charsetName )
153        {
154        super( sourceFields, sinkFields );
155    
156        // throws an exception if not found
157        setCharsetName( charsetName );
158    
159        verify( sourceFields );
160        }
161    
162      /**
163       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
164       * subsequent tuples.
165       *
166       * @param sourceFields the source fields for this scheme
167       * @param sinkFields   the sink fields for this scheme
168       * @param numSinkParts of type int
169       */
170      @ConstructorProperties({"sourceFields", "sinkFields", "numSinkParts"})
171      public TextLine( Fields sourceFields, Fields sinkFields, int numSinkParts )
172        {
173        super( sourceFields, sinkFields, numSinkParts );
174    
175        verify( sourceFields );
176        }
177    
178      /**
179       * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
180       * subsequent tuples.
181       *
182       * @param sourceFields    of type Fields
183       * @param sinkFields      of type Fields
184       * @param sinkCompression of type Compress
185       */
186      @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression"})
187      public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression )
188        {
189        super( sourceFields, sinkFields );
190    
191        setSinkCompression( sinkCompression );
192    
193        verify( sourceFields );
194        }
195    
196      /**
197       * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
198       * subsequent tuples.
199       *
200       * @param sourceFields    of type Fields
201       * @param sinkFields      of type Fields
202       * @param sinkCompression of type Compress
203       * @param charsetName     of type String
204       */
205      @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "charsetName"})
206      public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, String charsetName )
207        {
208        super( sourceFields, sinkFields );
209    
210        setSinkCompression( sinkCompression );
211    
212        // throws an exception if not found
213        setCharsetName( charsetName );
214    
215        verify( sourceFields );
216        }
217    
218      /**
219       * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
220       * subsequent tuples.
221       *
222       * @param sourceFields    of type Fields
223       * @param sinkFields      of type Fields
224       * @param sinkCompression of type Compress
225       * @param numSinkParts    of type int
226       */
227      @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts"})
228      public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts )
229        {
230        super( sourceFields, sinkFields, numSinkParts );
231    
232        setSinkCompression( sinkCompression );
233    
234        verify( sourceFields );
235        }
236    
237      /**
238       * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
239       * subsequent tuples.
240       *
241       * @param sourceFields    of type Fields
242       * @param sinkFields      of type Fields
243       * @param sinkCompression of type Compress
244       * @param numSinkParts    of type int
245       * @param charsetName     of type String
246       */
247      @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts", "charsetName"})
248      public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts, String charsetName )
249        {
250        super( sourceFields, sinkFields, numSinkParts );
251    
252        setSinkCompression( sinkCompression );
253    
254        // throws an exception if not found
255        setCharsetName( charsetName );
256    
257        verify( sourceFields );
258        }
259    
260      /**
261       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
262       * subsequent tuples.
263       *
264       * @param sourceFields the source fields for this scheme
265       */
266      @ConstructorProperties({"sourceFields"})
267      public TextLine( Fields sourceFields )
268        {
269        super( sourceFields );
270    
271        verify( sourceFields );
272        }
273    
274      /**
275       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
276       * subsequent tuples.
277       *
278       * @param sourceFields the source fields for this scheme
279       * @param charsetName  of type String
280       */
281      @ConstructorProperties({"sourceFields", "charsetName"})
282      public TextLine( Fields sourceFields, String charsetName )
283        {
284        super( sourceFields );
285    
286        // throws an exception if not found
287        setCharsetName( charsetName );
288    
289        verify( sourceFields );
290        }
291    
292      /**
293       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
294       * subsequent tuples. The resulting data set will have numSinkParts.
295       *
296       * @param sourceFields the source fields for this scheme
297       * @param numSinkParts of type int
298       */
299      @ConstructorProperties({"sourceFields", "numSinkParts"})
300      public TextLine( Fields sourceFields, int numSinkParts )
301        {
302        super( sourceFields, numSinkParts );
303    
304        verify( sourceFields );
305        }
306    
307      protected void setCharsetName( String charsetName )
308        {
309        if( charsetName != null )
310          this.charsetName = charsetName;
311    
312        Charset.forName( this.charsetName );
313        }
314    
315      @Property(name = "charset", visibility = Visibility.PUBLIC)
316      @PropertyDescription(value = "character set used in this scheme.")
317      public String getCharsetName()
318        {
319        return charsetName;
320        }
321    
322      protected void verify( Fields sourceFields )
323        {
324        if( sourceFields.size() < 1 || sourceFields.size() > 2 )
325          throw new IllegalArgumentException( "this scheme requires either one or two source fields, given [" + sourceFields + "]" );
326        }
327    
328      /**
329       * Method getSinkCompression returns the sinkCompression of this TextLine object.
330       *
331       * @return the sinkCompression (type Compress) of this TextLine object.
332       */
333      @Property(name = "sinkCompression", visibility = Visibility.PUBLIC)
334      @PropertyDescription(value = "The compression of the scheme when used in a sink.")
335      public Compress getSinkCompression()
336        {
337        return sinkCompression;
338        }
339    
340      /**
341       * Method setSinkCompression sets the sinkCompression of this TextLine object. If null, compression will remain disabled.
342       *
343       * @param sinkCompression the sinkCompression of this TextLine object.
344       */
345      public void setSinkCompression( Compress sinkCompression )
346        {
347        if( sinkCompression != null ) // leave disabled if null
348          this.sinkCompression = sinkCompression;
349        }
350    
351      @Override
352      public void sourceConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf )
353        {
354        if( hasZippedFiles( FileInputFormat.getInputPaths( conf ) ) )
355          throw new IllegalStateException( "cannot read zip files: " + Arrays.toString( FileInputFormat.getInputPaths( conf ) ) );
356    
357        conf.setInputFormat( TextInputFormat.class );
358        }
359    
360      private boolean hasZippedFiles( Path[] paths )
361        {
362        if( paths == null || paths.length == 0 )
363          return false;
364    
365        boolean isZipped = paths[ 0 ].getName().endsWith( ".zip" );
366    
367        for( int i = 1; i < paths.length; i++ )
368          {
369          if( isZipped != paths[ i ].getName().endsWith( ".zip" ) )
370            throw new IllegalStateException( "cannot mix zipped and upzipped files" );
371          }
372    
373        return isZipped;
374        }
375    
376      @Override
377      public void presentSourceFields( FlowProcess<JobConf> flowProcess, Tap tap, Fields fields )
378        {
379        // do nothing to change TextLine state
380        }
381    
382      @Override
383      public void presentSinkFields( FlowProcess<JobConf> flowProcess, Tap tap, Fields fields )
384        {
385        // do nothing to change TextLine state
386        }
387    
388      @Override
389      public void sinkConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf )
390        {
391        if( tap.getFullIdentifier( conf ).endsWith( ".zip" ) )
392          throw new IllegalStateException( "cannot write zip files: " + FileOutputFormat.getOutputPath( conf ) );
393    
394        if( getSinkCompression() == Compress.DISABLE )
395          conf.setBoolean( "mapred.output.compress", false );
396        else if( getSinkCompression() == Compress.ENABLE )
397          conf.setBoolean( "mapred.output.compress", true );
398    
399        conf.setOutputKeyClass( Text.class ); // be explicit
400        conf.setOutputValueClass( Text.class ); // be explicit
401        conf.setOutputFormat( TextOutputFormat.class );
402        }
403    
404      @Override
405      public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
406        {
407        if( sourceCall.getContext() == null )
408          sourceCall.setContext( new Object[ 3 ] );
409    
410        sourceCall.getContext()[ 0 ] = sourceCall.getInput().createKey();
411        sourceCall.getContext()[ 1 ] = sourceCall.getInput().createValue();
412        sourceCall.getContext()[ 2 ] = Charset.forName( charsetName );
413        }
414    
415      @Override
416      public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException
417        {
418        if( !sourceReadInput( sourceCall ) )
419          return false;
420    
421        sourceHandleInput( sourceCall );
422    
423        return true;
424        }
425    
426      private boolean sourceReadInput( SourceCall<Object[], RecordReader> sourceCall ) throws IOException
427        {
428        Object[] context = sourceCall.getContext();
429    
430        return sourceCall.getInput().next( context[ 0 ], context[ 1 ] );
431        }
432    
433      protected void sourceHandleInput( SourceCall<Object[], RecordReader> sourceCall )
434        {
435        TupleEntry result = sourceCall.getIncomingEntry();
436    
437        int index = 0;
438        Object[] context = sourceCall.getContext();
439    
440        // coerce into canonical forms
441        if( getSourceFields().size() == 2 )
442          result.setLong( index++, ( (LongWritable) context[ 0 ] ).get() );
443    
444        result.setString( index, makeEncodedString( context ) );
445        }
446    
447      protected String makeEncodedString( Object[] context )
448        {
449        Text text = (Text) context[ 1 ];
450        return new String( text.getBytes(), 0, text.getLength(), (Charset) context[ 2 ] );
451        }
452    
453      @Override
454      public void sourceCleanup( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
455        {
456        sourceCall.setContext( null );
457        }
458    
459      @Override
460      public void sinkPrepare( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
461        {
462        sinkCall.setContext( new Object[ 2 ] );
463    
464        sinkCall.getContext()[ 0 ] = new Text();
465        sinkCall.getContext()[ 1 ] = Charset.forName( charsetName );
466        }
467    
468      @Override
469      public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
470        {
471        Text text = (Text) sinkCall.getContext()[ 0 ];
472        Charset charset = (Charset) sinkCall.getContext()[ 1 ];
473        String line = sinkCall.getOutgoingEntry().getTuple().toString();
474    
475        text.set( line.getBytes( charset ) );
476    
477        // it's ok to use NULL here so the collector does not write anything
478        sinkCall.getOutput().collect( null, text );
479        }
480      }