001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop.io;
022    
023    import java.util.Iterator;
024    
025    import cascading.flow.FlowProcess;
026    import cascading.tap.TapException;
027    import cascading.tap.partition.Partition;
028    import cascading.tuple.Fields;
029    import cascading.tuple.Tuple;
030    import cascading.tuple.TupleEntry;
031    import cascading.tuple.TupleEntrySchemeIterator;
032    import cascading.tuple.util.TupleViews;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.mapred.JobConf;
035    
036    public class CombineInputPartitionTupleEntryIterator implements Iterator<Tuple>
037      {
038      private final TupleEntrySchemeIterator childIterator;
039      private final FlowProcess<JobConf> flowProcess;
040      private final TupleEntry partitionEntry;
041      private final Fields sourceFields;
042      private final Partition partition;
043      private final String parentIdentifier;
044    
045      private Tuple base;
046      private Tuple view;
047      private String currentFile;
048    
049      public CombineInputPartitionTupleEntryIterator( FlowProcess<JobConf> flowProcess, Fields sourceFields,
050                                                      Partition partition, String parentIdentifier, TupleEntrySchemeIterator childIterator )
051        {
052        this.flowProcess = flowProcess;
053        this.partition = partition;
054        this.parentIdentifier = parentIdentifier;
055        this.childIterator = childIterator;
056        this.sourceFields = sourceFields;
057        this.partitionEntry = new TupleEntry( partition.getPartitionFields(), Tuple.size( partition.getPartitionFields().size() ) );
058        }
059    
060      @Override
061      public boolean hasNext()
062        {
063        return childIterator.hasNext();
064        }
065    
066      @Override
067      public Tuple next()
068        {
069        String currentFile = getCurrentFile();
070        if( this.currentFile == null || !this.currentFile.equals( currentFile ) )
071          {
072          this.currentFile = currentFile;
073    
074          try
075            {
076            String childIdentifier = new Path( currentFile ).getParent().toString(); // drop part-xxxx
077            partition.toTuple( childIdentifier.substring( parentIdentifier.length() + 1 ), partitionEntry );
078            }
079          catch( Exception exception )
080            {
081            throw new TapException( "unable to parse partition given parent: " + parentIdentifier + " and child: " + currentFile, exception );
082            }
083    
084          base = TupleViews.createOverride( sourceFields, partitionEntry.getFields() );
085    
086          TupleViews.reset( base, Tuple.size( sourceFields.size() ), partitionEntry.getTuple() );
087    
088          view = TupleViews.createOverride( sourceFields, childIterator.getFields() );
089          }
090    
091        Tuple tuple = childIterator.next().getTuple();
092        TupleViews.reset( view, base, tuple );
093    
094        return view;
095        }
096    
097      private String getCurrentFile()
098        {
099        String result = flowProcess.getStringProperty( "mapreduce.map.input.file" );
100    
101        if( result == null )
102          result = flowProcess.getStringProperty( "map.input.file" );
103    
104        return result;
105        }
106    
107      @Override
108      public void remove()
109        {
110        childIterator.remove();
111        }
112      }