001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop;
022
023import java.io.DataInputStream;
024import java.io.DataOutputStream;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.HashMap;
031import java.util.LinkedList;
032import java.util.Map;
033
034import cascading.CascadingException;
035import cascading.flow.FlowProcess;
036import cascading.flow.FlowProps;
037import cascading.tuple.Comparison;
038import cascading.tuple.Tuple;
039import cascading.tuple.TupleException;
040import cascading.tuple.hadoop.io.HadoopTupleOutputStream;
041import cascading.tuple.hadoop.io.IndexTupleDeserializer;
042import cascading.tuple.hadoop.io.IndexTupleSerializer;
043import cascading.tuple.hadoop.io.TupleDeserializer;
044import cascading.tuple.hadoop.io.TuplePairDeserializer;
045import cascading.tuple.hadoop.io.TuplePairSerializer;
046import cascading.tuple.hadoop.io.TupleSerializer;
047import cascading.tuple.io.IndexTuple;
048import cascading.tuple.io.TupleInputStream;
049import cascading.tuple.io.TupleOutputStream;
050import cascading.tuple.io.TuplePair;
051import cascading.util.Util;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.conf.Configured;
054import org.apache.hadoop.io.WritableUtils;
055import org.apache.hadoop.io.serializer.Deserializer;
056import org.apache.hadoop.io.serializer.Serialization;
057import org.apache.hadoop.io.serializer.SerializationFactory;
058import org.apache.hadoop.io.serializer.Serializer;
059import org.apache.hadoop.io.serializer.WritableSerialization;
060import org.apache.hadoop.util.ReflectionUtils;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import static cascading.tuple.hadoop.TupleSerializationProps.HADOOP_IO_SERIALIZATIONS;
065
066/**
067 * Class TupleSerialization is an implementation of Hadoop's {@link Serialization} interface.
068 * <p/>
069 * Typically developers will not use this implementation directly as it is automatically added
070 * to any relevant MapReduce jobs via the {@link org.apache.hadoop.conf.Configuration}.
071 * <p/>
072 * By default, all primitive types are natively handled, and {@link org.apache.hadoop.io.BytesWritable}
073 * has a pre-configured serialization token since byte arrays are not handled natively by {@link Tuple}.
074 * <p/>
075 * To add or manipulate Hadoop serializations or Cascading serializations tokens, see
076 * {@link TupleSerializationProps} for a fluent property builder class.
077 * <p/>
078 * By default this Serialization interface registers the class {@link org.apache.hadoop.io.ByteWritable} as
079 * token 127.
080 */
081@SerializationToken(
082  tokens = {127},
083  classNames = {"org.apache.hadoop.io.BytesWritable"})
084public class TupleSerialization extends Configured implements Serialization
085  {
086  /** Field LOG */
087  private static final Logger LOG = LoggerFactory.getLogger( TupleSerialization.class );
088
089  /** Field defaultComparator * */
090  private Comparator defaultComparator;
091  /** Field classCache */
092  private final Map<String, Class> classCache = new HashMap<String, Class>();
093  /** Field serializationFactory */
094  private SerializationFactory serializationFactory;
095
096  /** Field tokenClassesMap */
097  private HashMap<Integer, String> tokenClassesMap;
098  /** Field classesTokensMap */
099  private HashMap<String, Integer> classesTokensMap;
100  /** Field tokenMapSize */
101  private long tokensSize = 0;
102
103  static String getSerializationTokens( Configuration jobConf )
104    {
105    return jobConf.get( TupleSerializationProps.SERIALIZATION_TOKENS );
106    }
107
108  /**
109   * Adds this class as a Hadoop Serialization class. This method is safe to call redundantly.
110   * <p/>
111   * This method will guarantee {@link TupleSerialization} and {@link WritableSerialization} are
112   * first in the list, as both are required.
113   *
114   * @param jobConf of type JobConf
115   */
116  public static void setSerializations( Configuration jobConf )
117    {
118    String serializations = getSerializations( jobConf );
119
120    LinkedList<String> list = new LinkedList<String>();
121
122    if( serializations != null && !serializations.isEmpty() )
123      Collections.addAll( list, serializations.split( "," ) );
124
125    // required by MultiInputSplit
126    String writable = WritableSerialization.class.getName();
127    String tuple = TupleSerialization.class.getName();
128
129    list.remove( writable );
130    list.remove( tuple );
131
132    list.addFirst( writable );
133    list.addFirst( tuple );
134
135    // make writable last
136    jobConf.set( HADOOP_IO_SERIALIZATIONS, Util.join( list, "," ) );
137    }
138
139  static String getSerializations( Configuration jobConf )
140    {
141    return jobConf.get( HADOOP_IO_SERIALIZATIONS, null );
142    }
143
144  public static Comparator getDefaultComparator( Comparator comparator, Configuration jobConf )
145    {
146    String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR );
147
148    if( Util.isEmpty( typeName ) )
149      return null;
150
151    if( comparator == null )
152      return createComparator( jobConf, typeName );
153
154    if( comparator.getClass().getName().equals( typeName ) && !( comparator instanceof Configured ) )
155      return comparator;
156
157    return createComparator( jobConf, typeName );
158    }
159
160  public static Comparator getDefaultComparator( Configuration jobConf )
161    {
162    String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR );
163
164    if( Util.isEmpty( typeName ) )
165      return null;
166
167    return createComparator( jobConf, typeName );
168    }
169
170  private static Comparator createComparator( Configuration jobConf, String typeName )
171    {
172    LOG.debug( "using default comparator: {}", typeName );
173
174    try
175      {
176      Class<Comparator> type = (Class<Comparator>) TupleSerialization.class.getClassLoader().loadClass( typeName );
177
178      return ReflectionUtils.newInstance( type, jobConf );
179      }
180    catch( ClassNotFoundException exception )
181      {
182      throw new CascadingException( "unable to load class: " + typeName, exception );
183      }
184    }
185
186  /** Constructor TupleSerialization creates a new TupleSerialization instance. */
187  public TupleSerialization()
188    {
189    }
190
191  public TupleSerialization( final FlowProcess<? extends Configuration> flowProcess )
192    {
193    super( new Configuration()
194    {
195    @Override
196    public String get( String name )
197      {
198      return get( name, null );
199      }
200
201    @Override
202    public String get( String name, String defaultValue )
203      {
204      Object value = flowProcess.getProperty( name );
205      return value == null ? defaultValue : String.valueOf( value );
206      }
207    } );
208    }
209
210  /**
211   * Constructor TupleSerialization creates a new TupleSerialization instance.
212   *
213   * @param conf of type Configuration
214   */
215  public TupleSerialization( Configuration conf )
216    {
217    super( conf );
218    }
219
220  @Override
221  public void setConf( Configuration conf )
222    {
223    super.setConf( conf );
224
225    if( conf != null )
226      defaultComparator = getDefaultComparator( conf );
227    }
228
229  @Override
230  public Configuration getConf()
231    {
232    if( super.getConf() == null )
233      setConf( new Configuration() );
234
235    return super.getConf();
236    }
237
238  SerializationFactory getSerializationFactory()
239    {
240    if( serializationFactory == null )
241      serializationFactory = new SerializationFactory( getConf() );
242
243    return serializationFactory;
244    }
245
246  /** Must be called before {@link #getClassNameFor(int)} and {@link #getTokenFor(String)} methods. */
247  void initTokenMaps()
248    {
249    if( tokenClassesMap != null )
250      return;
251
252    tokenClassesMap = new HashMap<Integer, String>();
253    classesTokensMap = new HashMap<String, Integer>();
254
255    String tokenProperty = getSerializationTokens( getConf() );
256
257    if( tokenProperty != null )
258      {
259      tokenProperty = tokenProperty.replaceAll( "\\s", "" ); // allow for whitespace in token set
260
261      for( String pair : tokenProperty.split( "," ) )
262        {
263        String[] elements = pair.split( "=" );
264        addToken( null, Integer.parseInt( elements[ 0 ] ), elements[ 1 ] );
265        }
266      }
267
268    String serializationsString = getSerializations( getConf() );
269
270    LOG.debug( "using hadoop serializations from the job conf: {} ", serializationsString );
271
272    if( serializationsString == null )
273      return;
274
275    String[] serializations = serializationsString.split( "," );
276
277    for( String serializationName : serializations )
278      {
279      try
280        {
281        Class type = getConf().getClassByName( serializationName );
282
283        SerializationToken tokenAnnotation = (SerializationToken) type.getAnnotation( SerializationToken.class );
284
285        if( tokenAnnotation == null )
286          continue;
287
288        if( tokenAnnotation.tokens().length != tokenAnnotation.classNames().length )
289          throw new CascadingException( "serialization annotation tokens and classNames must be the same length" );
290
291        int[] tokens = tokenAnnotation.tokens();
292
293        for( int i = 0; i < tokens.length; i++ )
294          addToken( type, tokens[ i ], tokenAnnotation.classNames()[ i ] );
295        }
296      catch( ClassNotFoundException exception )
297        {
298        LOG.warn( "unable to load serialization class: {}", serializationName, exception );
299        }
300      }
301
302    tokensSize = tokenClassesMap.size();
303    }
304
305  private void addToken( Class type, int token, String className )
306    {
307    if( type != null && !type.getName().startsWith( "cascading." ) && token < 128 )
308      throw new CascadingException( "serialization annotation tokens may not be less than 128, was: " + token );
309
310    if( tokenClassesMap.containsKey( token ) )
311      {
312      if( type == null )
313        throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " found in properties" );
314
315      throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " on serialization: " + type.getName() );
316      }
317
318    if( classesTokensMap.containsKey( className ) )
319      {
320      if( type == null )
321        throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " found in properties " );
322
323      throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " on serialization: " + type.getName() );
324      }
325
326    LOG.debug( "adding serialization token: {}, for classname: {}", token, className );
327
328    tokenClassesMap.put( token, className );
329    classesTokensMap.put( className, token );
330    }
331
332  /**
333   * Returns the className for the given token.
334   *
335   * @param token of type int
336   * @return a String
337   */
338  final String getClassNameFor( int token )
339    {
340    if( tokensSize == 0 )
341      return null;
342
343    return tokenClassesMap.get( token );
344    }
345
346  final long getTokensMapSize()
347    {
348    return tokensSize;
349    }
350
351  /**
352   * Returns the token for the given className.
353   *
354   * @param className of type String
355   * @return an Integer
356   */
357  final Integer getTokenFor( String className )
358    {
359    if( tokensSize == 0 )
360      return null;
361
362    return classesTokensMap.get( className );
363    }
364
365  public Comparator getDefaultComparator()
366    {
367    return defaultComparator;
368    }
369
370  public Comparator getComparator( Class type )
371    {
372    Serialization serialization = getSerialization( type );
373
374    Comparator comparator = null;
375
376    if( serialization instanceof Comparison )
377      comparator = ( (Comparison) serialization ).getComparator( type );
378
379    if( comparator != null )
380      return comparator;
381
382    return defaultComparator;
383    }
384
385  Serialization getSerialization( String className )
386    {
387    return getSerialization( getClass( className ) );
388    }
389
390  Serialization getSerialization( Class type )
391    {
392    return getSerializationFactory().getSerialization( type );
393    }
394
395  Serializer getNewSerializer( Class type )
396    {
397    try
398      {
399      Serializer serializer = getSerializationFactory().getSerializer( type );
400
401      if( serializer == null )
402        throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() );
403
404      return serializer;
405      }
406    catch( NullPointerException exception )
407      {
408      throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() );
409      }
410    }
411
412  Deserializer getNewDeserializer( String className )
413    {
414    try
415      {
416      Deserializer deserializer = getSerializationFactory().getDeserializer( getClass( className ) );
417
418      if( deserializer == null )
419        throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() );
420
421      return deserializer;
422      }
423    catch( NullPointerException exception )
424      {
425      throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() );
426      }
427    }
428
429  TuplePairDeserializer getTuplePairDeserializer()
430    {
431    return new TuplePairDeserializer( getElementReader() );
432    }
433
434  /**
435   * Method getElementReader returns the elementReader of this TupleSerialization object.
436   *
437   * @return the elementReader (type SerializationElementReader) of this TupleSerialization object.
438   */
439  public SerializationElementReader getElementReader()
440    {
441    return new SerializationElementReader( this );
442    }
443
444  TupleDeserializer getTupleDeserializer()
445    {
446    return new TupleDeserializer( getElementReader() );
447    }
448
449  private TuplePairSerializer getTuplePairSerializer()
450    {
451    return new TuplePairSerializer( getElementWriter() );
452    }
453
454  IndexTupleDeserializer getIndexTupleDeserializer()
455    {
456    return new IndexTupleDeserializer( getElementReader() );
457    }
458
459  /**
460   * Method getElementWriter returns the elementWriter of this TupleSerialization object.
461   *
462   * @return the elementWriter (type SerializationElementWriter) of this TupleSerialization object.
463   */
464  public SerializationElementWriter getElementWriter()
465    {
466    return new SerializationElementWriter( this );
467    }
468
469  private TupleSerializer getTupleSerializer()
470    {
471    return new TupleSerializer( getElementWriter() );
472    }
473
474  private IndexTupleSerializer getIndexTupleSerializer()
475    {
476    return new IndexTupleSerializer( getElementWriter() );
477    }
478
479  /**
480   * Method accept implements {@link Serialization#accept(Class)}.
481   *
482   * @param c of type Class
483   * @return boolean
484   */
485  public boolean accept( Class c )
486    {
487    return Tuple.class == c || TuplePair.class == c || IndexTuple.class == c;
488    }
489
490  /**
491   * Method getDeserializer implements {@link Serialization#getDeserializer(Class)}.
492   *
493   * @param c of type Class
494   * @return Deserializer
495   */
496  public Deserializer getDeserializer( Class c )
497    {
498    if( c == Tuple.class )
499      return getTupleDeserializer();
500    else if( c == TuplePair.class )
501      return getTuplePairDeserializer();
502    else if( c == IndexTuple.class )
503      return getIndexTupleDeserializer();
504
505    throw new IllegalArgumentException( "unknown class, cannot deserialize: " + c.getName() );
506    }
507
508  /**
509   * Method getSerializer implements {@link Serialization#getSerializer(Class)}.
510   *
511   * @param c of type Class
512   * @return Serializer
513   */
514  public Serializer getSerializer( Class c )
515    {
516    if( c == Tuple.class )
517      return getTupleSerializer();
518    else if( c == TuplePair.class )
519      return getTuplePairSerializer();
520    else if( c == IndexTuple.class )
521      return getIndexTupleSerializer();
522
523    throw new IllegalArgumentException( "unknown class, cannot serialize: " + c.getName() );
524    }
525
526  public Class getClass( String className )
527    {
528    Class type = classCache.get( className );
529
530    if( type != null )
531      return type;
532
533    try
534      {
535      if( className.charAt( 0 ) == '[' )
536        type = Class.forName( className, true, Thread.currentThread().getContextClassLoader() );
537      else
538        type = Thread.currentThread().getContextClassLoader().loadClass( className );
539      }
540    catch( ClassNotFoundException exception )
541      {
542      throw new TupleException( "unable to load class named: " + className, exception );
543      }
544
545    classCache.put( className, type );
546
547    return type;
548    }
549
550  public static class SerializationElementReader implements TupleInputStream.ElementReader
551    {
552    /** Field LOG */
553    private static final Logger LOG = LoggerFactory.getLogger( SerializationElementReader.class );
554
555    /** Field tupleSerialization */
556    private final TupleSerialization tupleSerialization;
557
558    /** Field deserializers */
559    final Map<String, Deserializer> deserializers = new HashMap<String, Deserializer>();
560
561    /**
562     * Constructor SerializationElementReader creates a new SerializationElementReader instance.
563     *
564     * @param tupleSerialization of type TupleSerialization
565     */
566    public SerializationElementReader( TupleSerialization tupleSerialization )
567      {
568      this.tupleSerialization = tupleSerialization;
569
570      tupleSerialization.initTokenMaps();
571      }
572
573    public Object read( int token, DataInputStream inputStream ) throws IOException
574      {
575      String className = getClassNameFor( token, inputStream );
576      Deserializer deserializer = getDeserializerFor( inputStream, className );
577
578      Object foundObject = null;
579      Object object;
580
581      try
582        {
583        object = deserializer.deserialize( foundObject );
584        }
585      catch( IOException exception )
586        {
587        LOG.error( "failed deserializing token: " + token + " with classname: " + className, exception );
588
589        throw exception;
590        }
591
592      return object;
593      }
594
595    @Override
596    public Comparator getComparatorFor( int token, DataInputStream inputStream ) throws IOException
597      {
598      Class type = tupleSerialization.getClass( getClassNameFor( token, inputStream ) );
599
600      return tupleSerialization.getComparator( type );
601      }
602
603    private Deserializer getDeserializerFor( DataInputStream inputStream, String className ) throws IOException
604      {
605      Deserializer deserializer = deserializers.get( className );
606
607      if( deserializer == null )
608        {
609        deserializer = tupleSerialization.getNewDeserializer( className );
610        deserializer.open( inputStream );
611        deserializers.put( className, deserializer );
612        }
613
614      return deserializer;
615      }
616
617    public String getClassNameFor( int token, DataInputStream inputStream ) throws IOException
618      {
619      String className = tupleSerialization.getClassNameFor( token );
620
621      try
622        {
623        if( className == null )
624          className = WritableUtils.readString( inputStream );
625        }
626      catch( IOException exception )
627        {
628        LOG.error( "unable to resolve token: {}, to a valid classname, with token map of size: {}, rethrowing IOException", token, tupleSerialization.getTokensMapSize() );
629        throw exception;
630        }
631
632      return className;
633      }
634
635    public void close()
636      {
637      if( deserializers.size() == 0 )
638        return;
639
640      Collection<Deserializer> clone = new ArrayList<Deserializer>( deserializers.values() );
641
642      deserializers.clear();
643
644      for( Deserializer deserializer : clone )
645        {
646        try
647          {
648          deserializer.close();
649          }
650        catch( IOException exception )
651          {
652          // do nothing
653          }
654        }
655      }
656    }
657
658  public static class SerializationElementWriter implements TupleOutputStream.ElementWriter
659    {
660    /** Field LOG */
661    private static final Logger LOG = LoggerFactory.getLogger( SerializationElementWriter.class );
662
663    /** Field tupleSerialization */
664    private final TupleSerialization tupleSerialization;
665
666    /** Field serializers */
667    final Map<Class, Serializer> serializers = new HashMap<Class, Serializer>();
668
669    public SerializationElementWriter( TupleSerialization tupleSerialization )
670      {
671      this.tupleSerialization = tupleSerialization;
672
673      tupleSerialization.initTokenMaps();
674      }
675
676    public void write( DataOutputStream outputStream, Object object ) throws IOException
677      {
678      Class<?> type = object.getClass();
679      String className = type.getName();
680      Integer token = tupleSerialization.getTokenFor( className );
681
682      if( token == null )
683        {
684        LOG.debug( "no serialization token found for classname: {}", className );
685
686        WritableUtils.writeVInt( outputStream, HadoopTupleOutputStream.WRITABLE_TOKEN ); // denotes to punt to hadoop serialization
687        WritableUtils.writeString( outputStream, className );
688        }
689      else
690        {
691        WritableUtils.writeVInt( outputStream, token );
692        }
693
694      Serializer serializer = serializers.get( type );
695
696      if( serializer == null )
697        {
698        serializer = tupleSerialization.getNewSerializer( type );
699        serializer.open( outputStream );
700        serializers.put( type, serializer );
701        }
702
703      try
704        {
705        serializer.serialize( object );
706        }
707      catch( IOException exception )
708        {
709        LOG.error( "failed serializing token: " + token + " with classname: " + className, exception );
710
711        throw exception;
712        }
713      }
714
715    public void close()
716      {
717      if( serializers.size() == 0 )
718        return;
719
720      Collection<Serializer> clone = new ArrayList<Serializer>( serializers.values() );
721
722      serializers.clear();
723
724      for( Serializer serializer : clone )
725        {
726        try
727          {
728          serializer.close();
729          }
730        catch( IOException exception )
731          {
732          // do nothing
733          }
734        }
735      }
736    }
737  }