001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.scheme.util;
022    
023    import java.io.IOException;
024    import java.io.Serializable;
025    import java.lang.reflect.Type;
026    import java.util.ArrayList;
027    import java.util.Arrays;
028    import java.util.List;
029    import java.util.regex.Pattern;
030    
031    import cascading.flow.FlowProcess;
032    import cascading.tap.Tap;
033    import cascading.tap.TapException;
034    import cascading.tuple.Fields;
035    import cascading.tuple.Tuple;
036    import cascading.tuple.TupleEntry;
037    import cascading.tuple.TupleEntryIterator;
038    import cascading.tuple.coerce.Coercions;
039    import cascading.tuple.type.CoercibleType;
040    import cascading.util.Util;
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    /**
045     * Class DelimitedParser is a base class for parsing text delimited files.
046     * <p/>
047     * It maybe sub-classed to change its behavior.
048     * <p/>
049     * The interface {@link FieldTypeResolver} maybe used to clean and prepare field names
050     * for data columns, and to infer type information from column names.
051     */
052    public class DelimitedParser implements Serializable
053      {
054      /** Field LOG */
055      private static final Logger LOG = LoggerFactory.getLogger( DelimitedParser.class );
056    
057      /** Field SPECIAL_REGEX_CHARS */
058      static final String SPECIAL_REGEX_CHARS = "([\\]\\[|.*<>\\\\$^?()=!+])";
059      /** Field QUOTED_REGEX_FORMAT */
060      static final String QUOTED_REGEX_FORMAT = "%2$s(?=(?:[^%1$s]*%1$s[^%1$s]*[^%1$s%2$s]*%1$s)*(?![^%1$s]*%1$s))";
061      /** Field CLEAN_REGEX_FORMAT */
062      static final String CLEAN_REGEX_FORMAT = "^(?:%1$s)(.*)(?:%1$s)$";
063      /** Field ESCAPE_REGEX_FORMAT */
064      static final String ESCAPE_REGEX_FORMAT = "(%1$s%1$s)";
065    
066      /** Field sourceFields */
067      protected Fields sourceFields;
068    
069      /** Field splitPattern */
070      protected Pattern splitPattern;
071      /** Field cleanPattern */
072      protected Pattern cleanPattern;
073      /** Field escapePattern */
074      protected Pattern escapePattern;
075      /** Field delimiter * */
076      protected String delimiter;
077      /** Field quote */
078      protected String quote;
079      /** Field strict */
080      protected boolean strict = true; // need to cache value across resets
081      /** Field enforceStrict */
082      protected boolean enforceStrict = true;
083      /** Field numValues */
084      protected int numValues;
085      /** Field types */
086      protected Type[] types;
087      /** Fields coercibles */
088      protected CoercibleType[] coercibles;
089      /** Field safe */
090      protected boolean safe = true;
091      /** fieldTypeResolver */
092      protected FieldTypeResolver fieldTypeResolver;
093    
094      public DelimitedParser( String delimiter, String quote, Class[] types )
095        {
096        reset( delimiter, quote, types, strict, safe, null, null, null );
097        }
098    
099      public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe )
100        {
101        reset( delimiter, quote, types, strict, safe, null, null, null );
102        }
103    
104      public DelimitedParser( String delimiter, String quote, FieldTypeResolver fieldTypeResolver )
105        {
106        reset( delimiter, quote, null, strict, safe, null, null, fieldTypeResolver );
107        }
108    
109      public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, FieldTypeResolver fieldTypeResolver )
110        {
111        reset( delimiter, quote, types, strict, safe, null, null, fieldTypeResolver );
112        }
113    
114      public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields )
115        {
116        reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, null );
117        }
118    
119      public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields, FieldTypeResolver fieldTypeResolver )
120        {
121        reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, fieldTypeResolver );
122        }
123    
124      public void reset( Fields sourceFields, Fields sinkFields )
125        {
126        reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, fieldTypeResolver );
127        }
128    
129      public void reset( String delimiter, String quote, Type[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields, FieldTypeResolver fieldTypeResolver )
130        {
131        if( delimiter == null || delimiter.isEmpty() )
132          throw new IllegalArgumentException( "delimiter may not be null or empty" );
133    
134        if( delimiter.equals( quote ) )
135          throw new IllegalArgumentException( "delimiter and quote character may not be the same value, got: '" + delimiter + "'" );
136    
137        this.delimiter = delimiter;
138        this.strict = strict;
139        this.safe = safe;
140        this.fieldTypeResolver = fieldTypeResolver;
141    
142        if( quote != null && !quote.isEmpty() ) // if empty, leave null
143          this.quote = quote;
144    
145        if( types != null && types.length == 0 )
146          this.types = null;
147    
148        if( types != null )
149          this.types = Arrays.copyOf( types, types.length );
150    
151        if( sourceFields == null || sinkFields == null )
152          return;
153    
154        if( types == null && sourceFields.hasTypes() )
155          this.types = sourceFields.getTypes(); // gets a copy
156    
157        this.sourceFields = sourceFields;
158        this.numValues = Math.max( sourceFields.size(), sinkFields.size() ); // if asymmetrical, one is zero
159    
160        this.enforceStrict = this.strict;
161    
162        if( sourceFields.isUnknown() )
163          this.enforceStrict = false;
164    
165        if( !sinkFields.isAll() && numValues == 0 )
166          throw new IllegalArgumentException( "may not be zero declared fields, found: " + sinkFields.printVerbose() );
167    
168        splitPattern = createSplitPatternFor( this.delimiter, this.quote );
169        cleanPattern = createCleanPatternFor( this.quote );
170        escapePattern = createEscapePatternFor( this.quote );
171    
172        if( this.types != null && sinkFields.isAll() )
173          throw new IllegalArgumentException( "when using Fields.ALL, field types may not be used" );
174    
175        if( this.types != null && this.types.length != sinkFields.size() )
176          throw new IllegalArgumentException( "num of types must equal number of fields: " + sinkFields.printVerbose() + ", found: " + this.types.length );
177    
178        coercibles = Coercions.coercibleArray( this.numValues, this.types );
179        }
180    
181      public String getDelimiter()
182        {
183        return delimiter;
184        }
185    
186      public String getQuote()
187        {
188        return quote;
189        }
190    
191      /**
192       * Method createEscapePatternFor creates a regex {@link java.util.regex.Pattern} cleaning quote escapes from a String.
193       * <p/>
194       * If {@code quote} is null or empty, a null value will be returned;
195       *
196       * @param quote of type String
197       * @return Pattern
198       */
199      public Pattern createEscapePatternFor( String quote )
200        {
201        if( quote == null || quote.isEmpty() )
202          return null;
203    
204        return Pattern.compile( String.format( ESCAPE_REGEX_FORMAT, quote ) );
205        }
206    
207      /**
208       * Method createCleanPatternFor creates a regex {@link java.util.regex.Pattern} for removing quote characters from a String.
209       * <p/>
210       * If {@code quote} is null or empty, a null value will be returned;
211       *
212       * @param quote of type String
213       * @return Pattern
214       */
215      public Pattern createCleanPatternFor( String quote )
216        {
217        if( quote == null || quote.isEmpty() )
218          return null;
219    
220        return Pattern.compile( String.format( CLEAN_REGEX_FORMAT, quote ) );
221        }
222    
223      /**
224       * Method createSplitPatternFor creates a regex {@link java.util.regex.Pattern} for splitting a line of text into its component
225       * parts using the given delimiter and quote Strings. {@code quote} may be null.
226       *
227       * @param delimiter of type String
228       * @param quote     of type String
229       * @return Pattern
230       */
231      public Pattern createSplitPatternFor( String delimiter, String quote )
232        {
233        String escapedDelimiter = delimiter.replaceAll( SPECIAL_REGEX_CHARS, "\\\\$1" );
234    
235        if( quote == null || quote.isEmpty() )
236          return Pattern.compile( escapedDelimiter );
237        else
238          return Pattern.compile( String.format( QUOTED_REGEX_FORMAT, quote, escapedDelimiter ) );
239        }
240    
241      /**
242       * Method createSplit will split the given {@code value} with the given {@code splitPattern}.
243       *
244       * @param value        of type String
245       * @param splitPattern of type Pattern
246       * @param numValues    of type int
247       * @return String[]
248       */
249      public String[] createSplit( String value, Pattern splitPattern, int numValues )
250        {
251        return splitPattern.split( value, numValues );
252        }
253    
254      /**
255       * Method cleanSplit will return a quote free array of String values, the given {@code split} array
256       * will be updated in place.
257       * <p/>
258       * If {@code cleanPattern} is null, quote cleaning will not be performed, but all empty String values
259       * will be replaces with a {@code null} value.
260       *
261       * @param split         of type Object[]
262       * @param cleanPattern  of type Pattern
263       * @param escapePattern of type Pattern
264       * @param quote         of type String
265       * @return Object[] as a convenience
266       */
267      public Object[] cleanSplit( Object[] split, Pattern cleanPattern, Pattern escapePattern, String quote )
268        {
269        if( cleanPattern != null )
270          {
271          for( int i = 0; i < split.length; i++ )
272            {
273            split[ i ] = cleanPattern.matcher( (String) split[ i ] ).replaceAll( "$1" );
274            split[ i ] = escapePattern.matcher( (String) split[ i ] ).replaceAll( quote );
275            }
276          }
277    
278        for( int i = 0; i < split.length; i++ )
279          {
280          if( ( (String) split[ i ] ).isEmpty() )
281            split[ i ] = null;
282          }
283    
284        return split;
285        }
286    
287      public Fields parseFirstLine( FlowProcess flowProcess, Tap tap )
288        {
289        Fields sourceFields;
290        TupleEntryIterator iterator = null;
291    
292        try
293          {
294          if( !tap.resourceExists( flowProcess.getConfigCopy() ) )
295            throw new TapException( "unable to read fields from tap: " + tap + ", does not exist" );
296    
297          iterator = tap.openForRead( flowProcess );
298    
299          TupleEntry entry = iterator.hasNext() ? iterator.next() : null;
300    
301          if( entry == null )
302            throw new TapException( "unable to read fields from tap: " + tap + ", is empty" );
303    
304          Object[] result = onlyParseLine( entry.getTuple().getString( 0 ) ); // don't coerce if type info is avail
305    
306          result = cleanParsedLine( result );
307    
308          Type[] inferred = inferTypes( result ); // infer type from field name, after removing quotes/escapes
309    
310          result = cleanFields( result ); // clean field names to remove any meta-data or manage case
311    
312          sourceFields = new Fields( Arrays.copyOf( result, result.length, Comparable[].class ) );
313    
314          if( inferred != null )
315            sourceFields = sourceFields.applyTypes( inferred );
316          }
317        catch( IOException exception )
318          {
319          throw new TapException( "unable to read fields from tap: " + tap, exception );
320          }
321        finally
322          {
323          if( iterator != null )
324            {
325            try
326              {
327              iterator.close();
328              }
329            catch( IOException exception )
330              {
331              // do nothing
332              }
333            }
334          }
335    
336        return sourceFields;
337        }
338    
339      public Object[] parseLine( String line )
340        {
341        Object[] split = onlyParseLine( line );
342    
343        split = cleanParsedLine( split );
344    
345        return coerceParsedLine( line, split );
346        }
347    
348      protected Object[] cleanParsedLine( Object[] split )
349        {
350        return cleanSplit( split, cleanPattern, escapePattern, quote );
351        }
352    
353      protected Object[] coerceParsedLine( String line, Object[] split )
354        {
355        if( types != null ) // forced null in ctor
356          {
357          Object[] result = new Object[ split.length ];
358    
359          for( int i = 0; i < split.length; i++ )
360            {
361            try
362              {
363              result[ i ] = coercibles[ i ].canonical( split[ i ] );
364              }
365            catch( Exception exception )
366              {
367              result[ i ] = null;
368    
369              if( !safe )
370                throw new TapException( getSafeMessage( split[ i ], i ), exception, new Tuple( line ) ); // trap actual line data
371    
372              if( LOG.isDebugEnabled() )
373                LOG.debug( getSafeMessage( split[ i ], i ), exception );
374              }
375            }
376    
377          split = result;
378          }
379    
380        return split;
381        }
382    
383      private String getSafeMessage( Object object, int i )
384        {
385        try
386          {
387          return "field " + sourceFields.get( i ) + " cannot be coerced from : " + object + " to: " + Util.getTypeName( types[ i ] );
388          }
389        catch( Throwable throwable )
390          {
391          // you may get an exception while composing the message (e.g. ArrayIndexOutOfBoundsException)
392          // use a generic string
393          return "field pos " + i + " cannot be coerced from: " + object + ", pos has no corresponding field name or coercion type";
394          }
395        }
396    
397      protected Object[] onlyParseLine( String line )
398        {
399        Object[] split = createSplit( line, splitPattern, numValues == 0 ? 0 : -1 );
400    
401        if( numValues != 0 && split.length != numValues )
402          {
403          if( enforceStrict )
404            throw new TapException( getParseMessage( split ), new Tuple( line ) ); // trap actual line data
405    
406          if( LOG.isDebugEnabled() )
407            LOG.debug( getParseMessage( split ) );
408    
409          Object[] array = new Object[ numValues ];
410          Arrays.fill( array, "" );
411          System.arraycopy( split, 0, array, 0, Math.min( numValues, split.length ) );
412    
413          split = array;
414          }
415    
416        return split;
417        }
418    
419      private String getParseMessage( Object[] split )
420        {
421        return "did not parse correct number of values from input data, expected: " + numValues + ", got: " + split.length + ":" + Util.join( ",", (String[]) split );
422        }
423    
424      public Appendable joinFirstLine( Iterable iterable, Appendable buffer )
425        {
426        iterable = prepareFields( iterable );
427    
428        return joinLine( iterable, buffer );
429        }
430    
431      public Appendable joinLine( Iterable iterable, Appendable buffer )
432        {
433        try
434          {
435          if( quote != null )
436            return joinWithQuote( iterable, buffer );
437    
438          return joinNoQuote( iterable, buffer );
439          }
440        catch( IOException exception )
441          {
442          throw new TapException( "unable to append data", exception );
443          }
444        }
445    
446      protected Appendable joinWithQuote( Iterable tuple, Appendable buffer ) throws IOException
447        {
448        int count = 0;
449    
450        for( Object value : tuple )
451          {
452          if( count != 0 )
453            buffer.append( delimiter );
454    
455          if( value != null )
456            {
457            String valueString = value.toString();
458    
459            if( valueString.contains( quote ) )
460              valueString = valueString.replaceAll( quote, quote + quote );
461    
462            if( valueString.contains( delimiter ) )
463              valueString = quote + valueString + quote;
464    
465            buffer.append( valueString );
466            }
467    
468          count++;
469          }
470    
471        return buffer;
472        }
473    
474      protected Appendable joinNoQuote( Iterable tuple, Appendable buffer ) throws IOException
475        {
476        int count = 0;
477    
478        for( Object value : tuple )
479          {
480          if( count != 0 )
481            buffer.append( delimiter );
482    
483          if( value != null )
484            buffer.append( value.toString() );
485    
486          count++;
487          }
488    
489        return buffer;
490        }
491    
492      protected Type[] inferTypes( Object[] result )
493        {
494        if( fieldTypeResolver == null )
495          return null;
496    
497        Type[] inferred = new Type[ result.length ];
498    
499        for( int i = 0; i < result.length; i++ )
500          {
501          String field = (String) result[ i ];
502    
503          inferred[ i ] = fieldTypeResolver.inferTypeFrom( i, field );
504          }
505    
506        return inferred;
507        }
508    
509      protected Iterable prepareFields( Iterable fields )
510        {
511        if( fieldTypeResolver == null )
512          return fields;
513    
514        List result = new ArrayList();
515    
516        for( Object field : fields )
517          {
518          int index = result.size();
519          Type type = types != null ? types[ index ] : null;
520          String value = fieldTypeResolver.prepareField( index, (String) field, type );
521    
522          if( value != null && !value.isEmpty() )
523            field = value;
524    
525          result.add( field );
526          }
527    
528        return result;
529        }
530    
531      protected Object[] cleanFields( Object[] result )
532        {
533        if( fieldTypeResolver == null )
534          return result;
535    
536        for( int i = 0; i < result.length; i++ )
537          {
538          Type type = types != null ? types[ i ] : null;
539          String value = fieldTypeResolver.cleanField( i, (String) result[ i ], type );
540    
541          if( value != null && !value.isEmpty() )
542            result[ i ] = value;
543          }
544    
545        return result;
546        }
547      }