001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.io; 022 023import java.io.DataOutputStream; 024import java.io.IOException; 025import java.io.OutputStream; 026import java.util.List; 027import java.util.Map; 028 029import cascading.tuple.Tuple; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** Class TupleOutputStream is used internally to write Tuples to storage. */ 034public abstract class TupleOutputStream extends DataOutputStream 035 { 036 /** Field LOG */ 037 private static final Logger LOG = LoggerFactory.getLogger( TupleOutputStream.class ); 038 039 public interface TupleElementWriter 040 { 041 void write( TupleOutputStream stream, Object element ) throws IOException; 042 } 043 044 public interface ElementWriter 045 { 046 void write( DataOutputStream outputStream, Object object ) throws IOException; 047 048 void write( DataOutputStream outputStream, Class<?> type, Object object ) throws IOException; 049 050 void close(); 051 } 052 053 private final Map<Class, TupleElementWriter> tupleUnTypedElementWriters; 054 private final Map<Class, TupleElementWriter> tupleTypedElementWriters; 055 056 /** Field elementWriter */ 057 final ElementWriter elementWriter; 058 059 public TupleOutputStream( Map<Class, TupleElementWriter> tupleUnTypedElementWriters, Map<Class, TupleElementWriter> tupleTypedElementWriters, OutputStream outputStream, ElementWriter elementWriter ) 060 { 061 super( outputStream ); 062 this.tupleUnTypedElementWriters = tupleUnTypedElementWriters; 063 this.tupleTypedElementWriters = tupleTypedElementWriters; 064 this.elementWriter = elementWriter; 065 } 066 067 public final TupleOutputStream.TupleElementWriter getWriterFor( final Class type ) throws IOException 068 { 069 TupleOutputStream.TupleElementWriter tupleElementWriter = tupleTypedElementWriters.get( type ); 070 071 if( tupleElementWriter != null ) 072 return tupleElementWriter; 073 074 return new TupleOutputStream.TupleElementWriter() 075 { 076 @Override 077 public void write( TupleOutputStream stream, Object element ) throws IOException 078 { 079 elementWriter.write( stream, type, element ); 080 } 081 }; 082 } 083 084 public void writeTuple( Tuple tuple ) throws IOException 085 { 086 write( tuple ); 087 } 088 089 public void writeTuplePair( TuplePair tuplePair ) throws IOException 090 { 091 Tuple[] tuples = TuplePair.tuples( tuplePair ); 092 093 write( tuples[ 0 ] ); 094 write( tuples[ 1 ] ); 095 } 096 097 public abstract void writeIndexTuple( IndexTuple indexTuple ) throws IOException; 098 099 /** 100 * Method write is used by Hadoop to write this Tuple instance out to a file. 101 * 102 * @throws java.io.IOException when 103 */ 104 private void write( Tuple tuple ) throws IOException 105 { 106 writeUnTyped( tuple ); 107 } 108 109 public void writeWith( TupleElementWriter[] writers, Tuple tuple ) throws IOException 110 { 111 List<Object> elements = Tuple.elements( tuple ); 112 113 for( int i = 0; i < writers.length; i++ ) 114 writers[ i ].write( this, elements.get( i ) ); 115 } 116 117 public void writeTyped( Class[] classes, Tuple tuple ) throws IOException 118 { 119 List<Object> elements = Tuple.elements( tuple ); 120 121 for( int i = 0; i < classes.length; i++ ) 122 { 123 Class type = classes[ i ]; 124 writeTypedElement( type, elements.get( i ) ); 125 } 126 } 127 128 public void writeUnTyped( Tuple tuple ) throws IOException 129 { 130 List<Object> elements = Tuple.elements( tuple ); 131 132 writeIntInternal( elements.size() ); 133 134 for( Object element : elements ) 135 writeElement( element ); 136 } 137 138 public void writeElementArray( Object[] elements ) throws IOException 139 { 140 writeIntInternal( elements.length ); 141 142 for( Object element : elements ) 143 writeElement( element ); 144 } 145 146 public final void writeTypedElement( Class type, Object element ) throws IOException 147 { 148 TupleElementWriter tupleElementWriter = tupleTypedElementWriters.get( type ); 149 150 if( tupleElementWriter != null ) 151 tupleElementWriter.write( this, element ); 152 else 153 elementWriter.write( this, type, element ); 154 } 155 156 public final void writeElement( Object element ) throws IOException 157 { 158 if( element == null ) 159 { 160 writeIntInternal( 0 ); 161 return; 162 } 163 164 Class type = element.getClass(); 165 TupleElementWriter tupleElementWriter = tupleUnTypedElementWriters.get( type ); 166 167 if( tupleElementWriter != null ) 168 tupleElementWriter.write( this, element ); 169 else 170 elementWriter.write( this, element ); 171 } 172 173 protected abstract void writeIntInternal( int value ) throws IOException; 174 175 @Override 176 public void close() throws IOException 177 { 178 LOG.debug( "closing tuple output stream" ); 179 180 try 181 { 182 super.close(); 183 } 184 finally 185 { 186 if( elementWriter != null ) 187 elementWriter.close(); 188 } 189 } 190 }