001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.partition;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.LinkedHashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import cascading.flow.FlowProcess;
033import cascading.scheme.Scheme;
034import cascading.scheme.SinkCall;
035import cascading.scheme.SourceCall;
036import cascading.tap.SinkMode;
037import cascading.tap.Tap;
038import cascading.tap.TapException;
039import cascading.tap.type.FileType;
040import cascading.tuple.Fields;
041import cascading.tuple.Tuple;
042import cascading.tuple.TupleEntry;
043import cascading.tuple.TupleEntryCollector;
044import cascading.tuple.TupleEntryIterableChainIterator;
045import cascading.tuple.TupleEntryIterator;
046import cascading.tuple.TupleEntrySchemeCollector;
047import cascading.tuple.TupleEntrySchemeIterator;
048import cascading.tuple.util.TupleViews;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 *
054 */
055public abstract class BasePartitionTap<Config, Input, Output> extends Tap<Config, Input, Output>
056  {
057  /** Field LOG */
058  private static final Logger LOG = LoggerFactory.getLogger( BasePartitionTap.class );
059  /** Field OPEN_FILES_THRESHOLD_DEFAULT */
060  protected static final int OPEN_WRITES_THRESHOLD_DEFAULT = 300;
061
062  private class PartitionIterator extends TupleEntryIterableChainIterator
063    {
064    public PartitionIterator( final FlowProcess<? extends Config> flowProcess, Input input ) throws IOException
065      {
066      super( getSourceFields() );
067
068      List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>();
069
070      if( input != null )
071        {
072        String identifier = parent.getFullIdentifier( flowProcess );
073        iterators.add( createPartitionEntryIterator( flowProcess, input, identifier, getCurrentIdentifier( flowProcess ) ) );
074        }
075      else
076        {
077        String[] childIdentifiers = getChildPartitionIdentifiers( flowProcess, false );
078
079        for( String childIdentifier : childIdentifiers )
080          iterators.add( createPartitionEntryIterator( flowProcess, null, parent.getIdentifier(), childIdentifier ) );
081        }
082
083      reset( iterators );
084      }
085
086    private PartitionTupleEntryIterator createPartitionEntryIterator( FlowProcess<? extends Config> flowProcess, Input input, String parentIdentifier, String childIdentifier ) throws IOException
087      {
088      TupleEntrySchemeIterator schemeIterator = createTupleEntrySchemeIterator( flowProcess, parent, childIdentifier, input );
089
090      return new PartitionTupleEntryIterator( getSourceFields(), partition, parentIdentifier, childIdentifier, schemeIterator );
091      }
092    }
093
094  public class PartitionCollector extends TupleEntryCollector
095    {
096    private final FlowProcess<? extends Config> flowProcess;
097    private final Config conf;
098    private final Fields parentFields;
099    private final Fields partitionFields;
100    private TupleEntry partitionEntry;
101    private final Tuple partitionTuple;
102    private final Tuple parentTuple;
103
104    public PartitionCollector( FlowProcess<? extends Config> flowProcess )
105      {
106      super( Fields.asDeclaration( getSinkFields() ) );
107      this.flowProcess = flowProcess;
108      this.conf = flowProcess.getConfigCopy();
109      this.parentFields = parent.getSinkFields();
110      this.partitionFields = ( (PartitionScheme) getScheme() ).partitionFields;
111      this.partitionEntry = new TupleEntry( this.partitionFields );
112
113      this.partitionTuple = TupleViews.createNarrow( getSinkFields().getPos( this.partitionFields ) );
114      this.parentTuple = TupleViews.createNarrow( getSinkFields().getPos( this.parentFields ) );
115
116      this.partitionEntry.setTuple( partitionTuple );
117      }
118
119    TupleEntryCollector getCollector( String path )
120      {
121      TupleEntryCollector collector = collectors.get( path );
122
123      if( collector != null )
124        return collector;
125
126      try
127        {
128        LOG.debug( "creating collector for parent: {}, path: {}", parent.getFullIdentifier( conf ), path );
129
130        collector = createTupleEntrySchemeCollector( flowProcess, parent, path, openedCollectors );
131
132        openedCollectors++;
133        flowProcess.increment( Counters.Paths_Opened, 1 );
134        }
135      catch( IOException exception )
136        {
137        throw new TapException( "unable to open partition path: " + path, exception );
138        }
139
140      if( collectors.size() > openWritesThreshold )
141        purgeCollectors();
142
143      collectors.put( path, collector );
144
145      if( LOG.isInfoEnabled() && collectors.size() % 100 == 0 )
146        LOG.info( "caching {} open Taps", collectors.size() );
147
148      return collector;
149      }
150
151    private void purgeCollectors()
152      {
153      int numToClose = Math.max( 1, (int) ( openWritesThreshold * .10 ) );
154
155      if( LOG.isInfoEnabled() )
156        LOG.info( "removing {} open Taps from cache of size {}", numToClose, collectors.size() );
157
158      Set<String> removeKeys = new HashSet<String>();
159      Set<String> keys = collectors.keySet();
160
161      for( String key : keys )
162        {
163        if( numToClose-- == 0 )
164          break;
165
166        removeKeys.add( key );
167        }
168
169      for( String removeKey : removeKeys )
170        {
171        closeCollector( removeKey );
172        collectors.remove( removeKey );
173        }
174
175      flowProcess.increment( Counters.Path_Purges, 1 );
176      }
177
178    @Override
179    public void close()
180      {
181      super.close();
182
183      try
184        {
185        for( String path : new ArrayList<String>( collectors.keySet() ) )
186          closeCollector( path );
187        }
188      finally
189        {
190        collectors.clear();
191        }
192      }
193
194    public void closeCollector( String path )
195      {
196      TupleEntryCollector collector = collectors.get( path );
197      if( collector == null )
198        return;
199      try
200        {
201        collector.close();
202
203        flowProcess.increment( Counters.Paths_Closed, 1 );
204        }
205      catch( Exception exception )
206        {
207        LOG.error( "exception while closing TupleEntryCollector {}", path, exception );
208
209        boolean failOnError = false;
210        Object failProperty = flowProcess.getProperty( PartitionTapProps.FAIL_ON_CLOSE );
211
212        if( failProperty != null )
213          failOnError = Boolean.parseBoolean( failProperty.toString() );
214
215        if( failOnError )
216          throw new TapException( exception );
217        }
218      }
219
220    protected void collect( TupleEntry tupleEntry ) throws IOException
221      {
222      // reset select views
223      TupleViews.reset( partitionTuple, tupleEntry.getTuple() ); // partitionTuple is inside partitionEntry
224      TupleViews.reset( parentTuple, tupleEntry.getTuple() );
225
226      String path = partition.toPartition( partitionEntry );
227
228      getCollector( path ).add( parentTuple );
229      }
230    }
231
232  /** Field parent */
233  protected Tap parent;
234  /** Field partition */
235  protected Partition partition;
236  /** Field keepParentOnDelete */
237  protected boolean keepParentOnDelete = false;
238  /** Field openTapsThreshold */
239  protected int openWritesThreshold = OPEN_WRITES_THRESHOLD_DEFAULT;
240
241  /** Field openedCollectors */
242  private long openedCollectors = 0;
243  /** Field collectors */
244  private final Map<String, TupleEntryCollector> collectors = new LinkedHashMap<String, TupleEntryCollector>( 1000, .75f, true );
245
246  protected abstract TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap parent, String path, long sequence ) throws IOException;
247
248  protected abstract TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap parent, String path, Input input ) throws IOException;
249
250  public enum Counters
251    {
252      Paths_Opened, Paths_Closed, Path_Purges
253    }
254
255  protected BasePartitionTap( Tap parent, Partition partition, int openWritesThreshold )
256    {
257    super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), parent.getSinkMode() );
258    this.parent = parent;
259    this.partition = partition;
260    this.openWritesThreshold = openWritesThreshold;
261    }
262
263  protected BasePartitionTap( Tap parent, Partition partition, SinkMode sinkMode )
264    {
265    super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), sinkMode );
266    this.parent = parent;
267    this.partition = partition;
268    }
269
270  protected BasePartitionTap( Tap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold )
271    {
272    super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), sinkMode );
273    this.parent = parent;
274    this.partition = partition;
275    this.keepParentOnDelete = keepParentOnDelete;
276    this.openWritesThreshold = openWritesThreshold;
277    }
278
279  /**
280   * Method getParent returns the parent Tap of this PartitionTap object.
281   *
282   * @return the parent (type Tap) of this PartitionTap object.
283   */
284  public Tap getParent()
285    {
286    return parent;
287    }
288
289  /**
290   * Method getPartition returns the {@link Partition} instance used by this PartitionTap
291   *
292   * @return the partition instance
293   */
294  public Partition getPartition()
295    {
296    return partition;
297    }
298
299  /**
300   * Method getChildPartitionIdentifiers returns an array of all identifiers for all available partitions.
301   * <p/>
302   * This method is used internally to set all incoming paths, override to limit applicable partitions.
303   * <p/>
304   * Note the returns array may be large.
305   *
306   * @param flowProcess    of type FlowProcess
307   * @param fullyQualified of type boolean
308   * @return a String[] of partition identifiers
309   * @throws IOException
310   */
311  public String[] getChildPartitionIdentifiers( FlowProcess<? extends Config> flowProcess, boolean fullyQualified ) throws IOException
312    {
313    return ( (FileType) parent ).getChildIdentifiers( flowProcess.getConfig(), partition.getPathDepth(), fullyQualified );
314    }
315
316  @Override
317  public String getIdentifier()
318    {
319    return parent.getIdentifier();
320    }
321
322  protected abstract String getCurrentIdentifier( FlowProcess<? extends Config> flowProcess );
323
324  /**
325   * Method getOpenWritesThreshold returns the openTapsThreshold of this PartitionTap object.
326   *
327   * @return the openTapsThreshold (type int) of this PartitionTap object.
328   */
329  public int getOpenWritesThreshold()
330    {
331    return openWritesThreshold;
332    }
333
334  @Override
335  public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException
336    {
337    return new PartitionCollector( flowProcess );
338    }
339
340  @Override
341  public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException
342    {
343    return new PartitionIterator( flowProcess, input );
344    }
345
346  @Override
347  public boolean createResource( Config conf ) throws IOException
348    {
349    return parent.createResource( conf );
350    }
351
352  @Override
353  public boolean deleteResource( Config conf ) throws IOException
354    {
355    return keepParentOnDelete || parent.deleteResource( conf );
356    }
357
358  @Override
359  public boolean prepareResourceForRead( Config conf ) throws IOException
360    {
361    return parent.prepareResourceForRead( conf );
362    }
363
364  @Override
365  public boolean prepareResourceForWrite( Config conf ) throws IOException
366    {
367    return parent.prepareResourceForWrite( conf );
368    }
369
370  @Override
371  public boolean commitResource( Config conf ) throws IOException
372    {
373    return parent.commitResource( conf );
374    }
375
376  @Override
377  public boolean rollbackResource( Config conf ) throws IOException
378    {
379    return parent.rollbackResource( conf );
380    }
381
382  @Override
383  public boolean resourceExists( Config conf ) throws IOException
384    {
385    return parent.resourceExists( conf );
386    }
387
388  @Override
389  public long getModifiedTime( Config conf ) throws IOException
390    {
391    return parent.getModifiedTime( conf );
392    }
393
394  @Override
395  public boolean equals( Object object )
396    {
397    if( this == object )
398      return true;
399    if( object == null || getClass() != object.getClass() )
400      return false;
401    if( !super.equals( object ) )
402      return false;
403
404    BasePartitionTap that = (BasePartitionTap) object;
405
406    if( parent != null ? !parent.equals( that.parent ) : that.parent != null )
407      return false;
408    if( partition != null ? !partition.equals( that.partition ) : that.partition != null )
409      return false;
410
411    return true;
412    }
413
414  @Override
415  public int hashCode()
416    {
417    int result = super.hashCode();
418    result = 31 * result + ( parent != null ? parent.hashCode() : 0 );
419    result = 31 * result + ( partition != null ? partition.hashCode() : 0 );
420    return result;
421    }
422
423  @Override
424  public String toString()
425    {
426    return getClass().getSimpleName() + "[\"" + parent + "\"]" + "[\"" + partition + "\"]";
427    }
428
429  public static class PartitionScheme<Config, Input, Output> extends Scheme<Config, Input, Output, Void, Void>
430    {
431    private final Scheme scheme;
432    private final Fields partitionFields;
433
434    public PartitionScheme( Scheme scheme )
435      {
436      this.scheme = scheme;
437      this.partitionFields = null;
438      }
439
440    public PartitionScheme( Scheme scheme, Fields partitionFields )
441      {
442      this.scheme = scheme;
443
444      if( partitionFields == null || partitionFields.isAll() )
445        this.partitionFields = null;
446      else if( partitionFields.isDefined() )
447        this.partitionFields = partitionFields;
448      else
449        throw new IllegalArgumentException( "partitionFields must be defined or the ALL substitution, got: " + partitionFields.printVerbose() );
450      }
451
452    public Fields getSinkFields()
453      {
454      if( partitionFields == null || scheme.getSinkFields().isAll() )
455        return scheme.getSinkFields();
456
457      return Fields.merge( scheme.getSinkFields(), partitionFields );
458      }
459
460    public void setSinkFields( Fields sinkFields )
461      {
462      scheme.setSinkFields( sinkFields );
463      }
464
465    public Fields getSourceFields()
466      {
467      if( partitionFields == null || scheme.getSourceFields().isUnknown() )
468        return scheme.getSourceFields();
469
470      return Fields.merge( scheme.getSourceFields(), partitionFields );
471      }
472
473    public void setSourceFields( Fields sourceFields )
474      {
475      scheme.setSourceFields( sourceFields );
476      }
477
478    public int getNumSinkParts()
479      {
480      return scheme.getNumSinkParts();
481      }
482
483    public void setNumSinkParts( int numSinkParts )
484      {
485      scheme.setNumSinkParts( numSinkParts );
486      }
487
488    @Override
489    public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf )
490      {
491      scheme.sourceConfInit( flowProcess, tap, conf );
492      }
493
494    @Override
495    public void sourcePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException
496      {
497      scheme.sourcePrepare( flowProcess, sourceCall );
498      }
499
500    @Override
501    public boolean source( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException
502      {
503      throw new UnsupportedOperationException( "should never be called" );
504      }
505
506    @Override
507    public void sourceCleanup( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException
508      {
509      scheme.sourceCleanup( flowProcess, sourceCall );
510      }
511
512    @Override
513    public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf )
514      {
515      scheme.sinkConfInit( flowProcess, tap, conf );
516      }
517
518    @Override
519    public void sinkPrepare( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
520      {
521      scheme.sinkPrepare( flowProcess, sinkCall );
522      }
523
524    @Override
525    public void sink( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
526      {
527      throw new UnsupportedOperationException( "should never be called" );
528      }
529
530    @Override
531    public void sinkCleanup( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
532      {
533      scheme.sinkCleanup( flowProcess, sinkCall );
534      }
535    }
536  }