001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.util.ArrayList;
026    import java.util.Iterator;
027    import java.util.List;
028    
029    import cascading.flow.FlowProcess;
030    import cascading.tap.SinkMode;
031    import cascading.tap.Tap;
032    import cascading.tap.TapException;
033    import cascading.tap.hadoop.io.CombineInputPartitionTupleEntryIterator;
034    import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
035    import cascading.tap.hadoop.io.MultiInputSplit;
036    import cascading.tap.hadoop.io.TapOutputCollector;
037    import cascading.tap.partition.BasePartitionTap;
038    import cascading.tap.partition.Partition;
039    import cascading.tuple.Tuple;
040    import cascading.tuple.TupleEntryIterableChainIterator;
041    import cascading.tuple.TupleEntryIterator;
042    import cascading.tuple.TupleEntrySchemeCollector;
043    import cascading.tuple.TupleEntrySchemeIterator;
044    import org.apache.hadoop.fs.Path;
045    import org.apache.hadoop.mapred.JobConf;
046    import org.apache.hadoop.mapred.OutputCollector;
047    import org.apache.hadoop.mapred.RecordReader;
048    
049    /**
050     * Class PartitionTap can be used to write tuple streams out to files and sub-directories based on the values in the
051     * current {@link cascading.tuple.Tuple} instance.
052     * <p/>
053     * The constructor takes a {@link cascading.tap.hadoop.Hfs} {@link cascading.tap.Tap} and a {@link Partition}
054     * implementation. This allows Tuple values at given positions to be used as directory names during write
055     * operations, and directory names as data during read operations.
056     * <p/>
057     * The key value here is that there is no need to duplicate data values in the directory names and inside
058     * the data files.
059     * <p/>
060     * So only values declared in the parent Tap will be read or written to the underlying file system files. But
061     * fields declared by the {@link Partition} will only be read or written to the directory names. That is, the
062     * PartitionTap instance will sink or source the partition fields, plus the parent Tap fields. The partition
063     * fields and parent Tap fields do not need to have common field names.
064     * <p/>
065     * Note that Hadoop can only sink to directories, and all files in those directories are "part-xxxxx" files.
066     * <p/>
067     * {@code openWritesThreshold} limits the number of open files to be output to. This value defaults to 300 files.
068     * Each time the threshold is exceeded, 10% of the least recently used open files will be closed.
069     * <p/>
070     * PartitionTap will populate a given {@code partition} without regard to case of the values being used. Thus
071     * the resulting paths {@code 2012/June/} and {@code 2012/june/} will likely result in two open files into the same
072     * location. Forcing the case to be consistent with a custom Partition implementation or an upstream
073     * {@link cascading.operation.Function} is recommended, see {@link cascading.operation.expression.ExpressionFunction}.
074     * <p/>
075     * Though Hadoop has no mechanism to prevent simultaneous writes to a directory from multiple jobs, it doesn't mean
076     * its safe to do so. Same is true with the PartitionTap. Interleaving writes to a common parent (root) directory
077     * across multiple flows will very likely lead to data loss.
078     */
079    public class PartitionTap extends BasePartitionTap<JobConf, RecordReader, OutputCollector>
080      {
081      /**
082       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
083       * base path and default {@link cascading.scheme.Scheme}, and the partition.
084       *
085       * @param parent    of type Tap
086       * @param partition of type String
087       */
088      @ConstructorProperties({"parent", "partition"})
089      public PartitionTap( Hfs parent, Partition partition )
090        {
091        this( parent, partition, OPEN_WRITES_THRESHOLD_DEFAULT );
092        }
093    
094      /**
095       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
096       * base path and default {@link cascading.scheme.Scheme}, and the partition.
097       * <p/>
098       * {@code openWritesThreshold} limits the number of open files to be output to.
099       *
100       * @param parent              of type Hfs
101       * @param partition           of type String
102       * @param openWritesThreshold of type int
103       */
104      @ConstructorProperties({"parent", "partition", "openWritesThreshold"})
105      public PartitionTap( Hfs parent, Partition partition, int openWritesThreshold )
106        {
107        super( parent, partition, openWritesThreshold );
108        }
109    
110      /**
111       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
112       * base path and default {@link cascading.scheme.Scheme}, and the partition.
113       *
114       * @param parent    of type Tap
115       * @param partition of type String
116       * @param sinkMode  of type SinkMode
117       */
118      @ConstructorProperties({"parent", "partition", "sinkMode"})
119      public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode )
120        {
121        super( parent, partition, sinkMode );
122        }
123    
124      /**
125       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
126       * base path and default {@link cascading.scheme.Scheme}, and the partition.
127       * <p/>
128       * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
129       * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
130       *
131       * @param parent             of type Tap
132       * @param partition          of type String
133       * @param sinkMode           of type SinkMode
134       * @param keepParentOnDelete of type boolean
135       */
136      @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete"})
137      public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete )
138        {
139        this( parent, partition, sinkMode, keepParentOnDelete, OPEN_WRITES_THRESHOLD_DEFAULT );
140        }
141    
142      /**
143       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
144       * base path and default {@link cascading.scheme.Scheme}, and the partition.
145       * <p/>
146       * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
147       * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
148       * <p/>
149       * {@code openWritesThreshold} limits the number of open files to be output to.
150       *
151       * @param parent              of type Tap
152       * @param partition           of type String
153       * @param sinkMode            of type SinkMode
154       * @param keepParentOnDelete  of type boolean
155       * @param openWritesThreshold of type int
156       */
157      @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete", "openWritesThreshold"})
158      public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold )
159        {
160        super( parent, partition, sinkMode, keepParentOnDelete, openWritesThreshold );
161        }
162    
163      @Override
164      protected TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<JobConf> flowProcess, Tap parent, String path, long sequence ) throws IOException
165        {
166        TapOutputCollector outputCollector = new TapOutputCollector( flowProcess, parent, path, sequence );
167    
168        return new TupleEntrySchemeCollector<JobConf, OutputCollector>( flowProcess, parent, outputCollector );
169        }
170    
171      @Override
172      protected TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<JobConf> flowProcess, Tap parent, String path, RecordReader recordReader ) throws IOException
173        {
174        return new HadoopTupleEntrySchemeIterator( flowProcess, new Hfs( parent.getScheme(), path ), recordReader );
175        }
176    
177      @Override
178      protected String getCurrentIdentifier( FlowProcess<JobConf> flowProcess )
179        {
180        String identifier = flowProcess.getStringProperty( MultiInputSplit.CASCADING_SOURCE_PATH ); // set on current split
181    
182        if( identifier == null )
183          {
184          if( flowProcess.getBooleanProperty( HfsProps.COMBINE_INPUT_FILES, false ) )
185            throw new TapException( "combined input format support, via '" + HfsProps.COMBINE_INPUT_FILES + "', may not be enabled for use with the PartitionTap" );
186    
187          throw new TapException( "unable to retrieve the current file being processed, '" + MultiInputSplit.CASCADING_SOURCE_PATH + "' is not set" );
188          }
189    
190        return new Path( identifier ).getParent().toString(); // drop part-xxxx
191        }
192    
193      @Override
194      public void sourceConfInit( FlowProcess<JobConf> flowProcess, JobConf conf )
195        {
196        try
197          {
198          String[] childPartitions = getChildPartitionIdentifiers( flowProcess, true );
199    
200          ( (Hfs) getParent() ).applySourceConfInitIdentifiers( flowProcess, conf, childPartitions );
201          }
202        catch( IOException exception )
203          {
204          throw new TapException( "unable to retrieve child partitions", exception );
205          }
206        }
207    
208      @Override
209      public TupleEntryIterator openForRead( FlowProcess<JobConf> flowProcess, RecordReader input ) throws IOException
210        {
211        if( flowProcess.getBooleanProperty( HfsProps.COMBINE_INPUT_FILES, false ) )
212          return new CombinePartitionIterator( flowProcess, input );
213    
214        return super.openForRead( flowProcess, input );
215        }
216    
217      private class CombinePartitionIterator extends TupleEntryIterableChainIterator
218        {
219        public CombinePartitionIterator( final FlowProcess<JobConf> flowProcess, RecordReader input ) throws IOException
220          {
221          super( getSourceFields() );
222    
223          List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>();
224    
225          if( input == null )
226            throw new IOException( "input cannot be null" );
227    
228          String identifier = parent.getFullIdentifier( flowProcess );
229    
230          iterators.add( createPartitionEntryIterator( flowProcess, input, identifier ) );
231    
232          reset( iterators );
233          }
234    
235        private CombineInputPartitionTupleEntryIterator createPartitionEntryIterator( FlowProcess<JobConf> flowProcess, RecordReader input, String parentIdentifier ) throws IOException
236          {
237          TupleEntrySchemeIterator schemeIterator = createTupleEntrySchemeIterator( flowProcess, parent, null, input );
238    
239          return new CombineInputPartitionTupleEntryIterator( flowProcess, getSourceFields(), partition, parentIdentifier, schemeIterator );
240          }
241        }
242      }