001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.io; 022 023import java.io.DataOutputStream; 024import java.io.IOException; 025import java.io.OutputStream; 026import java.util.List; 027import java.util.Map; 028 029import cascading.tuple.Tuple; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** Class TupleOutputStream is used internally to write Tuples to storage. */ 034public abstract class TupleOutputStream extends DataOutputStream 035 { 036 /** Field LOG */ 037 private static final Logger LOG = LoggerFactory.getLogger( TupleOutputStream.class ); 038 039 protected interface TupleElementWriter 040 { 041 void write( TupleOutputStream stream, Object element ) throws IOException; 042 } 043 044 private final Map<Class, TupleElementWriter> tupleElementWriters; 045 /** Field elementWriter */ 046 final ElementWriter elementWriter; 047 048 public interface ElementWriter 049 { 050 void write( DataOutputStream outputStream, Object object ) throws IOException; 051 052 void close(); 053 } 054 055 public TupleOutputStream( Map<Class, TupleElementWriter> tupleElementWriters, OutputStream outputStream, ElementWriter elementWriter ) 056 { 057 super( outputStream ); 058 this.tupleElementWriters = tupleElementWriters; 059 this.elementWriter = elementWriter; 060 } 061 062 public void writeTuple( Tuple tuple ) throws IOException 063 { 064 write( tuple ); 065 } 066 067 public void writeTuplePair( TuplePair tuplePair ) throws IOException 068 { 069 Tuple[] tuples = TuplePair.tuples( tuplePair ); 070 071 write( tuples[ 0 ] ); 072 write( tuples[ 1 ] ); 073 } 074 075 public abstract void writeIndexTuple( IndexTuple indexTuple ) throws IOException; 076 077 /** 078 * Method write is used by Hadoop to write this Tuple instance out to a file. 079 * 080 * @throws java.io.IOException when 081 */ 082 private void write( Tuple tuple ) throws IOException 083 { 084 List<Object> elements = Tuple.elements( tuple ); 085 086 writeIntInternal( elements.size() ); 087 088 for( Object element : elements ) 089 writeElement( element ); 090 } 091 092 public void writeElementArray( Object[] elements ) throws IOException 093 { 094 writeIntInternal( elements.length ); 095 096 for( Object element : elements ) 097 writeElement( element ); 098 } 099 100 public final void writeElement( Object element ) throws IOException 101 { 102 if( element == null ) 103 { 104 writeIntInternal( 0 ); 105 return; 106 } 107 108 Class type = element.getClass(); 109 TupleElementWriter tupleElementWriter = tupleElementWriters.get( type ); 110 111 if( tupleElementWriter != null ) 112 tupleElementWriter.write( this, element ); 113 else 114 elementWriter.write( this, element ); 115 } 116 117 protected abstract void writeIntInternal( int value ) throws IOException; 118 119 @Override 120 public void close() throws IOException 121 { 122 LOG.debug( "closing tuple output stream" ); 123 124 try 125 { 126 super.close(); 127 } 128 finally 129 { 130 if( elementWriter != null ) 131 elementWriter.close(); 132 } 133 } 134 }