001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tuple.hadoop; 022 023 import java.io.DataInputStream; 024 import java.io.DataOutputStream; 025 import java.io.IOException; 026 import java.io.InputStream; 027 import java.io.OutputStream; 028 import java.util.Comparator; 029 030 import cascading.tuple.Comparison; 031 import cascading.tuple.hadoop.util.BytesComparator; 032 import org.apache.hadoop.conf.Configured; 033 import org.apache.hadoop.io.serializer.Deserializer; 034 import org.apache.hadoop.io.serializer.Serialization; 035 import org.apache.hadoop.io.serializer.Serializer; 036 037 /** 038 * Class BytesSerialization is an implementation of Hadoop's {@link Serialization} interface for use 039 * by {@code byte} arrays ({@code byte[]}). 040 * <p/> 041 * To use, call<br/> 042 * {@code TupleSerializationProps.addSerialization(properties, BytesSerialization.class.getName() );} 043 * <p/> 044 * This class also implements {@link Comparison} so it is not required to set a {@link cascading.tuple.hadoop.util.BytesComparator} 045 * when attempting to group on a byte array via GroupBy or CoGroup. 046 * 047 * @see TupleSerialization#addSerialization(java.util.Map, String) 048 * @see cascading.tuple.hadoop.util.BytesComparator 049 * @see Comparison 050 */ 051 @SerializationToken(tokens = {126}, classNames = {"[B"}) 052 public class BytesSerialization extends Configured implements Comparison<byte[]>, Serialization<byte[]> 053 { 054 055 public static class RawBytesDeserializer implements Deserializer<byte[]> 056 { 057 private DataInputStream in; 058 059 @Override 060 public void open( InputStream in ) throws IOException 061 { 062 if( in instanceof DataInputStream ) 063 this.in = (DataInputStream) in; 064 else 065 this.in = new DataInputStream( in ); 066 } 067 068 @Override 069 public byte[] deserialize( byte[] existing ) throws IOException 070 { 071 int len = in.readInt(); 072 073 byte[] bytes = existing != null && existing.length == len ? existing : new byte[ len ]; 074 075 in.readFully( bytes ); 076 077 return bytes; 078 } 079 080 @Override 081 public void close() throws IOException 082 { 083 in.close(); 084 } 085 } 086 087 public static class RawBytesSerializer implements Serializer<byte[]> 088 { 089 private DataOutputStream out; 090 091 @Override 092 public void open( OutputStream out ) throws IOException 093 { 094 if( out instanceof DataOutputStream ) 095 this.out = (DataOutputStream) out; 096 else 097 this.out = new DataOutputStream( out ); 098 } 099 100 @Override 101 public void serialize( byte[] bytes ) throws IOException 102 { 103 out.writeInt( bytes.length ); 104 out.write( bytes ); 105 } 106 107 @Override 108 public void close() throws IOException 109 { 110 out.close(); 111 } 112 } 113 114 115 public BytesSerialization() 116 { 117 } 118 119 @Override 120 public boolean accept( Class<?> c ) 121 { 122 return byte[].class == c; 123 } 124 125 @Override 126 public Serializer<byte[]> getSerializer( Class<byte[]> c ) 127 { 128 return new RawBytesSerializer(); 129 } 130 131 @Override 132 public Deserializer<byte[]> getDeserializer( Class<byte[]> c ) 133 { 134 return new RawBytesDeserializer(); 135 } 136 137 @Override 138 public Comparator<byte[]> getComparator( Class<byte[]> type ) 139 { 140 return new BytesComparator(); 141 } 142 }