001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.scheme.util;
022
023import java.io.IOException;
024import java.io.Serializable;
025import java.lang.reflect.Type;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.List;
029import java.util.regex.Pattern;
030
031import cascading.flow.FlowProcess;
032import cascading.tap.Tap;
033import cascading.tap.TapException;
034import cascading.tuple.Fields;
035import cascading.tuple.Tuple;
036import cascading.tuple.TupleEntry;
037import cascading.tuple.TupleEntryIterator;
038import cascading.tuple.coerce.Coercions;
039import cascading.tuple.type.CoercibleType;
040import cascading.util.Util;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Class DelimitedParser is a base class for parsing text delimited files.
046 * <p/>
047 * It maybe sub-classed to change its behavior.
048 * <p/>
049 * The interface {@link FieldTypeResolver} maybe used to clean and prepare field names
050 * for data columns, and to infer type information from column names.
051 */
052public class DelimitedParser implements Serializable
053  {
054  /** Field LOG */
055  private static final Logger LOG = LoggerFactory.getLogger( DelimitedParser.class );
056
057  /** Field SPECIAL_REGEX_CHARS */
058  static final String SPECIAL_REGEX_CHARS = "([\\]\\[|.*<>\\\\$^?()=!+])";
059  /** Field QUOTED_REGEX_FORMAT */
060  static final String QUOTED_REGEX_FORMAT = "%2$s(?=(?:[^%1$s]*%1$s[^%1$s]*[^%1$s%2$s]*%1$s)*(?![^%1$s]*%1$s))";
061  /** Field CLEAN_REGEX_FORMAT */
062  static final String CLEAN_REGEX_FORMAT = "^(?:%1$s)(.*)(?:%1$s)$";
063  /** Field ESCAPE_REGEX_FORMAT */
064  static final String ESCAPE_REGEX_FORMAT = "(%1$s%1$s)";
065
066  /** Field sourceFields */
067  protected Fields sourceFields;
068
069  /** Field splitPattern */
070  protected Pattern splitPattern;
071  /** Field cleanPattern */
072  protected Pattern cleanPattern;
073  /** Field escapePattern */
074  protected Pattern escapePattern;
075  /** Field delimiter * */
076  protected String delimiter;
077  /** Field quote */
078  protected String quote;
079  /** Field strict */
080  protected boolean strict = true; // need to cache value across resets
081  /** Field enforceStrict */
082  protected boolean enforceStrict = true;
083  /** Field numValues */
084  protected int numValues;
085  /** Field types */
086  protected Type[] types;
087  /** Fields coercibles */
088  protected CoercibleType[] coercibles;
089  /** Field safe */
090  protected boolean safe = true;
091  /** fieldTypeResolver */
092  protected FieldTypeResolver fieldTypeResolver;
093
094  public DelimitedParser( String delimiter, String quote, Class[] types )
095    {
096    reset( delimiter, quote, types, strict, safe, null, null, null );
097    }
098
099  public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe )
100    {
101    reset( delimiter, quote, types, strict, safe, null, null, null );
102    }
103
104  public DelimitedParser( String delimiter, String quote, FieldTypeResolver fieldTypeResolver )
105    {
106    reset( delimiter, quote, null, strict, safe, null, null, fieldTypeResolver );
107    }
108
109  public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, FieldTypeResolver fieldTypeResolver )
110    {
111    reset( delimiter, quote, types, strict, safe, null, null, fieldTypeResolver );
112    }
113
114  public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields )
115    {
116    reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, null );
117    }
118
119  public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields, FieldTypeResolver fieldTypeResolver )
120    {
121    reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, fieldTypeResolver );
122    }
123
124  public void reset( Fields sourceFields, Fields sinkFields )
125    {
126    reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, fieldTypeResolver );
127    }
128
129  public void reset( String delimiter, String quote, Type[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields, FieldTypeResolver fieldTypeResolver )
130    {
131    if( delimiter == null || delimiter.isEmpty() )
132      throw new IllegalArgumentException( "delimiter may not be null or empty" );
133
134    if( delimiter.equals( quote ) )
135      throw new IllegalArgumentException( "delimiter and quote character may not be the same value, got: '" + delimiter + "'" );
136
137    this.delimiter = delimiter;
138    this.strict = strict;
139    this.safe = safe;
140    this.fieldTypeResolver = fieldTypeResolver;
141
142    if( quote != null && !quote.isEmpty() ) // if empty, leave null
143      this.quote = quote;
144
145    if( types != null && types.length == 0 )
146      this.types = null;
147
148    if( types != null )
149      this.types = Arrays.copyOf( types, types.length );
150
151    if( sourceFields == null || sinkFields == null )
152      return;
153
154    if( types == null && sourceFields.hasTypes() )
155      this.types = sourceFields.getTypes(); // gets a copy
156
157    this.sourceFields = sourceFields;
158    this.numValues = Math.max( sourceFields.size(), sinkFields.size() ); // if asymmetrical, one is zero
159
160    this.enforceStrict = this.strict;
161
162    if( sourceFields.isUnknown() )
163      this.enforceStrict = false;
164
165    if( !sinkFields.isAll() && numValues == 0 )
166      throw new IllegalArgumentException( "may not be zero declared fields, found: " + sinkFields.printVerbose() );
167
168    splitPattern = createSplitPatternFor( this.delimiter, this.quote );
169    cleanPattern = createCleanPatternFor( this.quote );
170    escapePattern = createEscapePatternFor( this.quote );
171
172    if( this.types != null && sinkFields.isAll() )
173      throw new IllegalArgumentException( "when using Fields.ALL, field types may not be used" );
174
175    if( this.types != null && this.types.length != sinkFields.size() )
176      throw new IllegalArgumentException( "num of types must equal number of fields: " + sinkFields.printVerbose() + ", found: " + this.types.length );
177
178    coercibles = Coercions.coercibleArray( this.numValues, this.types );
179    }
180
181  public String getDelimiter()
182    {
183    return delimiter;
184    }
185
186  public String getQuote()
187    {
188    return quote;
189    }
190
191  /**
192   * Method createEscapePatternFor creates a regex {@link java.util.regex.Pattern} cleaning quote escapes from a String.
193   * <p/>
194   * If {@code quote} is null or empty, a null value will be returned;
195   *
196   * @param quote of type String
197   * @return Pattern
198   */
199  public Pattern createEscapePatternFor( String quote )
200    {
201    if( quote == null || quote.isEmpty() )
202      return null;
203
204    return Pattern.compile( String.format( ESCAPE_REGEX_FORMAT, quote ) );
205    }
206
207  /**
208   * Method createCleanPatternFor creates a regex {@link java.util.regex.Pattern} for removing quote characters from a String.
209   * <p/>
210   * If {@code quote} is null or empty, a null value will be returned;
211   *
212   * @param quote of type String
213   * @return Pattern
214   */
215  public Pattern createCleanPatternFor( String quote )
216    {
217    if( quote == null || quote.isEmpty() )
218      return null;
219
220    return Pattern.compile( String.format( CLEAN_REGEX_FORMAT, quote ) );
221    }
222
223  /**
224   * Method createSplitPatternFor creates a regex {@link java.util.regex.Pattern} for splitting a line of text into its component
225   * parts using the given delimiter and quote Strings. {@code quote} may be null.
226   *
227   * @param delimiter of type String
228   * @param quote     of type String
229   * @return Pattern
230   */
231  public Pattern createSplitPatternFor( String delimiter, String quote )
232    {
233    String escapedDelimiter = delimiter.replaceAll( SPECIAL_REGEX_CHARS, "\\\\$1" );
234
235    if( quote == null || quote.isEmpty() )
236      return Pattern.compile( escapedDelimiter );
237    else
238      return Pattern.compile( String.format( QUOTED_REGEX_FORMAT, quote, escapedDelimiter ) );
239    }
240
241  /**
242   * Method createSplit will split the given {@code value} with the given {@code splitPattern}.
243   *
244   * @param value        of type String
245   * @param splitPattern of type Pattern
246   * @param numValues    of type int
247   * @return String[]
248   */
249  public String[] createSplit( String value, Pattern splitPattern, int numValues )
250    {
251    return splitPattern.split( value, numValues );
252    }
253
254  /**
255   * Method cleanSplit will return a quote free array of String values, the given {@code split} array
256   * will be updated in place.
257   * <p/>
258   * If {@code cleanPattern} is null, quote cleaning will not be performed, but all empty String values
259   * will be replaces with a {@code null} value.
260   *
261   * @param split         of type Object[]
262   * @param cleanPattern  of type Pattern
263   * @param escapePattern of type Pattern
264   * @param quote         of type String
265   * @return Object[] as a convenience
266   */
267  public Object[] cleanSplit( Object[] split, Pattern cleanPattern, Pattern escapePattern, String quote )
268    {
269    if( cleanPattern != null )
270      {
271      for( int i = 0; i < split.length; i++ )
272        {
273        split[ i ] = cleanPattern.matcher( (String) split[ i ] ).replaceAll( "$1" );
274        split[ i ] = escapePattern.matcher( (String) split[ i ] ).replaceAll( quote );
275        }
276      }
277
278    for( int i = 0; i < split.length; i++ )
279      {
280      if( ( (String) split[ i ] ).isEmpty() )
281        split[ i ] = null;
282      }
283
284    return split;
285    }
286
287  public Fields parseFirstLine( FlowProcess flowProcess, Tap tap )
288    {
289    Fields sourceFields;
290    TupleEntryIterator iterator = null;
291
292    try
293      {
294      if( !tap.resourceExists( flowProcess ) )
295        throw new TapException( "unable to read fields from tap: " + tap + ", does not exist" );
296
297      iterator = tap.openForRead( flowProcess );
298
299      TupleEntry entry = iterator.hasNext() ? iterator.next() : null;
300
301      if( entry == null )
302        throw new TapException( "unable to read fields from tap: " + tap + ", is empty" );
303
304      Object[] result = onlyParseLine( entry.getTuple().getString( 0 ) ); // don't coerce if type info is avail
305
306      result = cleanParsedLine( result );
307
308      Type[] inferred = inferTypes( result ); // infer type from field name, after removing quotes/escapes
309
310      result = cleanFields( result ); // clean field names to remove any meta-data or manage case
311
312      sourceFields = new Fields( Arrays.copyOf( result, result.length, Comparable[].class ) );
313
314      if( inferred != null )
315        sourceFields = sourceFields.applyTypes( inferred );
316      }
317    catch( IOException exception )
318      {
319      throw new TapException( "unable to read fields from tap: " + tap, exception );
320      }
321    finally
322      {
323      if( iterator != null )
324        {
325        try
326          {
327          iterator.close();
328          }
329        catch( IOException exception )
330          {
331          // do nothing
332          }
333        }
334      }
335
336    return sourceFields;
337    }
338
339  public Object[] parseLine( String line )
340    {
341    Object[] split = onlyParseLine( line );
342
343    split = cleanParsedLine( split );
344
345    return coerceParsedLine( line, split );
346    }
347
348  protected Object[] cleanParsedLine( Object[] split )
349    {
350    return cleanSplit( split, cleanPattern, escapePattern, quote );
351    }
352
353  protected Object[] coerceParsedLine( String line, Object[] split )
354    {
355    if( types != null ) // forced null in ctor
356      {
357      Object[] result = new Object[ split.length ];
358
359      for( int i = 0; i < split.length; i++ )
360        {
361        try
362          {
363          result[ i ] = coercibles[ i ].canonical( split[ i ] );
364          }
365        catch( Exception exception )
366          {
367          result[ i ] = null;
368
369          if( !safe )
370            throw new TapException( getSafeMessage( split[ i ], i ), exception, new Tuple( line ) ); // trap actual line data
371
372          if( LOG.isDebugEnabled() )
373            LOG.debug( getSafeMessage( split[ i ], i ), exception );
374          }
375        }
376
377      split = result;
378      }
379
380    return split;
381    }
382
383  private String getSafeMessage( Object object, int i )
384    {
385    try
386      {
387      return "field " + sourceFields.get( i ) + " cannot be coerced from : " + object + " to: " + Util.getTypeName( types[ i ] );
388      }
389    catch( Throwable throwable )
390      {
391      // you may get an exception while composing the message (e.g. ArrayIndexOutOfBoundsException)
392      // use a generic string
393      return "field pos " + i + " cannot be coerced from: " + object + ", pos has no corresponding field name or coercion type";
394      }
395    }
396
397  protected Object[] onlyParseLine( String line )
398    {
399    Object[] split = createSplit( line, splitPattern, numValues == 0 ? 0 : -1 );
400
401    if( numValues != 0 && split.length != numValues )
402      {
403      if( enforceStrict )
404        throw new TapException( getParseMessage( split ), new Tuple( line ) ); // trap actual line data
405
406      if( LOG.isDebugEnabled() )
407        LOG.debug( getParseMessage( split ) );
408
409      Object[] array = new Object[ numValues ];
410      Arrays.fill( array, "" );
411      System.arraycopy( split, 0, array, 0, Math.min( numValues, split.length ) );
412
413      split = array;
414      }
415
416    return split;
417    }
418
419  private String getParseMessage( Object[] split )
420    {
421    return "did not parse correct number of values from input data, expected: " + numValues + ", got: " + split.length + ":" + Util.join( ",", (String[]) split );
422    }
423
424  public Appendable joinFirstLine( Iterable iterable, Appendable buffer )
425    {
426    iterable = prepareFields( iterable );
427
428    return joinLine( iterable, buffer );
429    }
430
431  public Appendable joinLine( Iterable iterable, Appendable buffer )
432    {
433    try
434      {
435      if( quote != null )
436        return joinWithQuote( iterable, buffer );
437
438      return joinNoQuote( iterable, buffer );
439      }
440    catch( IOException exception )
441      {
442      throw new TapException( "unable to append data", exception );
443      }
444    }
445
446  protected Appendable joinWithQuote( Iterable tuple, Appendable buffer ) throws IOException
447    {
448    int count = 0;
449
450    for( Object value : tuple )
451      {
452      if( count != 0 )
453        buffer.append( delimiter );
454
455      if( value != null )
456        {
457        String valueString = value.toString();
458
459        if( valueString.contains( quote ) )
460          valueString = valueString.replaceAll( quote, quote + quote );
461
462        if( valueString.contains( delimiter ) )
463          valueString = quote + valueString + quote;
464
465        buffer.append( valueString );
466        }
467
468      count++;
469      }
470
471    return buffer;
472    }
473
474  protected Appendable joinNoQuote( Iterable tuple, Appendable buffer ) throws IOException
475    {
476    int count = 0;
477
478    for( Object value : tuple )
479      {
480      if( count != 0 )
481        buffer.append( delimiter );
482
483      if( value != null )
484        buffer.append( value.toString() );
485
486      count++;
487      }
488
489    return buffer;
490    }
491
492  protected Type[] inferTypes( Object[] result )
493    {
494    if( fieldTypeResolver == null )
495      return null;
496
497    Type[] inferred = new Type[ result.length ];
498
499    for( int i = 0; i < result.length; i++ )
500      {
501      String field = (String) result[ i ];
502
503      inferred[ i ] = fieldTypeResolver.inferTypeFrom( i, field );
504      }
505
506    return inferred;
507    }
508
509  protected Iterable prepareFields( Iterable fields )
510    {
511    if( fieldTypeResolver == null )
512      return fields;
513
514    List result = new ArrayList();
515
516    for( Object field : fields )
517      {
518      int index = result.size();
519      Type type = types != null ? types[ index ] : null;
520      String value = fieldTypeResolver.prepareField( index, (String) field, type );
521
522      if( value != null && !value.isEmpty() )
523        field = value;
524
525      result.add( field );
526      }
527
528    return result;
529    }
530
531  protected Object[] cleanFields( Object[] result )
532    {
533    if( fieldTypeResolver == null )
534      return result;
535
536    for( int i = 0; i < result.length; i++ )
537      {
538      Type type = types != null ? types[ i ] : null;
539      String value = fieldTypeResolver.cleanField( i, (String) result[ i ], type );
540
541      if( value != null && !value.isEmpty() )
542        result[ i ] = value;
543      }
544
545    return result;
546    }
547  }