001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop.io;
022
023import java.io.IOException;
024import java.io.OutputStream;
025import java.util.IdentityHashMap;
026import java.util.Map;
027
028import cascading.tuple.Tuple;
029import cascading.tuple.io.IndexTuple;
030import cascading.tuple.io.TupleOutputStream;
031import cascading.tuple.io.TuplePair;
032import org.apache.hadoop.io.WritableUtils;
033
034/**
035 *
036 */
037public class HadoopTupleOutputStream extends TupleOutputStream
038  {
039  /** Field WRITABLE_TOKEN */
040  public static final int WRITABLE_TOKEN = 32;
041
042  private static final Map<Class, TupleElementWriter> staticTupleUnTypedElementWriters = new IdentityHashMap<Class, TupleElementWriter>();
043  private static final Map<Class, TupleElementWriter> staticTupleTypedElementWriters = new IdentityHashMap<Class, TupleElementWriter>();
044
045  static
046    {
047    // untyped
048
049    staticTupleUnTypedElementWriters.put( String.class, new TupleElementWriter()
050    {
051    @Override
052    public void write( TupleOutputStream stream, Object element ) throws IOException
053      {
054      WritableUtils.writeVInt( stream, 1 );
055      WritableUtils.writeString( stream, (String) element );
056      }
057    } );
058
059    staticTupleUnTypedElementWriters.put( Float.class, new TupleElementWriter()
060    {
061    @Override
062    public void write( TupleOutputStream stream, Object element ) throws IOException
063      {
064      WritableUtils.writeVInt( stream, 2 );
065      stream.writeFloat( (Float) element );
066      }
067    } );
068
069    staticTupleUnTypedElementWriters.put( Double.class, new TupleElementWriter()
070    {
071    @Override
072    public void write( TupleOutputStream stream, Object element ) throws IOException
073      {
074      WritableUtils.writeVInt( stream, 3 );
075      stream.writeDouble( (Double) element );
076      }
077    } );
078
079    staticTupleUnTypedElementWriters.put( Integer.class, new TupleElementWriter()
080    {
081    @Override
082    public void write( TupleOutputStream stream, Object element ) throws IOException
083      {
084      WritableUtils.writeVInt( stream, 4 );
085      WritableUtils.writeVInt( stream, (Integer) element );
086      }
087    } );
088
089    staticTupleUnTypedElementWriters.put( Long.class, new TupleElementWriter()
090    {
091    @Override
092    public void write( TupleOutputStream stream, Object element ) throws IOException
093      {
094      WritableUtils.writeVInt( stream, 5 );
095      WritableUtils.writeVLong( stream, (Long) element );
096      }
097    } );
098
099    staticTupleUnTypedElementWriters.put( Boolean.class, new TupleElementWriter()
100    {
101    @Override
102    public void write( TupleOutputStream stream, Object element ) throws IOException
103      {
104      WritableUtils.writeVInt( stream, 6 );
105      stream.writeBoolean( (Boolean) element );
106      }
107    } );
108
109    staticTupleUnTypedElementWriters.put( Short.class, new TupleElementWriter()
110    {
111    @Override
112    public void write( TupleOutputStream stream, Object element ) throws IOException
113      {
114      WritableUtils.writeVInt( stream, 7 );
115      stream.writeShort( (Short) element );
116      }
117    } );
118
119    staticTupleUnTypedElementWriters.put( Tuple.class, new TupleElementWriter()
120    {
121    @Override
122    public void write( TupleOutputStream stream, Object element ) throws IOException
123      {
124      WritableUtils.writeVInt( stream, 8 );
125      stream.writeTuple( (Tuple) element );
126      }
127    } );
128
129    staticTupleUnTypedElementWriters.put( TuplePair.class, new TupleElementWriter()
130    {
131    @Override
132    public void write( TupleOutputStream stream, Object element ) throws IOException
133      {
134      WritableUtils.writeVInt( stream, 9 );
135      stream.writeTuplePair( (TuplePair) element );
136      }
137    } );
138
139    staticTupleUnTypedElementWriters.put( IndexTuple.class, new TupleElementWriter()
140    {
141    @Override
142    public void write( TupleOutputStream stream, Object element ) throws IOException
143      {
144      WritableUtils.writeVInt( stream, 10 );
145      stream.writeIndexTuple( (IndexTuple) element );
146      }
147    } );
148
149    // typed
150
151    staticTupleTypedElementWriters.put( Void.class, new TupleElementWriter()
152    {
153    @Override
154    public void write( TupleOutputStream stream, Object element ) throws IOException
155      {
156      // do nothing
157      }
158    } );
159
160    staticTupleTypedElementWriters.put( String.class, new TupleElementWriter()
161    {
162    @Override
163    public void write( TupleOutputStream stream, Object element ) throws IOException
164      {
165      WritableUtils.writeString( stream, (String) element );
166      }
167    } );
168
169    staticTupleTypedElementWriters.put( Float.class, new TupleElementWriter()
170    {
171    @Override
172    public void write( TupleOutputStream stream, Object element ) throws IOException
173      {
174      if( element == null )
175        {
176        stream.writeByte( 0 );
177        return;
178        }
179
180      stream.writeByte( 1 );
181      stream.writeFloat( (Float) element );
182      }
183    } );
184
185    staticTupleTypedElementWriters.put( Double.class, new TupleElementWriter()
186    {
187    @Override
188    public void write( TupleOutputStream stream, Object element ) throws IOException
189      {
190      if( element == null )
191        {
192        stream.writeByte( 0 );
193        return;
194        }
195
196      stream.writeByte( 1 );
197      stream.writeDouble( (Double) element );
198      }
199    } );
200
201    staticTupleTypedElementWriters.put( Integer.class, new TupleElementWriter()
202    {
203    @Override
204    public void write( TupleOutputStream stream, Object element ) throws IOException
205      {
206      if( element == null )
207        {
208        stream.writeByte( 0 );
209        return;
210        }
211
212      stream.writeByte( 1 );
213      WritableUtils.writeVInt( stream, (Integer) element );
214      }
215    } );
216
217    staticTupleTypedElementWriters.put( Long.class, new TupleElementWriter()
218    {
219    @Override
220    public void write( TupleOutputStream stream, Object element ) throws IOException
221      {
222      if( element == null )
223        {
224        stream.writeByte( 0 );
225        return;
226        }
227
228      stream.writeByte( 1 );
229      WritableUtils.writeVLong( stream, (Long) element );
230      }
231    } );
232
233    staticTupleTypedElementWriters.put( Boolean.class, new TupleElementWriter()
234    {
235    @Override
236    public void write( TupleOutputStream stream, Object element ) throws IOException
237      {
238      if( element == null )
239        {
240        stream.writeByte( 0 );
241        return;
242        }
243
244      stream.writeByte( 1 );
245      stream.writeBoolean( (Boolean) element );
246      }
247    } );
248
249    staticTupleTypedElementWriters.put( Short.class, new TupleElementWriter()
250    {
251    @Override
252    public void write( TupleOutputStream stream, Object element ) throws IOException
253      {
254      if( element == null )
255        {
256        stream.writeByte( 0 );
257        return;
258        }
259
260      stream.writeByte( 1 );
261      stream.writeShort( (Short) element );
262      }
263    } );
264
265    staticTupleTypedElementWriters.put( Float.TYPE, new TupleElementWriter()
266    {
267    @Override
268    public void write( TupleOutputStream stream, Object element ) throws IOException
269      {
270      if( element == null )
271        stream.writeFloat( 0 );
272      else
273        stream.writeFloat( (Float) element );
274      }
275    } );
276
277    staticTupleTypedElementWriters.put( Double.TYPE, new TupleElementWriter()
278    {
279    @Override
280    public void write( TupleOutputStream stream, Object element ) throws IOException
281      {
282      if( element == null )
283        stream.writeDouble( 0 );
284      else
285        stream.writeDouble( (Double) element );
286      }
287    } );
288
289    staticTupleTypedElementWriters.put( Integer.TYPE, new TupleElementWriter()
290    {
291    @Override
292    public void write( TupleOutputStream stream, Object element ) throws IOException
293      {
294      if( element == null )
295        WritableUtils.writeVInt( stream, 0 );
296      else
297        WritableUtils.writeVInt( stream, (Integer) element );
298      }
299    } );
300
301    staticTupleTypedElementWriters.put( Long.TYPE, new TupleElementWriter()
302    {
303    @Override
304    public void write( TupleOutputStream stream, Object element ) throws IOException
305      {
306      if( element == null )
307        WritableUtils.writeVLong( stream, 0 );
308      else
309        WritableUtils.writeVLong( stream, (Long) element );
310      }
311    } );
312
313    staticTupleTypedElementWriters.put( Boolean.TYPE, new TupleElementWriter()
314    {
315    @Override
316    public void write( TupleOutputStream stream, Object element ) throws IOException
317      {
318      if( element == null )
319        stream.writeBoolean( false );
320      else
321        stream.writeBoolean( (Boolean) element );
322      }
323    } );
324
325    staticTupleTypedElementWriters.put( Short.TYPE, new TupleElementWriter()
326    {
327    @Override
328    public void write( TupleOutputStream stream, Object element ) throws IOException
329      {
330      if( element == null )
331        stream.writeShort( 0 );
332      else
333        stream.writeShort( (Short) element );
334      }
335    } );
336
337    staticTupleTypedElementWriters.put( Tuple.class, new TupleElementWriter()
338    {
339    @Override
340    public void write( TupleOutputStream stream, Object element ) throws IOException
341      {
342      stream.writeTuple( (Tuple) element );
343      }
344    } );
345
346    staticTupleTypedElementWriters.put( TuplePair.class, new TupleElementWriter()
347    {
348    @Override
349    public void write( TupleOutputStream stream, Object element ) throws IOException
350      {
351      stream.writeTuplePair( (TuplePair) element );
352      }
353    } );
354
355    staticTupleTypedElementWriters.put( IndexTuple.class, new TupleElementWriter()
356    {
357    @Override
358    public void write( TupleOutputStream stream, Object element ) throws IOException
359      {
360      stream.writeIndexTuple( (IndexTuple) element );
361      }
362    } );
363    }
364
365  public static TupleElementWriter[] getWritersFor( final ElementWriter elementWriter, final Class[] keyClasses )
366    {
367    if( keyClasses == null || keyClasses.length == 0 )
368      return null;
369
370    TupleElementWriter[] writers = new TupleElementWriter[ keyClasses.length ];
371
372    for( int i = 0; i < keyClasses.length; i++ )
373      {
374      TupleElementWriter writer = staticTupleTypedElementWriters.get( keyClasses[ i ] );
375
376      if( writer != null )
377        {
378        writers[ i ] = writer;
379        }
380      else
381        {
382        final int index = i;
383        writers[ i ] = new TupleElementWriter()
384        {
385        @Override
386        public void write( TupleOutputStream stream, Object element ) throws IOException
387          {
388          elementWriter.write( stream, keyClasses[ index ], element );
389          }
390        };
391        }
392      }
393
394    return writers;
395    }
396
397  public HadoopTupleOutputStream( OutputStream outputStream, ElementWriter elementWriter )
398    {
399    super( staticTupleUnTypedElementWriters, staticTupleTypedElementWriters, outputStream, elementWriter );
400    }
401
402  @Override
403  protected void writeIntInternal( int value ) throws IOException
404    {
405    WritableUtils.writeVInt( this, value );
406    }
407
408  public void writeIndexTuple( IndexTuple indexTuple ) throws IOException
409    {
410    writeIntInternal( indexTuple.getIndex() );
411    writeTuple( indexTuple.getTuple() );
412    }
413  }