001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.hadoop.io;
022    
023    import java.io.IOException;
024    import java.io.InputStream;
025    
026    import cascading.tuple.io.IndexTuple;
027    import cascading.tuple.io.TupleInputStream;
028    import org.apache.hadoop.io.WritableUtils;
029    
030    /**
031     *
032     */
033    public class HadoopTupleInputStream extends TupleInputStream
034      {
035      public HadoopTupleInputStream( InputStream inputStream, ElementReader elementReader )
036        {
037        super( inputStream, elementReader );
038        }
039    
040      public int getNumElements() throws IOException
041        {
042        return readVInt();
043        }
044    
045      public int readToken() throws IOException
046        {
047        return readVInt();
048        }
049    
050      public Object getNextElement() throws IOException
051        {
052        return readType( readToken() );
053        }
054    
055      public IndexTuple readIndexTuple( IndexTuple indexTuple ) throws IOException
056        {
057        indexTuple.setIndex( readVInt() );
058        indexTuple.setTuple( readTuple() );
059    
060        return indexTuple;
061        }
062    
063      public long readVLong() throws IOException
064        {
065        return WritableUtils.readVLong( this );
066        }
067    
068      public int readVInt() throws IOException
069        {
070        return WritableUtils.readVInt( this );
071        }
072    
073      public String readString() throws IOException
074        {
075        return WritableUtils.readString( this );
076        }
077    
078      protected final Object readType( int type ) throws IOException
079        {
080        switch( type )
081          {
082          case 0:
083            return null;
084          case 1:
085            return readString();
086          case 2:
087            return readFloat();
088          case 3:
089            return readDouble();
090          case 4:
091            return readVInt();
092          case 5:
093            return readVLong();
094          case 6:
095            return readBoolean();
096          case 7:
097            return readShort();
098          case 8:
099            return readTuple();
100          case 9:
101            return readTuplePair();
102          case 10:
103            return readIndexTuple();
104          default:
105            return elementReader.read( type, this );
106          }
107        }
108      }