001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028
029import cascading.flow.FlowProcess;
030import cascading.tap.SinkMode;
031import cascading.tap.Tap;
032import cascading.tap.TapException;
033import cascading.tap.hadoop.io.CombineInputPartitionTupleEntryIterator;
034import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
035import cascading.tap.hadoop.io.MultiInputSplit;
036import cascading.tap.hadoop.io.TapOutputCollector;
037import cascading.tap.hadoop.util.Hadoop18TapUtil;
038import cascading.tap.partition.BasePartitionTap;
039import cascading.tap.partition.Partition;
040import cascading.tuple.Tuple;
041import cascading.tuple.TupleEntryIterableChainIterator;
042import cascading.tuple.TupleEntryIterator;
043import cascading.tuple.TupleEntrySchemeCollector;
044import cascading.tuple.TupleEntrySchemeIterator;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.mapred.OutputCollector;
048import org.apache.hadoop.mapred.RecordReader;
049
050/**
051 * Class PartitionTap can be used to write tuple streams out to files and sub-directories based on the values in the
052 * current {@link cascading.tuple.Tuple} instance.
053 * <p/>
054 * The constructor takes a {@link cascading.tap.hadoop.Hfs} {@link cascading.tap.Tap} and a {@link Partition}
055 * implementation. This allows Tuple values at given positions to be used as directory names during write
056 * operations, and directory names as data during read operations.
057 * <p/>
058 * The key value here is that there is no need to duplicate data values in the directory names and inside
059 * the data files.
060 * <p/>
061 * So only values declared in the parent Tap will be read or written to the underlying file system files. But
062 * fields declared by the {@link Partition} will only be read or written to the directory names. That is, the
063 * PartitionTap instance will sink or source the partition fields, plus the parent Tap fields. The partition
064 * fields and parent Tap fields do not need to have common field names.
065 * <p/>
066 * Note that Hadoop can only sink to directories, and all files in those directories are "part-xxxxx" files.
067 * <p/>
068 * {@code openWritesThreshold} limits the number of open files to be output to. This value defaults to 300 files.
069 * Each time the threshold is exceeded, 10% of the least recently used open files will be closed.
070 * <p/>
071 * PartitionTap will populate a given {@code partition} without regard to case of the values being used. Thus
072 * the resulting paths {@code 2012/June/} and {@code 2012/june/} will likely result in two open files into the same
073 * location. Forcing the case to be consistent with a custom Partition implementation or an upstream
074 * {@link cascading.operation.Function} is recommended, see {@link cascading.operation.expression.ExpressionFunction}.
075 * <p/>
076 * Though Hadoop has no mechanism to prevent simultaneous writes to a directory from multiple jobs, it doesn't mean
077 * its safe to do so. Same is true with the PartitionTap. Interleaving writes to a common parent (root) directory
078 * across multiple flows will very likely lead to data loss.
079 */
080public class PartitionTap extends BasePartitionTap<Configuration, RecordReader, OutputCollector>
081  {
082  /**
083   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
084   * base path and default {@link cascading.scheme.Scheme}, and the partition.
085   *
086   * @param parent    of type Tap
087   * @param partition of type Partition
088   */
089  @ConstructorProperties({"parent", "partition"})
090  public PartitionTap( Hfs parent, Partition partition )
091    {
092    this( parent, partition, OPEN_WRITES_THRESHOLD_DEFAULT );
093    }
094
095  /**
096   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
097   * base path and default {@link cascading.scheme.Scheme}, and the partition.
098   * <p/>
099   * {@code openWritesThreshold} limits the number of open files to be output to.
100   *
101   * @param parent              of type Hfs
102   * @param partition           of type Partition
103   * @param openWritesThreshold of type int
104   */
105  @ConstructorProperties({"parent", "partition", "openWritesThreshold"})
106  public PartitionTap( Hfs parent, Partition partition, int openWritesThreshold )
107    {
108    super( parent, partition, openWritesThreshold );
109    }
110
111  /**
112   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
113   * base path and default {@link cascading.scheme.Scheme}, and the partition.
114   *
115   * @param parent    of type Tap
116   * @param partition of type String
117   * @param sinkMode  of type SinkMode
118   */
119  @ConstructorProperties({"parent", "partition", "sinkMode"})
120  public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode )
121    {
122    super( parent, partition, sinkMode );
123    }
124
125  /**
126   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
127   * base path and default {@link cascading.scheme.Scheme}, and the partition.
128   * <p/>
129   * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
130   * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
131   *
132   * @param parent             of type Tap
133   * @param partition          of type Partition
134   * @param sinkMode           of type SinkMode
135   * @param keepParentOnDelete of type boolean
136   */
137  @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete"})
138  public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete )
139    {
140    this( parent, partition, sinkMode, keepParentOnDelete, OPEN_WRITES_THRESHOLD_DEFAULT );
141    }
142
143  /**
144   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
145   * base path and default {@link cascading.scheme.Scheme}, and the partition.
146   * <p/>
147   * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
148   * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
149   * <p/>
150   * {@code openWritesThreshold} limits the number of open files to be output to.
151   *
152   * @param parent              of type Tap
153   * @param partition           of type Partition
154   * @param sinkMode            of type SinkMode
155   * @param keepParentOnDelete  of type boolean
156   * @param openWritesThreshold of type int
157   */
158  @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete", "openWritesThreshold"})
159  public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold )
160    {
161    super( parent, partition, sinkMode, keepParentOnDelete, openWritesThreshold );
162    }
163
164  @Override
165  protected TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<? extends Configuration> flowProcess, Tap parent, String path, long sequence ) throws IOException
166    {
167    TapOutputCollector outputCollector = new TapOutputCollector( flowProcess, parent, path, sequence );
168
169    return new TupleEntrySchemeCollector<Configuration, OutputCollector>( flowProcess, parent, outputCollector );
170    }
171
172  @Override
173  protected TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<? extends Configuration> flowProcess, Tap parent, String path, RecordReader recordReader ) throws IOException
174    {
175    return new HadoopTupleEntrySchemeIterator( flowProcess, new Hfs( parent.getScheme(), path ), recordReader );
176    }
177
178  @Override
179  protected String getCurrentIdentifier( FlowProcess<? extends Configuration> flowProcess )
180    {
181    String identifier = flowProcess.getStringProperty( MultiInputSplit.CASCADING_SOURCE_PATH ); // set on current split
182
183    if( identifier == null )
184      {
185      if( flowProcess.getBooleanProperty( HfsProps.COMBINE_INPUT_FILES, false ) )
186        throw new TapException( "combined input format support, via '" + HfsProps.COMBINE_INPUT_FILES + "', may not be enabled for use with the PartitionTap" );
187
188      throw new TapException( "unable to retrieve the current file being processed, '" + MultiInputSplit.CASCADING_SOURCE_PATH + "' was lost or not set" );
189      }
190
191    return new Path( identifier ).getParent().toString(); // drop part-xxxx
192    }
193
194  @Override
195  public void sourceConfInit( FlowProcess<? extends Configuration> flowProcess, Configuration conf )
196    {
197    try
198      {
199      String[] childPartitions = getChildPartitionIdentifiers( flowProcess, true );
200
201      ( (Hfs) getParent() ).applySourceConfInitIdentifiers( flowProcess, conf, childPartitions );
202      }
203    catch( IOException exception )
204      {
205      throw new TapException( "unable to retrieve child partitions", exception );
206      }
207    }
208
209  @Override
210  public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
211    {
212    if( flowProcess.getBooleanProperty( HfsProps.COMBINE_INPUT_FILES, false ) )
213      return new CombinePartitionIterator( flowProcess, input );
214
215    return super.openForRead( flowProcess, input );
216    }
217
218  private class CombinePartitionIterator extends TupleEntryIterableChainIterator
219    {
220    public CombinePartitionIterator( final FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
221      {
222      super( getSourceFields() );
223
224      List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>();
225
226      if( input == null )
227        throw new IOException( "input cannot be null" );
228
229      String identifier = parent.getFullIdentifier( flowProcess );
230
231      iterators.add( createPartitionEntryIterator( flowProcess, input, identifier ) );
232
233      reset( iterators );
234      }
235
236    private CombineInputPartitionTupleEntryIterator createPartitionEntryIterator( FlowProcess<? extends Configuration> flowProcess, RecordReader input, String parentIdentifier ) throws IOException
237      {
238      TupleEntrySchemeIterator schemeIterator = createTupleEntrySchemeIterator( flowProcess, parent, null, input );
239
240      return new CombineInputPartitionTupleEntryIterator( flowProcess, getSourceFields(), partition, parentIdentifier, schemeIterator );
241      }
242    }
243
244  @Override
245  public boolean commitResource( Configuration conf ) throws IOException
246    {
247    Hadoop18TapUtil.writeSuccessMarker( conf, ( (Hfs) parent ).getPath() );
248
249    return super.commitResource( conf );
250    }
251  }