001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.hadoop.io; 022 023import java.io.IOException; 024import java.io.OutputStream; 025import java.util.IdentityHashMap; 026import java.util.Map; 027 028import cascading.tuple.Tuple; 029import cascading.tuple.io.IndexTuple; 030import cascading.tuple.io.TupleOutputStream; 031import cascading.tuple.io.TuplePair; 032import org.apache.hadoop.io.WritableUtils; 033 034/** 035 * 036 */ 037public class HadoopTupleOutputStream extends TupleOutputStream 038 { 039 /** Field WRITABLE_TOKEN */ 040 public static final int WRITABLE_TOKEN = 32; 041 042 private static final Map<Class, TupleElementWriter> staticTupleUnTypedElementWriters = new IdentityHashMap<Class, TupleElementWriter>(); 043 private static final Map<Class, TupleElementWriter> staticTupleTypedElementWriters = new IdentityHashMap<Class, TupleElementWriter>(); 044 045 static 046 { 047 // untyped 048 049 staticTupleUnTypedElementWriters.put( String.class, new TupleElementWriter() 050 { 051 @Override 052 public void write( TupleOutputStream stream, Object element ) throws IOException 053 { 054 WritableUtils.writeVInt( stream, 1 ); 055 WritableUtils.writeString( stream, (String) element ); 056 } 057 } ); 058 059 staticTupleUnTypedElementWriters.put( Float.class, new TupleElementWriter() 060 { 061 @Override 062 public void write( TupleOutputStream stream, Object element ) throws IOException 063 { 064 WritableUtils.writeVInt( stream, 2 ); 065 stream.writeFloat( (Float) element ); 066 } 067 } ); 068 069 staticTupleUnTypedElementWriters.put( Double.class, new TupleElementWriter() 070 { 071 @Override 072 public void write( TupleOutputStream stream, Object element ) throws IOException 073 { 074 WritableUtils.writeVInt( stream, 3 ); 075 stream.writeDouble( (Double) element ); 076 } 077 } ); 078 079 staticTupleUnTypedElementWriters.put( Integer.class, new TupleElementWriter() 080 { 081 @Override 082 public void write( TupleOutputStream stream, Object element ) throws IOException 083 { 084 WritableUtils.writeVInt( stream, 4 ); 085 WritableUtils.writeVInt( stream, (Integer) element ); 086 } 087 } ); 088 089 staticTupleUnTypedElementWriters.put( Long.class, new TupleElementWriter() 090 { 091 @Override 092 public void write( TupleOutputStream stream, Object element ) throws IOException 093 { 094 WritableUtils.writeVInt( stream, 5 ); 095 WritableUtils.writeVLong( stream, (Long) element ); 096 } 097 } ); 098 099 staticTupleUnTypedElementWriters.put( Boolean.class, new TupleElementWriter() 100 { 101 @Override 102 public void write( TupleOutputStream stream, Object element ) throws IOException 103 { 104 WritableUtils.writeVInt( stream, 6 ); 105 stream.writeBoolean( (Boolean) element ); 106 } 107 } ); 108 109 staticTupleUnTypedElementWriters.put( Short.class, new TupleElementWriter() 110 { 111 @Override 112 public void write( TupleOutputStream stream, Object element ) throws IOException 113 { 114 WritableUtils.writeVInt( stream, 7 ); 115 stream.writeShort( (Short) element ); 116 } 117 } ); 118 119 staticTupleUnTypedElementWriters.put( Tuple.class, new TupleElementWriter() 120 { 121 @Override 122 public void write( TupleOutputStream stream, Object element ) throws IOException 123 { 124 WritableUtils.writeVInt( stream, 8 ); 125 stream.writeTuple( (Tuple) element ); 126 } 127 } ); 128 129 staticTupleUnTypedElementWriters.put( TuplePair.class, new TupleElementWriter() 130 { 131 @Override 132 public void write( TupleOutputStream stream, Object element ) throws IOException 133 { 134 WritableUtils.writeVInt( stream, 9 ); 135 stream.writeTuplePair( (TuplePair) element ); 136 } 137 } ); 138 139 staticTupleUnTypedElementWriters.put( IndexTuple.class, new TupleElementWriter() 140 { 141 @Override 142 public void write( TupleOutputStream stream, Object element ) throws IOException 143 { 144 WritableUtils.writeVInt( stream, 10 ); 145 stream.writeIndexTuple( (IndexTuple) element ); 146 } 147 } ); 148 149 // typed 150 151 staticTupleTypedElementWriters.put( Void.class, new TupleElementWriter() 152 { 153 @Override 154 public void write( TupleOutputStream stream, Object element ) throws IOException 155 { 156 // do nothing 157 } 158 } ); 159 160 staticTupleTypedElementWriters.put( String.class, new TupleElementWriter() 161 { 162 @Override 163 public void write( TupleOutputStream stream, Object element ) throws IOException 164 { 165 WritableUtils.writeString( stream, (String) element ); 166 } 167 } ); 168 169 staticTupleTypedElementWriters.put( Float.class, new TupleElementWriter() 170 { 171 @Override 172 public void write( TupleOutputStream stream, Object element ) throws IOException 173 { 174 if( element == null ) 175 { 176 stream.writeByte( 0 ); 177 return; 178 } 179 180 stream.writeByte( 1 ); 181 stream.writeFloat( (Float) element ); 182 } 183 } ); 184 185 staticTupleTypedElementWriters.put( Double.class, new TupleElementWriter() 186 { 187 @Override 188 public void write( TupleOutputStream stream, Object element ) throws IOException 189 { 190 if( element == null ) 191 { 192 stream.writeByte( 0 ); 193 return; 194 } 195 196 stream.writeByte( 1 ); 197 stream.writeDouble( (Double) element ); 198 } 199 } ); 200 201 staticTupleTypedElementWriters.put( Integer.class, new TupleElementWriter() 202 { 203 @Override 204 public void write( TupleOutputStream stream, Object element ) throws IOException 205 { 206 if( element == null ) 207 { 208 stream.writeByte( 0 ); 209 return; 210 } 211 212 stream.writeByte( 1 ); 213 WritableUtils.writeVInt( stream, (Integer) element ); 214 } 215 } ); 216 217 staticTupleTypedElementWriters.put( Long.class, new TupleElementWriter() 218 { 219 @Override 220 public void write( TupleOutputStream stream, Object element ) throws IOException 221 { 222 if( element == null ) 223 { 224 stream.writeByte( 0 ); 225 return; 226 } 227 228 stream.writeByte( 1 ); 229 WritableUtils.writeVLong( stream, (Long) element ); 230 } 231 } ); 232 233 staticTupleTypedElementWriters.put( Boolean.class, new TupleElementWriter() 234 { 235 @Override 236 public void write( TupleOutputStream stream, Object element ) throws IOException 237 { 238 if( element == null ) 239 { 240 stream.writeByte( 0 ); 241 return; 242 } 243 244 stream.writeByte( 1 ); 245 stream.writeBoolean( (Boolean) element ); 246 } 247 } ); 248 249 staticTupleTypedElementWriters.put( Short.class, new TupleElementWriter() 250 { 251 @Override 252 public void write( TupleOutputStream stream, Object element ) throws IOException 253 { 254 if( element == null ) 255 { 256 stream.writeByte( 0 ); 257 return; 258 } 259 260 stream.writeByte( 1 ); 261 stream.writeShort( (Short) element ); 262 } 263 } ); 264 265 staticTupleTypedElementWriters.put( Float.TYPE, new TupleElementWriter() 266 { 267 @Override 268 public void write( TupleOutputStream stream, Object element ) throws IOException 269 { 270 if( element == null ) 271 stream.writeFloat( 0 ); 272 else 273 stream.writeFloat( (Float) element ); 274 } 275 } ); 276 277 staticTupleTypedElementWriters.put( Double.TYPE, new TupleElementWriter() 278 { 279 @Override 280 public void write( TupleOutputStream stream, Object element ) throws IOException 281 { 282 if( element == null ) 283 stream.writeDouble( 0 ); 284 else 285 stream.writeDouble( (Double) element ); 286 } 287 } ); 288 289 staticTupleTypedElementWriters.put( Integer.TYPE, new TupleElementWriter() 290 { 291 @Override 292 public void write( TupleOutputStream stream, Object element ) throws IOException 293 { 294 if( element == null ) 295 WritableUtils.writeVInt( stream, 0 ); 296 else 297 WritableUtils.writeVInt( stream, (Integer) element ); 298 } 299 } ); 300 301 staticTupleTypedElementWriters.put( Long.TYPE, new TupleElementWriter() 302 { 303 @Override 304 public void write( TupleOutputStream stream, Object element ) throws IOException 305 { 306 if( element == null ) 307 WritableUtils.writeVLong( stream, 0 ); 308 else 309 WritableUtils.writeVLong( stream, (Long) element ); 310 } 311 } ); 312 313 staticTupleTypedElementWriters.put( Boolean.TYPE, new TupleElementWriter() 314 { 315 @Override 316 public void write( TupleOutputStream stream, Object element ) throws IOException 317 { 318 if( element == null ) 319 stream.writeBoolean( false ); 320 else 321 stream.writeBoolean( (Boolean) element ); 322 } 323 } ); 324 325 staticTupleTypedElementWriters.put( Short.TYPE, new TupleElementWriter() 326 { 327 @Override 328 public void write( TupleOutputStream stream, Object element ) throws IOException 329 { 330 if( element == null ) 331 stream.writeShort( 0 ); 332 else 333 stream.writeShort( (Short) element ); 334 } 335 } ); 336 337 staticTupleTypedElementWriters.put( Tuple.class, new TupleElementWriter() 338 { 339 @Override 340 public void write( TupleOutputStream stream, Object element ) throws IOException 341 { 342 stream.writeTuple( (Tuple) element ); 343 } 344 } ); 345 346 staticTupleTypedElementWriters.put( TuplePair.class, new TupleElementWriter() 347 { 348 @Override 349 public void write( TupleOutputStream stream, Object element ) throws IOException 350 { 351 stream.writeTuplePair( (TuplePair) element ); 352 } 353 } ); 354 355 staticTupleTypedElementWriters.put( IndexTuple.class, new TupleElementWriter() 356 { 357 @Override 358 public void write( TupleOutputStream stream, Object element ) throws IOException 359 { 360 stream.writeIndexTuple( (IndexTuple) element ); 361 } 362 } ); 363 } 364 365 public static TupleElementWriter[] getWritersFor( final ElementWriter elementWriter, final Class[] keyClasses ) 366 { 367 if( keyClasses == null || keyClasses.length == 0 ) 368 return null; 369 370 TupleElementWriter[] writers = new TupleElementWriter[ keyClasses.length ]; 371 372 for( int i = 0; i < keyClasses.length; i++ ) 373 { 374 TupleElementWriter writer = staticTupleTypedElementWriters.get( keyClasses[ i ] ); 375 376 if( writer != null ) 377 { 378 writers[ i ] = writer; 379 } 380 else 381 { 382 final int index = i; 383 writers[ i ] = new TupleElementWriter() 384 { 385 @Override 386 public void write( TupleOutputStream stream, Object element ) throws IOException 387 { 388 elementWriter.write( stream, keyClasses[ index ], element ); 389 } 390 }; 391 } 392 } 393 394 return writers; 395 } 396 397 public HadoopTupleOutputStream( OutputStream outputStream, ElementWriter elementWriter ) 398 { 399 super( staticTupleUnTypedElementWriters, staticTupleTypedElementWriters, outputStream, elementWriter ); 400 } 401 402 @Override 403 protected void writeIntInternal( int value ) throws IOException 404 { 405 WritableUtils.writeVInt( this, value ); 406 } 407 408 public void writeIndexTuple( IndexTuple indexTuple ) throws IOException 409 { 410 writeIntInternal( indexTuple.getIndex() ); 411 writeTuple( indexTuple.getTuple() ); 412 } 413 }