001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.scheme.hadoop; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025import java.nio.charset.Charset; 026 027import cascading.flow.FlowProcess; 028import cascading.management.annotation.Property; 029import cascading.management.annotation.PropertyDescription; 030import cascading.management.annotation.Visibility; 031import cascading.scheme.SinkCall; 032import cascading.scheme.SourceCall; 033import cascading.scheme.util.DelimitedParser; 034import cascading.tap.CompositeTap; 035import cascading.tap.Tap; 036import cascading.tap.TapException; 037import cascading.tap.hadoop.Hfs; 038import cascading.tuple.Fields; 039import cascading.tuple.Tuple; 040import cascading.tuple.TupleEntry; 041import cascading.tuple.util.TupleViews; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.io.LongWritable; 044import org.apache.hadoop.io.Text; 045import org.apache.hadoop.mapred.OutputCollector; 046import org.apache.hadoop.mapred.RecordReader; 047 048/** 049 * Class TextDelimited is a sub-class of {@link TextLine}. It provides direct support for delimited text files, like 050 * TAB (\t) or COMMA (,) delimited files. It also optionally allows for quoted values. 051 * <p/> 052 * TextDelimited may also be used to skip the "header" in a file, where the header is defined as the very first line 053 * in every input file. That is, if the byte offset of the current line from the input is zero (0), that line will 054 * be skipped. 055 * <p/> 056 * It is assumed if sink/source {@code fields} is set to either {@link Fields#ALL} or {@link Fields#UNKNOWN} and 057 * {@code skipHeader} or {@code hasHeader} is {@code true}, the field names will be retrieved from the header of the 058 * file and used during planning. The header will parsed with the same rules as the body of the file. 059 * <p/> 060 * By default headers are not skipped. 061 * <p/> 062 * TextDelimited may also be used to write a "header" in a file. The fields names for the header are taken directly 063 * from the declared fields. Or if the declared fields are {@link Fields#ALL} or {@link Fields#UNKNOWN}, the 064 * resolved field names will be used, if any. 065 * <p/> 066 * By default headers are not written. 067 * <p/> 068 * If {@code hasHeaders} is set to {@code true} on a constructor, both {@code skipHeader} and {@code writeHeader} will 069 * be set to {@code true}. 070 * <p/> 071 * By default this {@link cascading.scheme.Scheme} is both {@code strict} and {@code safe}. 072 * <p/> 073 * Strict meaning if a line of text does not parse into the expected number of fields, this class will throw a 074 * {@link TapException}. If strict is {@code false}, then {@link Tuple} will be returned with {@code null} values 075 * for the missing fields. 076 * <p/> 077 * Safe meaning if a field cannot be coerced into an expected type, a {@code null} will be used for the value. 078 * If safe is {@code false}, a {@link TapException} will be thrown. 079 * <p/> 080 * Also by default, {@code quote} strings are not searched for to improve processing speed. If a file is 081 * COMMA delimited but may have COMMA's in a value, the whole value should be surrounded by the quote string, typically 082 * double quotes ({@literal "}). 083 * <p/> 084 * Note all empty fields in a line will be returned as {@code null} unless coerced into a new type. 085 * <p/> 086 * This Scheme may source/sink {@link Fields#ALL}, when given on the constructor the new instance will automatically 087 * default to strict == false as the number of fields parsed are arbitrary or unknown. A type array may not be given 088 * either, so all values will be returned as Strings. 089 * <p/> 090 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor 091 * argument. 092 * <p/> 093 * To override field and line parsing behaviors, sub-class {@link DelimitedParser} or provide a 094 * {@link cascading.scheme.util.FieldTypeResolver} implementation. 095 * <p/> 096 * Note that there should be no expectation that TextDelimited, or specifically {@link DelimitedParser}, can handle 097 * all delimited and quoted combinations reliably. Attempting to do so would impair its performance and maintainability. 098 * <p/> 099 * Further, it can be safely said any corrupted files will not be supported for obvious reasons. Corrupted files may 100 * result in exceptions or could cause edge cases in the underlying java regular expression engine. 101 * <p/> 102 * A large part of Cascading was designed to help users cleans data. Thus the recommendation is to create Flows that 103 * are responsible for cleansing large data-sets when faced with the problem 104 * <p/> 105 * DelimitedParser maybe sub-classed and extended if necessary. 106 * 107 * @see TextLine 108 */ 109public class TextDelimited extends TextLine 110 { 111 public static final String DEFAULT_CHARSET = "UTF-8"; 112 113 /** Field delimitedParser */ 114 protected final DelimitedParser delimitedParser; 115 /** Field skipHeader */ 116 private boolean skipHeader; 117 private final boolean writeHeader; 118 119 /** 120 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 121 * {@link Fields#ALL} and using TAB as the default delimiter. 122 * <p/> 123 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 124 * with a {@link cascading.pipe.Checkpoint} Tap. 125 */ 126 public TextDelimited() 127 { 128 this( Fields.ALL, null, "\t", null, null ); 129 } 130 131 /** 132 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 133 * {@link Fields#ALL} and using TAB as the default delimiter. 134 * <p/> 135 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 136 * with a {@link cascading.pipe.Checkpoint} Tap. 137 * 138 * @param hasHeader of type boolean 139 * @param delimiter of type String 140 */ 141 @ConstructorProperties({"hasHeader", "delimiter"}) 142 public TextDelimited( boolean hasHeader, String delimiter ) 143 { 144 this( Fields.ALL, null, hasHeader, delimiter, null, (Class[]) null ); 145 } 146 147 /** 148 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 149 * {@link Fields#ALL} and using TAB as the default delimiter. 150 * <p/> 151 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 152 * with a {@link cascading.pipe.Checkpoint} Tap. 153 * 154 * @param hasHeader of type boolean 155 * @param delimiter of type String 156 * @param quote of type String 157 */ 158 @ConstructorProperties({"hasHeader", "delimiter", "quote"}) 159 public TextDelimited( boolean hasHeader, String delimiter, String quote ) 160 { 161 this( Fields.ALL, null, hasHeader, delimiter, quote, (Class[]) null ); 162 } 163 164 /** 165 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 166 * {@link Fields#ALL} and using the given delimitedParser instance for parsing. 167 * <p/> 168 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 169 * with a {@link cascading.pipe.Checkpoint} Tap. 170 * 171 * @param hasHeader of type boolean 172 * @param delimitedParser of type DelimitedParser 173 */ 174 @ConstructorProperties({"hasHeader", "delimitedParser"}) 175 public TextDelimited( boolean hasHeader, DelimitedParser delimitedParser ) 176 { 177 this( Fields.ALL, null, hasHeader, hasHeader, delimitedParser ); 178 } 179 180 /** 181 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 182 * {@link Fields#ALL} and using the given delimitedParser instance for parsing. 183 * <p/> 184 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 185 * with a {@link cascading.pipe.Checkpoint} Tap. 186 * <p/> 187 * This constructor will set {@code skipHeader} and {@code writeHeader} values to true. 188 * 189 * @param delimitedParser of type DelimitedParser 190 */ 191 @ConstructorProperties({"delimitedParser"}) 192 public TextDelimited( DelimitedParser delimitedParser ) 193 { 194 this( Fields.ALL, null, true, true, delimitedParser ); 195 } 196 197 /** 198 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 199 * {@link Fields#ALL} and using the given delimitedParser instance for parsing. 200 * <p/> 201 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 202 * with a {@link cascading.pipe.Checkpoint} Tap. 203 * 204 * @param sinkCompression of type Compress 205 * @param hasHeader of type boolean 206 * @param delimitedParser of type DelimitedParser 207 */ 208 @ConstructorProperties({"sinkCompression", "hasHeader", "delimitedParser"}) 209 public TextDelimited( Compress sinkCompression, boolean hasHeader, DelimitedParser delimitedParser ) 210 { 211 this( Fields.ALL, sinkCompression, hasHeader, hasHeader, delimitedParser ); 212 } 213 214 /** 215 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 216 * {@link Fields#ALL} and using the given delimitedParser instance for parsing. 217 * <p/> 218 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 219 * with a {@link cascading.pipe.Checkpoint} Tap. 220 * <p/> 221 * This constructor will set {@code skipHeader} and {@code writeHeader} values to true. 222 * 223 * @param delimitedParser of type DelimitedParser 224 */ 225 @ConstructorProperties({"sinkCompression", "delimitedParser"}) 226 public TextDelimited( Compress sinkCompression, DelimitedParser delimitedParser ) 227 { 228 this( Fields.ALL, sinkCompression, true, true, delimitedParser ); 229 } 230 231 /** 232 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 233 * {@link Fields#ALL} and using TAB as the default delimiter. 234 * <p/> 235 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 236 * with a {@link cascading.pipe.Checkpoint} Tap. 237 * 238 * @param sinkCompression of type Compress 239 * @param hasHeader of type boolean 240 * @param delimiter of type String 241 * @param quote of type String 242 */ 243 @ConstructorProperties({"sinkCompression", "hasHeader", "delimiter", "quote"}) 244 public TextDelimited( Compress sinkCompression, boolean hasHeader, String delimiter, String quote ) 245 { 246 this( Fields.ALL, sinkCompression, hasHeader, delimiter, quote, (Class[]) null ); 247 } 248 249 /** 250 * Constructor TextDelimited creates a new TextDelimited instance with TAB as the default delimiter. 251 * 252 * @param fields of type Fields 253 */ 254 @ConstructorProperties({"fields"}) 255 public TextDelimited( Fields fields ) 256 { 257 this( fields, null, "\t", null, null ); 258 } 259 260 /** 261 * Constructor TextDelimited creates a new TextDelimited instance. 262 * 263 * @param fields of type Fields 264 * @param delimiter of type String 265 */ 266 @ConstructorProperties({"fields", "delimiter"}) 267 public TextDelimited( Fields fields, String delimiter ) 268 { 269 this( fields, null, delimiter, null, null ); 270 } 271 272 /** 273 * Constructor TextDelimited creates a new TextDelimited instance. 274 * 275 * @param fields of type Fields 276 * @param hasHeader of type boolean 277 * @param delimiter of type String 278 */ 279 @ConstructorProperties({"fields", "hasHeader", "delimiter"}) 280 public TextDelimited( Fields fields, boolean hasHeader, String delimiter ) 281 { 282 this( fields, null, hasHeader, hasHeader, delimiter, null, null ); 283 } 284 285 /** 286 * Constructor TextDelimited creates a new TextDelimited instance. 287 * 288 * @param fields of type Fields 289 * @param skipHeader of type boolean 290 * @param writeHeader of type boolean 291 * @param delimiter of type String 292 */ 293 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter"}) 294 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter ) 295 { 296 this( fields, null, skipHeader, writeHeader, delimiter, null, null ); 297 } 298 299 /** 300 * Constructor TextDelimited creates a new TextDelimited instance. 301 * 302 * @param fields of type Fields 303 * @param delimiter of type String 304 * @param types of type Class[] 305 */ 306 @ConstructorProperties({"fields", "delimiter", "types"}) 307 public TextDelimited( Fields fields, String delimiter, Class[] types ) 308 { 309 this( fields, null, delimiter, null, types ); 310 } 311 312 /** 313 * Constructor TextDelimited creates a new TextDelimited instance. 314 * 315 * @param fields of type Fields 316 * @param hasHeader of type boolean 317 * @param delimiter of type String 318 * @param types of type Class[] 319 */ 320 @ConstructorProperties({"fields", "hasHeader", "delimiter", "types"}) 321 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, Class[] types ) 322 { 323 this( fields, null, hasHeader, hasHeader, delimiter, null, types ); 324 } 325 326 /** 327 * Constructor TextDelimited creates a new TextDelimited instance. 328 * 329 * @param fields of type Fields 330 * @param skipHeader of type boolean 331 * @param writeHeader of type boolean 332 * @param delimiter of type String 333 * @param types of type Class[] 334 */ 335 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "types"}) 336 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types ) 337 { 338 this( fields, null, skipHeader, writeHeader, delimiter, null, types ); 339 } 340 341 /** 342 * Constructor TextDelimited creates a new TextDelimited instance. 343 * 344 * @param fields of type Fields 345 * @param delimiter of type String 346 * @param quote of type String 347 * @param types of type Class[] 348 */ 349 @ConstructorProperties({"fields", "delimiter", "quote", "types"}) 350 public TextDelimited( Fields fields, String delimiter, String quote, Class[] types ) 351 { 352 this( fields, null, delimiter, quote, types ); 353 } 354 355 /** 356 * Constructor TextDelimited creates a new TextDelimited instance. 357 * 358 * @param fields of type Fields 359 * @param hasHeader of type boolean 360 * @param delimiter of type String 361 * @param quote of type String 362 * @param types of type Class[] 363 */ 364 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types"}) 365 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types ) 366 { 367 this( fields, null, hasHeader, hasHeader, delimiter, quote, types ); 368 } 369 370 /** 371 * Constructor TextDelimited creates a new TextDelimited instance. 372 * 373 * @param fields of type Fields 374 * @param skipHeader of type boolean 375 * @param writeHeader of type boolean 376 * @param delimiter of type String 377 * @param quote of type String 378 * @param types of type Class[] 379 */ 380 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types"}) 381 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types ) 382 { 383 this( fields, null, skipHeader, writeHeader, delimiter, quote, types ); 384 } 385 386 /** 387 * Constructor TextDelimited creates a new TextDelimited instance. 388 * 389 * @param fields of type Fields 390 * @param delimiter of type String 391 * @param quote of type String 392 * @param types of type Class[] 393 * @param safe of type boolean 394 */ 395 @ConstructorProperties({"fields", "delimiter", "quote", "types", "safe"}) 396 public TextDelimited( Fields fields, String delimiter, String quote, Class[] types, boolean safe ) 397 { 398 this( fields, null, delimiter, quote, types, safe ); 399 } 400 401 /** 402 * Constructor TextDelimited creates a new TextDelimited instance. 403 * 404 * @param fields of type Fields 405 * @param hasHeader of type boolean 406 * @param delimiter of type String 407 * @param quote of type String 408 * @param types of type Class[] 409 * @param safe of type boolean 410 */ 411 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe"}) 412 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe ) 413 { 414 this( fields, null, hasHeader, hasHeader, delimiter, quote, types, safe ); 415 } 416 417 /** 418 * Constructor TextDelimited creates a new TextDelimited instance. 419 * 420 * @param fields of type Fields 421 * @param hasHeader of type boolean 422 * @param delimiter of type String 423 * @param quote of type String 424 * @param types of type Class[] 425 * @param safe of type boolean 426 * @param charsetName of type String 427 */ 428 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe", "charsetName"}) 429 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe, String charsetName ) 430 { 431 this( fields, null, hasHeader, hasHeader, delimiter, true, quote, types, safe, charsetName ); 432 } 433 434 /** 435 * Constructor TextDelimited creates a new TextDelimited instance. 436 * 437 * @param fields of type Fields 438 * @param skipHeader of type boolean 439 * @param writeHeader of type boolean 440 * @param delimiter of type String 441 * @param quote of type String 442 * @param types of type Class[] 443 * @param safe of type boolean 444 */ 445 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types", "safe"}) 446 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe ) 447 { 448 this( fields, null, skipHeader, writeHeader, delimiter, quote, types, safe ); 449 } 450 451 /** 452 * Constructor TextDelimited creates a new TextDelimited instance. 453 * 454 * @param fields of type Fields 455 * @param sinkCompression of type Compress 456 * @param delimiter of type String 457 */ 458 @ConstructorProperties({"fields", "sinkCompression", "delimiter"}) 459 public TextDelimited( Fields fields, Compress sinkCompression, String delimiter ) 460 { 461 this( fields, sinkCompression, delimiter, null, null ); 462 } 463 464 /** 465 * Constructor TextDelimited creates a new TextDelimited instance. 466 * 467 * @param fields of type Fields 468 * @param sinkCompression of type Compress 469 * @param hasHeader of type boolean 470 * @param delimiter of type String 471 */ 472 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter"}) 473 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter ) 474 { 475 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, null ); 476 } 477 478 /** 479 * Constructor TextDelimited creates a new TextDelimited instance. 480 * 481 * @param fields of type Fields 482 * @param sinkCompression of type Compress 483 * @param skipHeader of type boolean 484 * @param writeHeader of type boolean 485 * @param delimiter of type String 486 */ 487 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter"}) 488 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter ) 489 { 490 this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, null ); 491 } 492 493 /** 494 * Constructor TextDelimited creates a new TextDelimited instance. 495 * 496 * @param fields of type Fields 497 * @param sinkCompression of type Compress 498 * @param delimiter of type String 499 * @param types of type Class[] 500 */ 501 @ConstructorProperties({"fields", "sinkCompression", "delimiter", "types"}) 502 public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, Class[] types ) 503 { 504 this( fields, sinkCompression, delimiter, null, types ); 505 } 506 507 /** 508 * Constructor TextDelimited creates a new TextDelimited instance. 509 * 510 * @param fields of type Fields 511 * @param sinkCompression of type Compress 512 * @param hasHeader of type boolean 513 * @param delimiter of type String 514 * @param types of type Class[] 515 */ 516 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types"}) 517 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types ) 518 { 519 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, types ); 520 } 521 522 /** 523 * Constructor TextDelimited creates a new TextDelimited instance. 524 * 525 * @param fields of type Fields 526 * @param sinkCompression of type Compress 527 * @param skipHeader of type boolean 528 * @param writeHeader of type boolean 529 * @param delimiter of type String 530 * @param types of type Class[] 531 */ 532 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "types"}) 533 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types ) 534 { 535 this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, types ); 536 } 537 538 /** 539 * Constructor TextDelimited creates a new TextDelimited instance. 540 * 541 * @param fields of type Fields 542 * @param sinkCompression of type Compress 543 * @param delimiter of type String 544 * @param types of type Class[] 545 * @param safe of type boolean 546 */ 547 @ConstructorProperties({"fields", "sinkCompression", "delimiter", "types", "safe"}) 548 public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, Class[] types, boolean safe ) 549 { 550 this( fields, sinkCompression, delimiter, null, types, safe ); 551 } 552 553 /** 554 * Constructor TextDelimited creates a new TextDelimited instance. 555 * 556 * @param fields of type Fields 557 * @param sinkCompression of type Compress 558 * @param hasHeader of type boolean 559 * @param delimiter of type String 560 * @param types of type Class[] 561 * @param safe of type boolean 562 */ 563 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types", "safe"}) 564 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types, boolean safe ) 565 { 566 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, types, safe ); 567 } 568 569 /** 570 * Constructor TextDelimited creates a new TextDelimited instance. 571 * 572 * @param fields of type Fields 573 * @param sinkCompression of type Compress 574 * @param hasHeader of type boolean 575 * @param delimiter of type String 576 * @param types of type Class[] 577 * @param safe of type boolean 578 * @param charsetName of type String 579 */ 580 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types", "safe", "charsetName"}) 581 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types, boolean safe, String charsetName ) 582 { 583 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, null, types, safe, charsetName ); 584 } 585 586 /** 587 * Constructor TextDelimited creates a new TextDelimited instance. 588 * 589 * @param fields of type Fields 590 * @param sinkCompression of type Compress 591 * @param skipHeader of type boolean 592 * @param writeHeader of type boolean 593 * @param delimiter of type String 594 * @param types of type Class[] 595 * @param safe of type boolean 596 */ 597 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "types", "safe"}) 598 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types, boolean safe ) 599 { 600 this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, types, safe ); 601 } 602 603 /** 604 * Constructor TextDelimited creates a new TextDelimited instance. 605 * 606 * @param fields of type Fields 607 * @param delimiter of type String 608 * @param quote of type String 609 */ 610 @ConstructorProperties({"fields", "delimiter", "quote"}) 611 public TextDelimited( Fields fields, String delimiter, String quote ) 612 { 613 this( fields, null, delimiter, quote ); 614 } 615 616 /** 617 * Constructor TextDelimited creates a new TextDelimited instance. 618 * 619 * @param fields of type Fields 620 * @param hasHeader of type boolean 621 * @param delimiter of type String 622 * @param quote of type String 623 */ 624 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote"}) 625 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote ) 626 { 627 this( fields, null, hasHeader, hasHeader, delimiter, quote ); 628 } 629 630 /** 631 * Constructor TextDelimited creates a new TextDelimited instance. 632 * 633 * @param fields of type Fields 634 * @param skipHeader of type boolean 635 * @param writeHeader of type boolean 636 * @param delimiter of type String 637 * @param quote of type String 638 */ 639 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote"}) 640 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote ) 641 { 642 this( fields, null, skipHeader, writeHeader, delimiter, quote ); 643 } 644 645 /** 646 * Constructor TextDelimited creates a new TextDelimited instance. 647 * 648 * @param fields of type Fields 649 * @param sinkCompression of type Compress 650 * @param delimiter of type String 651 * @param quote of type String 652 */ 653 @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote"}) 654 public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote ) 655 { 656 this( fields, sinkCompression, false, false, delimiter, true, quote, null, true ); 657 } 658 659 /** 660 * Constructor TextDelimited creates a new TextDelimited instance. 661 * 662 * @param fields of type Fields 663 * @param sinkCompression of type Compress 664 * @param hasHeader of type boolean 665 * @param delimiter of type String 666 * @param quote of type String 667 */ 668 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote"}) 669 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote ) 670 { 671 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, null, true ); 672 } 673 674 /** 675 * Constructor TextDelimited creates a new TextDelimited instance. 676 * 677 * @param fields of type Fields 678 * @param sinkCompression of type Compress 679 * @param hasHeader of type boolean 680 * @param delimiter of type String 681 * @param quote of type String 682 * @param charsetName of type String 683 */ 684 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "charsetName"}) 685 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, String charsetName ) 686 { 687 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, null, true, charsetName ); 688 } 689 690 /** 691 * Constructor TextDelimited creates a new TextDelimited instance. 692 * 693 * @param fields of type Fields 694 * @param sinkCompression of type Compress 695 * @param skipHeader of type boolean 696 * @param writeHeader of type boolean 697 * @param delimiter of type String 698 * @param quote of type String 699 */ 700 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote"}) 701 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote ) 702 { 703 this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, null, true ); 704 } 705 706 /** 707 * Constructor TextDelimited creates a new TextDelimited instance. 708 * 709 * @param fields of type Fields 710 * @param sinkCompression of type Compress 711 * @param delimiter of type String 712 * @param quote of type String 713 * @param types of type Class[] 714 */ 715 @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote", "types"}) 716 public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote, Class[] types ) 717 { 718 this( fields, sinkCompression, false, false, delimiter, true, quote, types, true ); 719 } 720 721 /** 722 * Constructor TextDelimited creates a new TextDelimited instance. 723 * 724 * @param fields of type Fields 725 * @param sinkCompression of type Compress 726 * @param hasHeader of type boolean 727 * @param delimiter of type String 728 * @param quote of type String 729 * @param types of type Class[] 730 */ 731 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "types"}) 732 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, Class[] types ) 733 { 734 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, types, true ); 735 } 736 737 /** 738 * Constructor TextDelimited creates a new TextDelimited instance. 739 * 740 * @param fields of type Fields 741 * @param sinkCompression of type Compress 742 * @param skipHeader of type boolean 743 * @param writeHeader of type boolean 744 * @param delimiter of type String 745 * @param quote of type String 746 * @param types of type Class[] 747 */ 748 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote", "types"}) 749 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types ) 750 { 751 this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, types, true ); 752 } 753 754 /** 755 * Constructor TextDelimited creates a new TextDelimited instance. 756 * 757 * @param fields of type Fields 758 * @param sinkCompression of type Compress 759 * @param delimiter of type String 760 * @param quote of type String 761 * @param types of type Class[] 762 * @param safe of type boolean 763 */ 764 @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote", "types", "safe"}) 765 public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote, Class[] types, boolean safe ) 766 { 767 this( fields, sinkCompression, false, false, delimiter, true, quote, types, safe ); 768 } 769 770 /** 771 * Constructor TextDelimited creates a new TextDelimited instance. 772 * 773 * @param fields of type Fields 774 * @param sinkCompression of type Compress 775 * @param hasHeader of type boolean 776 * @param delimiter of type String 777 * @param quote of type String 778 * @param types of type Class[] 779 * @param safe of type boolean 780 */ 781 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "types", "safe"}) 782 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe ) 783 { 784 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, types, safe ); 785 } 786 787 /** 788 * Constructor TextDelimited creates a new TextDelimited instance. 789 * 790 * @param fields of type Fields 791 * @param sinkCompression of type Compress 792 * @param skipHeader of type boolean 793 * @param writeHeader of type boolean 794 * @param delimiter of type String 795 * @param quote of type String 796 * @param types of type Class[] 797 * @param safe of type boolean 798 */ 799 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote", "types", 800 "safe"}) 801 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe ) 802 { 803 this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, types, safe ); 804 } 805 806 /** 807 * Constructor TextDelimited creates a new TextDelimited instance. 808 * 809 * @param fields of type Fields 810 * @param sinkCompression of type Compress 811 * @param skipHeader of type boolean 812 * @param delimiter of type String 813 * @param strict of type boolean 814 * @param quote of type String 815 * @param types of type Class[] 816 * @param safe of type boolean 817 */ 818 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "strict", "quote", 819 "types", "safe"}) 820 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe ) 821 { 822 this( fields, sinkCompression, skipHeader, writeHeader, delimiter, strict, quote, types, safe, DEFAULT_CHARSET ); 823 } 824 825 /** 826 * Constructor TextDelimited creates a new TextDelimited instance. 827 * 828 * @param fields of type Fields 829 * @param sinkCompression of type Compress 830 * @param skipHeader of type boolean 831 * @param delimiter of type String 832 * @param strict of type boolean 833 * @param quote of type String 834 * @param types of type Class[] 835 * @param safe of type boolean 836 * @param charsetName of type String 837 */ 838 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "strict", "quote", 839 "types", "safe", "charsetName"}) 840 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe, String charsetName ) 841 { 842 this( fields, sinkCompression, skipHeader, writeHeader, charsetName, new DelimitedParser( delimiter, quote, types, strict, safe ) ); 843 } 844 845 /** 846 * Constructor TextDelimited creates a new TextDelimited instance. 847 * 848 * @param fields of type Fields 849 * @param writeHeader of type boolean 850 * @param delimitedParser of type DelimitedParser 851 */ 852 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimitedParser"}) 853 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser ) 854 { 855 this( fields, null, skipHeader, writeHeader, null, delimitedParser ); 856 } 857 858 /** 859 * Constructor TextDelimited creates a new TextDelimited instance. 860 * 861 * @param fields of type Fields 862 * @param hasHeader of type boolean 863 * @param delimitedParser of type DelimitedParser 864 */ 865 @ConstructorProperties({"fields", "hasHeader", "delimitedParser"}) 866 public TextDelimited( Fields fields, boolean hasHeader, DelimitedParser delimitedParser ) 867 { 868 this( fields, null, hasHeader, hasHeader, null, delimitedParser ); 869 } 870 871 /** 872 * Constructor TextDelimited creates a new TextDelimited instance. 873 * 874 * @param fields of type Fields 875 * @param writeHeader of type boolean 876 * @param delimitedParser of type DelimitedParser 877 */ 878 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimitedParser"}) 879 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser ) 880 { 881 this( fields, sinkCompression, skipHeader, writeHeader, null, delimitedParser ); 882 } 883 884 /** 885 * Constructor TextDelimited creates a new TextDelimited instance. 886 * 887 * @param fields of type Fields 888 * @param sinkCompression of type Compress 889 * @param skipHeader of type boolean 890 * @param writeHeader of type boolean 891 * @param charsetName of type String 892 * @param delimitedParser of type DelimitedParser 893 */ 894 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "charsetName", "delimitedParser"}) 895 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String charsetName, DelimitedParser delimitedParser ) 896 { 897 super( sinkCompression ); 898 899 this.delimitedParser = delimitedParser; 900 901 // normalizes ALL and UNKNOWN 902 setSinkFields( fields ); 903 setSourceFields( fields ); 904 905 this.skipHeader = skipHeader; 906 this.writeHeader = writeHeader; 907 908 // throws an exception if not found 909 setCharsetName( charsetName ); 910 } 911 912 /** 913 * Method getDelimiter returns the delimiter used to parse fields from the current line of text. 914 * 915 * @return a String 916 */ 917 @Property(name = "delimiter", visibility = Visibility.PUBLIC) 918 @PropertyDescription("The delimiter used to separate fields.") 919 public String getDelimiter() 920 { 921 return delimitedParser.getDelimiter(); 922 } 923 924 /** 925 * Method getQuote returns the quote string, if any, used to encapsulate each field in a line to delimited text. 926 * 927 * @return a String 928 */ 929 @Property(name = "quote", visibility = Visibility.PUBLIC) 930 @PropertyDescription("The string used for quoting.") 931 public String getQuote() 932 { 933 return delimitedParser.getQuote(); 934 } 935 936 @Override 937 public boolean isSymmetrical() 938 { 939 return super.isSymmetrical() && skipHeader == writeHeader; 940 } 941 942 @Override 943 public void setSinkFields( Fields sinkFields ) 944 { 945 super.setSourceFields( sinkFields ); 946 super.setSinkFields( sinkFields ); 947 948 if( delimitedParser != null ) 949 delimitedParser.reset( getSourceFields(), getSinkFields() ); 950 } 951 952 @Override 953 public void setSourceFields( Fields sourceFields ) 954 { 955 super.setSourceFields( sourceFields ); 956 super.setSinkFields( sourceFields ); 957 958 if( delimitedParser != null ) 959 delimitedParser.reset( getSourceFields(), getSinkFields() ); 960 } 961 962 @Override 963 public Fields retrieveSourceFields( FlowProcess<? extends Configuration> flowProcess, Tap tap ) 964 { 965 if( !skipHeader || !getSourceFields().isUnknown() ) 966 return getSourceFields(); 967 968 // no need to open them all 969 if( tap instanceof CompositeTap ) 970 tap = (Tap) ( (CompositeTap) tap ).getChildTaps().next(); 971 972 // should revert to file:// (Lfs) if tap is Lfs 973 tap = new Hfs( new TextLine( new Fields( "line" ), charsetName ), tap.getFullIdentifier( flowProcess ) ); 974 975 setSourceFields( delimitedParser.parseFirstLine( flowProcess, tap ) ); 976 977 return getSourceFields(); 978 } 979 980 @Override 981 public void presentSourceFields( FlowProcess<? extends Configuration> flowProcess, Tap tap, Fields fields ) 982 { 983 presentSourceFieldsInternal( fields ); 984 } 985 986 @Override 987 public void presentSinkFields( FlowProcess<? extends Configuration> flowProcess, Tap tap, Fields fields ) 988 { 989 presentSinkFieldsInternal( fields ); 990 } 991 992 @Override 993 public void sourcePrepare( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) 994 { 995 super.sourcePrepare( flowProcess, sourceCall ); 996 997 sourceCall.getIncomingEntry().setTuple( TupleViews.createObjectArray() ); 998 } 999 1000 @Override 1001 public boolean source( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException 1002 { 1003 Object[] context = sourceCall.getContext(); 1004 1005 if( !sourceCall.getInput().next( context[ 0 ], context[ 1 ] ) ) 1006 return false; 1007 1008 if( skipHeader && ( (LongWritable) context[ 0 ] ).get() == 0 ) 1009 { 1010 if( !sourceCall.getInput().next( context[ 0 ], context[ 1 ] ) ) 1011 return false; 1012 } 1013 1014 // delegate coercion to delimitedParser for robustness 1015 Object[] split = delimitedParser.parseLine( makeEncodedString( context ) ); 1016 Tuple tuple = sourceCall.getIncomingEntry().getTuple(); 1017 1018 TupleViews.reset( tuple, split ); 1019 1020 return true; 1021 } 1022 1023 @Override 1024 public void sinkPrepare( FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException 1025 { 1026 sinkCall.setContext( new Object[ 3 ] ); 1027 1028 sinkCall.getContext()[ 0 ] = new Text(); 1029 sinkCall.getContext()[ 1 ] = new StringBuilder( 4 * 1024 ); 1030 sinkCall.getContext()[ 2 ] = Charset.forName( charsetName ); 1031 1032 if( writeHeader ) 1033 writeHeader( sinkCall ); 1034 } 1035 1036 protected void writeHeader( SinkCall<Object[], OutputCollector> sinkCall ) throws IOException 1037 { 1038 Fields fields = sinkCall.getOutgoingEntry().getFields(); 1039 1040 Text text = (Text) sinkCall.getContext()[ 0 ]; 1041 StringBuilder line = (StringBuilder) sinkCall.getContext()[ 1 ]; 1042 Charset charset = (Charset) sinkCall.getContext()[ 2 ]; 1043 1044 line = (StringBuilder) delimitedParser.joinFirstLine( fields, line ); 1045 1046 text.set( line.toString().getBytes( charset ) ); 1047 1048 sinkCall.getOutput().collect( null, text ); 1049 1050 line.setLength( 0 ); 1051 } 1052 1053 @Override 1054 public void sink( FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException 1055 { 1056 TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); 1057 1058 Text text = (Text) sinkCall.getContext()[ 0 ]; 1059 StringBuilder line = (StringBuilder) sinkCall.getContext()[ 1 ]; 1060 Charset charset = (Charset) sinkCall.getContext()[ 2 ]; 1061 1062 Iterable<String> strings = tupleEntry.asIterableOf( String.class ); 1063 1064 line = (StringBuilder) delimitedParser.joinLine( strings, line ); 1065 1066 text.set( line.toString().getBytes( charset ) ); 1067 1068 sinkCall.getOutput().collect( null, text ); 1069 1070 line.setLength( 0 ); 1071 } 1072 } 1073