001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tuple.hadoop.io; 022 023 import java.io.IOException; 024 import java.io.InputStream; 025 026 import cascading.tuple.io.IndexTuple; 027 import cascading.tuple.io.TupleInputStream; 028 import org.apache.hadoop.io.WritableUtils; 029 030 /** 031 * 032 */ 033 public class HadoopTupleInputStream extends TupleInputStream 034 { 035 public HadoopTupleInputStream( InputStream inputStream, ElementReader elementReader ) 036 { 037 super( inputStream, elementReader ); 038 } 039 040 public int getNumElements() throws IOException 041 { 042 return readVInt(); 043 } 044 045 public int readToken() throws IOException 046 { 047 return readVInt(); 048 } 049 050 public Object getNextElement() throws IOException 051 { 052 return readType( readToken() ); 053 } 054 055 public IndexTuple readIndexTuple( IndexTuple indexTuple ) throws IOException 056 { 057 indexTuple.setIndex( readVInt() ); 058 indexTuple.setTuple( readTuple() ); 059 060 return indexTuple; 061 } 062 063 public long readVLong() throws IOException 064 { 065 return WritableUtils.readVLong( this ); 066 } 067 068 public int readVInt() throws IOException 069 { 070 return WritableUtils.readVInt( this ); 071 } 072 073 public String readString() throws IOException 074 { 075 return WritableUtils.readString( this ); 076 } 077 078 protected final Object readType( int type ) throws IOException 079 { 080 switch( type ) 081 { 082 case 0: 083 return null; 084 case 1: 085 return readString(); 086 case 2: 087 return readFloat(); 088 case 3: 089 return readDouble(); 090 case 4: 091 return readVInt(); 092 case 5: 093 return readVLong(); 094 case 6: 095 return readBoolean(); 096 case 7: 097 return readShort(); 098 case 8: 099 return readTuple(); 100 case 9: 101 return readTuplePair(); 102 case 10: 103 return readIndexTuple(); 104 default: 105 return elementReader.read( type, this ); 106 } 107 } 108 }