001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.hadoop; 022 023import java.io.DataInputStream; 024import java.io.DataOutputStream; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.HashMap; 031import java.util.LinkedList; 032import java.util.Map; 033 034import cascading.CascadingException; 035import cascading.flow.FlowProcess; 036import cascading.flow.FlowProps; 037import cascading.tuple.Comparison; 038import cascading.tuple.Tuple; 039import cascading.tuple.TupleException; 040import cascading.tuple.hadoop.io.HadoopTupleOutputStream; 041import cascading.tuple.hadoop.io.IndexTupleDeserializer; 042import cascading.tuple.hadoop.io.IndexTupleSerializer; 043import cascading.tuple.hadoop.io.TupleDeserializer; 044import cascading.tuple.hadoop.io.TuplePairDeserializer; 045import cascading.tuple.hadoop.io.TuplePairSerializer; 046import cascading.tuple.hadoop.io.TupleSerializer; 047import cascading.tuple.io.IndexTuple; 048import cascading.tuple.io.TupleInputStream; 049import cascading.tuple.io.TupleOutputStream; 050import cascading.tuple.io.TuplePair; 051import cascading.util.Util; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.conf.Configured; 054import org.apache.hadoop.io.WritableUtils; 055import org.apache.hadoop.io.serializer.Deserializer; 056import org.apache.hadoop.io.serializer.Serialization; 057import org.apache.hadoop.io.serializer.SerializationFactory; 058import org.apache.hadoop.io.serializer.Serializer; 059import org.apache.hadoop.io.serializer.WritableSerialization; 060import org.apache.hadoop.util.ReflectionUtils; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import static cascading.tuple.hadoop.TupleSerializationProps.HADOOP_IO_SERIALIZATIONS; 065 066/** 067 * Class TupleSerialization is an implementation of Hadoop's {@link Serialization} interface. 068 * <p/> 069 * Typically developers will not use this implementation directly as it is automatically added 070 * to any relevant MapReduce jobs via the {@link org.apache.hadoop.conf.Configuration}. 071 * <p/> 072 * By default, all primitive types are natively handled, and {@link org.apache.hadoop.io.BytesWritable} 073 * has a pre-configured serialization token since byte arrays are not handled natively by {@link Tuple}. 074 * <p/> 075 * To add or manipulate Hadoop serializations or Cascading serializations tokens, see 076 * {@link TupleSerializationProps} for a fluent property builder class. 077 * <p/> 078 * By default this Serialization interface registers the class {@link org.apache.hadoop.io.ByteWritable} as 079 * token 127. 080 */ 081@SerializationToken( 082 tokens = {127}, 083 classNames = {"org.apache.hadoop.io.BytesWritable"}) 084public class TupleSerialization extends Configured implements Serialization 085 { 086 /** Field LOG */ 087 private static final Logger LOG = LoggerFactory.getLogger( TupleSerialization.class ); 088 089 /** Field defaultComparator * */ 090 private Comparator defaultComparator; 091 /** Field classCache */ 092 private final Map<String, Class> classCache = new HashMap<String, Class>(); 093 /** Field serializationFactory */ 094 private SerializationFactory serializationFactory; 095 096 /** Field tokenClassesMap */ 097 private HashMap<Integer, String> tokenClassesMap; 098 /** Field classesTokensMap */ 099 private HashMap<String, Integer> classesTokensMap; 100 /** Field tokenMapSize */ 101 private long tokensSize = 0; 102 103 static String getSerializationTokens( Configuration jobConf ) 104 { 105 return jobConf.get( TupleSerializationProps.SERIALIZATION_TOKENS ); 106 } 107 108 /** 109 * Adds this class as a Hadoop Serialization class. This method is safe to call redundantly. 110 * <p/> 111 * This method will guarantee {@link TupleSerialization} and {@link WritableSerialization} are 112 * first in the list, as both are required. 113 * 114 * @param jobConf of type JobConf 115 */ 116 public static void setSerializations( Configuration jobConf ) 117 { 118 String serializations = getSerializations( jobConf ); 119 120 LinkedList<String> list = new LinkedList<String>(); 121 122 if( serializations != null && !serializations.isEmpty() ) 123 Collections.addAll( list, serializations.split( "," ) ); 124 125 // required by MultiInputSplit 126 String writable = WritableSerialization.class.getName(); 127 String tuple = TupleSerialization.class.getName(); 128 129 list.remove( writable ); 130 list.remove( tuple ); 131 132 list.addFirst( writable ); 133 list.addFirst( tuple ); 134 135 // make writable last 136 jobConf.set( HADOOP_IO_SERIALIZATIONS, Util.join( list, "," ) ); 137 } 138 139 static String getSerializations( Configuration jobConf ) 140 { 141 return jobConf.get( HADOOP_IO_SERIALIZATIONS, null ); 142 } 143 144 public static Comparator getDefaultComparator( Comparator comparator, Configuration jobConf ) 145 { 146 String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR ); 147 148 if( Util.isEmpty( typeName ) ) 149 return null; 150 151 if( comparator == null ) 152 return createComparator( jobConf, typeName ); 153 154 if( comparator.getClass().getName().equals( typeName ) && !( comparator instanceof Configured ) ) 155 return comparator; 156 157 return createComparator( jobConf, typeName ); 158 } 159 160 public static Comparator getDefaultComparator( Configuration jobConf ) 161 { 162 String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR ); 163 164 if( Util.isEmpty( typeName ) ) 165 return null; 166 167 return createComparator( jobConf, typeName ); 168 } 169 170 private static Comparator createComparator( Configuration jobConf, String typeName ) 171 { 172 LOG.debug( "using default comparator: {}", typeName ); 173 174 try 175 { 176 Class<Comparator> type = (Class<Comparator>) TupleSerialization.class.getClassLoader().loadClass( typeName ); 177 178 return ReflectionUtils.newInstance( type, jobConf ); 179 } 180 catch( ClassNotFoundException exception ) 181 { 182 throw new CascadingException( "unable to load class: " + typeName, exception ); 183 } 184 } 185 186 /** Constructor TupleSerialization creates a new TupleSerialization instance. */ 187 public TupleSerialization() 188 { 189 } 190 191 public TupleSerialization( final FlowProcess<? extends Configuration> flowProcess ) 192 { 193 super( new Configuration() 194 { 195 @Override 196 public String get( String name ) 197 { 198 return get( name, null ); 199 } 200 201 @Override 202 public String get( String name, String defaultValue ) 203 { 204 Object value = flowProcess.getProperty( name ); 205 return value == null ? defaultValue : String.valueOf( value ); 206 } 207 } ); 208 } 209 210 /** 211 * Constructor TupleSerialization creates a new TupleSerialization instance. 212 * 213 * @param conf of type Configuration 214 */ 215 public TupleSerialization( Configuration conf ) 216 { 217 super( conf ); 218 } 219 220 @Override 221 public void setConf( Configuration conf ) 222 { 223 super.setConf( conf ); 224 225 if( conf != null ) 226 defaultComparator = getDefaultComparator( conf ); 227 } 228 229 @Override 230 public Configuration getConf() 231 { 232 if( super.getConf() == null ) 233 setConf( new Configuration() ); 234 235 return super.getConf(); 236 } 237 238 SerializationFactory getSerializationFactory() 239 { 240 if( serializationFactory == null ) 241 serializationFactory = new SerializationFactory( getConf() ); 242 243 return serializationFactory; 244 } 245 246 /** Must be called before {@link #getClassNameFor(int)} and {@link #getTokenFor(String)} methods. */ 247 void initTokenMaps() 248 { 249 if( tokenClassesMap != null ) 250 return; 251 252 tokenClassesMap = new HashMap<Integer, String>(); 253 classesTokensMap = new HashMap<String, Integer>(); 254 255 String tokenProperty = getSerializationTokens( getConf() ); 256 257 if( tokenProperty != null ) 258 { 259 tokenProperty = tokenProperty.replaceAll( "\\s", "" ); // allow for whitespace in token set 260 261 for( String pair : tokenProperty.split( "," ) ) 262 { 263 String[] elements = pair.split( "=" ); 264 addToken( null, Integer.parseInt( elements[ 0 ] ), elements[ 1 ] ); 265 } 266 } 267 268 String serializationsString = getSerializations( getConf() ); 269 270 LOG.debug( "using hadoop serializations from the job conf: {} ", serializationsString ); 271 272 if( serializationsString == null ) 273 return; 274 275 String[] serializations = serializationsString.split( "," ); 276 277 for( String serializationName : serializations ) 278 { 279 try 280 { 281 Class type = getConf().getClassByName( serializationName ); 282 283 SerializationToken tokenAnnotation = (SerializationToken) type.getAnnotation( SerializationToken.class ); 284 285 if( tokenAnnotation == null ) 286 continue; 287 288 if( tokenAnnotation.tokens().length != tokenAnnotation.classNames().length ) 289 throw new CascadingException( "serialization annotation tokens and classNames must be the same length" ); 290 291 int[] tokens = tokenAnnotation.tokens(); 292 293 for( int i = 0; i < tokens.length; i++ ) 294 addToken( type, tokens[ i ], tokenAnnotation.classNames()[ i ] ); 295 } 296 catch( ClassNotFoundException exception ) 297 { 298 LOG.warn( "unable to load serialization class: {}", serializationName, exception ); 299 } 300 } 301 302 tokensSize = tokenClassesMap.size(); 303 } 304 305 private void addToken( Class type, int token, String className ) 306 { 307 if( type != null && !type.getName().startsWith( "cascading." ) && token < 128 ) 308 throw new CascadingException( "serialization annotation tokens may not be less than 128, was: " + token ); 309 310 if( tokenClassesMap.containsKey( token ) ) 311 { 312 if( type == null ) 313 throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " found in properties" ); 314 315 throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " on serialization: " + type.getName() ); 316 } 317 318 if( classesTokensMap.containsKey( className ) ) 319 { 320 if( type == null ) 321 throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " found in properties " ); 322 323 throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " on serialization: " + type.getName() ); 324 } 325 326 LOG.debug( "adding serialization token: {}, for classname: {}", token, className ); 327 328 tokenClassesMap.put( token, className ); 329 classesTokensMap.put( className, token ); 330 } 331 332 /** 333 * Returns the className for the given token. 334 * 335 * @param token of type int 336 * @return a String 337 */ 338 final String getClassNameFor( int token ) 339 { 340 if( tokensSize == 0 ) 341 return null; 342 343 return tokenClassesMap.get( token ); 344 } 345 346 final long getTokensMapSize() 347 { 348 return tokensSize; 349 } 350 351 /** 352 * Returns the token for the given className. 353 * 354 * @param className of type String 355 * @return an Integer 356 */ 357 final Integer getTokenFor( String className ) 358 { 359 if( tokensSize == 0 ) 360 return null; 361 362 return classesTokensMap.get( className ); 363 } 364 365 public Comparator getDefaultComparator() 366 { 367 return defaultComparator; 368 } 369 370 public Comparator getComparator( Class type ) 371 { 372 Serialization serialization = getSerialization( type ); 373 374 Comparator comparator = null; 375 376 if( serialization instanceof Comparison ) 377 comparator = ( (Comparison) serialization ).getComparator( type ); 378 379 if( comparator != null ) 380 return comparator; 381 382 return defaultComparator; 383 } 384 385 Serialization getSerialization( String className ) 386 { 387 return getSerialization( getClass( className ) ); 388 } 389 390 Serialization getSerialization( Class type ) 391 { 392 return getSerializationFactory().getSerialization( type ); 393 } 394 395 Serializer getNewSerializer( Class type ) 396 { 397 try 398 { 399 Serializer serializer = getSerializationFactory().getSerializer( type ); 400 401 if( serializer == null ) 402 throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() ); 403 404 return serializer; 405 } 406 catch( NullPointerException exception ) 407 { 408 throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() ); 409 } 410 } 411 412 Deserializer getNewDeserializer( String className ) 413 { 414 try 415 { 416 Deserializer deserializer = getSerializationFactory().getDeserializer( getClass( className ) ); 417 418 if( deserializer == null ) 419 throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() ); 420 421 return deserializer; 422 } 423 catch( NullPointerException exception ) 424 { 425 throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() ); 426 } 427 } 428 429 TuplePairDeserializer getTuplePairDeserializer() 430 { 431 return new TuplePairDeserializer( getElementReader() ); 432 } 433 434 /** 435 * Method getElementReader returns the elementReader of this TupleSerialization object. 436 * 437 * @return the elementReader (type SerializationElementReader) of this TupleSerialization object. 438 */ 439 public SerializationElementReader getElementReader() 440 { 441 return new SerializationElementReader( this ); 442 } 443 444 TupleDeserializer getTupleDeserializer() 445 { 446 return new TupleDeserializer( getElementReader() ); 447 } 448 449 private TuplePairSerializer getTuplePairSerializer() 450 { 451 return new TuplePairSerializer( getElementWriter() ); 452 } 453 454 IndexTupleDeserializer getIndexTupleDeserializer() 455 { 456 return new IndexTupleDeserializer( getElementReader() ); 457 } 458 459 /** 460 * Method getElementWriter returns the elementWriter of this TupleSerialization object. 461 * 462 * @return the elementWriter (type SerializationElementWriter) of this TupleSerialization object. 463 */ 464 public SerializationElementWriter getElementWriter() 465 { 466 return new SerializationElementWriter( this ); 467 } 468 469 private TupleSerializer getTupleSerializer() 470 { 471 return new TupleSerializer( getElementWriter() ); 472 } 473 474 private IndexTupleSerializer getIndexTupleSerializer() 475 { 476 return new IndexTupleSerializer( getElementWriter() ); 477 } 478 479 /** 480 * Method accept implements {@link Serialization#accept(Class)}. 481 * 482 * @param c of type Class 483 * @return boolean 484 */ 485 public boolean accept( Class c ) 486 { 487 return Tuple.class == c || TuplePair.class == c || IndexTuple.class == c; 488 } 489 490 /** 491 * Method getDeserializer implements {@link Serialization#getDeserializer(Class)}. 492 * 493 * @param c of type Class 494 * @return Deserializer 495 */ 496 public Deserializer getDeserializer( Class c ) 497 { 498 if( c == Tuple.class ) 499 return getTupleDeserializer(); 500 else if( c == TuplePair.class ) 501 return getTuplePairDeserializer(); 502 else if( c == IndexTuple.class ) 503 return getIndexTupleDeserializer(); 504 505 throw new IllegalArgumentException( "unknown class, cannot deserialize: " + c.getName() ); 506 } 507 508 /** 509 * Method getSerializer implements {@link Serialization#getSerializer(Class)}. 510 * 511 * @param c of type Class 512 * @return Serializer 513 */ 514 public Serializer getSerializer( Class c ) 515 { 516 if( c == Tuple.class ) 517 return getTupleSerializer(); 518 else if( c == TuplePair.class ) 519 return getTuplePairSerializer(); 520 else if( c == IndexTuple.class ) 521 return getIndexTupleSerializer(); 522 523 throw new IllegalArgumentException( "unknown class, cannot serialize: " + c.getName() ); 524 } 525 526 public Class getClass( String className ) 527 { 528 Class type = classCache.get( className ); 529 530 if( type != null ) 531 return type; 532 533 try 534 { 535 if( className.charAt( 0 ) == '[' ) 536 type = Class.forName( className, true, Thread.currentThread().getContextClassLoader() ); 537 else 538 type = Thread.currentThread().getContextClassLoader().loadClass( className ); 539 } 540 catch( ClassNotFoundException exception ) 541 { 542 throw new TupleException( "unable to load class named: " + className, exception ); 543 } 544 545 classCache.put( className, type ); 546 547 return type; 548 } 549 550 public static class SerializationElementReader implements TupleInputStream.ElementReader 551 { 552 /** Field LOG */ 553 private static final Logger LOG = LoggerFactory.getLogger( SerializationElementReader.class ); 554 555 /** Field tupleSerialization */ 556 private final TupleSerialization tupleSerialization; 557 558 /** Field deserializers */ 559 final Map<String, Deserializer> deserializers = new HashMap<String, Deserializer>(); 560 561 /** 562 * Constructor SerializationElementReader creates a new SerializationElementReader instance. 563 * 564 * @param tupleSerialization of type TupleSerialization 565 */ 566 public SerializationElementReader( TupleSerialization tupleSerialization ) 567 { 568 this.tupleSerialization = tupleSerialization; 569 570 tupleSerialization.initTokenMaps(); 571 } 572 573 public Object read( int token, DataInputStream inputStream ) throws IOException 574 { 575 String className = getClassNameFor( token, inputStream ); 576 Deserializer deserializer = getDeserializerFor( inputStream, className ); 577 578 Object foundObject = null; 579 Object object; 580 581 try 582 { 583 object = deserializer.deserialize( foundObject ); 584 } 585 catch( IOException exception ) 586 { 587 LOG.error( "failed deserializing token: " + token + " with classname: " + className, exception ); 588 589 throw exception; 590 } 591 592 return object; 593 } 594 595 @Override 596 public Comparator getComparatorFor( int token, DataInputStream inputStream ) throws IOException 597 { 598 Class type = tupleSerialization.getClass( getClassNameFor( token, inputStream ) ); 599 600 return tupleSerialization.getComparator( type ); 601 } 602 603 private Deserializer getDeserializerFor( DataInputStream inputStream, String className ) throws IOException 604 { 605 Deserializer deserializer = deserializers.get( className ); 606 607 if( deserializer == null ) 608 { 609 deserializer = tupleSerialization.getNewDeserializer( className ); 610 deserializer.open( inputStream ); 611 deserializers.put( className, deserializer ); 612 } 613 614 return deserializer; 615 } 616 617 public String getClassNameFor( int token, DataInputStream inputStream ) throws IOException 618 { 619 String className = tupleSerialization.getClassNameFor( token ); 620 621 try 622 { 623 if( className == null ) 624 className = WritableUtils.readString( inputStream ); 625 } 626 catch( IOException exception ) 627 { 628 LOG.error( "unable to resolve token: {}, to a valid classname, with token map of size: {}, rethrowing IOException", token, tupleSerialization.getTokensMapSize() ); 629 throw exception; 630 } 631 632 return className; 633 } 634 635 public void close() 636 { 637 if( deserializers.size() == 0 ) 638 return; 639 640 Collection<Deserializer> clone = new ArrayList<Deserializer>( deserializers.values() ); 641 642 deserializers.clear(); 643 644 for( Deserializer deserializer : clone ) 645 { 646 try 647 { 648 deserializer.close(); 649 } 650 catch( IOException exception ) 651 { 652 // do nothing 653 } 654 } 655 } 656 } 657 658 public static class SerializationElementWriter implements TupleOutputStream.ElementWriter 659 { 660 /** Field LOG */ 661 private static final Logger LOG = LoggerFactory.getLogger( SerializationElementWriter.class ); 662 663 /** Field tupleSerialization */ 664 private final TupleSerialization tupleSerialization; 665 666 /** Field serializers */ 667 final Map<Class, Serializer> serializers = new HashMap<Class, Serializer>(); 668 669 public SerializationElementWriter( TupleSerialization tupleSerialization ) 670 { 671 this.tupleSerialization = tupleSerialization; 672 673 tupleSerialization.initTokenMaps(); 674 } 675 676 public void write( DataOutputStream outputStream, Object object ) throws IOException 677 { 678 Class<?> type = object.getClass(); 679 String className = type.getName(); 680 Integer token = tupleSerialization.getTokenFor( className ); 681 682 if( token == null ) 683 { 684 LOG.debug( "no serialization token found for classname: {}", className ); 685 686 WritableUtils.writeVInt( outputStream, HadoopTupleOutputStream.WRITABLE_TOKEN ); // denotes to punt to hadoop serialization 687 WritableUtils.writeString( outputStream, className ); 688 } 689 else 690 { 691 WritableUtils.writeVInt( outputStream, token ); 692 } 693 694 Serializer serializer = serializers.get( type ); 695 696 if( serializer == null ) 697 { 698 serializer = tupleSerialization.getNewSerializer( type ); 699 serializer.open( outputStream ); 700 serializers.put( type, serializer ); 701 } 702 703 try 704 { 705 serializer.serialize( object ); 706 } 707 catch( IOException exception ) 708 { 709 LOG.error( "failed serializing token: " + token + " with classname: " + className, exception ); 710 711 throw exception; 712 } 713 } 714 715 public void close() 716 { 717 if( serializers.size() == 0 ) 718 return; 719 720 Collection<Serializer> clone = new ArrayList<Serializer>( serializers.values() ); 721 722 serializers.clear(); 723 724 for( Serializer serializer : clone ) 725 { 726 try 727 { 728 serializer.close(); 729 } 730 catch( IOException exception ) 731 { 732 // do nothing 733 } 734 } 735 } 736 } 737 }