001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.hadoop.collect;
022    
023    import java.io.File;
024    import java.io.FileInputStream;
025    import java.io.FileOutputStream;
026    import java.io.IOException;
027    import java.io.InputStream;
028    import java.io.OutputStream;
029    
030    import cascading.flow.FlowProcess;
031    import cascading.flow.FlowProcessWrapper;
032    import cascading.flow.hadoop.HadoopFlowProcess;
033    import cascading.tuple.TupleException;
034    import cascading.tuple.collect.SpillableTupleList;
035    import cascading.tuple.hadoop.TupleSerialization;
036    import cascading.tuple.hadoop.io.HadoopTupleInputStream;
037    import cascading.tuple.hadoop.io.HadoopTupleOutputStream;
038    import cascading.tuple.io.TupleInputStream;
039    import cascading.tuple.io.TupleOutputStream;
040    import org.apache.hadoop.io.compress.CodecPool;
041    import org.apache.hadoop.io.compress.CompressionCodec;
042    import org.apache.hadoop.io.compress.Compressor;
043    import org.apache.hadoop.io.compress.Decompressor;
044    import org.apache.hadoop.mapred.JobConf;
045    import org.apache.hadoop.util.ReflectionUtils;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * SpillableTupleList is a simple {@link Iterable} object that can store an unlimited number of {@link cascading.tuple.Tuple} instances by spilling
051     * excess to a temporary disk file.
052     * <p/>
053     * Spills will automatically be compressed using the {@link #defaultCodecs} values. To disable compression or
054     * change the codecs, see {@link cascading.tuple.collect.SpillableProps#SPILL_COMPRESS} and {@link cascading.tuple.collect.SpillableProps#SPILL_CODECS}.
055     * <p/>
056     * It is recommended to add Lzo if available.
057     * {@code "org.apache.hadoop.io.compress.LzoCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec" }
058     */
059    public class HadoopSpillableTupleList extends SpillableTupleList
060      {
061      private static final Logger LOG = LoggerFactory.getLogger( HadoopSpillableTupleList.class );
062    
063      public static final String defaultCodecs = "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec";
064    
065      /** Field codec */
066      private final CompressionCodec codec;
067      /** Field serializationElementWriter */
068      private final TupleSerialization tupleSerialization;
069    
070      public static synchronized CompressionCodec getCodec( FlowProcess flowProcess, String defaultCodecs )
071        {
072        Class<? extends CompressionCodec> codecClass = getCodecClass( flowProcess, defaultCodecs, CompressionCodec.class );
073    
074        if( codecClass == null )
075          return null;
076    
077        if( flowProcess instanceof FlowProcessWrapper )
078          flowProcess = ( (FlowProcessWrapper) flowProcess ).getDelegate();
079    
080        return ReflectionUtils.newInstance( codecClass, ( (HadoopFlowProcess) flowProcess ).getJobConf() );
081        }
082    
083      /**
084       * Constructor SpillableTupleList creates a new SpillableTupleList instance using the given threshold value, and
085       * the first available compression codec, if any.
086       *
087       * @param threshold of type long
088       * @param codec     of type CompressionCodec
089       */
090      public HadoopSpillableTupleList( int threshold, CompressionCodec codec, JobConf jobConf )
091        {
092        super( threshold );
093        this.codec = codec;
094    
095        if( jobConf == null )
096          this.tupleSerialization = new TupleSerialization();
097        else
098          this.tupleSerialization = new TupleSerialization( jobConf );
099        }
100    
101      public HadoopSpillableTupleList( int threshold, TupleSerialization tupleSerialization, CompressionCodec codec )
102        {
103        super( threshold );
104        this.tupleSerialization = tupleSerialization;
105        this.codec = codec;
106        }
107    
108      @Override
109      protected TupleOutputStream createTupleOutputStream( File file )
110        {
111        OutputStream outputStream;
112    
113        try
114          {
115          outputStream = new FileOutputStream( file );
116    
117          Compressor compressor = null;
118    
119          if( codec != null )
120            {
121            compressor = getCompressor();
122            outputStream = codec.createOutputStream( outputStream, compressor );
123            }
124    
125          final Compressor finalCompressor = compressor;
126    
127          return new HadoopTupleOutputStream( outputStream, tupleSerialization.getElementWriter() )
128          {
129          @Override
130          public void close() throws IOException
131            {
132            try
133              {
134              super.close();
135              }
136            finally
137              {
138              if( finalCompressor != null )
139                CodecPool.returnCompressor( finalCompressor );
140              }
141            }
142          };
143          }
144        catch( IOException exception )
145          {
146          throw new TupleException( "unable to create temporary file input stream", exception );
147          }
148        }
149    
150      private Compressor getCompressor()
151        {
152        // some codecs are using direct memory, and the gc for direct memory cannot sometimes keep up
153        // so we attempt to force a gc if we see a OOME once.
154        try
155          {
156          return CodecPool.getCompressor( codec );
157          }
158        catch( OutOfMemoryError error )
159          {
160          System.gc();
161          LOG.info( "received OOME when allocating compressor for codec: {}, retrying once", codec.getClass().getCanonicalName(), error );
162    
163          return CodecPool.getCompressor( codec );
164          }
165        }
166    
167      @Override
168      protected TupleInputStream createTupleInputStream( File file )
169        {
170        try
171          {
172          InputStream inputStream;
173    
174          inputStream = new FileInputStream( file );
175    
176          Decompressor decompressor = null;
177    
178          if( codec != null )
179            {
180            decompressor = getDecompressor();
181            inputStream = codec.createInputStream( inputStream, decompressor );
182            }
183    
184          final Decompressor finalDecompressor = decompressor;
185          return new HadoopTupleInputStream( inputStream, tupleSerialization.getElementReader() )
186          {
187          @Override
188          public void close() throws IOException
189            {
190            try
191              {
192              super.close();
193              }
194            finally
195              {
196              if( finalDecompressor != null )
197                CodecPool.returnDecompressor( finalDecompressor );
198              }
199            }
200          };
201          }
202        catch( IOException exception )
203          {
204          throw new TupleException( "unable to create temporary file output stream", exception );
205          }
206        }
207    
208      private Decompressor getDecompressor()
209        {
210        // some codecs are using direct memory, and the gc for direct memory cannot sometimes keep up
211        // so we attempt to force a gc if we see a OOME once.
212        try
213          {
214          return CodecPool.getDecompressor( codec );
215          }
216        catch( OutOfMemoryError error )
217          {
218          System.gc();
219          LOG.info( "received OOME when allocating decompressor for codec: {}, retrying once", codec.getClass().getCanonicalName(), error );
220    
221          return CodecPool.getDecompressor( codec );
222          }
223        }
224      }