001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop.io;
022    
023    import java.io.IOException;
024    
025    import cascading.flow.FlowProcess;
026    import cascading.flow.hadoop.HadoopFlowProcess;
027    import cascading.flow.hadoop.util.HadoopUtil;
028    import cascading.tap.Tap;
029    import cascading.tap.TapException;
030    import cascading.tuple.Tuple;
031    import cascading.util.CloseableIterator;
032    import org.apache.hadoop.mapred.InputFormat;
033    import org.apache.hadoop.mapred.InputSplit;
034    import org.apache.hadoop.mapred.JobConf;
035    import org.apache.hadoop.mapred.JobConfigurable;
036    import org.apache.hadoop.mapred.RecordReader;
037    import org.apache.hadoop.mapred.Reporter;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     * Class TapIterator is an implementation of {@link cascading.util.CloseableIterator}. It is returned by {@link cascading.tap.Tap} instances when
043     * opening the taps resource for reading.
044     */
045    public class MultiRecordReaderIterator implements CloseableIterator<RecordReader>
046      {
047      /** Field LOG */
048      private static final Logger LOG = LoggerFactory.getLogger( MultiRecordReaderIterator.class );
049    
050      private final FlowProcess<JobConf> flowProcess;
051      /** Field tap */
052      private final Tap tap;
053      /** Field inputFormat */
054      private InputFormat inputFormat;
055      /** Field conf */
056      private JobConf conf;
057      /** Field splits */
058      private InputSplit[] splits;
059      /** Field reader */
060      private RecordReader reader;
061    
062      /** Field lastReader */
063      private RecordReader lastReader;
064    
065      /** Field currentSplit */
066      private int currentSplit = 0;
067      /** Field complete */
068      private boolean complete = false;
069    
070      /**
071       * Constructor TapIterator creates a new TapIterator instance.
072       *
073       * @throws IOException when
074       */
075      public MultiRecordReaderIterator( FlowProcess<JobConf> flowProcess, Tap tap ) throws IOException
076        {
077        this.flowProcess = flowProcess;
078        this.tap = tap;
079        this.conf = flowProcess.getConfigCopy();
080    
081        initialize();
082        }
083    
084      private void initialize() throws IOException
085        {
086        // prevent collisions of configuration properties set client side if now cluster side
087        String property = flowProcess.getStringProperty( "cascading.step.accumulated.source.conf." + Tap.id( tap ) );
088    
089        if( property == null )
090          {
091          // default behavior is to accumulate paths, so remove any set prior
092          conf = HadoopUtil.removePropertiesFrom( conf, "mapred.input.dir", "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
093          tap.sourceConfInit( flowProcess, conf );
094          }
095    
096        inputFormat = conf.getInputFormat();
097    
098        if( inputFormat instanceof JobConfigurable )
099          ( (JobConfigurable) inputFormat ).configure( conf );
100    
101        // do not test for existence, let hadoop decide how to handle the given path
102        // this delegates globbing to the inputformat on split generation.
103        splits = inputFormat.getSplits( conf, 1 );
104    
105        if( splits.length == 0 )
106          complete = true;
107        }
108    
109      private RecordReader makeReader( int currentSplit ) throws IOException
110        {
111        LOG.debug( "reading split: {}", currentSplit );
112    
113        Reporter reporter = Reporter.NULL;
114    
115        if( flowProcess instanceof HadoopFlowProcess )
116          reporter = ( (HadoopFlowProcess) flowProcess ).getReporter(); // may return Reporter.NULL
117    
118        return inputFormat.getRecordReader( splits[ currentSplit ], conf, reporter );
119        }
120    
121      /**
122       * Method hasNext returns true if there more {@link Tuple} instances available.
123       *
124       * @return boolean
125       */
126      public boolean hasNext()
127        {
128        getNextReader();
129    
130        return !complete;
131        }
132    
133      /**
134       * Method next returns the next {@link Tuple}.
135       *
136       * @return Tuple
137       */
138      public RecordReader next()
139        {
140        if( complete )
141          throw new IllegalStateException( "no more values" );
142    
143        try
144          {
145          getNextReader();
146    
147          return reader;
148          }
149        finally
150          {
151          reader = null;
152          }
153        }
154    
155      private void getNextReader()
156        {
157        if( complete || reader != null )
158          return;
159    
160        try
161          {
162          if( currentSplit < splits.length )
163            {
164            if( lastReader != null )
165              lastReader.close();
166    
167            reader = makeReader( currentSplit++ );
168            lastReader = reader;
169            }
170          else
171            {
172            complete = true;
173            }
174          }
175        catch( IOException exception )
176          {
177          throw new TapException( "could not get next tuple", exception );
178          }
179        }
180    
181      public void remove()
182        {
183        throw new UnsupportedOperationException( "unimplemented" );
184        }
185    
186      @Override
187      public void close() throws IOException
188        {
189        if( lastReader != null )
190          lastReader.close();
191        }
192      }