001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.io;
022
023import java.io.DataInputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.Comparator;
027import java.util.List;
028
029import cascading.tuple.Tuple;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/** Class TupleInputStream is used internally to read Tuples from storage. */
034public abstract class TupleInputStream extends DataInputStream
035  {
036  /** Field LOG */
037  private static final Logger LOG = LoggerFactory.getLogger( TupleInputStream.class );
038
039  /** Field inputStream */
040  protected final InputStream inputStream;
041  /** Field elementReader */
042  protected final ElementReader elementReader;
043
044  public interface TupleElementReader<T extends TupleInputStream>
045    {
046    Object read( T stream ) throws IOException;
047    }
048
049  public interface ElementReader
050    {
051    Object read( int token, DataInputStream inputStream ) throws IOException;
052
053    Object read( Class type, DataInputStream inputStream ) throws IOException;
054
055    Comparator getComparatorFor( int type, DataInputStream inputStream ) throws IOException;
056
057    void close();
058    }
059
060  public TupleInputStream( InputStream inputStream, ElementReader elementReader )
061    {
062    super( inputStream );
063    this.inputStream = inputStream;
064    this.elementReader = elementReader;
065    }
066
067  public InputStream getInputStream()
068    {
069    return inputStream;
070    }
071
072  public Tuple readTuple() throws IOException
073    {
074    Tuple tuple = new Tuple();
075
076    return readTuple( tuple );
077    }
078
079  public <T extends Tuple> T readWith( TupleElementReader[] readers, T tuple ) throws IOException
080    {
081    List<Object> elements = Tuple.elements( tuple );
082
083    elements.clear();
084
085    for( int i = 0; i < readers.length; i++ )
086      {
087      Object element = readers[ i ].read( this );
088
089      elements.add( element );
090      }
091
092    return tuple;
093    }
094
095  public <T extends Tuple> T readTuple( T tuple ) throws IOException
096    {
097    return readUnTyped( tuple );
098    }
099
100  public <T extends Tuple> T readTyped( Class[] classes, T tuple ) throws IOException
101    {
102    List<Object> elements = Tuple.elements( tuple );
103
104    elements.clear();
105
106    for( int i = 0; i < classes.length; i++ )
107      {
108      Class type = classes[ i ];
109
110      elements.add( readType( type ) );
111      }
112
113    return tuple;
114    }
115
116  public <T extends Tuple> T readUnTyped( T tuple ) throws IOException
117    {
118    List<Object> elements = Tuple.elements( tuple );
119
120    elements.clear();
121    int len = getNumElements();
122
123    for( int i = 0; i < len; i++ )
124      elements.add( getNextElement() );
125
126    return tuple;
127    }
128
129  public abstract int getNumElements() throws IOException;
130
131  public abstract int readToken() throws IOException;
132
133  public abstract Object getNextElement() throws IOException;
134
135  public TuplePair readTuplePair() throws IOException
136    {
137    return readTuplePair( new TuplePair() );
138    }
139
140  public TuplePair readTuplePair( TuplePair tuplePair ) throws IOException
141    {
142    Tuple[] tuples = TuplePair.tuples( tuplePair );
143
144    readTuple( tuples[ 0 ] ); // guaranteed to not be null
145    readTuple( tuples[ 1 ] ); // guaranteed to not be null
146
147    return tuplePair;
148    }
149
150  public IndexTuple readIndexTuple() throws IOException
151    {
152    return readIndexTuple( new IndexTuple() );
153    }
154
155  public abstract IndexTuple readIndexTuple( IndexTuple tuple ) throws IOException;
156
157  protected abstract Object readType( int type ) throws IOException;
158
159  public abstract Object readType( Class type ) throws IOException;
160
161  public Comparator getComparatorFor( int type ) throws IOException
162    {
163    if( type >= 0 && type <= 10 )
164      return null;
165
166    return elementReader.getComparatorFor( type, this );
167    }
168
169  @Override
170  public void close() throws IOException
171    {
172    LOG.debug( "closing tuple input stream" );
173
174    try
175      {
176      super.close();
177      }
178    finally
179      {
180      if( elementReader != null )
181        elementReader.close();
182      }
183    }
184  }