001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.hadoop; 022 023import java.io.DataInputStream; 024import java.io.DataOutputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.util.Comparator; 029 030import cascading.tuple.Comparison; 031import cascading.tuple.hadoop.util.BytesComparator; 032import org.apache.hadoop.conf.Configured; 033import org.apache.hadoop.io.serializer.Deserializer; 034import org.apache.hadoop.io.serializer.Serialization; 035import org.apache.hadoop.io.serializer.Serializer; 036 037/** 038 * Class BytesSerialization is an implementation of Hadoop's {@link Serialization} interface for use 039 * by {@code byte} arrays ({@code byte[]}). 040 * <p/> 041 * To use, call<br/> 042 * {@code TupleSerializationProps.addSerialization(properties, BytesSerialization.class.getName() );} 043 * <p/> 044 * This class also implements {@link Comparison} so it is not required to set a {@link cascading.tuple.hadoop.util.BytesComparator} 045 * when attempting to group on a byte array via GroupBy or CoGroup. 046 * 047 * @see TupleSerializationProps#addSerialization(java.util.Map, String) 048 * @see cascading.tuple.hadoop.util.BytesComparator 049 * @see Comparison 050 */ 051@SerializationToken(tokens = {126}, classNames = {"[B"}) 052public class BytesSerialization extends Configured implements Comparison<byte[]>, Serialization<byte[]> 053 { 054 public static class RawBytesDeserializer implements Deserializer<byte[]> 055 { 056 private DataInputStream in; 057 058 @Override 059 public void open( InputStream in ) throws IOException 060 { 061 if( in instanceof DataInputStream ) 062 this.in = (DataInputStream) in; 063 else 064 this.in = new DataInputStream( in ); 065 } 066 067 @Override 068 public byte[] deserialize( byte[] existing ) throws IOException 069 { 070 int len = in.readInt(); 071 072 byte[] bytes = existing != null && existing.length == len ? existing : new byte[ len ]; 073 074 in.readFully( bytes ); 075 076 return bytes; 077 } 078 079 @Override 080 public void close() throws IOException 081 { 082 in.close(); 083 } 084 } 085 086 public static class RawBytesSerializer implements Serializer<byte[]> 087 { 088 private DataOutputStream out; 089 090 @Override 091 public void open( OutputStream out ) throws IOException 092 { 093 if( out instanceof DataOutputStream ) 094 this.out = (DataOutputStream) out; 095 else 096 this.out = new DataOutputStream( out ); 097 } 098 099 @Override 100 public void serialize( byte[] bytes ) throws IOException 101 { 102 out.writeInt( bytes.length ); 103 out.write( bytes ); 104 } 105 106 @Override 107 public void close() throws IOException 108 { 109 out.close(); 110 } 111 } 112 113 public BytesSerialization() 114 { 115 } 116 117 @Override 118 public boolean accept( Class<?> c ) 119 { 120 return byte[].class == c; 121 } 122 123 @Override 124 public Serializer<byte[]> getSerializer( Class<byte[]> c ) 125 { 126 return new RawBytesSerializer(); 127 } 128 129 @Override 130 public Deserializer<byte[]> getDeserializer( Class<byte[]> c ) 131 { 132 return new RawBytesDeserializer(); 133 } 134 135 @Override 136 public Comparator<byte[]> getComparator( Class<byte[]> type ) 137 { 138 return new BytesComparator(); 139 } 140 }