001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.hadoop;
022    
023    import java.io.DataInputStream;
024    import java.io.DataOutputStream;
025    import java.io.IOException;
026    import java.io.InputStream;
027    import java.io.OutputStream;
028    import java.math.BigDecimal;
029    import java.math.BigInteger;
030    
031    import org.apache.hadoop.conf.Configured;
032    import org.apache.hadoop.io.serializer.Deserializer;
033    import org.apache.hadoop.io.serializer.Serialization;
034    import org.apache.hadoop.io.serializer.Serializer;
035    
036    /**
037     * Class BigDecimalSerialization is an implementation of Hadoop's {@link org.apache.hadoop.io.serializer.Serialization} interface for use
038     * by {@link BigDecimal} instances.
039     * <p/>
040     * To use, call<br/>
041     * {@code TupleSerializationProps.addSerialization(properties, BigDecimalSerialization.class.getName());}
042     * <p/>
043     *
044     * @see cascading.tuple.hadoop.TupleSerializationProps#addSerialization(java.util.Map, String)
045     */
046    @SerializationToken(tokens = {125}, classNames = {"java.math.BigDecimal"})
047    public class BigDecimalSerialization extends Configured implements Serialization<BigDecimal>
048      {
049      public static class BigDecimalDeserializer implements Deserializer<BigDecimal>
050        {
051        private DataInputStream in;
052    
053        @Override
054        public void open( InputStream in ) throws IOException
055          {
056          if( in instanceof DataInputStream )
057            this.in = (DataInputStream) in;
058          else
059            this.in = new DataInputStream( in );
060          }
061    
062        @Override
063        public BigDecimal deserialize( BigDecimal existing ) throws IOException
064          {
065          int len = in.readInt();
066          byte[] valueBytes = new byte[ len ];
067    
068          in.readFully( valueBytes );
069    
070          BigInteger value = new BigInteger( valueBytes );
071    
072          return new BigDecimal( value, in.readInt() );
073          }
074    
075        @Override
076        public void close() throws IOException
077          {
078          in.close();
079          }
080        }
081    
082      public static class BigDecimalSerializer implements Serializer<BigDecimal>
083        {
084        private DataOutputStream out;
085    
086        @Override
087        public void open( OutputStream out ) throws IOException
088          {
089          if( out instanceof DataOutputStream )
090            this.out = (DataOutputStream) out;
091          else
092            this.out = new DataOutputStream( out );
093          }
094    
095        @Override
096        public void serialize( BigDecimal bigDecimal ) throws IOException
097          {
098          BigInteger value = bigDecimal.unscaledValue();
099          byte[] valueBytes = value.toByteArray();
100    
101          out.writeInt( valueBytes.length );
102          out.write( valueBytes );
103          out.writeInt( bigDecimal.scale() );
104          }
105    
106        @Override
107        public void close() throws IOException
108          {
109          out.close();
110          }
111        }
112    
113    
114      public BigDecimalSerialization()
115        {
116        }
117    
118      @Override
119      public boolean accept( Class<?> c )
120        {
121        return BigDecimal.class == c;
122        }
123    
124      @Override
125      public Serializer<BigDecimal> getSerializer( Class<BigDecimal> c )
126        {
127        return new BigDecimalSerializer();
128        }
129    
130      @Override
131      public Deserializer<BigDecimal> getDeserializer( Class<BigDecimal> c )
132        {
133        return new BigDecimalDeserializer();
134        }
135      }