001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.scheme.local;
023
024import java.beans.ConstructorProperties;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.InputStreamReader;
028import java.io.LineNumberReader;
029import java.io.OutputStream;
030import java.io.OutputStreamWriter;
031import java.io.PrintWriter;
032import java.io.UnsupportedEncodingException;
033import java.nio.charset.Charset;
034import java.util.Properties;
035
036import cascading.flow.FlowProcess;
037import cascading.management.annotation.Property;
038import cascading.management.annotation.PropertyDescription;
039import cascading.management.annotation.Visibility;
040import cascading.scheme.Scheme;
041import cascading.scheme.SinkCall;
042import cascading.scheme.SourceCall;
043import cascading.tap.Tap;
044import cascading.tap.TapException;
045import cascading.tuple.Fields;
046import cascading.tuple.TupleEntry;
047
048/**
049 * A TextLine is a type of {@link cascading.scheme.Scheme} for plain text files. Files are broken into
050 * lines. Either line-feed or carriage-return are used to signal end of line.
051 * <p/>
052 * By default, this scheme returns a {@link cascading.tuple.Tuple} with two fields, "num" and "line". Where "num"
053 * is the line number for "line".
054 * <p/>
055 * Many of the constructors take both "sourceFields" and "sinkFields". sourceFields denote the field names
056 * to be used instead of the names "num" and "line". sinkFields is a selector and is by default {@link Fields#ALL}.
057 * Any available field names can be given if only a subset of the incoming fields should be used.
058 * <p/>
059 * If a {@link Fields} instance is passed on the constructor as sourceFields having only one field, the return tuples
060 * will simply be the "line" value using the given field name.
061 * <p/>
062 * Note that TextLine will concatenate all the Tuple values for the selected fields with a TAB delimiter before
063 * writing out the line.
064 * <p/>
065 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor
066 * argument.
067 */
068public class TextLine extends Scheme<Properties, InputStream, OutputStream, LineNumberReader, PrintWriter>
069  {
070  public static final String DEFAULT_CHARSET = "UTF-8";
071  public static final Fields DEFAULT_SOURCE_FIELDS = new Fields( "num", "line" ).applyTypes( Integer.TYPE, String.class );
072
073  private String charsetName = DEFAULT_CHARSET;
074
075  /**
076   * Creates a new TextLine instance that sources "num" and "line" fields, and sinks all incoming fields, where
077   * "num" is the line number of the line in the input file.
078   */
079  public TextLine()
080    {
081    super( DEFAULT_SOURCE_FIELDS, Fields.ALL );
082    }
083
084  /**
085   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
086   * subsequent tuples.
087   *
088   * @param sourceFields of Fields
089   */
090  @ConstructorProperties({"sourceFields"})
091  public TextLine( Fields sourceFields )
092    {
093    super( sourceFields );
094
095    verify( sourceFields );
096    }
097
098  /**
099   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
100   * subsequent tuples.
101   *
102   * @param sourceFields of Fields
103   * @param charsetName  of type String
104   */
105  @ConstructorProperties({"sourceFields", "charsetName"})
106  public TextLine( Fields sourceFields, String charsetName )
107    {
108    super( sourceFields );
109
110    // throws an exception if not found
111    setCharsetName( charsetName );
112
113    verify( sourceFields );
114    }
115
116  /**
117   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
118   * subsequent tuples.
119   *
120   * @param sourceFields of Fields
121   * @param sinkFields   of Fields
122   */
123  @ConstructorProperties({"sourceFields", "sinkFields"})
124  public TextLine( Fields sourceFields, Fields sinkFields )
125    {
126    super( sourceFields, sinkFields );
127
128    verify( sourceFields );
129    }
130
131  /**
132   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
133   * subsequent tuples.
134   *
135   * @param sourceFields of Fields
136   * @param sinkFields   of Fields
137   * @param charsetName  of type String
138   */
139  @ConstructorProperties({"sourceFields", "sinkFields", "charsetName"})
140  public TextLine( Fields sourceFields, Fields sinkFields, String charsetName )
141    {
142    super( sourceFields, sinkFields );
143
144    // throws an exception if not found
145    setCharsetName( charsetName );
146
147    verify( sourceFields );
148    }
149
150  private void setCharsetName( String charsetName )
151    {
152    if( charsetName != null )
153      this.charsetName = charsetName;
154
155    Charset.forName( this.charsetName );
156    }
157
158  @Property(name = "charset", visibility = Visibility.PUBLIC)
159  @PropertyDescription("character set used.")
160  public String getCharsetName()
161    {
162    return charsetName;
163    }
164
165  protected void verify( Fields sourceFields )
166    {
167    if( sourceFields.size() < 1 || sourceFields.size() > 2 )
168      throw new IllegalArgumentException( "this scheme requires either one or two source fields, given [" + sourceFields + "]" );
169    }
170
171  public LineNumberReader createInput( InputStream inputStream )
172    {
173    try
174      {
175      return new LineNumberReader( new InputStreamReader( inputStream, charsetName ) );
176      }
177    catch( UnsupportedEncodingException exception )
178      {
179      throw new TapException( exception );
180      }
181    }
182
183  public PrintWriter createOutput( OutputStream outputStream )
184    {
185    try
186      {
187      return new PrintWriter( new OutputStreamWriter( outputStream, charsetName ) );
188      }
189    catch( UnsupportedEncodingException exception )
190      {
191      throw new TapException( exception );
192      }
193    }
194
195  @Override
196  public void presentSourceFields( FlowProcess<? extends Properties> process, Tap tap, Fields fields )
197    {
198    // do nothing
199    }
200
201  @Override
202  public void presentSinkFields( FlowProcess<? extends Properties> process, Tap tap, Fields fields )
203    {
204    // do nothing
205    }
206
207  @Override
208  public void sourceConfInit( FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf )
209    {
210    }
211
212  @Override
213  public void sinkConfInit( FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf )
214    {
215    }
216
217  @Override
218  public void sourcePrepare( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
219    {
220    sourceCall.setContext( createInput( sourceCall.getInput() ) );
221    }
222
223  @Override
224  public void sourceRePrepare( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
225    {
226    sourceCall.setContext( createInput( sourceCall.getInput() ) );
227    }
228
229  @Override
230  public boolean source( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
231    {
232    // first line is 0, this matches offset being zero, so when throwing out the first line for comments
233    int lineNumber = sourceCall.getContext().getLineNumber();
234    String line = sourceCall.getContext().readLine();
235
236    if( line == null )
237      return false;
238
239    TupleEntry incomingEntry = sourceCall.getIncomingEntry();
240
241    if( getSourceFields().size() == 1 )
242      {
243      incomingEntry.setObject( 0, line );
244      }
245    else
246      {
247      incomingEntry.setInteger( 0, lineNumber );
248      incomingEntry.setString( 1, line );
249      }
250
251    return true;
252    }
253
254  @Override
255  public void sourceCleanup( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
256    {
257    sourceCall.setContext( null );
258    }
259
260  @Override
261  public void sinkPrepare( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException
262    {
263    sinkCall.setContext( createOutput( sinkCall.getOutput() ) );
264    }
265
266  @Override
267  public void sink( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException
268    {
269    sinkCall.getContext().println( sinkCall.getOutgoingEntry().getTuple().toString() );
270    }
271
272  @Override
273  public void sinkCleanup( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException
274    {
275    sinkCall.getContext().flush();
276    sinkCall.setContext( null );
277    }
278  }