001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.io;
022
023import java.io.DataInputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.Comparator;
027import java.util.List;
028
029import cascading.tuple.Tuple;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/** Class TupleInputStream is used internally to read Tuples from storage. */
034public abstract class TupleInputStream extends DataInputStream
035  {
036  /** Field LOG */
037  private static final Logger LOG = LoggerFactory.getLogger( TupleInputStream.class );
038
039  /** Field inputStream */
040  protected final InputStream inputStream;
041  /** Field elementReader */
042  protected final ElementReader elementReader;
043
044  public interface ElementReader
045    {
046    Object read( int token, DataInputStream inputStream ) throws IOException;
047
048    Comparator getComparatorFor( int type, DataInputStream inputStream ) throws IOException;
049
050    void close();
051    }
052
053  public TupleInputStream( InputStream inputStream, ElementReader elementReader )
054    {
055    super( inputStream );
056    this.inputStream = inputStream;
057    this.elementReader = elementReader;
058    }
059
060  public InputStream getInputStream()
061    {
062    return inputStream;
063    }
064
065  public Tuple readTuple() throws IOException
066    {
067    return readTuple( new Tuple() );
068    }
069
070  public Tuple readTuple( Tuple tuple ) throws IOException
071    {
072    List<Object> elements = Tuple.elements( tuple );
073
074    elements.clear();
075    int len = getNumElements();
076
077    for( int i = 0; i < len; i++ )
078      elements.add( getNextElement() );
079
080    return tuple;
081    }
082
083  public abstract int getNumElements() throws IOException;
084
085  public abstract int readToken() throws IOException;
086
087  public abstract Object getNextElement() throws IOException;
088
089  public TuplePair readTuplePair() throws IOException
090    {
091    return readTuplePair( new TuplePair() );
092    }
093
094  public TuplePair readTuplePair( TuplePair tuplePair ) throws IOException
095    {
096    Tuple[] tuples = TuplePair.tuples( tuplePair );
097
098    readTuple( tuples[ 0 ] ); // guaranteed to not be null
099    readTuple( tuples[ 1 ] ); // guaranteed to not be null
100
101    return tuplePair;
102    }
103
104  public IndexTuple readIndexTuple() throws IOException
105    {
106    return readIndexTuple( new IndexTuple() );
107    }
108
109  public abstract IndexTuple readIndexTuple( IndexTuple indexTuple ) throws IOException;
110
111  protected abstract Object readType( int type ) throws IOException;
112
113  public Comparator getComparatorFor( int type ) throws IOException
114    {
115    if( type >= 0 && type <= 10 )
116      return null;
117
118    return elementReader.getComparatorFor( type, this );
119    }
120
121  @Override
122  public void close() throws IOException
123    {
124    LOG.debug( "closing tuple input stream" );
125
126    try
127      {
128      super.close();
129      }
130    finally
131      {
132      if( elementReader != null )
133        elementReader.close();
134      }
135    }
136  }