001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.io;
022    
023    import java.io.DataOutputStream;
024    import java.io.IOException;
025    import java.io.OutputStream;
026    import java.util.List;
027    import java.util.Map;
028    
029    import cascading.tuple.Tuple;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /** Class TupleOutputStream is used internally to write Tuples to storage. */
034    public abstract class TupleOutputStream extends DataOutputStream
035      {
036      /** Field LOG */
037      private static final Logger LOG = LoggerFactory.getLogger( TupleOutputStream.class );
038    
039      protected interface TupleElementWriter
040        {
041        void write( TupleOutputStream stream, Object element ) throws IOException;
042        }
043    
044      private final Map<Class, TupleElementWriter> tupleElementWriters;
045      /** Field elementWriter */
046      final ElementWriter elementWriter;
047    
048      public interface ElementWriter
049        {
050        void write( DataOutputStream outputStream, Object object ) throws IOException;
051    
052        void close();
053        }
054    
055      public TupleOutputStream( Map<Class, TupleElementWriter> tupleElementWriters, OutputStream outputStream, ElementWriter elementWriter )
056        {
057        super( outputStream );
058        this.tupleElementWriters = tupleElementWriters;
059        this.elementWriter = elementWriter;
060        }
061    
062      public void writeTuple( Tuple tuple ) throws IOException
063        {
064        write( tuple );
065        }
066    
067      public void writeTuplePair( TuplePair tuplePair ) throws IOException
068        {
069        Tuple[] tuples = TuplePair.tuples( tuplePair );
070    
071        write( tuples[ 0 ] );
072        write( tuples[ 1 ] );
073        }
074    
075      public abstract void writeIndexTuple( IndexTuple indexTuple ) throws IOException;
076    
077      /**
078       * Method write is used by Hadoop to write this Tuple instance out to a file.
079       *
080       * @throws java.io.IOException when
081       */
082      private void write( Tuple tuple ) throws IOException
083        {
084        List<Object> elements = Tuple.elements( tuple );
085    
086        writeIntInternal( elements.size() );
087    
088        for( Object element : elements )
089          writeElement( element );
090        }
091    
092      public void writeElementArray( Object[] elements ) throws IOException
093        {
094        writeIntInternal( elements.length );
095    
096        for( Object element : elements )
097          writeElement( element );
098        }
099    
100      public final void writeElement( Object element ) throws IOException
101        {
102        if( element == null )
103          {
104          writeIntInternal( 0 );
105          return;
106          }
107    
108        Class type = element.getClass();
109        TupleElementWriter tupleElementWriter = tupleElementWriters.get( type );
110    
111        if( tupleElementWriter != null )
112          tupleElementWriter.write( this, element );
113        else
114          elementWriter.write( this, element );
115        }
116    
117      protected abstract void writeIntInternal( int value ) throws IOException;
118    
119      @Override
120      public void close() throws IOException
121        {
122        LOG.debug( "closing tuple output stream" );
123    
124        try
125          {
126          super.close();
127          }
128        finally
129          {
130          if( elementWriter != null )
131            elementWriter.close();
132          }
133        }
134      }