001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop.io;
022
023import java.io.IOException;
024import java.io.OutputStream;
025import java.util.IdentityHashMap;
026import java.util.Map;
027
028import cascading.tuple.Tuple;
029import cascading.tuple.io.IndexTuple;
030import cascading.tuple.io.TupleOutputStream;
031import cascading.tuple.io.TuplePair;
032import org.apache.hadoop.io.WritableUtils;
033
034/**
035 *
036 */
037public class HadoopTupleOutputStream extends TupleOutputStream
038  {
039  /** Field WRITABLE_TOKEN */
040  public static final int WRITABLE_TOKEN = 32;
041
042  private static final Map<Class, TupleElementWriter> staticTupleElementWriters = new IdentityHashMap<Class, TupleElementWriter>();
043
044  static
045    {
046    staticTupleElementWriters.put( String.class, new TupleElementWriter()
047    {
048    @Override
049    public void write( TupleOutputStream stream, Object element ) throws IOException
050      {
051      WritableUtils.writeVInt( stream, 1 );
052      WritableUtils.writeString( stream, (String) element );
053      }
054    } );
055
056    staticTupleElementWriters.put( Float.class, new TupleElementWriter()
057    {
058    @Override
059    public void write( TupleOutputStream stream, Object element ) throws IOException
060      {
061      WritableUtils.writeVInt( stream, 2 );
062      stream.writeFloat( (Float) element );
063      }
064    } );
065
066    staticTupleElementWriters.put( Double.class, new TupleElementWriter()
067    {
068    @Override
069    public void write( TupleOutputStream stream, Object element ) throws IOException
070      {
071      WritableUtils.writeVInt( stream, 3 );
072      stream.writeDouble( (Double) element );
073      }
074    } );
075
076    staticTupleElementWriters.put( Integer.class, new TupleElementWriter()
077    {
078    @Override
079    public void write( TupleOutputStream stream, Object element ) throws IOException
080      {
081      WritableUtils.writeVInt( stream, 4 );
082      WritableUtils.writeVInt( stream, (Integer) element );
083      }
084    } );
085
086    staticTupleElementWriters.put( Long.class, new TupleElementWriter()
087    {
088    @Override
089    public void write( TupleOutputStream stream, Object element ) throws IOException
090      {
091      WritableUtils.writeVInt( stream, 5 );
092      WritableUtils.writeVLong( stream, (Long) element );
093      }
094    } );
095
096    staticTupleElementWriters.put( Boolean.class, new TupleElementWriter()
097    {
098    @Override
099    public void write( TupleOutputStream stream, Object element ) throws IOException
100      {
101      WritableUtils.writeVInt( stream, 6 );
102      stream.writeBoolean( (Boolean) element );
103      }
104    } );
105
106    staticTupleElementWriters.put( Short.class, new TupleElementWriter()
107    {
108    @Override
109    public void write( TupleOutputStream stream, Object element ) throws IOException
110      {
111      WritableUtils.writeVInt( stream, 7 );
112      stream.writeShort( (Short) element );
113      }
114    } );
115
116    staticTupleElementWriters.put( Tuple.class, new TupleElementWriter()
117    {
118    @Override
119    public void write( TupleOutputStream stream, Object element ) throws IOException
120      {
121      WritableUtils.writeVInt( stream, 8 );
122      stream.writeTuple( (Tuple) element );
123      }
124    } );
125
126    staticTupleElementWriters.put( TuplePair.class, new TupleElementWriter()
127    {
128    @Override
129    public void write( TupleOutputStream stream, Object element ) throws IOException
130      {
131      WritableUtils.writeVInt( stream, 9 );
132      stream.writeTuplePair( (TuplePair) element );
133      }
134    } );
135
136    staticTupleElementWriters.put( IndexTuple.class, new TupleElementWriter()
137    {
138    @Override
139    public void write( TupleOutputStream stream, Object element ) throws IOException
140      {
141      WritableUtils.writeVInt( stream, 10 );
142      stream.writeIndexTuple( (IndexTuple) element );
143      }
144    } );
145    }
146
147  public HadoopTupleOutputStream( OutputStream outputStream, ElementWriter elementWriter )
148    {
149    super( staticTupleElementWriters, outputStream, elementWriter );
150    }
151
152  @Override
153  protected void writeIntInternal( int value ) throws IOException
154    {
155    WritableUtils.writeVInt( this, value );
156    }
157
158  public void writeIndexTuple( IndexTuple indexTuple ) throws IOException
159    {
160    writeIntInternal( indexTuple.getIndex() );
161    writeTuple( indexTuple.getTuple() );
162    }
163  }