001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tap.local;
023
024import java.beans.ConstructorProperties;
025import java.io.FileInputStream;
026import java.io.FileNotFoundException;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.OutputStream;
030import java.nio.file.DirectoryStream;
031import java.nio.file.Files;
032import java.nio.file.Path;
033import java.nio.file.Paths;
034import java.util.HashSet;
035import java.util.Properties;
036import java.util.Set;
037
038import cascading.flow.FlowProcess;
039import cascading.tap.SinkMode;
040import cascading.tap.Tap;
041import cascading.tap.local.io.TapFileOutputStream;
042import cascading.tap.partition.BasePartitionTap;
043import cascading.tap.partition.Partition;
044import cascading.tuple.TupleEntrySchemeCollector;
045import cascading.tuple.TupleEntrySchemeIterator;
046
047/**
048 * Class PartitionTap can be used to write tuple streams out to files and sub-directories based on the values in the
049 * current {@link cascading.tuple.Tuple} instance.
050 * <p/>
051 * The constructor takes a {@link cascading.tap.local.FileTap} {@link cascading.tap.Tap} and a {@link Partition}
052 * implementation. This allows Tuple values at given positions to be used as directory names during write
053 * operations, and directory names as data during read operations.
054 * <p/>
055 * The key value here is that there is no need to duplicate data values in the directory names and inside
056 * the data files.
057 * <p/>
058 * So only values declared in the parent Tap will be read or written to the underlying file system files. But
059 * fields declared by the {@link Partition} will only be read or written to the directory names. That is, the
060 * PartitionTap instance will sink or source the partition fields, plus the parent Tap fields. The partition
061 * fields and parent Tap fields do not need to have common field names.
062 * <p/>
063 * {@code openWritesThreshold} limits the number of open files to be output to. This value defaults to 300 files.
064 * Each time the threshold is exceeded, 10% of the least recently used open files will be closed.
065 * <p/>
066 * PartitionTap will populate a given {@code partition} without regard to case of the values being used. Thus
067 * the resulting paths {@code 2012/June/} and {@code 2012/june/} will likely result in two open files into the same
068 * location. Forcing the case to be consistent with a custom Partition implementation or an upstream
069 * {@link cascading.operation.Function} is recommended, see {@link cascading.operation.expression.ExpressionFunction}.
070 */
071public class PartitionTap extends BasePartitionTap<Properties, InputStream, OutputStream>
072  {
073  /**
074   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
075   * base path and default {@link cascading.scheme.Scheme}, and the partition.
076   *
077   * @param parent    of type Tap
078   * @param partition of type Partition
079   */
080  @ConstructorProperties({"parent", "partition"})
081  public PartitionTap( FileTap parent, Partition partition )
082    {
083    this( parent, partition, OPEN_WRITES_THRESHOLD_DEFAULT );
084    }
085
086  /**
087   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
088   * base path and default {@link cascading.scheme.Scheme}, and the partition.
089   * <p/>
090   * {@code openWritesThreshold} limits the number of open files to be output to.
091   *
092   * @param parent              of type Hfs
093   * @param partition           of type Partition
094   * @param openWritesThreshold of type int
095   */
096  @ConstructorProperties({"parent", "partition", "openWritesThreshold"})
097  public PartitionTap( FileTap parent, Partition partition, int openWritesThreshold )
098    {
099    super( parent, partition, openWritesThreshold );
100    }
101
102  /**
103   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
104   * base path and default {@link cascading.scheme.Scheme}, and the partition.
105   *
106   * @param parent    of type Tap
107   * @param partition of type Partition
108   * @param sinkMode  of type SinkMode
109   */
110  @ConstructorProperties({"parent", "partition", "sinkMode"})
111  public PartitionTap( FileTap parent, Partition partition, SinkMode sinkMode )
112    {
113    super( parent, partition, sinkMode );
114    }
115
116  /**
117   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
118   * base path and default {@link cascading.scheme.Scheme}, and the partition.
119   * <p/>
120   * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
121   * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
122   *
123   * @param parent             of type Tap
124   * @param partition          of type Partition
125   * @param sinkMode           of type SinkMode
126   * @param keepParentOnDelete of type boolean
127   */
128  @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete"})
129  public PartitionTap( FileTap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete )
130    {
131    this( parent, partition, sinkMode, keepParentOnDelete, OPEN_WRITES_THRESHOLD_DEFAULT );
132    }
133
134  /**
135   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
136   * base path and default {@link cascading.scheme.Scheme}, and the partition.
137   * <p/>
138   * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
139   * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
140   * <p/>
141   * {@code openWritesThreshold} limits the number of open files to be output to.
142   *
143   * @param parent              of type Tap
144   * @param partition           of type Partition
145   * @param sinkMode            of type SinkMode
146   * @param keepParentOnDelete  of type boolean
147   * @param openWritesThreshold of type int
148   */
149  @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete", "openWritesThreshold"})
150  public PartitionTap( FileTap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold )
151    {
152    super( parent, partition, sinkMode, keepParentOnDelete, openWritesThreshold );
153    }
154
155  @Override
156  protected String getCurrentIdentifier( FlowProcess<? extends Properties> flowProcess )
157    {
158    return null;
159    }
160
161  @Override
162  public boolean deleteResource( Properties conf ) throws IOException
163    {
164    String[] childIdentifiers = ( (FileTap) parent ).getChildIdentifiers( conf, Integer.MAX_VALUE, false );
165
166    if( childIdentifiers.length == 0 )
167      return deleteParent( conf );
168
169    Path parentPath = Paths.get( parent.getIdentifier() );
170    Set<Path> parents = new HashSet<>();
171
172    for( String childIdentifier : childIdentifiers )
173      {
174      Path path = Paths.get( childIdentifier );
175
176      parents.add( parentPath.resolve( parentPath.relativize( path ).subpath( 0, 1 ) ) );
177      }
178
179    for( Path subParent : parents )
180      recursiveDelete( subParent );
181
182    return deleteParent( conf );
183    }
184
185  private void recursiveDelete( Path path ) throws IOException
186    {
187    if( path == null )
188      return;
189
190    if( Files.isDirectory( path ) )
191      {
192      try( DirectoryStream<Path> paths = Files.newDirectoryStream( path ) )
193        {
194        for( Path current : paths )
195          recursiveDelete( current );
196        }
197      }
198
199    Files.deleteIfExists( path );
200    }
201
202  private boolean deleteParent( Properties conf ) throws IOException
203    {
204    return keepParentOnDelete || parent.deleteResource( conf );
205    }
206
207  @Override
208  protected TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<? extends Properties> flowProcess, Tap parent, String path, long sequence ) throws IOException
209    {
210    TapFileOutputStream output = new TapFileOutputStream( parent, path, true ); // always append
211
212    return new TupleEntrySchemeCollector<Properties, OutputStream>( flowProcess, parent, output );
213    }
214
215  @Override
216  protected TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<? extends Properties> flowProcess, Tap parent, String path, InputStream input ) throws FileNotFoundException
217    {
218    if( input == null )
219      input = new FileInputStream( path );
220
221    return new TupleEntrySchemeIterator( flowProcess, parent.getScheme(), input, path );
222    }
223  }