001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop.io;
022
023import java.io.IOException;
024import java.io.InputStream;
025
026import cascading.tuple.io.IndexTuple;
027import cascading.tuple.io.TupleInputStream;
028import org.apache.hadoop.io.WritableUtils;
029
030/**
031 *
032 */
033public class HadoopTupleInputStream extends TupleInputStream
034  {
035  public HadoopTupleInputStream( InputStream inputStream, ElementReader elementReader )
036    {
037    super( inputStream, elementReader );
038    }
039
040  public int getNumElements() throws IOException
041    {
042    return readVInt();
043    }
044
045  public int readToken() throws IOException
046    {
047    return readVInt();
048    }
049
050  public Object getNextElement() throws IOException
051    {
052    return readType( readToken() );
053    }
054
055  public IndexTuple readIndexTuple( IndexTuple indexTuple ) throws IOException
056    {
057    indexTuple.setIndex( readVInt() );
058    indexTuple.setTuple( readTuple() );
059
060    return indexTuple;
061    }
062
063  public long readVLong() throws IOException
064    {
065    return WritableUtils.readVLong( this );
066    }
067
068  public int readVInt() throws IOException
069    {
070    return WritableUtils.readVInt( this );
071    }
072
073  public String readString() throws IOException
074    {
075    return WritableUtils.readString( this );
076    }
077
078  protected final Object readType( int type ) throws IOException
079    {
080    switch( type )
081      {
082      case 0:
083        return null;
084      case 1:
085        return readString();
086      case 2:
087        return readFloat();
088      case 3:
089        return readDouble();
090      case 4:
091        return readVInt();
092      case 5:
093        return readVLong();
094      case 6:
095        return readBoolean();
096      case 7:
097        return readShort();
098      case 8:
099        return readTuple();
100      case 9:
101        return readTuplePair();
102      case 10:
103        return readIndexTuple();
104      default:
105        return elementReader.read( type, this );
106      }
107    }
108  }