001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.scheme.local; 023 024import java.beans.ConstructorProperties; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.InputStreamReader; 028import java.io.LineNumberReader; 029import java.io.OutputStream; 030import java.io.OutputStreamWriter; 031import java.io.PrintWriter; 032import java.io.UnsupportedEncodingException; 033import java.nio.charset.Charset; 034import java.util.Properties; 035 036import cascading.flow.FlowProcess; 037import cascading.management.annotation.Property; 038import cascading.management.annotation.PropertyDescription; 039import cascading.management.annotation.Visibility; 040import cascading.scheme.Scheme; 041import cascading.scheme.SinkCall; 042import cascading.scheme.SourceCall; 043import cascading.scheme.util.DelimitedParser; 044import cascading.tap.CompositeTap; 045import cascading.tap.Tap; 046import cascading.tap.TapException; 047import cascading.tap.local.FileTap; 048import cascading.tuple.Fields; 049import cascading.tuple.Tuple; 050import cascading.tuple.TupleEntry; 051import cascading.tuple.util.TupleViews; 052 053/** 054 * Class TextDelimited provides direct support for delimited text files, like 055 * TAB (\t) or COMMA (,) delimited files. It also optionally allows for quoted values. 056 * <p/> 057 * TextDelimited may also be used to skip the "header" in a file, where the header is defined as the very first line 058 * in every input file. That is, if the byte offset of the current line from the input is zero (0), that line will 059 * be skipped. 060 * <p/> 061 * It is assumed if sink/source {@code fields} is set to either {@link Fields#ALL} or {@link Fields#UNKNOWN} and 062 * {@code skipHeader} or {@code hasHeader} is {@code true}, the field names will be retrieved from the header of the 063 * file and used during planning. The header will parsed with the same rules as the body of the file. 064 * <p/> 065 * By default headers are not skipped. 066 * <p/> 067 * TextDelimited may also be used to write a "header" in a file. The fields names for the header are taken directly 068 * from the declared fields. Or if the declared fields are {@link Fields#ALL} or {@link Fields#UNKNOWN}, the 069 * resolved field names will be used, if any. 070 * <p/> 071 * By default headers are not written. 072 * <p/> 073 * If {@code hasHeaders} is set to {@code true} on a constructor, both {@code skipHeader} and {@code writeHeader} will 074 * be set to {@code true}. 075 * <p/> 076 * By default this {@link cascading.scheme.Scheme} is both {@code strict} and {@code safe}. 077 * <p/> 078 * Strict meaning if a line of text does not parse into the expected number of fields, this class will throw a 079 * {@link TapException}. If strict is {@code false}, then {@link Tuple} will be returned with {@code null} values 080 * for the missing fields. 081 * <p/> 082 * Safe meaning if a field cannot be coerced into an expected type, a {@code null} will be used for the value. 083 * If safe is {@code false}, a {@link TapException} will be thrown. 084 * <p/> 085 * Also by default, {@code quote} strings are not searched for to improve processing speed. If a file is 086 * COMMA delimited but may have COMMA's in a value, the whole value should be surrounded by the quote string, typically 087 * double quotes ({@literal "}). 088 * <p/> 089 * Note all empty fields in a line will be returned as {@code null} unless coerced into a new type. 090 * <p/> 091 * This Scheme may source/sink {@link Fields#ALL}, when given on the constructor the new instance will automatically 092 * default to strict == false as the number of fields parsed are arbitrary or unknown. A type array may not be given 093 * either, so all values will be returned as Strings. 094 * <p/> 095 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor 096 * argument. 097 * <p/> 098 * To override field and line parsing behaviors, sub-class {@link DelimitedParser} or provide a 099 * {@link cascading.scheme.util.FieldTypeResolver} implementation. 100 * <p/> 101 * Note that there should be no expectation that TextDelimited, or specifically {@link DelimitedParser}, can handle 102 * all delimited and quoted combinations reliably. Attempting to do so would impair its performance and maintainability. 103 * <p/> 104 * Further, it can be safely said any corrupted files will not be supported for obvious reasons. Corrupted files may 105 * result in exceptions or could cause edge cases in the underlying java regular expression engine. 106 * <p/> 107 * A large part of Cascading was designed to help users cleans data. Thus the recommendation is to create Flows that 108 * are responsible for cleansing large data-sets when faced with the problem 109 * <p/> 110 * DelimitedParser maybe sub-classed and extended if necessary. 111 * 112 * @see TextLine 113 */ 114public class TextDelimited extends Scheme<Properties, InputStream, OutputStream, LineNumberReader, PrintWriter> 115 { 116 public static final String DEFAULT_CHARSET = "UTF-8"; 117 118 private final boolean skipHeader; 119 private final boolean writeHeader; 120 private final DelimitedParser delimitedParser; 121 private String charsetName = DEFAULT_CHARSET; 122 123 /** 124 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 125 * {@link Fields#ALL} and using TAB as the default delimiter. 126 * <p/> 127 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 128 * with a {@link cascading.pipe.Checkpoint} Tap. 129 */ 130 public TextDelimited() 131 { 132 this( Fields.ALL ); 133 } 134 135 /** 136 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 137 * {@link Fields#ALL} and using TAB as the default delimiter. 138 * <p/> 139 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 140 * with a {@link cascading.pipe.Checkpoint} Tap. 141 * 142 * @param hasHeader 143 * @param delimiter 144 */ 145 @ConstructorProperties({"hasHeader", "delimiter"}) 146 public TextDelimited( boolean hasHeader, String delimiter ) 147 { 148 this( Fields.ALL, hasHeader, delimiter, null, (Class[]) null ); 149 } 150 151 /** 152 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 153 * {@link Fields#ALL} and using TAB as the default delimiter. 154 * <p/> 155 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 156 * with a {@link cascading.pipe.Checkpoint} Tap. 157 * 158 * @param hasHeader 159 * @param delimiter 160 * @param quote 161 */ 162 @ConstructorProperties({"hasHeader", "delimiter", "quote"}) 163 public TextDelimited( boolean hasHeader, String delimiter, String quote ) 164 { 165 this( Fields.ALL, hasHeader, delimiter, quote, (Class[]) null ); 166 } 167 168 /** 169 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 170 * {@link Fields#ALL} and using the given delimitedParser instance for parsing. 171 * <p/> 172 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 173 * with a {@link cascading.pipe.Checkpoint} Tap. 174 * 175 * @param hasHeader 176 * @param delimitedParser 177 */ 178 @ConstructorProperties({"hasHeader", "delimitedParser"}) 179 public TextDelimited( boolean hasHeader, DelimitedParser delimitedParser ) 180 { 181 this( Fields.ALL, hasHeader, hasHeader, delimitedParser ); 182 } 183 184 /** 185 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 186 * {@link Fields#ALL} and using the given delimitedParser instance for parsing. 187 * <p/> 188 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 189 * with a {@link cascading.pipe.Checkpoint} Tap. 190 * <p/> 191 * This constructor will set {@code skipHeader} and {@code writeHeader} values to true. 192 * 193 * @param delimitedParser 194 */ 195 @ConstructorProperties({"delimitedParser"}) 196 public TextDelimited( DelimitedParser delimitedParser ) 197 { 198 this( Fields.ALL, true, true, delimitedParser ); 199 } 200 201 /** 202 * Constructor TextDelimited creates a new TextDelimited instance with TAB as the default delimiter. 203 * 204 * @param fields of type Fields 205 */ 206 @ConstructorProperties({"fields"}) 207 public TextDelimited( Fields fields ) 208 { 209 this( fields, "\t", null, null ); 210 } 211 212 /** 213 * Constructor TextDelimited creates a new TextDelimited instance. 214 * 215 * @param fields of type Fields 216 * @param delimiter of type String 217 */ 218 @ConstructorProperties({"fields", "delimiter"}) 219 public TextDelimited( Fields fields, String delimiter ) 220 { 221 this( fields, delimiter, null, null ); 222 } 223 224 /** 225 * Constructor TextDelimited creates a new TextDelimited instance. 226 * 227 * @param fields of type Fields 228 * @param hasHeader of type boolean 229 * @param delimiter of type String 230 */ 231 @ConstructorProperties({"fields", "hasHeader", "delimiter"}) 232 public TextDelimited( Fields fields, boolean hasHeader, String delimiter ) 233 { 234 this( fields, hasHeader, hasHeader, delimiter, null, null ); 235 } 236 237 /** 238 * Constructor TextDelimited creates a new TextDelimited instance. 239 * 240 * @param fields of type Fields 241 * @param skipHeader of type boolean 242 * @param delimiter of type String 243 */ 244 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter"}) 245 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter ) 246 { 247 this( fields, skipHeader, writeHeader, delimiter, null, null ); 248 } 249 250 /** 251 * Constructor TextDelimited creates a new TextDelimited instance. 252 * 253 * @param fields of type Fields 254 * @param delimiter of type String 255 * @param types of type Class[] 256 */ 257 @ConstructorProperties({"fields", "delimiter", "types"}) 258 public TextDelimited( Fields fields, String delimiter, Class[] types ) 259 { 260 this( fields, delimiter, null, types ); 261 } 262 263 /** 264 * Constructor TextDelimited creates a new TextDelimited instance. 265 * 266 * @param fields of type Fields 267 * @param hasHeader of type boolean 268 * @param delimiter of type String 269 * @param types of type Class[] 270 */ 271 @ConstructorProperties({"fields", "hasHeader", "delimiter", "types"}) 272 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, Class[] types ) 273 { 274 this( fields, hasHeader, hasHeader, delimiter, null, types ); 275 } 276 277 /** 278 * Constructor TextDelimited creates a new TextDelimited instance. 279 * 280 * @param fields of type Fields 281 * @param skipHeader of type boolean 282 * @param writeHeader of type boolean 283 * @param delimiter of type String 284 * @param types of type Class[] 285 */ 286 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "types"}) 287 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types ) 288 { 289 this( fields, skipHeader, writeHeader, delimiter, null, types ); 290 } 291 292 /** 293 * Constructor TextDelimited creates a new TextDelimited instance. 294 * 295 * @param fields of type Fields 296 * @param delimiter of type String 297 * @param quote of type String 298 * @param types of type Class[] 299 */ 300 @ConstructorProperties({"fields", "delimiter", "quote", "types"}) 301 public TextDelimited( Fields fields, String delimiter, String quote, Class[] types ) 302 { 303 this( fields, false, delimiter, quote, types ); 304 } 305 306 /** 307 * Constructor TextDelimited creates a new TextDelimited instance. 308 * 309 * @param fields of type Fields 310 * @param hasHeader of type boolean 311 * @param delimiter of type String 312 * @param quote of type String 313 * @param types of type Class[] 314 */ 315 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types"}) 316 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types ) 317 { 318 this( fields, hasHeader, hasHeader, delimiter, quote, types, true ); 319 } 320 321 /** 322 * Constructor TextDelimited creates a new TextDelimited instance. 323 * 324 * @param fields of type Fields 325 * @param skipHeader of type boolean 326 * @param writeHeader of type boolean 327 * @param delimiter of type String 328 * @param quote of type String 329 * @param types of type Class[] 330 */ 331 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types"}) 332 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types ) 333 { 334 this( fields, skipHeader, writeHeader, delimiter, quote, types, true ); 335 } 336 337 /** 338 * Constructor TextDelimited creates a new TextDelimited instance. 339 * 340 * @param fields of type Fields 341 * @param delimiter of type String 342 * @param quote of type String 343 * @param types of type Class[] 344 * @param safe of type boolean 345 */ 346 @ConstructorProperties({"fields", "delimiter", "quote", "types", "safe"}) 347 public TextDelimited( Fields fields, String delimiter, String quote, Class[] types, boolean safe ) 348 { 349 this( fields, false, delimiter, quote, types, safe ); 350 } 351 352 /** 353 * Constructor TextDelimited creates a new TextDelimited instance. 354 * 355 * @param fields of type Fields 356 * @param hasHeader of type boolean 357 * @param delimiter of type String 358 * @param quote of type String 359 * @param types of type Class[] 360 * @param safe of type boolean 361 */ 362 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe"}) 363 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe ) 364 { 365 this( fields, hasHeader, hasHeader, delimiter, true, quote, types, safe ); 366 } 367 368 /** 369 * Constructor TextDelimited creates a new TextDelimited instance. 370 * 371 * @param fields of type Fields 372 * @param hasHeader of type boolean 373 * @param delimiter of type String 374 * @param quote of type String 375 * @param types of type Class[] 376 * @param safe of type boolean 377 * @param charsetName of type String 378 */ 379 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe", "charsetName"}) 380 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe, String charsetName ) 381 { 382 this( fields, hasHeader, hasHeader, delimiter, true, quote, types, safe, charsetName ); 383 } 384 385 /** 386 * Constructor TextDelimited creates a new TextDelimited instance. 387 * 388 * @param fields of type Fields 389 * @param skipHeader of type boolean 390 * @param writeHeader of type boolean 391 * @param delimiter of type String 392 * @param quote of type String 393 * @param types of type Class[] 394 * @param safe of type boolean 395 */ 396 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types", "safe"}) 397 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe ) 398 { 399 this( fields, skipHeader, writeHeader, delimiter, true, quote, types, safe ); 400 } 401 402 /** 403 * Constructor TextDelimited creates a new TextDelimited instance. 404 * 405 * @param fields of type Fields 406 * @param delimiter of type String 407 * @param quote of type String 408 */ 409 @ConstructorProperties({"fields", "delimiter", "quote"}) 410 public TextDelimited( Fields fields, String delimiter, String quote ) 411 { 412 this( fields, false, delimiter, quote, null, true ); 413 } 414 415 /** 416 * Constructor TextDelimited creates a new TextDelimited instance. 417 * 418 * @param fields of type Fields 419 * @param hasHeader of type boolean 420 * @param delimiter of type String 421 * @param quote of type String 422 */ 423 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote"}) 424 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote ) 425 { 426 this( fields, hasHeader, delimiter, quote, null, true ); 427 } 428 429 /** 430 * Constructor TextDelimited creates a new TextDelimited instance. 431 * 432 * @param fields of type Fields 433 * @param hasHeader of type boolean 434 * @param delimiter of type String 435 * @param quote of type String 436 * @param charsetName of type String 437 */ 438 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "charsetName"}) 439 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, String charsetName ) 440 { 441 this( fields, hasHeader, delimiter, quote, null, true, charsetName ); 442 } 443 444 /** 445 * Constructor TextDelimited creates a new TextDelimited instance. 446 * 447 * @param fields of type Fields 448 * @param skipHeader of type boolean 449 * @param writeHeader of type boolean 450 * @param delimiter of type String 451 * @param strict of type boolean 452 * @param quote of type String 453 * @param types of type Class[] 454 * @param safe of type boolean 455 */ 456 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "strict", "quote", "types", "safe"}) 457 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe ) 458 { 459 this( fields, skipHeader, writeHeader, delimiter, strict, quote, types, safe, DEFAULT_CHARSET ); 460 } 461 462 /** 463 * Constructor TextDelimited creates a new TextDelimited instance. 464 * 465 * @param fields of type Fields 466 * @param skipHeader of type boolean 467 * @param writeHeader of type boolean 468 * @param delimiter of type String 469 * @param strict of type boolean 470 * @param quote of type String 471 * @param types of type Class[] 472 * @param safe of type boolean 473 * @param charsetName of type String 474 */ 475 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "strict", "quote", "types", "safe", 476 "charsetName"}) 477 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe, String charsetName ) 478 { 479 this( fields, skipHeader, writeHeader, charsetName, new DelimitedParser( delimiter, quote, types, strict, safe ) ); 480 } 481 482 /** 483 * Constructor TextDelimited creates a new TextDelimited instance. 484 * 485 * @param fields of type Fields 486 * @param writeHeader of type boolean 487 * @param delimitedParser of type DelimitedParser 488 */ 489 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimitedParser"}) 490 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser ) 491 { 492 this( fields, skipHeader, writeHeader, null, delimitedParser ); 493 } 494 495 /** 496 * Constructor TextDelimited creates a new TextDelimited instance. 497 * 498 * @param fields of type Fields 499 * @param hasHeader of type boolean 500 * @param delimitedParser of type DelimitedParser 501 */ 502 @ConstructorProperties({"fields", "hasHeader", "delimitedParser"}) 503 public TextDelimited( Fields fields, boolean hasHeader, DelimitedParser delimitedParser ) 504 { 505 this( fields, hasHeader, hasHeader, null, delimitedParser ); 506 } 507 508 /** 509 * Constructor TextDelimited creates a new TextDelimited instance. 510 * 511 * @param fields of type Fields 512 * @param writeHeader of type boolean 513 * @param charsetName of type String 514 * @param delimitedParser of type DelimitedParser 515 */ 516 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "charsetName", "delimitedParser"}) 517 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String charsetName, DelimitedParser delimitedParser ) 518 { 519 super( fields, fields ); 520 521 this.delimitedParser = delimitedParser; 522 523 // normalizes ALL and UNKNOWN 524 // calls reset on delimitedParser 525 setSourceFields( fields ); 526 setSinkFields( fields ); 527 528 this.skipHeader = skipHeader; 529 this.writeHeader = writeHeader; 530 531 if( charsetName != null ) 532 this.charsetName = charsetName; 533 534 // throws an exception if not found 535 Charset.forName( this.charsetName ); 536 } 537 538 @Property(name = "charset", visibility = Visibility.PUBLIC) 539 @PropertyDescription("character set used.") 540 public String getCharsetName() 541 { 542 return charsetName; 543 } 544 545 /** 546 * Method getDelimiter returns the delimiter used to parse fields from the current line of text. 547 * 548 * @return a String 549 */ 550 @Property(name = "delimiter", visibility = Visibility.PUBLIC) 551 @PropertyDescription("The delimiter used to separate fields.") 552 public String getDelimiter() 553 { 554 return delimitedParser.getDelimiter(); 555 } 556 557 /** 558 * Method getQuote returns the quote string, if any, used to encapsulate each field in a line to delimited text. 559 * 560 * @return a String 561 */ 562 @Property(name = "quote", visibility = Visibility.PUBLIC) 563 @PropertyDescription("The string used for quoting.") 564 public String getQuote() 565 { 566 return delimitedParser.getQuote(); 567 } 568 569 public LineNumberReader createInput( InputStream inputStream ) 570 { 571 try 572 { 573 return new LineNumberReader( new InputStreamReader( inputStream, charsetName ) ); 574 } 575 catch( UnsupportedEncodingException exception ) 576 { 577 throw new TapException( exception ); 578 } 579 } 580 581 public PrintWriter createOutput( OutputStream outputStream ) 582 { 583 try 584 { 585 return new PrintWriter( new OutputStreamWriter( outputStream, charsetName ) ); 586 } 587 catch( UnsupportedEncodingException exception ) 588 { 589 throw new TapException( exception ); 590 } 591 } 592 593 @Override 594 public void setSinkFields( Fields sinkFields ) 595 { 596 super.setSourceFields( sinkFields ); 597 super.setSinkFields( sinkFields ); 598 599 if( delimitedParser != null ) 600 delimitedParser.reset( getSourceFields(), getSinkFields() ); 601 } 602 603 @Override 604 public void setSourceFields( Fields sourceFields ) 605 { 606 super.setSourceFields( sourceFields ); 607 super.setSinkFields( sourceFields ); 608 609 if( delimitedParser != null ) 610 delimitedParser.reset( getSourceFields(), getSinkFields() ); 611 } 612 613 @Override 614 public boolean isSymmetrical() 615 { 616 return super.isSymmetrical() && skipHeader == writeHeader; 617 } 618 619 @Override 620 public Fields retrieveSourceFields( FlowProcess<? extends Properties> process, Tap tap ) 621 { 622 if( !skipHeader || !getSourceFields().isUnknown() ) 623 return getSourceFields(); 624 625 // no need to open them all 626 if( tap instanceof CompositeTap ) 627 tap = (Tap) ( (CompositeTap) tap ).getChildTaps().next(); 628 629 tap = new FileTap( new TextLine( new Fields( "line" ), charsetName ), tap.getIdentifier() ); 630 631 setSourceFields( delimitedParser.parseFirstLine( process, tap ) ); 632 633 return getSourceFields(); 634 } 635 636 @Override 637 public void presentSourceFields( FlowProcess<? extends Properties> process, Tap tap, Fields fields ) 638 { 639 // do nothing 640 } 641 642 @Override 643 public void presentSinkFields( FlowProcess<? extends Properties> flowProcess, Tap tap, Fields fields ) 644 { 645 if( writeHeader ) 646 presentSinkFieldsInternal( fields ); 647 } 648 649 @Override 650 public void sourceConfInit( FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf ) 651 { 652 } 653 654 @Override 655 public void sourcePrepare( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 656 { 657 sourceCall.setContext( createInput( sourceCall.getInput() ) ); 658 659 sourceCall.getIncomingEntry().setTuple( TupleViews.createObjectArray() ); 660 } 661 662 @Override 663 public void sourceRePrepare( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 664 { 665 sourceCall.setContext( createInput( sourceCall.getInput() ) ); 666 } 667 668 @Override 669 public boolean source( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 670 { 671 String line = sourceCall.getContext().readLine(); 672 673 if( line == null ) 674 return false; 675 676 if( skipHeader && sourceCall.getContext().getLineNumber() == 1 ) // todo: optimize this away 677 line = sourceCall.getContext().readLine(); 678 679 if( line == null ) 680 return false; 681 682 Object[] split = delimitedParser.parseLine( line ); 683 684 // assumption it is better to re-use than to construct new 685 Tuple tuple = sourceCall.getIncomingEntry().getTuple(); 686 687 TupleViews.reset( tuple, split ); 688 689 return true; 690 } 691 692 @Override 693 public void sourceCleanup( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 694 { 695 sourceCall.setContext( null ); 696 } 697 698 @Override 699 public void sinkConfInit( FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf ) 700 { 701 } 702 703 @Override 704 public void sinkPrepare( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) 705 { 706 sinkCall.setContext( createOutput( sinkCall.getOutput() ) ); 707 708 if( writeHeader ) 709 { 710 Fields fields = sinkCall.getOutgoingEntry().getFields(); 711 delimitedParser.joinFirstLine( fields, sinkCall.getContext() ); 712 713 sinkCall.getContext().println(); 714 } 715 } 716 717 @Override 718 public void sink( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException 719 { 720 TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); 721 722 Iterable<String> strings = tupleEntry.asIterableOf( String.class ); 723 724 delimitedParser.joinLine( strings, sinkCall.getContext() ); 725 726 sinkCall.getContext().println(); 727 } 728 729 @Override 730 public void sinkCleanup( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) 731 { 732 sinkCall.getContext().flush(); 733 sinkCall.setContext( null ); 734 } 735 }