001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.hadoop;
022    
023    import java.io.DataInputStream;
024    import java.io.DataOutputStream;
025    import java.io.IOException;
026    import java.io.InputStream;
027    import java.io.OutputStream;
028    import java.util.Comparator;
029    
030    import cascading.tuple.Comparison;
031    import cascading.tuple.hadoop.util.BytesComparator;
032    import org.apache.hadoop.conf.Configured;
033    import org.apache.hadoop.io.serializer.Deserializer;
034    import org.apache.hadoop.io.serializer.Serialization;
035    import org.apache.hadoop.io.serializer.Serializer;
036    
037    /**
038     * Class BytesSerialization is an implementation of Hadoop's {@link Serialization} interface for use
039     * by {@code byte} arrays ({@code byte[]}).
040     * <p/>
041     * To use, call<br/>
042     * {@code TupleSerializationProps.addSerialization(properties, BytesSerialization.class.getName() );}
043     * <p/>
044     * This class also implements {@link Comparison} so it is not required to set a {@link cascading.tuple.hadoop.util.BytesComparator}
045     * when attempting to group on a byte array via GroupBy or CoGroup.
046     *
047     * @see TupleSerialization#addSerialization(java.util.Map, String)
048     * @see cascading.tuple.hadoop.util.BytesComparator
049     * @see Comparison
050     */
051    @SerializationToken(tokens = {126}, classNames = {"[B"})
052    public class BytesSerialization extends Configured implements Comparison<byte[]>, Serialization<byte[]>
053      {
054    
055      public static class RawBytesDeserializer implements Deserializer<byte[]>
056        {
057        private DataInputStream in;
058    
059        @Override
060        public void open( InputStream in ) throws IOException
061          {
062          if( in instanceof DataInputStream )
063            this.in = (DataInputStream) in;
064          else
065            this.in = new DataInputStream( in );
066          }
067    
068        @Override
069        public byte[] deserialize( byte[] existing ) throws IOException
070          {
071          int len = in.readInt();
072    
073          byte[] bytes = existing != null && existing.length == len ? existing : new byte[ len ];
074    
075          in.readFully( bytes );
076    
077          return bytes;
078          }
079    
080        @Override
081        public void close() throws IOException
082          {
083          in.close();
084          }
085        }
086    
087      public static class RawBytesSerializer implements Serializer<byte[]>
088        {
089        private DataOutputStream out;
090    
091        @Override
092        public void open( OutputStream out ) throws IOException
093          {
094          if( out instanceof DataOutputStream )
095            this.out = (DataOutputStream) out;
096          else
097            this.out = new DataOutputStream( out );
098          }
099    
100        @Override
101        public void serialize( byte[] bytes ) throws IOException
102          {
103          out.writeInt( bytes.length );
104          out.write( bytes );
105          }
106    
107        @Override
108        public void close() throws IOException
109          {
110          out.close();
111          }
112        }
113    
114    
115      public BytesSerialization()
116        {
117        }
118    
119      @Override
120      public boolean accept( Class<?> c )
121        {
122        return byte[].class == c;
123        }
124    
125      @Override
126      public Serializer<byte[]> getSerializer( Class<byte[]> c )
127        {
128        return new RawBytesSerializer();
129        }
130    
131      @Override
132      public Deserializer<byte[]> getDeserializer( Class<byte[]> c )
133        {
134        return new RawBytesDeserializer();
135        }
136    
137      @Override
138      public Comparator<byte[]> getComparator( Class<byte[]> type )
139        {
140        return new BytesComparator();
141        }
142      }