001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop.io;
022
023import java.io.IOException;
024
025import cascading.flow.FlowProcess;
026import cascading.flow.hadoop.MapRed;
027import cascading.flow.hadoop.util.HadoopUtil;
028import cascading.tap.Tap;
029import cascading.tap.TapException;
030import cascading.tuple.Tuple;
031import cascading.util.CloseableIterator;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.mapred.InputFormat;
034import org.apache.hadoop.mapred.InputSplit;
035import org.apache.hadoop.mapred.JobConf;
036import org.apache.hadoop.mapred.JobConfigurable;
037import org.apache.hadoop.mapred.RecordReader;
038import org.apache.hadoop.mapred.Reporter;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import static cascading.flow.hadoop.util.HadoopUtil.asJobConfInstance;
043
044/**
045 * Class TapIterator is an implementation of {@link cascading.util.CloseableIterator}. It is returned by {@link cascading.tap.Tap} instances when
046 * opening the taps resource for reading.
047 */
048public class MultiRecordReaderIterator implements CloseableIterator<RecordReader>
049  {
050  /** Field LOG */
051  private static final Logger LOG = LoggerFactory.getLogger( MultiRecordReaderIterator.class );
052
053  private final FlowProcess<? extends Configuration> flowProcess;
054  /** Field tap */
055  private final Tap tap;
056  /** Field inputFormat */
057  private InputFormat inputFormat;
058  /** Field conf */
059  private Configuration conf;
060  /** Field splits */
061  private InputSplit[] splits;
062  /** Field reader */
063  private RecordReader reader;
064
065  /** Field lastReader */
066  private RecordReader lastReader;
067
068  /** Field currentSplit */
069  private int currentSplit = 0;
070  /** Field complete */
071  private boolean complete = false;
072
073  /**
074   * Constructor TapIterator creates a new TapIterator instance.
075   *
076   * @throws IOException when
077   */
078  public MultiRecordReaderIterator( FlowProcess<? extends Configuration> flowProcess, Tap tap ) throws IOException
079    {
080    this.flowProcess = flowProcess;
081    this.tap = tap;
082    this.conf = flowProcess.getConfigCopy();
083
084    initialize();
085    }
086
087  private void initialize() throws IOException
088    {
089    // prevent collisions of configuration properties set client side if now cluster side
090    String property = flowProcess.getStringProperty( "cascading.node.accumulated.source.conf." + Tap.id( tap ) );
091
092    if( property == null )
093      {
094      // default behavior is to accumulate paths, so remove any set prior
095      conf = HadoopUtil.removePropertiesFrom( conf, "mapred.input.dir", "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
096      tap.sourceConfInit( flowProcess, conf );
097      }
098
099    JobConf jobConf = asJobConfInstance( conf );
100
101    inputFormat = jobConf.getInputFormat();
102
103    if( inputFormat instanceof JobConfigurable )
104      ( (JobConfigurable) inputFormat ).configure( jobConf );
105
106    // do not test for existence, let hadoop decide how to handle the given path
107    // this delegates globbing to the inputformat on split generation.
108    splits = inputFormat.getSplits( jobConf, 1 );
109
110    if( splits.length == 0 )
111      complete = true;
112    }
113
114  private RecordReader makeReader( int currentSplit ) throws IOException
115    {
116    LOG.debug( "reading split: {}", currentSplit );
117
118    Reporter reporter = Reporter.NULL;
119
120    if( flowProcess instanceof MapRed )
121      reporter = ( (MapRed) flowProcess ).getReporter(); // may return Reporter.NULL
122
123    return inputFormat.getRecordReader( splits[ currentSplit ], asJobConfInstance( conf ), reporter );
124    }
125
126  /**
127   * Method hasNext returns true if there more {@link Tuple} instances available.
128   *
129   * @return boolean
130   */
131  public boolean hasNext()
132    {
133    getNextReader();
134
135    return !complete;
136    }
137
138  /**
139   * Method next returns the next {@link Tuple}.
140   *
141   * @return Tuple
142   */
143  public RecordReader next()
144    {
145    if( complete )
146      throw new IllegalStateException( "no more values" );
147
148    try
149      {
150      getNextReader();
151
152      return reader;
153      }
154    finally
155      {
156      reader = null;
157      }
158    }
159
160  private void getNextReader()
161    {
162    if( complete || reader != null )
163      return;
164
165    try
166      {
167      if( currentSplit < splits.length )
168        {
169        if( lastReader != null )
170          lastReader.close();
171
172        reader = makeReader( currentSplit++ );
173        lastReader = reader;
174        }
175      else
176        {
177        complete = true;
178        }
179      }
180    catch( IOException exception )
181      {
182      throw new TapException( "could not get next tuple", exception );
183      }
184    }
185
186  public void remove()
187    {
188    throw new UnsupportedOperationException( "unimplemented" );
189    }
190
191  @Override
192  public void close() throws IOException
193    {
194    if( lastReader != null )
195      lastReader.close();
196    }
197  }