001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.scheme.local;
023
024import java.beans.ConstructorProperties;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.InputStreamReader;
028import java.io.LineNumberReader;
029import java.io.OutputStream;
030import java.io.OutputStreamWriter;
031import java.io.PrintWriter;
032import java.io.UnsupportedEncodingException;
033import java.nio.charset.Charset;
034import java.util.Properties;
035
036import cascading.flow.FlowProcess;
037import cascading.management.annotation.Property;
038import cascading.management.annotation.PropertyDescription;
039import cascading.management.annotation.Visibility;
040import cascading.scheme.Scheme;
041import cascading.scheme.SinkCall;
042import cascading.scheme.SourceCall;
043import cascading.scheme.util.DelimitedParser;
044import cascading.tap.CompositeTap;
045import cascading.tap.Tap;
046import cascading.tap.TapException;
047import cascading.tap.local.FileTap;
048import cascading.tuple.Fields;
049import cascading.tuple.Tuple;
050import cascading.tuple.TupleEntry;
051import cascading.tuple.util.TupleViews;
052
053/**
054 * Class TextDelimited provides direct support for delimited text files, like
055 * TAB (\t) or COMMA (,) delimited files. It also optionally allows for quoted values.
056 * <p/>
057 * TextDelimited may also be used to skip the "header" in a file, where the header is defined as the very first line
058 * in every input file. That is, if the byte offset of the current line from the input is zero (0), that line will
059 * be skipped.
060 * <p/>
061 * It is assumed if sink/source {@code fields} is set to either {@link Fields#ALL} or {@link Fields#UNKNOWN} and
062 * {@code skipHeader} or {@code hasHeader} is {@code true}, the field names will be retrieved from the header of the
063 * file and used during planning. The header will parsed with the same rules as the body of the file.
064 * <p/>
065 * By default headers are not skipped.
066 * <p/>
067 * TextDelimited may also be used to write a "header" in a file. The fields names for the header are taken directly
068 * from the declared fields. Or if the declared fields are {@link Fields#ALL} or {@link Fields#UNKNOWN}, the
069 * resolved field names will be used, if any.
070 * <p/>
071 * By default headers are not written.
072 * <p/>
073 * If {@code hasHeaders} is set to {@code true} on a constructor, both {@code skipHeader} and {@code writeHeader} will
074 * be set to {@code true}.
075 * <p/>
076 * By default this {@link cascading.scheme.Scheme} is both {@code strict} and {@code safe}.
077 * <p/>
078 * Strict meaning if a line of text does not parse into the expected number of fields, this class will throw a
079 * {@link TapException}. If strict is {@code false}, then {@link Tuple} will be returned with {@code null} values
080 * for the missing fields.
081 * <p/>
082 * Safe meaning if a field cannot be coerced into an expected type, a {@code null} will be used for the value.
083 * If safe is {@code false}, a {@link TapException} will be thrown.
084 * <p/>
085 * Also by default, {@code quote} strings are not searched for to improve processing speed. If a file is
086 * COMMA delimited but may have COMMA's in a value, the whole value should be surrounded by the quote string, typically
087 * double quotes ({@literal "}).
088 * <p/>
089 * Note all empty fields in a line will be returned as {@code null} unless coerced into a new type.
090 * <p/>
091 * This Scheme may source/sink {@link Fields#ALL}, when given on the constructor the new instance will automatically
092 * default to strict == false as the number of fields parsed are arbitrary or unknown. A type array may not be given
093 * either, so all values will be returned as Strings.
094 * <p/>
095 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor
096 * argument.
097 * <p/>
098 * To override field and line parsing behaviors, sub-class {@link DelimitedParser} or provide a
099 * {@link cascading.scheme.util.FieldTypeResolver} implementation.
100 * <p/>
101 * Note that there should be no expectation that TextDelimited, or specifically {@link DelimitedParser}, can handle
102 * all delimited and quoted combinations reliably. Attempting to do so would impair its performance and maintainability.
103 * <p/>
104 * Further, it can be safely said any corrupted files will not be supported for obvious reasons. Corrupted files may
105 * result in exceptions or could cause edge cases in the underlying java regular expression engine.
106 * <p/>
107 * A large part of Cascading was designed to help users cleans data. Thus the recommendation is to create Flows that
108 * are responsible for cleansing large data-sets when faced with the problem
109 * <p/>
110 * DelimitedParser maybe sub-classed and extended if necessary.
111 *
112 * @see TextLine
113 */
114public class TextDelimited extends Scheme<Properties, InputStream, OutputStream, LineNumberReader, PrintWriter>
115  {
116  public static final String DEFAULT_CHARSET = "UTF-8";
117
118  private final boolean skipHeader;
119  private final boolean writeHeader;
120  private final DelimitedParser delimitedParser;
121  private String charsetName = DEFAULT_CHARSET;
122
123  /**
124   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
125   * {@link Fields#ALL} and using TAB as the default delimiter.
126   * <p/>
127   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
128   * with a {@link cascading.pipe.Checkpoint} Tap.
129   */
130  public TextDelimited()
131    {
132    this( Fields.ALL );
133    }
134
135  /**
136   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
137   * {@link Fields#ALL} and using TAB as the default delimiter.
138   * <p/>
139   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
140   * with a {@link cascading.pipe.Checkpoint} Tap.
141   *
142   * @param hasHeader
143   * @param delimiter
144   */
145  @ConstructorProperties({"hasHeader", "delimiter"})
146  public TextDelimited( boolean hasHeader, String delimiter )
147    {
148    this( Fields.ALL, hasHeader, delimiter, null, (Class[]) null );
149    }
150
151  /**
152   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
153   * {@link Fields#ALL} and using TAB as the default delimiter.
154   * <p/>
155   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
156   * with a {@link cascading.pipe.Checkpoint} Tap.
157   *
158   * @param hasHeader
159   * @param delimiter
160   * @param quote
161   */
162  @ConstructorProperties({"hasHeader", "delimiter", "quote"})
163  public TextDelimited( boolean hasHeader, String delimiter, String quote )
164    {
165    this( Fields.ALL, hasHeader, delimiter, quote, (Class[]) null );
166    }
167
168  /**
169   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
170   * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
171   * <p/>
172   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
173   * with a {@link cascading.pipe.Checkpoint} Tap.
174   *
175   * @param hasHeader
176   * @param delimitedParser
177   */
178  @ConstructorProperties({"hasHeader", "delimitedParser"})
179  public TextDelimited( boolean hasHeader, DelimitedParser delimitedParser )
180    {
181    this( Fields.ALL, hasHeader, hasHeader, delimitedParser );
182    }
183
184  /**
185   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
186   * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
187   * <p/>
188   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
189   * with a {@link cascading.pipe.Checkpoint} Tap.
190   * <p/>
191   * This constructor will set {@code skipHeader} and {@code writeHeader} values to true.
192   *
193   * @param delimitedParser
194   */
195  @ConstructorProperties({"delimitedParser"})
196  public TextDelimited( DelimitedParser delimitedParser )
197    {
198    this( Fields.ALL, true, true, delimitedParser );
199    }
200
201  /**
202   * Constructor TextDelimited creates a new TextDelimited instance with TAB as the default delimiter.
203   *
204   * @param fields of type Fields
205   */
206  @ConstructorProperties({"fields"})
207  public TextDelimited( Fields fields )
208    {
209    this( fields, "\t", null, null );
210    }
211
212  /**
213   * Constructor TextDelimited creates a new TextDelimited instance.
214   *
215   * @param fields    of type Fields
216   * @param delimiter of type String
217   */
218  @ConstructorProperties({"fields", "delimiter"})
219  public TextDelimited( Fields fields, String delimiter )
220    {
221    this( fields, delimiter, null, null );
222    }
223
224  /**
225   * Constructor TextDelimited creates a new TextDelimited instance.
226   *
227   * @param fields    of type Fields
228   * @param hasHeader of type boolean
229   * @param delimiter of type String
230   */
231  @ConstructorProperties({"fields", "hasHeader", "delimiter"})
232  public TextDelimited( Fields fields, boolean hasHeader, String delimiter )
233    {
234    this( fields, hasHeader, hasHeader, delimiter, null, null );
235    }
236
237  /**
238   * Constructor TextDelimited creates a new TextDelimited instance.
239   *
240   * @param fields     of type Fields
241   * @param skipHeader of type boolean
242   * @param delimiter  of type String
243   */
244  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter"})
245  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter )
246    {
247    this( fields, skipHeader, writeHeader, delimiter, null, null );
248    }
249
250  /**
251   * Constructor TextDelimited creates a new TextDelimited instance.
252   *
253   * @param fields    of type Fields
254   * @param delimiter of type String
255   * @param types     of type Class[]
256   */
257  @ConstructorProperties({"fields", "delimiter", "types"})
258  public TextDelimited( Fields fields, String delimiter, Class[] types )
259    {
260    this( fields, delimiter, null, types );
261    }
262
263  /**
264   * Constructor TextDelimited creates a new TextDelimited instance.
265   *
266   * @param fields    of type Fields
267   * @param hasHeader of type boolean
268   * @param delimiter of type String
269   * @param types     of type Class[]
270   */
271  @ConstructorProperties({"fields", "hasHeader", "delimiter", "types"})
272  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, Class[] types )
273    {
274    this( fields, hasHeader, hasHeader, delimiter, null, types );
275    }
276
277  /**
278   * Constructor TextDelimited creates a new TextDelimited instance.
279   *
280   * @param fields      of type Fields
281   * @param skipHeader  of type boolean
282   * @param writeHeader of type boolean
283   * @param delimiter   of type String
284   * @param types       of type Class[]
285   */
286  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "types"})
287  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types )
288    {
289    this( fields, skipHeader, writeHeader, delimiter, null, types );
290    }
291
292  /**
293   * Constructor TextDelimited creates a new TextDelimited instance.
294   *
295   * @param fields    of type Fields
296   * @param delimiter of type String
297   * @param quote     of type String
298   * @param types     of type Class[]
299   */
300  @ConstructorProperties({"fields", "delimiter", "quote", "types"})
301  public TextDelimited( Fields fields, String delimiter, String quote, Class[] types )
302    {
303    this( fields, false, delimiter, quote, types );
304    }
305
306  /**
307   * Constructor TextDelimited creates a new TextDelimited instance.
308   *
309   * @param fields    of type Fields
310   * @param hasHeader of type boolean
311   * @param delimiter of type String
312   * @param quote     of type String
313   * @param types     of type Class[]
314   */
315  @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types"})
316  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types )
317    {
318    this( fields, hasHeader, hasHeader, delimiter, quote, types, true );
319    }
320
321  /**
322   * Constructor TextDelimited creates a new TextDelimited instance.
323   *
324   * @param fields      of type Fields
325   * @param skipHeader  of type boolean
326   * @param writeHeader of type boolean
327   * @param delimiter   of type String
328   * @param quote       of type String
329   * @param types       of type Class[]
330   */
331  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types"})
332  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types )
333    {
334    this( fields, skipHeader, writeHeader, delimiter, quote, types, true );
335    }
336
337  /**
338   * Constructor TextDelimited creates a new TextDelimited instance.
339   *
340   * @param fields    of type Fields
341   * @param delimiter of type String
342   * @param quote     of type String
343   * @param types     of type Class[]
344   * @param safe      of type boolean
345   */
346  @ConstructorProperties({"fields", "delimiter", "quote", "types", "safe"})
347  public TextDelimited( Fields fields, String delimiter, String quote, Class[] types, boolean safe )
348    {
349    this( fields, false, delimiter, quote, types, safe );
350    }
351
352  /**
353   * Constructor TextDelimited creates a new TextDelimited instance.
354   *
355   * @param fields    of type Fields
356   * @param hasHeader of type boolean
357   * @param delimiter of type String
358   * @param quote     of type String
359   * @param types     of type Class[]
360   * @param safe      of type boolean
361   */
362  @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe"})
363  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe )
364    {
365    this( fields, hasHeader, hasHeader, delimiter, true, quote, types, safe );
366    }
367
368  /**
369   * Constructor TextDelimited creates a new TextDelimited instance.
370   *
371   * @param fields      of type Fields
372   * @param hasHeader   of type boolean
373   * @param delimiter   of type String
374   * @param quote       of type String
375   * @param types       of type Class[]
376   * @param safe        of type boolean
377   * @param charsetName of type String
378   */
379  @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe", "charsetName"})
380  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe, String charsetName )
381    {
382    this( fields, hasHeader, hasHeader, delimiter, true, quote, types, safe, charsetName );
383    }
384
385  /**
386   * Constructor TextDelimited creates a new TextDelimited instance.
387   *
388   * @param fields      of type Fields
389   * @param skipHeader  of type boolean
390   * @param writeHeader of type boolean
391   * @param delimiter   of type String
392   * @param quote       of type String
393   * @param types       of type Class[]
394   * @param safe        of type boolean
395   */
396  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types", "safe"})
397  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe )
398    {
399    this( fields, skipHeader, writeHeader, delimiter, true, quote, types, safe );
400    }
401
402  /**
403   * Constructor TextDelimited creates a new TextDelimited instance.
404   *
405   * @param fields    of type Fields
406   * @param delimiter of type String
407   * @param quote     of type String
408   */
409  @ConstructorProperties({"fields", "delimiter", "quote"})
410  public TextDelimited( Fields fields, String delimiter, String quote )
411    {
412    this( fields, false, delimiter, quote, null, true );
413    }
414
415  /**
416   * Constructor TextDelimited creates a new TextDelimited instance.
417   *
418   * @param fields    of type Fields
419   * @param hasHeader of type boolean
420   * @param delimiter of type String
421   * @param quote     of type String
422   */
423  @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote"})
424  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote )
425    {
426    this( fields, hasHeader, delimiter, quote, null, true );
427    }
428
429  /**
430   * Constructor TextDelimited creates a new TextDelimited instance.
431   *
432   * @param fields      of type Fields
433   * @param hasHeader   of type boolean
434   * @param delimiter   of type String
435   * @param quote       of type String
436   * @param charsetName of type String
437   */
438  @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "charsetName"})
439  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, String charsetName )
440    {
441    this( fields, hasHeader, delimiter, quote, null, true, charsetName );
442    }
443
444  /**
445   * Constructor TextDelimited creates a new TextDelimited instance.
446   *
447   * @param fields      of type Fields
448   * @param skipHeader  of type boolean
449   * @param writeHeader of type boolean
450   * @param delimiter   of type String
451   * @param strict      of type boolean
452   * @param quote       of type String
453   * @param types       of type Class[]
454   * @param safe        of type boolean
455   */
456  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "strict", "quote", "types", "safe"})
457  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe )
458    {
459    this( fields, skipHeader, writeHeader, delimiter, strict, quote, types, safe, DEFAULT_CHARSET );
460    }
461
462  /**
463   * Constructor TextDelimited creates a new TextDelimited instance.
464   *
465   * @param fields      of type Fields
466   * @param skipHeader  of type boolean
467   * @param writeHeader of type boolean
468   * @param delimiter   of type String
469   * @param strict      of type boolean
470   * @param quote       of type String
471   * @param types       of type Class[]
472   * @param safe        of type boolean
473   * @param charsetName of type String
474   */
475  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "strict", "quote", "types", "safe",
476                          "charsetName"})
477  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe, String charsetName )
478    {
479    this( fields, skipHeader, writeHeader, charsetName, new DelimitedParser( delimiter, quote, types, strict, safe ) );
480    }
481
482  /**
483   * Constructor TextDelimited creates a new TextDelimited instance.
484   *
485   * @param fields          of type Fields
486   * @param writeHeader     of type boolean
487   * @param delimitedParser of type DelimitedParser
488   */
489  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimitedParser"})
490  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser )
491    {
492    this( fields, skipHeader, writeHeader, null, delimitedParser );
493    }
494
495  /**
496   * Constructor TextDelimited creates a new TextDelimited instance.
497   *
498   * @param fields          of type Fields
499   * @param hasHeader       of type boolean
500   * @param delimitedParser of type DelimitedParser
501   */
502  @ConstructorProperties({"fields", "hasHeader", "delimitedParser"})
503  public TextDelimited( Fields fields, boolean hasHeader, DelimitedParser delimitedParser )
504    {
505    this( fields, hasHeader, hasHeader, null, delimitedParser );
506    }
507
508  /**
509   * Constructor TextDelimited creates a new TextDelimited instance.
510   *
511   * @param fields          of type Fields
512   * @param writeHeader     of type boolean
513   * @param charsetName     of type String
514   * @param delimitedParser of type DelimitedParser
515   */
516  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "charsetName", "delimitedParser"})
517  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String charsetName, DelimitedParser delimitedParser )
518    {
519    super( fields, fields );
520
521    this.delimitedParser = delimitedParser;
522
523    // normalizes ALL and UNKNOWN
524    // calls reset on delimitedParser
525    setSourceFields( fields );
526    setSinkFields( fields );
527
528    this.skipHeader = skipHeader;
529    this.writeHeader = writeHeader;
530
531    if( charsetName != null )
532      this.charsetName = charsetName;
533
534    // throws an exception if not found
535    Charset.forName( this.charsetName );
536    }
537
538  @Property(name = "charset", visibility = Visibility.PUBLIC)
539  @PropertyDescription("character set used.")
540  public String getCharsetName()
541    {
542    return charsetName;
543    }
544
545  /**
546   * Method getDelimiter returns the delimiter used to parse fields from the current line of text.
547   *
548   * @return a String
549   */
550  @Property(name = "delimiter", visibility = Visibility.PUBLIC)
551  @PropertyDescription("The delimiter used to separate fields.")
552  public String getDelimiter()
553    {
554    return delimitedParser.getDelimiter();
555    }
556
557  /**
558   * Method getQuote returns the quote string, if any, used to encapsulate each field in a line to delimited text.
559   *
560   * @return a String
561   */
562  @Property(name = "quote", visibility = Visibility.PUBLIC)
563  @PropertyDescription("The string used for quoting.")
564  public String getQuote()
565    {
566    return delimitedParser.getQuote();
567    }
568
569  public LineNumberReader createInput( InputStream inputStream )
570    {
571    try
572      {
573      return new LineNumberReader( new InputStreamReader( inputStream, charsetName ) );
574      }
575    catch( UnsupportedEncodingException exception )
576      {
577      throw new TapException( exception );
578      }
579    }
580
581  public PrintWriter createOutput( OutputStream outputStream )
582    {
583    try
584      {
585      return new PrintWriter( new OutputStreamWriter( outputStream, charsetName ) );
586      }
587    catch( UnsupportedEncodingException exception )
588      {
589      throw new TapException( exception );
590      }
591    }
592
593  @Override
594  public void setSinkFields( Fields sinkFields )
595    {
596    super.setSourceFields( sinkFields );
597    super.setSinkFields( sinkFields );
598
599    if( delimitedParser != null )
600      delimitedParser.reset( getSourceFields(), getSinkFields() );
601    }
602
603  @Override
604  public void setSourceFields( Fields sourceFields )
605    {
606    super.setSourceFields( sourceFields );
607    super.setSinkFields( sourceFields );
608
609    if( delimitedParser != null )
610      delimitedParser.reset( getSourceFields(), getSinkFields() );
611    }
612
613  @Override
614  public boolean isSymmetrical()
615    {
616    return super.isSymmetrical() && skipHeader == writeHeader;
617    }
618
619  @Override
620  public Fields retrieveSourceFields( FlowProcess<? extends Properties> process, Tap tap )
621    {
622    if( !skipHeader || !getSourceFields().isUnknown() )
623      return getSourceFields();
624
625    // no need to open them all
626    if( tap instanceof CompositeTap )
627      tap = (Tap) ( (CompositeTap) tap ).getChildTaps().next();
628
629    tap = new FileTap( new TextLine( new Fields( "line" ), charsetName ), tap.getIdentifier() );
630
631    setSourceFields( delimitedParser.parseFirstLine( process, tap ) );
632
633    return getSourceFields();
634    }
635
636  @Override
637  public void presentSourceFields( FlowProcess<? extends Properties> process, Tap tap, Fields fields )
638    {
639    // do nothing
640    }
641
642  @Override
643  public void presentSinkFields( FlowProcess<? extends Properties> flowProcess, Tap tap, Fields fields )
644    {
645    if( writeHeader )
646      presentSinkFieldsInternal( fields );
647    }
648
649  @Override
650  public void sourceConfInit( FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf )
651    {
652    }
653
654  @Override
655  public void sourcePrepare( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
656    {
657    sourceCall.setContext( createInput( sourceCall.getInput() ) );
658
659    sourceCall.getIncomingEntry().setTuple( TupleViews.createObjectArray() );
660    }
661
662  @Override
663  public void sourceRePrepare( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
664    {
665    sourceCall.setContext( createInput( sourceCall.getInput() ) );
666    }
667
668  @Override
669  public boolean source( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
670    {
671    String line = sourceCall.getContext().readLine();
672
673    if( line == null )
674      return false;
675
676    if( skipHeader && sourceCall.getContext().getLineNumber() == 1 ) // todo: optimize this away
677      line = sourceCall.getContext().readLine();
678
679    if( line == null )
680      return false;
681
682    Object[] split = delimitedParser.parseLine( line );
683
684    // assumption it is better to re-use than to construct new
685    Tuple tuple = sourceCall.getIncomingEntry().getTuple();
686
687    TupleViews.reset( tuple, split );
688
689    return true;
690    }
691
692  @Override
693  public void sourceCleanup( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
694    {
695    sourceCall.setContext( null );
696    }
697
698  @Override
699  public void sinkConfInit( FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf )
700    {
701    }
702
703  @Override
704  public void sinkPrepare( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall )
705    {
706    sinkCall.setContext( createOutput( sinkCall.getOutput() ) );
707
708    if( writeHeader )
709      {
710      Fields fields = sinkCall.getOutgoingEntry().getFields();
711      delimitedParser.joinFirstLine( fields, sinkCall.getContext() );
712
713      sinkCall.getContext().println();
714      }
715    }
716
717  @Override
718  public void sink( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException
719    {
720    TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
721
722    Iterable<String> strings = tupleEntry.asIterableOf( String.class );
723
724    delimitedParser.joinLine( strings, sinkCall.getContext() );
725
726    sinkCall.getContext().println();
727    }
728
729  @Override
730  public void sinkCleanup( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall )
731    {
732    sinkCall.getContext().flush();
733    sinkCall.setContext( null );
734    }
735  }