001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap.hadoop.io; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029 030import cascading.CascadingException; 031import cascading.flow.hadoop.util.HadoopUtil; 032import cascading.util.Util; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.mapred.FileInputFormat; 035import org.apache.hadoop.mapred.InputFormat; 036import org.apache.hadoop.mapred.InputSplit; 037import org.apache.hadoop.mapred.JobConf; 038import org.apache.hadoop.mapred.RecordReader; 039import org.apache.hadoop.mapred.Reporter; 040import org.jets3t.service.S3ServiceException; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * Class MultiInputFormat accepts multiple InputFormat class declarations allowing a single MR job 046 * to read data from incompatible file types. 047 */ 048public class MultiInputFormat implements InputFormat 049 { 050 /** Field LOG */ 051 private static final Logger LOG = LoggerFactory.getLogger( MultiInputFormat.class ); 052 053 /** 054 * Used to set the current JobConf with all sub jobs configurations. 055 * 056 * @param toJob 057 * @param fromJobs 058 */ 059 public static void addInputFormat( JobConf toJob, JobConf... fromJobs ) 060 { 061 toJob.setInputFormat( MultiInputFormat.class ); 062 List<Map<String, String>> configs = new ArrayList<Map<String, String>>(); 063 List<Path> allPaths = new ArrayList<Path>(); 064 065 boolean isLocal = false; 066 067 for( JobConf fromJob : fromJobs ) 068 { 069 if( fromJob.get( "mapred.input.format.class" ) == null ) 070 throw new CascadingException( "mapred.input.format.class is required, should be set in source Scheme#sourceConfInit" ); 071 072 configs.add( HadoopUtil.getConfig( toJob, fromJob ) ); 073 Collections.addAll( allPaths, FileInputFormat.getInputPaths( fromJob ) ); 074 075 if( !isLocal ) 076 isLocal = HadoopUtil.isLocal( fromJob ); 077 } 078 079 if( !allPaths.isEmpty() ) // it's possible there aren't any 080 FileInputFormat.setInputPaths( toJob, (Path[]) allPaths.toArray( new Path[ allPaths.size() ] ) ); 081 082 try 083 { 084 toJob.set( "cascading.multiinputformats", HadoopUtil.serializeBase64( configs, toJob, true ) ); 085 } 086 catch( IOException exception ) 087 { 088 throw new CascadingException( "unable to pack input formats", exception ); 089 } 090 091 if( isLocal ) 092 HadoopUtil.setLocal( toJob ); 093 } 094 095 static InputFormat[] getInputFormats( JobConf[] jobConfs ) 096 { 097 InputFormat[] inputFormats = new InputFormat[ jobConfs.length ]; 098 099 for( int i = 0; i < jobConfs.length; i++ ) 100 inputFormats[ i ] = jobConfs[ i ].getInputFormat(); 101 102 return inputFormats; 103 } 104 105 private List<Map<String, String>> getConfigs( JobConf job ) throws IOException 106 { 107 return (List<Map<String, String>>) 108 HadoopUtil.deserializeBase64( job.get( "cascading.multiinputformats" ), job, ArrayList.class, true ); 109 } 110 111 public void validateInput( JobConf job ) throws IOException 112 { 113 // do nothing, is deprecated 114 } 115 116 /** 117 * Method getSplits delegates to the appropriate InputFormat. 118 * 119 * @param job of type JobConf 120 * @param numSplits of type int 121 * @return InputSplit[] 122 * @throws IOException when 123 */ 124 public InputSplit[] getSplits( JobConf job, int numSplits ) throws IOException 125 { 126 numSplits = numSplits == 0 ? 1 : numSplits; 127 128 List<Map<String, String>> configs = getConfigs( job ); 129 JobConf[] jobConfs = HadoopUtil.getJobConfs( job, configs ); 130 InputFormat[] inputFormats = getInputFormats( jobConfs ); 131 132 // if only one InputFormat, just return what ever it suggests 133 if( inputFormats.length == 1 ) 134 return collapse( getSplits( inputFormats, jobConfs, new int[]{numSplits} ), configs ); 135 136 int[] indexedSplits = new int[ inputFormats.length ]; 137 138 // if we need only a few, the return one for each 139 if( numSplits <= inputFormats.length ) 140 { 141 Arrays.fill( indexedSplits, 1 ); 142 return collapse( getSplits( inputFormats, jobConfs, indexedSplits ), configs ); 143 } 144 145 // attempt to get splits proportionally sized per input format 146 long[] inputSplitSizes = getInputSplitSizes( inputFormats, jobConfs, numSplits ); 147 long totalSplitSize = sum( inputSplitSizes ); 148 149 if( totalSplitSize == 0 ) 150 { 151 Arrays.fill( indexedSplits, 1 ); 152 return collapse( getSplits( inputFormats, jobConfs, indexedSplits ), configs ); 153 } 154 155 for( int i = 0; i < inputSplitSizes.length; i++ ) 156 { 157 int useSplits = (int) Math.ceil( (double) numSplits * inputSplitSizes[ i ] / (double) totalSplitSize ); 158 indexedSplits[ i ] = useSplits == 0 ? 1 : useSplits; 159 } 160 161 return collapse( getSplits( inputFormats, jobConfs, indexedSplits ), configs ); 162 } 163 164 private long sum( long[] inputSizes ) 165 { 166 long size = 0; 167 168 for( long inputSize : inputSizes ) 169 size += inputSize; 170 171 return size; 172 } 173 174 private InputSplit[] collapse( InputSplit[][] splits, List<Map<String, String>> configs ) 175 { 176 List<InputSplit> splitsList = new ArrayList<InputSplit>(); 177 178 for( int i = 0; i < splits.length; i++ ) 179 { 180 Map<String, String> config = configs.get( i ); 181 182 config.remove( "mapred.input.dir" ); // this is a redundant value, will show up cluster side 183 config.remove( "mapreduce.input.fileinputformat.inputdir" ); // hadoop2 184 185 InputSplit[] split = splits[ i ]; 186 187 for( int j = 0; j < split.length; j++ ) 188 splitsList.add( new MultiInputSplit( split[ j ], config ) ); 189 } 190 191 return splitsList.toArray( new InputSplit[ splitsList.size() ] ); 192 } 193 194 private InputSplit[][] getSplits( InputFormat[] inputFormats, JobConf[] jobConfs, int[] numSplits ) throws IOException 195 { 196 InputSplit[][] inputSplits = new InputSplit[ inputFormats.length ][]; 197 198 for( int i = 0; i < inputFormats.length; i++ ) 199 { 200 inputSplits[ i ] = inputFormats[ i ].getSplits( jobConfs[ i ], numSplits[ i ] ); 201 202 // it's reasonable the split array is empty, but really shouldn't be null 203 if( inputSplits[ i ] == null ) 204 inputSplits[ i ] = new InputSplit[ 0 ]; 205 206 for( int j = 0; j < inputSplits[ i ].length; j++ ) 207 { 208 if( inputSplits[ i ][ j ] == null ) 209 throw new IllegalStateException( "input format: " + inputFormats[ i ].getClass().getName() + ", returned a split array with nulls" ); 210 } 211 } 212 213 return inputSplits; 214 } 215 216 private long[] getInputSplitSizes( InputFormat[] inputFormats, JobConf[] jobConfs, int numSplits ) throws IOException 217 { 218 long[] inputSizes = new long[ inputFormats.length ]; 219 220 for( int i = 0; i < inputFormats.length; i++ ) 221 { 222 InputFormat inputFormat = inputFormats[ i ]; 223 InputSplit[] splits = inputFormat.getSplits( jobConfs[ i ], numSplits ); 224 225 inputSizes[ i ] = splits.length; 226 } 227 228 return inputSizes; 229 } 230 231 /** 232 * Method getRecordReader delegates to the appropriate InputFormat. 233 * 234 * @param split of type InputSplit 235 * @param job of type JobConf 236 * @param reporter of type Reporter 237 * @return RecordReader 238 * @throws IOException when 239 */ 240 public RecordReader getRecordReader( InputSplit split, JobConf job, final Reporter reporter ) throws IOException 241 { 242 final MultiInputSplit multiSplit = (MultiInputSplit) split; 243 final JobConf currentConf = HadoopUtil.mergeConf( job, multiSplit.config, true ); 244 245 try 246 { 247 return Util.retry( LOG, 3, 20, "unable to get record reader", new Util.RetryOperator<RecordReader>() 248 { 249 250 @Override 251 public RecordReader operate() throws Exception 252 { 253 return currentConf.getInputFormat().getRecordReader( multiSplit.inputSplit, currentConf, reporter ); 254 } 255 256 @Override 257 public boolean rethrow( Exception exception ) 258 { 259 return !( exception.getCause() instanceof S3ServiceException ); 260 } 261 } ); 262 } 263 catch( Exception exception ) 264 { 265 if( exception instanceof RuntimeException ) 266 throw (RuntimeException) exception; 267 else 268 throw (IOException) exception; 269 } 270 } 271 }