001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.io;
022
023import java.io.DataOutputStream;
024import java.io.IOException;
025import java.io.OutputStream;
026import java.util.List;
027import java.util.Map;
028
029import cascading.tuple.Tuple;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/** Class TupleOutputStream is used internally to write Tuples to storage. */
034public abstract class TupleOutputStream extends DataOutputStream
035  {
036  /** Field LOG */
037  private static final Logger LOG = LoggerFactory.getLogger( TupleOutputStream.class );
038
039  public interface TupleElementWriter
040    {
041    void write( TupleOutputStream stream, Object element ) throws IOException;
042    }
043
044  public interface ElementWriter
045    {
046    void write( DataOutputStream outputStream, Object object ) throws IOException;
047
048    void write( DataOutputStream outputStream, Class<?> type, Object object ) throws IOException;
049
050    void close();
051    }
052
053  private final Map<Class, TupleElementWriter> tupleUnTypedElementWriters;
054  private final Map<Class, TupleElementWriter> tupleTypedElementWriters;
055
056  /** Field elementWriter */
057  final ElementWriter elementWriter;
058
059  public TupleOutputStream( Map<Class, TupleElementWriter> tupleUnTypedElementWriters, Map<Class, TupleElementWriter> tupleTypedElementWriters, OutputStream outputStream, ElementWriter elementWriter )
060    {
061    super( outputStream );
062    this.tupleUnTypedElementWriters = tupleUnTypedElementWriters;
063    this.tupleTypedElementWriters = tupleTypedElementWriters;
064    this.elementWriter = elementWriter;
065    }
066
067  public final TupleOutputStream.TupleElementWriter getWriterFor( final Class type ) throws IOException
068    {
069    TupleOutputStream.TupleElementWriter tupleElementWriter = tupleTypedElementWriters.get( type );
070
071    if( tupleElementWriter != null )
072      return tupleElementWriter;
073
074    return new TupleOutputStream.TupleElementWriter()
075    {
076    @Override
077    public void write( TupleOutputStream stream, Object element ) throws IOException
078      {
079      elementWriter.write( stream, type, element );
080      }
081    };
082    }
083
084  public void writeTuple( Tuple tuple ) throws IOException
085    {
086    write( tuple );
087    }
088
089  public void writeTuplePair( TuplePair tuplePair ) throws IOException
090    {
091    Tuple[] tuples = TuplePair.tuples( tuplePair );
092
093    write( tuples[ 0 ] );
094    write( tuples[ 1 ] );
095    }
096
097  public abstract void writeIndexTuple( IndexTuple indexTuple ) throws IOException;
098
099  /**
100   * Method write is used by Hadoop to write this Tuple instance out to a file.
101   *
102   * @throws java.io.IOException when
103   */
104  private void write( Tuple tuple ) throws IOException
105    {
106    writeUnTyped( tuple );
107    }
108
109  public void writeWith( TupleElementWriter[] writers, Tuple tuple ) throws IOException
110    {
111    List<Object> elements = Tuple.elements( tuple );
112
113    for( int i = 0; i < writers.length; i++ )
114      writers[ i ].write( this, elements.get( i ) );
115    }
116
117  public void writeTyped( Class[] classes, Tuple tuple ) throws IOException
118    {
119    List<Object> elements = Tuple.elements( tuple );
120
121    for( int i = 0; i < classes.length; i++ )
122      {
123      Class type = classes[ i ];
124      writeTypedElement( type, elements.get( i ) );
125      }
126    }
127
128  public void writeUnTyped( Tuple tuple ) throws IOException
129    {
130    List<Object> elements = Tuple.elements( tuple );
131
132    writeIntInternal( elements.size() );
133
134    for( Object element : elements )
135      writeElement( element );
136    }
137
138  public void writeElementArray( Object[] elements ) throws IOException
139    {
140    writeIntInternal( elements.length );
141
142    for( Object element : elements )
143      writeElement( element );
144    }
145
146  public final void writeTypedElement( Class type, Object element ) throws IOException
147    {
148    TupleElementWriter tupleElementWriter = tupleTypedElementWriters.get( type );
149
150    if( tupleElementWriter != null )
151      tupleElementWriter.write( this, element );
152    else
153      elementWriter.write( this, type, element );
154    }
155
156  public final void writeElement( Object element ) throws IOException
157    {
158    if( element == null )
159      {
160      writeIntInternal( 0 );
161      return;
162      }
163
164    Class type = element.getClass();
165    TupleElementWriter tupleElementWriter = tupleUnTypedElementWriters.get( type );
166
167    if( tupleElementWriter != null )
168      tupleElementWriter.write( this, element );
169    else
170      elementWriter.write( this, element );
171    }
172
173  protected abstract void writeIntInternal( int value ) throws IOException;
174
175  @Override
176  public void close() throws IOException
177    {
178    LOG.debug( "closing tuple output stream" );
179
180    try
181      {
182      super.close();
183      }
184    finally
185      {
186      if( elementWriter != null )
187        elementWriter.close();
188      }
189    }
190  }