001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.partition;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.LinkedHashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import cascading.flow.Flow;
033import cascading.flow.FlowProcess;
034import cascading.operation.Filter;
035import cascading.scheme.Scheme;
036import cascading.scheme.SinkCall;
037import cascading.scheme.SourceCall;
038import cascading.tap.SinkMode;
039import cascading.tap.Tap;
040import cascading.tap.TapException;
041import cascading.tap.type.FileType;
042import cascading.tuple.Fields;
043import cascading.tuple.Tuple;
044import cascading.tuple.TupleEntry;
045import cascading.tuple.TupleEntryCollector;
046import cascading.tuple.TupleEntryIterableChainIterator;
047import cascading.tuple.TupleEntryIterator;
048import cascading.tuple.TupleEntrySchemeCollector;
049import cascading.tuple.TupleEntrySchemeIterator;
050import cascading.tuple.util.TupleViews;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 *
056 */
057public abstract class BasePartitionTap<Config, Input, Output> extends Tap<Config, Input, Output>
058  {
059  /** Field LOG */
060  private static final Logger LOG = LoggerFactory.getLogger( BasePartitionTap.class );
061  /** Field OPEN_FILES_THRESHOLD_DEFAULT */
062  protected static final int OPEN_WRITES_THRESHOLD_DEFAULT = 300;
063
064  private class PartitionIterator extends TupleEntryIterableChainIterator
065    {
066    public PartitionIterator( final FlowProcess<? extends Config> flowProcess, Input input ) throws IOException
067      {
068      super( getSourceFields() );
069
070      List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>();
071
072      if( input != null )
073        {
074        String identifier = parent.getFullIdentifier( flowProcess );
075        iterators.add( createPartitionEntryIterator( flowProcess, input, identifier, getCurrentIdentifier( flowProcess ) ) );
076        }
077      else
078        {
079        String[] childIdentifiers = getChildPartitionIdentifiers( flowProcess, false );
080
081        for( String childIdentifier : childIdentifiers )
082          iterators.add( createPartitionEntryIterator( flowProcess, null, parent.getIdentifier(), childIdentifier ) );
083        }
084
085      reset( iterators );
086      }
087
088    private PartitionTupleEntryIterator createPartitionEntryIterator( FlowProcess<? extends Config> flowProcess, Input input, String parentIdentifier, String childIdentifier ) throws IOException
089      {
090      TupleEntrySchemeIterator schemeIterator = createTupleEntrySchemeIterator( flowProcess, parent, childIdentifier, input );
091
092      return new PartitionTupleEntryIterator( getSourceFields(), partition, parentIdentifier, childIdentifier, schemeIterator );
093      }
094    }
095
096  public class PartitionCollector extends TupleEntryCollector
097    {
098    private final FlowProcess<? extends Config> flowProcess;
099    private final Config conf;
100    private final Fields parentFields;
101    private final Fields partitionFields;
102    private TupleEntry partitionEntry;
103    private final Tuple partitionTuple;
104    private final Tuple parentTuple;
105
106    public PartitionCollector( FlowProcess<? extends Config> flowProcess )
107      {
108      super( Fields.asDeclaration( getSinkFields() ) );
109      this.flowProcess = flowProcess;
110      this.conf = flowProcess.getConfigCopy();
111      this.parentFields = parent.getSinkFields();
112      this.partitionFields = ( (PartitionScheme) getScheme() ).partitionFields;
113      this.partitionEntry = new TupleEntry( this.partitionFields );
114
115      this.partitionTuple = TupleViews.createNarrow( getSinkFields().getPos( this.partitionFields ) );
116      this.parentTuple = TupleViews.createNarrow( getSinkFields().getPos( this.parentFields ) );
117
118      this.partitionEntry.setTuple( partitionTuple );
119      }
120
121    TupleEntryCollector getCollector( String path )
122      {
123      TupleEntryCollector collector = collectors.get( path );
124
125      if( collector != null )
126        return collector;
127
128      try
129        {
130        LOG.debug( "creating collector for parent: {}, path: {}", parent.getFullIdentifier( conf ), path );
131
132        collector = createTupleEntrySchemeCollector( flowProcess, parent, path, openedCollectors );
133
134        openedCollectors++;
135        flowProcess.increment( Counters.Paths_Opened, 1 );
136        }
137      catch( IOException exception )
138        {
139        throw new TapException( "unable to open partition path: " + path, exception );
140        }
141
142      if( collectors.size() > openWritesThreshold )
143        purgeCollectors();
144
145      collectors.put( path, collector );
146
147      if( LOG.isInfoEnabled() && collectors.size() % 100 == 0 )
148        LOG.info( "caching {} open Taps", collectors.size() );
149
150      return collector;
151      }
152
153    private void purgeCollectors()
154      {
155      int numToClose = Math.max( 1, (int) ( openWritesThreshold * .10 ) );
156
157      if( LOG.isInfoEnabled() )
158        LOG.info( "removing {} open Taps from cache of size {}", numToClose, collectors.size() );
159
160      Set<String> removeKeys = new HashSet<String>();
161      Set<String> keys = collectors.keySet();
162
163      for( String key : keys )
164        {
165        if( numToClose-- == 0 )
166          break;
167
168        removeKeys.add( key );
169        }
170
171      for( String removeKey : removeKeys )
172        {
173        closeCollector( removeKey );
174        collectors.remove( removeKey );
175        }
176
177      flowProcess.increment( Counters.Path_Purges, 1 );
178      }
179
180    @Override
181    public void close()
182      {
183      super.close();
184
185      try
186        {
187        for( String path : new ArrayList<String>( collectors.keySet() ) )
188          closeCollector( path );
189        }
190      finally
191        {
192        collectors.clear();
193        }
194      }
195
196    public void closeCollector( String path )
197      {
198      TupleEntryCollector collector = collectors.get( path );
199      if( collector == null )
200        return;
201      try
202        {
203        collector.close();
204
205        flowProcess.increment( Counters.Paths_Closed, 1 );
206        }
207      catch( Exception exception )
208        {
209        LOG.error( "exception while closing TupleEntryCollector {}", path, exception );
210
211        boolean failOnError = false;
212        Object failProperty = flowProcess.getProperty( PartitionTapProps.FAIL_ON_CLOSE );
213
214        if( failProperty != null )
215          failOnError = Boolean.parseBoolean( failProperty.toString() );
216
217        if( failOnError )
218          throw new TapException( exception );
219        }
220      }
221
222    protected void collect( TupleEntry tupleEntry ) throws IOException
223      {
224      // reset select views
225      TupleViews.reset( partitionTuple, tupleEntry.getTuple() ); // partitionTuple is inside partitionEntry
226      TupleViews.reset( parentTuple, tupleEntry.getTuple() );
227
228      String path = partition.toPartition( partitionEntry );
229
230      getCollector( path ).add( parentTuple );
231      }
232    }
233
234  /** Field parent */
235  protected Tap parent;
236  /** Field partition */
237  protected Partition partition;
238  /** Field sourcePartitionFilters */
239  protected final List<PartitionTapFilter> sourcePartitionFilters = new ArrayList<>();
240  /** Field keepParentOnDelete */
241  protected boolean keepParentOnDelete = false;
242  /** Field openTapsThreshold */
243  protected int openWritesThreshold = OPEN_WRITES_THRESHOLD_DEFAULT;
244
245  /** Field openedCollectors */
246  private long openedCollectors = 0;
247  /** Field collectors */
248  private final Map<String, TupleEntryCollector> collectors = new LinkedHashMap<String, TupleEntryCollector>( 1000, .75f, true );
249
250  protected abstract TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap parent, String path, long sequence ) throws IOException;
251
252  protected abstract TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap parent, String path, Input input ) throws IOException;
253
254  public enum Counters
255    {
256      Paths_Opened, Paths_Closed, Path_Purges
257    }
258
259  protected BasePartitionTap( Tap parent, Partition partition, int openWritesThreshold )
260    {
261    super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), parent.getSinkMode() );
262    this.parent = parent;
263    this.partition = partition;
264    this.openWritesThreshold = openWritesThreshold;
265    }
266
267  protected BasePartitionTap( Tap parent, Partition partition, SinkMode sinkMode )
268    {
269    super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), sinkMode );
270    this.parent = parent;
271    this.partition = partition;
272    }
273
274  protected BasePartitionTap( Tap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold )
275    {
276    super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), sinkMode );
277    this.parent = parent;
278    this.partition = partition;
279    this.keepParentOnDelete = keepParentOnDelete;
280    this.openWritesThreshold = openWritesThreshold;
281    }
282
283  /**
284   * Method getParent returns the parent Tap of this PartitionTap object.
285   *
286   * @return the parent (type Tap) of this PartitionTap object.
287   */
288  public Tap getParent()
289    {
290    return parent;
291    }
292
293  /**
294   * Method getPartition returns the {@link Partition} instance used by this PartitionTap
295   *
296   * @return the partition instance
297   */
298  public Partition getPartition()
299    {
300    return partition;
301    }
302
303  /**
304   * Method getChildPartitionIdentifiers returns an array of all identifiers for all available partitions.
305   * <p/>
306   * This method is used internally to set all incoming paths, override to limit applicable partitions.
307   * <p/>
308   * Note the returns array may be large.
309   *
310   * @param flowProcess    of type FlowProcess
311   * @param fullyQualified of type boolean
312   * @return a String[] of partition identifiers
313   * @throws IOException
314   */
315  public String[] getChildPartitionIdentifiers( FlowProcess<? extends Config> flowProcess, boolean fullyQualified ) throws IOException
316    {
317    String[] childIdentifiers = ( (FileType) parent ).getChildIdentifiers(
318      flowProcess.getConfig(),
319      partition.getPathDepth(),
320      fullyQualified
321    );
322
323    if( sourcePartitionFilters.isEmpty() )
324      return childIdentifiers;
325
326    return getFilteredPartitionIdentifiers( flowProcess, childIdentifiers );
327    }
328
329  protected String[] getFilteredPartitionIdentifiers( FlowProcess<? extends Config> flowProcess, String[] childIdentifiers )
330    {
331    Fields partitionFields = partition.getPartitionFields();
332    TupleEntry partitionEntry = new TupleEntry( partitionFields, Tuple.size( partitionFields.size() ) );
333
334    List<String> filteredIdentifiers = new ArrayList<>( childIdentifiers.length );
335
336    for( PartitionTapFilter filter : sourcePartitionFilters )
337      filter.prepare( flowProcess );
338
339    for( String childIdentifier : childIdentifiers )
340      {
341      partition.toTuple( childIdentifier.substring( parent.getFullIdentifier( flowProcess ).length() + 1 ), partitionEntry );
342
343      boolean isRemove = false;
344      for( PartitionTapFilter filter : sourcePartitionFilters )
345        {
346        if( filter.isRemove( flowProcess, partitionEntry ) )
347          {
348          isRemove = true;
349          break;
350          }
351        }
352
353      if( !isRemove )
354        filteredIdentifiers.add( childIdentifier );
355      }
356
357    for( PartitionTapFilter filter : sourcePartitionFilters )
358      filter.cleanup( flowProcess );
359
360    return filteredIdentifiers.toArray( new String[ filteredIdentifiers.size() ] );
361    }
362
363  /**
364   * Add a {@link Filter} with its associated argument selector when using this PartitionTap as a source. On read, each
365   * child identifier is converted to a {@link Tuple} using the provided {@link Partition}. Each {@link Filter} will be
366   * applied to the {@link Tuple} so that the input paths can be filtered to only accept those required for the
367   * {@link Flow}.
368   *
369   * @param argumentSelector field selector that selects Filter arguments from the input Tuple
370   * @param filter           Filter to be applied to each input Tuple
371   */
372  public void addSourcePartitionFilter( Fields argumentSelector, Filter filter )
373    {
374    Fields argumentFields;
375
376    if( argumentSelector.isAll() )
377      argumentFields = partition.getPartitionFields();
378    else
379      argumentFields = partition.getPartitionFields().select( argumentSelector );
380
381    sourcePartitionFilters.add( new PartitionTapFilter( argumentFields, filter ) );
382    }
383
384  @Override
385  public String getIdentifier()
386    {
387    return parent.getIdentifier();
388    }
389
390  protected abstract String getCurrentIdentifier( FlowProcess<? extends Config> flowProcess );
391
392  /**
393   * Method getOpenWritesThreshold returns the openTapsThreshold of this PartitionTap object.
394   *
395   * @return the openTapsThreshold (type int) of this PartitionTap object.
396   */
397  public int getOpenWritesThreshold()
398    {
399    return openWritesThreshold;
400    }
401
402  @Override
403  public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException
404    {
405    return new PartitionCollector( flowProcess );
406    }
407
408  @Override
409  public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException
410    {
411    return new PartitionIterator( flowProcess, input );
412    }
413
414  @Override
415  public boolean createResource( Config conf ) throws IOException
416    {
417    return parent.createResource( conf );
418    }
419
420  @Override
421  public boolean deleteResource( Config conf ) throws IOException
422    {
423    return keepParentOnDelete || parent.deleteResource( conf );
424    }
425
426  @Override
427  public boolean prepareResourceForRead( Config conf ) throws IOException
428    {
429    return parent.prepareResourceForRead( conf );
430    }
431
432  @Override
433  public boolean prepareResourceForWrite( Config conf ) throws IOException
434    {
435    return parent.prepareResourceForWrite( conf );
436    }
437
438  @Override
439  public boolean commitResource( Config conf ) throws IOException
440    {
441    return parent.commitResource( conf );
442    }
443
444  @Override
445  public boolean rollbackResource( Config conf ) throws IOException
446    {
447    return parent.rollbackResource( conf );
448    }
449
450  @Override
451  public boolean resourceExists( Config conf ) throws IOException
452    {
453    return parent.resourceExists( conf );
454    }
455
456  @Override
457  public long getModifiedTime( Config conf ) throws IOException
458    {
459    return parent.getModifiedTime( conf );
460    }
461
462  @Override
463  public boolean equals( Object object )
464    {
465    if( this == object )
466      return true;
467    if( object == null || getClass() != object.getClass() )
468      return false;
469    if( !super.equals( object ) )
470      return false;
471
472    BasePartitionTap that = (BasePartitionTap) object;
473
474    if( parent != null ? !parent.equals( that.parent ) : that.parent != null )
475      return false;
476    if( partition != null ? !partition.equals( that.partition ) : that.partition != null )
477      return false;
478    if( partition != null ? !sourcePartitionFilters.equals( that.sourcePartitionFilters ) : that.sourcePartitionFilters != null )
479      return false;
480
481    return true;
482    }
483
484  @Override
485  public int hashCode()
486    {
487    int result = super.hashCode();
488    result = 31 * result + ( parent != null ? parent.hashCode() : 0 );
489    result = 31 * result + ( partition != null ? partition.hashCode() : 0 );
490    result = 31 * result + ( sourcePartitionFilters != null ? sourcePartitionFilters.hashCode() : 0 );
491    return result;
492    }
493
494  @Override
495  public String toString()
496    {
497    return getClass().getSimpleName() + "[\"" + parent + "\"]" + "[\"" + partition + "\"]" + "[\"" + sourcePartitionFilters + "\"]";
498    }
499
500  public static class PartitionScheme<Config, Input, Output> extends Scheme<Config, Input, Output, Void, Void>
501    {
502    private final Scheme scheme;
503    private final Fields partitionFields;
504
505    public PartitionScheme( Scheme scheme )
506      {
507      this.scheme = scheme;
508      this.partitionFields = null;
509      }
510
511    public PartitionScheme( Scheme scheme, Fields partitionFields )
512      {
513      this.scheme = scheme;
514
515      if( partitionFields == null || partitionFields.isAll() )
516        this.partitionFields = null;
517      else if( partitionFields.isDefined() )
518        this.partitionFields = partitionFields;
519      else
520        throw new IllegalArgumentException( "partitionFields must be defined or the ALL substitution, got: " + partitionFields.printVerbose() );
521      }
522
523    public Fields getSinkFields()
524      {
525      if( partitionFields == null || scheme.getSinkFields().isAll() )
526        return scheme.getSinkFields();
527
528      return Fields.merge( scheme.getSinkFields(), partitionFields );
529      }
530
531    public void setSinkFields( Fields sinkFields )
532      {
533      scheme.setSinkFields( sinkFields );
534      }
535
536    public Fields getSourceFields()
537      {
538      if( partitionFields == null || scheme.getSourceFields().isUnknown() )
539        return scheme.getSourceFields();
540
541      return Fields.merge( scheme.getSourceFields(), partitionFields );
542      }
543
544    public void setSourceFields( Fields sourceFields )
545      {
546      scheme.setSourceFields( sourceFields );
547      }
548
549    public int getNumSinkParts()
550      {
551      return scheme.getNumSinkParts();
552      }
553
554    public void setNumSinkParts( int numSinkParts )
555      {
556      scheme.setNumSinkParts( numSinkParts );
557      }
558
559    @Override
560    public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf )
561      {
562      scheme.sourceConfInit( flowProcess, tap, conf );
563      }
564
565    @Override
566    public void sourcePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException
567      {
568      scheme.sourcePrepare( flowProcess, sourceCall );
569      }
570
571    @Override
572    public boolean source( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException
573      {
574      throw new UnsupportedOperationException( "should never be called" );
575      }
576
577    @Override
578    public void sourceCleanup( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException
579      {
580      scheme.sourceCleanup( flowProcess, sourceCall );
581      }
582
583    @Override
584    public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf )
585      {
586      scheme.sinkConfInit( flowProcess, tap, conf );
587      }
588
589    @Override
590    public void sinkPrepare( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
591      {
592      scheme.sinkPrepare( flowProcess, sinkCall );
593      }
594
595    @Override
596    public void sink( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
597      {
598      throw new UnsupportedOperationException( "should never be called" );
599      }
600
601    @Override
602    public void sinkCleanup( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
603      {
604      scheme.sinkCleanup( flowProcess, sinkCall );
605      }
606    }
607  }