001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap.hadoop; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028 029import cascading.flow.FlowProcess; 030import cascading.tap.SinkMode; 031import cascading.tap.Tap; 032import cascading.tap.TapException; 033import cascading.tap.hadoop.io.CombineInputPartitionTupleEntryIterator; 034import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; 035import cascading.tap.hadoop.io.MultiInputSplit; 036import cascading.tap.hadoop.io.TapOutputCollector; 037import cascading.tap.hadoop.util.Hadoop18TapUtil; 038import cascading.tap.partition.BasePartitionTap; 039import cascading.tap.partition.Partition; 040import cascading.tuple.Tuple; 041import cascading.tuple.TupleEntryIterableChainIterator; 042import cascading.tuple.TupleEntryIterator; 043import cascading.tuple.TupleEntrySchemeCollector; 044import cascading.tuple.TupleEntrySchemeIterator; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.mapred.OutputCollector; 048import org.apache.hadoop.mapred.RecordReader; 049 050/** 051 * Class PartitionTap can be used to write tuple streams out to files and sub-directories based on the values in the 052 * current {@link cascading.tuple.Tuple} instance. 053 * <p/> 054 * The constructor takes a {@link cascading.tap.hadoop.Hfs} {@link cascading.tap.Tap} and a {@link Partition} 055 * implementation. This allows Tuple values at given positions to be used as directory names during write 056 * operations, and directory names as data during read operations. 057 * <p/> 058 * The key value here is that there is no need to duplicate data values in the directory names and inside 059 * the data files. 060 * <p/> 061 * So only values declared in the parent Tap will be read or written to the underlying file system files. But 062 * fields declared by the {@link Partition} will only be read or written to the directory names. That is, the 063 * PartitionTap instance will sink or source the partition fields, plus the parent Tap fields. The partition 064 * fields and parent Tap fields do not need to have common field names. 065 * <p/> 066 * Note that Hadoop can only sink to directories, and all files in those directories are "part-xxxxx" files. 067 * <p/> 068 * {@code openWritesThreshold} limits the number of open files to be output to. This value defaults to 300 files. 069 * Each time the threshold is exceeded, 10% of the least recently used open files will be closed. 070 * <p/> 071 * PartitionTap will populate a given {@code partition} without regard to case of the values being used. Thus 072 * the resulting paths {@code 2012/June/} and {@code 2012/june/} will likely result in two open files into the same 073 * location. Forcing the case to be consistent with a custom Partition implementation or an upstream 074 * {@link cascading.operation.Function} is recommended, see {@link cascading.operation.expression.ExpressionFunction}. 075 * <p/> 076 * Though Hadoop has no mechanism to prevent simultaneous writes to a directory from multiple jobs, it doesn't mean 077 * its safe to do so. Same is true with the PartitionTap. Interleaving writes to a common parent (root) directory 078 * across multiple flows will very likely lead to data loss. 079 */ 080public class PartitionTap extends BasePartitionTap<Configuration, RecordReader, OutputCollector> 081 { 082 /** 083 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 084 * base path and default {@link cascading.scheme.Scheme}, and the partition. 085 * 086 * @param parent of type Tap 087 * @param partition of type Partition 088 */ 089 @ConstructorProperties({"parent", "partition"}) 090 public PartitionTap( Hfs parent, Partition partition ) 091 { 092 this( parent, partition, OPEN_WRITES_THRESHOLD_DEFAULT ); 093 } 094 095 /** 096 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 097 * base path and default {@link cascading.scheme.Scheme}, and the partition. 098 * <p/> 099 * {@code openWritesThreshold} limits the number of open files to be output to. 100 * 101 * @param parent of type Hfs 102 * @param partition of type Partition 103 * @param openWritesThreshold of type int 104 */ 105 @ConstructorProperties({"parent", "partition", "openWritesThreshold"}) 106 public PartitionTap( Hfs parent, Partition partition, int openWritesThreshold ) 107 { 108 super( parent, partition, openWritesThreshold ); 109 } 110 111 /** 112 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 113 * base path and default {@link cascading.scheme.Scheme}, and the partition. 114 * 115 * @param parent of type Tap 116 * @param partition of type String 117 * @param sinkMode of type SinkMode 118 */ 119 @ConstructorProperties({"parent", "partition", "sinkMode"}) 120 public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode ) 121 { 122 super( parent, partition, sinkMode ); 123 } 124 125 /** 126 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 127 * base path and default {@link cascading.scheme.Scheme}, and the partition. 128 * <p/> 129 * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)} 130 * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}. 131 * 132 * @param parent of type Tap 133 * @param partition of type Partition 134 * @param sinkMode of type SinkMode 135 * @param keepParentOnDelete of type boolean 136 */ 137 @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete"}) 138 public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete ) 139 { 140 this( parent, partition, sinkMode, keepParentOnDelete, OPEN_WRITES_THRESHOLD_DEFAULT ); 141 } 142 143 /** 144 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 145 * base path and default {@link cascading.scheme.Scheme}, and the partition. 146 * <p/> 147 * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)} 148 * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}. 149 * <p/> 150 * {@code openWritesThreshold} limits the number of open files to be output to. 151 * 152 * @param parent of type Tap 153 * @param partition of type Partition 154 * @param sinkMode of type SinkMode 155 * @param keepParentOnDelete of type boolean 156 * @param openWritesThreshold of type int 157 */ 158 @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete", "openWritesThreshold"}) 159 public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold ) 160 { 161 super( parent, partition, sinkMode, keepParentOnDelete, openWritesThreshold ); 162 } 163 164 @Override 165 protected TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<? extends Configuration> flowProcess, Tap parent, String path, long sequence ) throws IOException 166 { 167 TapOutputCollector outputCollector = new TapOutputCollector( flowProcess, parent, path, sequence ); 168 169 return new TupleEntrySchemeCollector<Configuration, OutputCollector>( flowProcess, parent, outputCollector ); 170 } 171 172 @Override 173 protected TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<? extends Configuration> flowProcess, Tap parent, String path, RecordReader recordReader ) throws IOException 174 { 175 return new HadoopTupleEntrySchemeIterator( flowProcess, new Hfs( parent.getScheme(), path ), recordReader ); 176 } 177 178 @Override 179 protected String getCurrentIdentifier( FlowProcess<? extends Configuration> flowProcess ) 180 { 181 String identifier = flowProcess.getStringProperty( MultiInputSplit.CASCADING_SOURCE_PATH ); // set on current split 182 183 if( identifier == null ) 184 { 185 if( flowProcess.getBooleanProperty( HfsProps.COMBINE_INPUT_FILES, false ) ) 186 throw new TapException( "combined input format support, via '" + HfsProps.COMBINE_INPUT_FILES + "', may not be enabled for use with the PartitionTap" ); 187 188 throw new TapException( "unable to retrieve the current file being processed, '" + MultiInputSplit.CASCADING_SOURCE_PATH + "' was lost or not set" ); 189 } 190 191 return new Path( identifier ).getParent().toString(); // drop part-xxxx 192 } 193 194 @Override 195 public void sourceConfInit( FlowProcess<? extends Configuration> flowProcess, Configuration conf ) 196 { 197 try 198 { 199 String[] childPartitions = getChildPartitionIdentifiers( flowProcess, true ); 200 201 ( (Hfs) getParent() ).applySourceConfInitIdentifiers( flowProcess, conf, childPartitions ); 202 } 203 catch( IOException exception ) 204 { 205 throw new TapException( "unable to retrieve child partitions", exception ); 206 } 207 } 208 209 @Override 210 public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException 211 { 212 if( flowProcess.getBooleanProperty( HfsProps.COMBINE_INPUT_FILES, false ) ) 213 return new CombinePartitionIterator( flowProcess, input ); 214 215 return super.openForRead( flowProcess, input ); 216 } 217 218 private class CombinePartitionIterator extends TupleEntryIterableChainIterator 219 { 220 public CombinePartitionIterator( final FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException 221 { 222 super( getSourceFields() ); 223 224 List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>(); 225 226 if( input == null ) 227 throw new IOException( "input cannot be null" ); 228 229 String identifier = parent.getFullIdentifier( flowProcess ); 230 231 iterators.add( createPartitionEntryIterator( flowProcess, input, identifier ) ); 232 233 reset( iterators ); 234 } 235 236 private CombineInputPartitionTupleEntryIterator createPartitionEntryIterator( FlowProcess<? extends Configuration> flowProcess, RecordReader input, String parentIdentifier ) throws IOException 237 { 238 TupleEntrySchemeIterator schemeIterator = createTupleEntrySchemeIterator( flowProcess, parent, null, input ); 239 240 return new CombineInputPartitionTupleEntryIterator( flowProcess, getSourceFields(), partition, parentIdentifier, schemeIterator ); 241 } 242 } 243 244 @Override 245 public boolean commitResource( Configuration conf ) throws IOException 246 { 247 Hadoop18TapUtil.writeSuccessMarker( conf, ( (Hfs) parent ).getPath() ); 248 249 return super.commitResource( conf ); 250 } 251 }