001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tuple.hadoop.collect; 022 023 import java.io.File; 024 import java.io.FileInputStream; 025 import java.io.FileOutputStream; 026 import java.io.IOException; 027 import java.io.InputStream; 028 import java.io.OutputStream; 029 030 import cascading.flow.FlowProcess; 031 import cascading.flow.FlowProcessWrapper; 032 import cascading.flow.hadoop.HadoopFlowProcess; 033 import cascading.tuple.TupleException; 034 import cascading.tuple.collect.SpillableTupleList; 035 import cascading.tuple.hadoop.TupleSerialization; 036 import cascading.tuple.hadoop.io.HadoopTupleInputStream; 037 import cascading.tuple.hadoop.io.HadoopTupleOutputStream; 038 import cascading.tuple.io.TupleInputStream; 039 import cascading.tuple.io.TupleOutputStream; 040 import org.apache.hadoop.io.compress.CodecPool; 041 import org.apache.hadoop.io.compress.CompressionCodec; 042 import org.apache.hadoop.io.compress.Compressor; 043 import org.apache.hadoop.io.compress.Decompressor; 044 import org.apache.hadoop.mapred.JobConf; 045 import org.apache.hadoop.util.ReflectionUtils; 046 import org.slf4j.Logger; 047 import org.slf4j.LoggerFactory; 048 049 /** 050 * SpillableTupleList is a simple {@link Iterable} object that can store an unlimited number of {@link cascading.tuple.Tuple} instances by spilling 051 * excess to a temporary disk file. 052 * <p/> 053 * Spills will automatically be compressed using the {@link #defaultCodecs} values. To disable compression or 054 * change the codecs, see {@link cascading.tuple.collect.SpillableProps#SPILL_COMPRESS} and {@link cascading.tuple.collect.SpillableProps#SPILL_CODECS}. 055 * <p/> 056 * It is recommended to add Lzo if available. 057 * {@code "org.apache.hadoop.io.compress.LzoCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec" } 058 */ 059 public class HadoopSpillableTupleList extends SpillableTupleList 060 { 061 private static final Logger LOG = LoggerFactory.getLogger( HadoopSpillableTupleList.class ); 062 063 public static final String defaultCodecs = "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec"; 064 065 /** Field codec */ 066 private final CompressionCodec codec; 067 /** Field serializationElementWriter */ 068 private final TupleSerialization tupleSerialization; 069 070 public static synchronized CompressionCodec getCodec( FlowProcess flowProcess, String defaultCodecs ) 071 { 072 Class<? extends CompressionCodec> codecClass = getCodecClass( flowProcess, defaultCodecs, CompressionCodec.class ); 073 074 if( codecClass == null ) 075 return null; 076 077 if( flowProcess instanceof FlowProcessWrapper ) 078 flowProcess = ( (FlowProcessWrapper) flowProcess ).getDelegate(); 079 080 return ReflectionUtils.newInstance( codecClass, ( (HadoopFlowProcess) flowProcess ).getJobConf() ); 081 } 082 083 /** 084 * Constructor SpillableTupleList creates a new SpillableTupleList instance using the given threshold value, and 085 * the first available compression codec, if any. 086 * 087 * @param threshold of type long 088 * @param codec of type CompressionCodec 089 */ 090 public HadoopSpillableTupleList( int threshold, CompressionCodec codec, JobConf jobConf ) 091 { 092 super( threshold ); 093 this.codec = codec; 094 095 if( jobConf == null ) 096 this.tupleSerialization = new TupleSerialization(); 097 else 098 this.tupleSerialization = new TupleSerialization( jobConf ); 099 } 100 101 public HadoopSpillableTupleList( int threshold, TupleSerialization tupleSerialization, CompressionCodec codec ) 102 { 103 super( threshold ); 104 this.tupleSerialization = tupleSerialization; 105 this.codec = codec; 106 } 107 108 @Override 109 protected TupleOutputStream createTupleOutputStream( File file ) 110 { 111 OutputStream outputStream; 112 113 try 114 { 115 outputStream = new FileOutputStream( file ); 116 117 Compressor compressor = null; 118 119 if( codec != null ) 120 { 121 compressor = getCompressor(); 122 outputStream = codec.createOutputStream( outputStream, compressor ); 123 } 124 125 final Compressor finalCompressor = compressor; 126 127 return new HadoopTupleOutputStream( outputStream, tupleSerialization.getElementWriter() ) 128 { 129 @Override 130 public void close() throws IOException 131 { 132 try 133 { 134 super.close(); 135 } 136 finally 137 { 138 if( finalCompressor != null ) 139 CodecPool.returnCompressor( finalCompressor ); 140 } 141 } 142 }; 143 } 144 catch( IOException exception ) 145 { 146 throw new TupleException( "unable to create temporary file input stream", exception ); 147 } 148 } 149 150 private Compressor getCompressor() 151 { 152 // some codecs are using direct memory, and the gc for direct memory cannot sometimes keep up 153 // so we attempt to force a gc if we see a OOME once. 154 try 155 { 156 return CodecPool.getCompressor( codec ); 157 } 158 catch( OutOfMemoryError error ) 159 { 160 System.gc(); 161 LOG.info( "received OOME when allocating compressor for codec: {}, retrying once", codec.getClass().getCanonicalName(), error ); 162 163 return CodecPool.getCompressor( codec ); 164 } 165 } 166 167 @Override 168 protected TupleInputStream createTupleInputStream( File file ) 169 { 170 try 171 { 172 InputStream inputStream; 173 174 inputStream = new FileInputStream( file ); 175 176 Decompressor decompressor = null; 177 178 if( codec != null ) 179 { 180 decompressor = getDecompressor(); 181 inputStream = codec.createInputStream( inputStream, decompressor ); 182 } 183 184 final Decompressor finalDecompressor = decompressor; 185 return new HadoopTupleInputStream( inputStream, tupleSerialization.getElementReader() ) 186 { 187 @Override 188 public void close() throws IOException 189 { 190 try 191 { 192 super.close(); 193 } 194 finally 195 { 196 if( finalDecompressor != null ) 197 CodecPool.returnDecompressor( finalDecompressor ); 198 } 199 } 200 }; 201 } 202 catch( IOException exception ) 203 { 204 throw new TupleException( "unable to create temporary file output stream", exception ); 205 } 206 } 207 208 private Decompressor getDecompressor() 209 { 210 // some codecs are using direct memory, and the gc for direct memory cannot sometimes keep up 211 // so we attempt to force a gc if we see a OOME once. 212 try 213 { 214 return CodecPool.getDecompressor( codec ); 215 } 216 catch( OutOfMemoryError error ) 217 { 218 System.gc(); 219 LOG.info( "received OOME when allocating decompressor for codec: {}, retrying once", codec.getClass().getCanonicalName(), error ); 220 221 return CodecPool.getDecompressor( codec ); 222 } 223 } 224 }