001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.io;
022
023import java.io.DataOutputStream;
024import java.io.IOException;
025import java.io.OutputStream;
026import java.util.List;
027import java.util.Map;
028
029import cascading.tuple.Tuple;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/** Class TupleOutputStream is used internally to write Tuples to storage. */
034public abstract class TupleOutputStream extends DataOutputStream
035  {
036  /** Field LOG */
037  private static final Logger LOG = LoggerFactory.getLogger( TupleOutputStream.class );
038
039  protected interface TupleElementWriter
040    {
041    void write( TupleOutputStream stream, Object element ) throws IOException;
042    }
043
044  private final Map<Class, TupleElementWriter> tupleElementWriters;
045  /** Field elementWriter */
046  final ElementWriter elementWriter;
047
048  public interface ElementWriter
049    {
050    void write( DataOutputStream outputStream, Object object ) throws IOException;
051
052    void close();
053    }
054
055  public TupleOutputStream( Map<Class, TupleElementWriter> tupleElementWriters, OutputStream outputStream, ElementWriter elementWriter )
056    {
057    super( outputStream );
058    this.tupleElementWriters = tupleElementWriters;
059    this.elementWriter = elementWriter;
060    }
061
062  public void writeTuple( Tuple tuple ) throws IOException
063    {
064    write( tuple );
065    }
066
067  public void writeTuplePair( TuplePair tuplePair ) throws IOException
068    {
069    Tuple[] tuples = TuplePair.tuples( tuplePair );
070
071    write( tuples[ 0 ] );
072    write( tuples[ 1 ] );
073    }
074
075  public abstract void writeIndexTuple( IndexTuple indexTuple ) throws IOException;
076
077  /**
078   * Method write is used by Hadoop to write this Tuple instance out to a file.
079   *
080   * @throws java.io.IOException when
081   */
082  private void write( Tuple tuple ) throws IOException
083    {
084    List<Object> elements = Tuple.elements( tuple );
085
086    writeIntInternal( elements.size() );
087
088    for( Object element : elements )
089      writeElement( element );
090    }
091
092  public void writeElementArray( Object[] elements ) throws IOException
093    {
094    writeIntInternal( elements.length );
095
096    for( Object element : elements )
097      writeElement( element );
098    }
099
100  public final void writeElement( Object element ) throws IOException
101    {
102    if( element == null )
103      {
104      writeIntInternal( 0 );
105      return;
106      }
107
108    Class type = element.getClass();
109    TupleElementWriter tupleElementWriter = tupleElementWriters.get( type );
110
111    if( tupleElementWriter != null )
112      tupleElementWriter.write( this, element );
113    else
114      elementWriter.write( this, element );
115    }
116
117  protected abstract void writeIntInternal( int value ) throws IOException;
118
119  @Override
120  public void close() throws IOException
121    {
122    LOG.debug( "closing tuple output stream" );
123
124    try
125      {
126      super.close();
127      }
128    finally
129      {
130      if( elementWriter != null )
131        elementWriter.close();
132      }
133    }
134  }