001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.io; 022 023import java.io.DataInputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.Comparator; 027import java.util.List; 028 029import cascading.tuple.Tuple; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** Class TupleInputStream is used internally to read Tuples from storage. */ 034public abstract class TupleInputStream extends DataInputStream 035 { 036 /** Field LOG */ 037 private static final Logger LOG = LoggerFactory.getLogger( TupleInputStream.class ); 038 039 /** Field inputStream */ 040 protected final InputStream inputStream; 041 /** Field elementReader */ 042 protected final ElementReader elementReader; 043 044 public interface TupleElementReader<T extends TupleInputStream> 045 { 046 Object read( T stream ) throws IOException; 047 } 048 049 public interface ElementReader 050 { 051 Object read( int token, DataInputStream inputStream ) throws IOException; 052 053 Object read( Class type, DataInputStream inputStream ) throws IOException; 054 055 Comparator getComparatorFor( int type, DataInputStream inputStream ) throws IOException; 056 057 void close(); 058 } 059 060 public TupleInputStream( InputStream inputStream, ElementReader elementReader ) 061 { 062 super( inputStream ); 063 this.inputStream = inputStream; 064 this.elementReader = elementReader; 065 } 066 067 public InputStream getInputStream() 068 { 069 return inputStream; 070 } 071 072 public Tuple readTuple() throws IOException 073 { 074 Tuple tuple = new Tuple(); 075 076 return readTuple( tuple ); 077 } 078 079 public <T extends Tuple> T readWith( TupleElementReader[] readers, T tuple ) throws IOException 080 { 081 List<Object> elements = Tuple.elements( tuple ); 082 083 elements.clear(); 084 085 for( int i = 0; i < readers.length; i++ ) 086 { 087 Object element = readers[ i ].read( this ); 088 089 elements.add( element ); 090 } 091 092 return tuple; 093 } 094 095 public <T extends Tuple> T readTuple( T tuple ) throws IOException 096 { 097 return readUnTyped( tuple ); 098 } 099 100 public <T extends Tuple> T readTyped( Class[] classes, T tuple ) throws IOException 101 { 102 List<Object> elements = Tuple.elements( tuple ); 103 104 elements.clear(); 105 106 for( int i = 0; i < classes.length; i++ ) 107 { 108 Class type = classes[ i ]; 109 110 elements.add( readType( type ) ); 111 } 112 113 return tuple; 114 } 115 116 public <T extends Tuple> T readUnTyped( T tuple ) throws IOException 117 { 118 List<Object> elements = Tuple.elements( tuple ); 119 120 elements.clear(); 121 int len = getNumElements(); 122 123 for( int i = 0; i < len; i++ ) 124 elements.add( getNextElement() ); 125 126 return tuple; 127 } 128 129 public abstract int getNumElements() throws IOException; 130 131 public abstract int readToken() throws IOException; 132 133 public abstract Object getNextElement() throws IOException; 134 135 public TuplePair readTuplePair() throws IOException 136 { 137 return readTuplePair( new TuplePair() ); 138 } 139 140 public TuplePair readTuplePair( TuplePair tuplePair ) throws IOException 141 { 142 Tuple[] tuples = TuplePair.tuples( tuplePair ); 143 144 readTuple( tuples[ 0 ] ); // guaranteed to not be null 145 readTuple( tuples[ 1 ] ); // guaranteed to not be null 146 147 return tuplePair; 148 } 149 150 public IndexTuple readIndexTuple() throws IOException 151 { 152 return readIndexTuple( new IndexTuple() ); 153 } 154 155 public abstract IndexTuple readIndexTuple( IndexTuple tuple ) throws IOException; 156 157 protected abstract Object readType( int type ) throws IOException; 158 159 public abstract Object readType( Class type ) throws IOException; 160 161 public Comparator getComparatorFor( int type ) throws IOException 162 { 163 if( type >= 0 && type <= 10 ) 164 return null; 165 166 return elementReader.getComparatorFor( type, this ); 167 } 168 169 @Override 170 public void close() throws IOException 171 { 172 LOG.debug( "closing tuple input stream" ); 173 174 try 175 { 176 super.close(); 177 } 178 finally 179 { 180 if( elementReader != null ) 181 elementReader.close(); 182 } 183 } 184 }