001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop.io;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029
030import cascading.CascadingException;
031import cascading.flow.hadoop.util.HadoopUtil;
032import cascading.util.Util;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.mapred.FileInputFormat;
035import org.apache.hadoop.mapred.InputFormat;
036import org.apache.hadoop.mapred.InputSplit;
037import org.apache.hadoop.mapred.JobConf;
038import org.apache.hadoop.mapred.RecordReader;
039import org.apache.hadoop.mapred.Reporter;
040import org.jets3t.service.S3ServiceException;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Class MultiInputFormat accepts multiple InputFormat class declarations allowing a single MR job
046 * to read data from incompatible file types.
047 */
048public class MultiInputFormat implements InputFormat
049  {
050  /** Field LOG */
051  private static final Logger LOG = LoggerFactory.getLogger( MultiInputFormat.class );
052
053  /**
054   * Used to set the current JobConf with all sub jobs configurations.
055   *
056   * @param toJob
057   * @param fromJobs
058   */
059  public static void addInputFormat( JobConf toJob, JobConf... fromJobs )
060    {
061    toJob.setInputFormat( MultiInputFormat.class );
062    List<Map<String, String>> configs = new ArrayList<Map<String, String>>();
063    List<Path> allPaths = new ArrayList<Path>();
064
065    boolean isLocal = false;
066
067    for( JobConf fromJob : fromJobs )
068      {
069      if( fromJob.get( "mapred.input.format.class" ) == null )
070        throw new CascadingException( "mapred.input.format.class is required, should be set in source Scheme#sourceConfInit" );
071
072      configs.add( HadoopUtil.getConfig( toJob, fromJob ) );
073      Collections.addAll( allPaths, FileInputFormat.getInputPaths( fromJob ) );
074
075      if( !isLocal )
076        isLocal = HadoopUtil.isLocal( fromJob );
077      }
078
079    if( !allPaths.isEmpty() ) // it's possible there aren't any
080      FileInputFormat.setInputPaths( toJob, (Path[]) allPaths.toArray( new Path[ allPaths.size() ] ) );
081
082    try
083      {
084      toJob.set( "cascading.multiinputformats", HadoopUtil.serializeBase64( configs, toJob, true ) );
085      }
086    catch( IOException exception )
087      {
088      throw new CascadingException( "unable to pack input formats", exception );
089      }
090
091    if( isLocal )
092      HadoopUtil.setLocal( toJob );
093    }
094
095  static InputFormat[] getInputFormats( JobConf[] jobConfs )
096    {
097    InputFormat[] inputFormats = new InputFormat[ jobConfs.length ];
098
099    for( int i = 0; i < jobConfs.length; i++ )
100      inputFormats[ i ] = jobConfs[ i ].getInputFormat();
101
102    return inputFormats;
103    }
104
105  private List<Map<String, String>> getConfigs( JobConf job ) throws IOException
106    {
107    return (List<Map<String, String>>)
108      HadoopUtil.deserializeBase64( job.get( "cascading.multiinputformats" ), job, ArrayList.class, true );
109    }
110
111  public void validateInput( JobConf job ) throws IOException
112    {
113    // do nothing, is deprecated
114    }
115
116  /**
117   * Method getSplits delegates to the appropriate InputFormat.
118   *
119   * @param job       of type JobConf
120   * @param numSplits of type int
121   * @return InputSplit[]
122   * @throws IOException when
123   */
124  public InputSplit[] getSplits( JobConf job, int numSplits ) throws IOException
125    {
126    numSplits = numSplits == 0 ? 1 : numSplits;
127
128    List<Map<String, String>> configs = getConfigs( job );
129    JobConf[] jobConfs = HadoopUtil.getJobConfs( job, configs );
130    InputFormat[] inputFormats = getInputFormats( jobConfs );
131
132    // if only one InputFormat, just return what ever it suggests
133    if( inputFormats.length == 1 )
134      return collapse( getSplits( inputFormats, jobConfs, new int[]{numSplits} ), configs );
135
136    int[] indexedSplits = new int[ inputFormats.length ];
137
138    // if we need only a few, the return one for each
139    if( numSplits <= inputFormats.length )
140      {
141      Arrays.fill( indexedSplits, 1 );
142      return collapse( getSplits( inputFormats, jobConfs, indexedSplits ), configs );
143      }
144
145    // attempt to get splits proportionally sized per input format
146    long[] inputSplitSizes = getInputSplitSizes( inputFormats, jobConfs, numSplits );
147    long totalSplitSize = sum( inputSplitSizes );
148
149    if( totalSplitSize == 0 )
150      {
151      Arrays.fill( indexedSplits, 1 );
152      return collapse( getSplits( inputFormats, jobConfs, indexedSplits ), configs );
153      }
154
155    for( int i = 0; i < inputSplitSizes.length; i++ )
156      {
157      int useSplits = (int) Math.ceil( (double) numSplits * inputSplitSizes[ i ] / (double) totalSplitSize );
158      indexedSplits[ i ] = useSplits == 0 ? 1 : useSplits;
159      }
160
161    return collapse( getSplits( inputFormats, jobConfs, indexedSplits ), configs );
162    }
163
164  private long sum( long[] inputSizes )
165    {
166    long size = 0;
167
168    for( long inputSize : inputSizes )
169      size += inputSize;
170
171    return size;
172    }
173
174  private InputSplit[] collapse( InputSplit[][] splits, List<Map<String, String>> configs )
175    {
176    List<InputSplit> splitsList = new ArrayList<InputSplit>();
177
178    for( int i = 0; i < splits.length; i++ )
179      {
180      Map<String, String> config = configs.get( i );
181
182      config.remove( "mapred.input.dir" ); // this is a redundant value, will show up cluster side
183      config.remove( "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
184
185      InputSplit[] split = splits[ i ];
186
187      for( int j = 0; j < split.length; j++ )
188        splitsList.add( new MultiInputSplit( split[ j ], config ) );
189      }
190
191    return splitsList.toArray( new InputSplit[ splitsList.size() ] );
192    }
193
194  private InputSplit[][] getSplits( InputFormat[] inputFormats, JobConf[] jobConfs, int[] numSplits ) throws IOException
195    {
196    InputSplit[][] inputSplits = new InputSplit[ inputFormats.length ][];
197
198    for( int i = 0; i < inputFormats.length; i++ )
199      {
200      inputSplits[ i ] = inputFormats[ i ].getSplits( jobConfs[ i ], numSplits[ i ] );
201
202      // it's reasonable the split array is empty, but really shouldn't be null
203      if( inputSplits[ i ] == null )
204        inputSplits[ i ] = new InputSplit[ 0 ];
205
206      for( int j = 0; j < inputSplits[ i ].length; j++ )
207        {
208        if( inputSplits[ i ][ j ] == null )
209          throw new IllegalStateException( "input format: " + inputFormats[ i ].getClass().getName() + ", returned a split array with nulls" );
210        }
211      }
212
213    return inputSplits;
214    }
215
216  private long[] getInputSplitSizes( InputFormat[] inputFormats, JobConf[] jobConfs, int numSplits ) throws IOException
217    {
218    long[] inputSizes = new long[ inputFormats.length ];
219
220    for( int i = 0; i < inputFormats.length; i++ )
221      {
222      InputFormat inputFormat = inputFormats[ i ];
223      InputSplit[] splits = inputFormat.getSplits( jobConfs[ i ], numSplits );
224
225      inputSizes[ i ] = splits.length;
226      }
227
228    return inputSizes;
229    }
230
231  /**
232   * Method getRecordReader delegates to the appropriate InputFormat.
233   *
234   * @param split    of type InputSplit
235   * @param job      of type JobConf
236   * @param reporter of type Reporter
237   * @return RecordReader
238   * @throws IOException when
239   */
240  public RecordReader getRecordReader( InputSplit split, JobConf job, final Reporter reporter ) throws IOException
241    {
242    final MultiInputSplit multiSplit = (MultiInputSplit) split;
243    final JobConf currentConf = HadoopUtil.mergeConf( job, multiSplit.config, true );
244
245    try
246      {
247      return Util.retry( LOG, 3, 20, "unable to get record reader", new Util.RetryOperator<RecordReader>()
248      {
249
250      @Override
251      public RecordReader operate() throws Exception
252        {
253        return currentConf.getInputFormat().getRecordReader( multiSplit.inputSplit, currentConf, reporter );
254        }
255
256      @Override
257      public boolean rethrow( Exception exception )
258        {
259        return !( exception.getCause() instanceof S3ServiceException );
260        }
261      } );
262      }
263    catch( Exception exception )
264      {
265      if( exception instanceof RuntimeException )
266        throw (RuntimeException) exception;
267      else
268        throw (IOException) exception;
269      }
270    }
271  }