001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.hadoop.io; 022 023import java.io.IOException; 024import java.io.InputStream; 025import java.util.IdentityHashMap; 026import java.util.Map; 027 028import cascading.tuple.Tuple; 029import cascading.tuple.io.IndexTuple; 030import cascading.tuple.io.TupleInputStream; 031import cascading.tuple.io.TuplePair; 032import org.apache.hadoop.io.WritableUtils; 033 034/** 035 * 036 */ 037public class HadoopTupleInputStream extends TupleInputStream 038 { 039 private static final Map<Class, TupleElementReader> staticTupleUnTypedElementReaders = new IdentityHashMap<>(); 040 private static final Map<Class, TupleElementReader> staticTupleTypedElementReaders = new IdentityHashMap<>(); 041 042 static 043 { 044 // typed 045 046 staticTupleTypedElementReaders.put( Void.class, new TupleElementReader<HadoopTupleInputStream>() 047 { 048 @Override 049 public Object read( HadoopTupleInputStream stream ) throws IOException 050 { 051 return null; 052 } 053 } ); 054 055 staticTupleTypedElementReaders.put( String.class, new TupleElementReader<HadoopTupleInputStream>() 056 { 057 @Override 058 public Object read( HadoopTupleInputStream stream ) throws IOException 059 { 060 return stream.readString(); 061 } 062 } ); 063 064 staticTupleTypedElementReaders.put( Float.class, new TupleElementReader<HadoopTupleInputStream>() 065 { 066 @Override 067 public Object read( HadoopTupleInputStream stream ) throws IOException 068 { 069 return stream.readNullFloat(); 070 } 071 } ); 072 073 staticTupleTypedElementReaders.put( Double.class, new TupleElementReader<HadoopTupleInputStream>() 074 { 075 @Override 076 public Object read( HadoopTupleInputStream stream ) throws IOException 077 { 078 return stream.readNullDouble(); 079 } 080 } ); 081 082 staticTupleTypedElementReaders.put( Integer.class, new TupleElementReader<HadoopTupleInputStream>() 083 { 084 @Override 085 public Object read( HadoopTupleInputStream stream ) throws IOException 086 { 087 return stream.readNullVInt(); 088 } 089 } ); 090 091 staticTupleTypedElementReaders.put( Long.class, new TupleElementReader<HadoopTupleInputStream>() 092 { 093 @Override 094 public Object read( HadoopTupleInputStream stream ) throws IOException 095 { 096 return stream.readNullVLong(); 097 } 098 } ); 099 100 staticTupleTypedElementReaders.put( Boolean.class, new TupleElementReader<HadoopTupleInputStream>() 101 { 102 @Override 103 public Object read( HadoopTupleInputStream stream ) throws IOException 104 { 105 return stream.readNullBoolean(); 106 } 107 } ); 108 109 staticTupleTypedElementReaders.put( Short.class, new TupleElementReader<HadoopTupleInputStream>() 110 { 111 @Override 112 public Object read( HadoopTupleInputStream stream ) throws IOException 113 { 114 return stream.readNullShort(); 115 } 116 } ); 117 118 staticTupleTypedElementReaders.put( Float.TYPE, new TupleElementReader<HadoopTupleInputStream>() 119 { 120 @Override 121 public Object read( HadoopTupleInputStream stream ) throws IOException 122 { 123 return stream.readFloat(); 124 } 125 } ); 126 127 staticTupleTypedElementReaders.put( Double.TYPE, new TupleElementReader<HadoopTupleInputStream>() 128 { 129 @Override 130 public Object read( HadoopTupleInputStream stream ) throws IOException 131 { 132 return stream.readDouble(); 133 } 134 } ); 135 136 staticTupleTypedElementReaders.put( Integer.TYPE, new TupleElementReader<HadoopTupleInputStream>() 137 { 138 @Override 139 public Object read( HadoopTupleInputStream stream ) throws IOException 140 { 141 return stream.readVInt(); 142 } 143 } ); 144 145 staticTupleTypedElementReaders.put( Long.TYPE, new TupleElementReader<HadoopTupleInputStream>() 146 { 147 @Override 148 public Object read( HadoopTupleInputStream stream ) throws IOException 149 { 150 return stream.readVLong(); 151 } 152 } ); 153 154 staticTupleTypedElementReaders.put( Boolean.TYPE, new TupleElementReader<HadoopTupleInputStream>() 155 { 156 @Override 157 public Object read( HadoopTupleInputStream stream ) throws IOException 158 { 159 return stream.readBoolean(); 160 } 161 } ); 162 163 staticTupleTypedElementReaders.put( Short.TYPE, new TupleElementReader<HadoopTupleInputStream>() 164 { 165 @Override 166 public Object read( HadoopTupleInputStream stream ) throws IOException 167 { 168 return stream.readShort(); 169 } 170 } ); 171 172 staticTupleTypedElementReaders.put( Tuple.class, new TupleElementReader<HadoopTupleInputStream>() 173 { 174 @Override 175 public Object read( HadoopTupleInputStream stream ) throws IOException 176 { 177 return stream.readTuple(); 178 } 179 } ); 180 181 staticTupleTypedElementReaders.put( TuplePair.class, new TupleElementReader<HadoopTupleInputStream>() 182 { 183 @Override 184 public Object read( HadoopTupleInputStream stream ) throws IOException 185 { 186 return stream.readTuplePair(); 187 } 188 } ); 189 190 staticTupleTypedElementReaders.put( IndexTuple.class, new TupleElementReader<HadoopTupleInputStream>() 191 { 192 @Override 193 public Object read( HadoopTupleInputStream stream ) throws IOException 194 { 195 return stream.readIndexTuple(); 196 } 197 } ); 198 } 199 200 public static TupleElementReader[] getReadersFor( final ElementReader elementReader, final Class[] classes ) 201 { 202 if( classes == null || classes.length == 0 ) 203 return null; 204 205 TupleElementReader[] readers = new TupleElementReader[ classes.length ]; 206 207 for( int i = 0; i < classes.length; i++ ) 208 { 209 TupleElementReader reader = staticTupleTypedElementReaders.get( classes[ i ] ); 210 211 if( reader != null ) 212 { 213 readers[ i ] = reader; 214 } 215 else 216 { 217 final int index = i; 218 readers[ i ] = new TupleElementReader() 219 { 220 @Override 221 public Object read( TupleInputStream stream ) throws IOException 222 { 223 return elementReader.read( classes[ index ], stream ); 224 } 225 }; 226 } 227 } 228 229 return readers; 230 } 231 232 public HadoopTupleInputStream( InputStream inputStream, ElementReader elementReader ) 233 { 234 super( inputStream, elementReader ); 235 } 236 237 public int getNumElements() throws IOException 238 { 239 return readVInt(); 240 } 241 242 public int readToken() throws IOException 243 { 244 return readVInt(); 245 } 246 247 public Object getNextElement() throws IOException 248 { 249 return readType( readToken() ); 250 } 251 252 public IndexTuple readIndexTuple( IndexTuple tuple ) throws IOException 253 { 254 tuple.setIndex( readVInt() ); 255 tuple.setTuple( readTuple() ); 256 257 return tuple; 258 } 259 260 public Long readNullVLong() throws IOException 261 { 262 byte b = this.readByte(); 263 264 if( b == 0 ) 265 return null; 266 267 return WritableUtils.readVLong( this ); 268 } 269 270 public long readVLong() throws IOException 271 { 272 return WritableUtils.readVLong( this ); 273 } 274 275 public Integer readNullVInt() throws IOException 276 { 277 byte b = this.readByte(); 278 279 if( b == 0 ) 280 return null; 281 282 return WritableUtils.readVInt( this ); 283 } 284 285 public int readVInt() throws IOException 286 { 287 return WritableUtils.readVInt( this ); 288 } 289 290 public String readString() throws IOException 291 { 292 return WritableUtils.readString( this ); 293 } 294 295 private Short readNullShort() throws IOException 296 { 297 byte b = this.readByte(); 298 299 if( b == 0 ) 300 return null; 301 302 return readShort(); 303 } 304 305 private Object readNullBoolean() throws IOException 306 { 307 byte b = this.readByte(); 308 309 if( b == 0 ) 310 return null; 311 312 return readBoolean(); 313 } 314 315 private Object readNullDouble() throws IOException 316 { 317 byte b = this.readByte(); 318 319 if( b == 0 ) 320 return null; 321 322 return readDouble(); 323 } 324 325 private Object readNullFloat() throws IOException 326 { 327 byte b = this.readByte(); 328 329 if( b == 0 ) 330 return null; 331 332 return readFloat(); 333 } 334 335 protected final Object readType( int type ) throws IOException 336 { 337 switch( type ) 338 { 339 case 0: 340 return null; 341 case 1: 342 return readString(); 343 case 2: 344 return readFloat(); 345 case 3: 346 return readDouble(); 347 case 4: 348 return readVInt(); 349 case 5: 350 return readVLong(); 351 case 6: 352 return readBoolean(); 353 case 7: 354 return readShort(); 355 case 8: 356 return readTuple(); 357 case 9: 358 return readTuplePair(); 359 case 10: 360 return readIndexTuple(); 361 default: 362 return elementReader.read( type, this ); 363 } 364 } 365 366 public final Object readType( Class type ) throws IOException 367 { 368 if( type == Void.class ) 369 return null; 370 371 if( type == String.class ) 372 return readString(); 373 374 if( type == Float.class ) 375 return readNullFloat(); 376 if( type == Double.class ) 377 return readNullDouble(); 378 if( type == Integer.class ) 379 return readNullVInt(); 380 if( type == Long.class ) 381 return readNullVLong(); 382 if( type == Boolean.class ) 383 return readNullBoolean(); 384 if( type == Short.class ) 385 return readNullShort(); 386 387 if( type == Float.TYPE ) 388 return readFloat(); 389 if( type == Double.TYPE ) 390 return readDouble(); 391 if( type == Integer.TYPE ) 392 return readVInt(); 393 if( type == Long.TYPE ) 394 return readVLong(); 395 if( type == Boolean.TYPE ) 396 return readBoolean(); 397 if( type == Short.TYPE ) 398 return readShort(); 399 400 if( type == Tuple.class ) 401 return readTuple(); 402 if( type == TuplePair.class ) 403 return readTuplePair(); 404 if( type == IndexTuple.class ) 405 return readIndexTuple(); 406 else 407 return elementReader.read( type, this ); 408 } 409 }