001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.partition;
022    
023    import java.io.IOException;
024    import java.util.ArrayList;
025    import java.util.HashSet;
026    import java.util.Iterator;
027    import java.util.LinkedHashMap;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    
032    import cascading.flow.FlowProcess;
033    import cascading.scheme.Scheme;
034    import cascading.scheme.SinkCall;
035    import cascading.scheme.SourceCall;
036    import cascading.tap.SinkMode;
037    import cascading.tap.Tap;
038    import cascading.tap.TapException;
039    import cascading.tap.type.FileType;
040    import cascading.tuple.Fields;
041    import cascading.tuple.Tuple;
042    import cascading.tuple.TupleEntry;
043    import cascading.tuple.TupleEntryCollector;
044    import cascading.tuple.TupleEntryIterableChainIterator;
045    import cascading.tuple.TupleEntryIterator;
046    import cascading.tuple.TupleEntrySchemeCollector;
047    import cascading.tuple.TupleEntrySchemeIterator;
048    import cascading.tuple.util.TupleViews;
049    import org.slf4j.Logger;
050    import org.slf4j.LoggerFactory;
051    
052    /**
053     *
054     */
055    public abstract class BasePartitionTap<Config, Input, Output> extends Tap<Config, Input, Output>
056      {
057      /** Field LOG */
058      private static final Logger LOG = LoggerFactory.getLogger( BasePartitionTap.class );
059      /** Field OPEN_FILES_THRESHOLD_DEFAULT */
060      protected static final int OPEN_WRITES_THRESHOLD_DEFAULT = 300;
061    
062      private class PartitionIterator extends TupleEntryIterableChainIterator
063        {
064        public PartitionIterator( final FlowProcess<Config> flowProcess, Input input ) throws IOException
065          {
066          super( getSourceFields() );
067    
068          List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>();
069    
070          if( input != null )
071            {
072            String identifier = parent.getFullIdentifier( flowProcess );
073            iterators.add( createPartitionEntryIterator( flowProcess, input, identifier, getCurrentIdentifier( flowProcess ) ) );
074            }
075          else
076            {
077            String[] childIdentifiers = getChildPartitionIdentifiers( flowProcess, false );
078    
079            for( String childIdentifier : childIdentifiers )
080              iterators.add( createPartitionEntryIterator( flowProcess, null, parent.getIdentifier(), childIdentifier ) );
081            }
082    
083          reset( iterators );
084          }
085    
086        private PartitionTupleEntryIterator createPartitionEntryIterator( FlowProcess<Config> flowProcess, Input input, String parentIdentifier, String childIdentifier ) throws IOException
087          {
088          TupleEntrySchemeIterator schemeIterator = createTupleEntrySchemeIterator( flowProcess, parent, childIdentifier, input );
089    
090          return new PartitionTupleEntryIterator( getSourceFields(), partition, parentIdentifier, childIdentifier, schemeIterator );
091          }
092        }
093    
094      public class PartitionCollector extends TupleEntryCollector
095        {
096        private final FlowProcess<Config> flowProcess;
097        private final Config conf;
098        private final Fields parentFields;
099        private final Fields partitionFields;
100        private TupleEntry partitionEntry;
101        private final Tuple partitionTuple;
102        private final Tuple parentTuple;
103    
104        public PartitionCollector( FlowProcess<Config> flowProcess )
105          {
106          super( Fields.asDeclaration( getSinkFields() ) );
107          this.flowProcess = flowProcess;
108          this.conf = flowProcess.getConfigCopy();
109          this.parentFields = parent.getSinkFields();
110          this.partitionFields = ( (PartitionScheme) getScheme() ).partitionFields;
111          this.partitionEntry = new TupleEntry( this.partitionFields );
112    
113          this.partitionTuple = TupleViews.createNarrow( getSinkFields().getPos( this.partitionFields ) );
114          this.parentTuple = TupleViews.createNarrow( getSinkFields().getPos( this.parentFields ) );
115    
116          this.partitionEntry.setTuple( partitionTuple );
117          }
118    
119        TupleEntryCollector getCollector( String path )
120          {
121          TupleEntryCollector collector = collectors.get( path );
122    
123          if( collector != null )
124            return collector;
125    
126          try
127            {
128            LOG.debug( "creating collector for parent: {}, path: {}", parent.getFullIdentifier( conf ), path );
129    
130            collector = createTupleEntrySchemeCollector( flowProcess, parent, path, openedCollectors );
131    
132            openedCollectors++;
133            flowProcess.increment( Counters.Paths_Opened, 1 );
134            }
135          catch( IOException exception )
136            {
137            throw new TapException( "unable to open partition path: " + path, exception );
138            }
139    
140          if( collectors.size() > openWritesThreshold )
141            purgeCollectors();
142    
143          collectors.put( path, collector );
144    
145          if( LOG.isInfoEnabled() && collectors.size() % 100 == 0 )
146            LOG.info( "caching {} open Taps", collectors.size() );
147    
148          return collector;
149          }
150    
151        private void purgeCollectors()
152          {
153          int numToClose = Math.max( 1, (int) ( openWritesThreshold * .10 ) );
154    
155          if( LOG.isInfoEnabled() )
156            LOG.info( "removing {} open Taps from cache of size {}", numToClose, collectors.size() );
157    
158          Set<String> removeKeys = new HashSet<String>();
159          Set<String> keys = collectors.keySet();
160    
161          for( String key : keys )
162            {
163            if( numToClose-- == 0 )
164              break;
165    
166            removeKeys.add( key );
167            }
168    
169          for( String removeKey : removeKeys )
170            {
171            closeCollector( removeKey );
172            collectors.remove( removeKey );
173            }
174    
175          flowProcess.increment( Counters.Path_Purges, 1 );
176          }
177    
178        @Override
179        public void close()
180          {
181          super.close();
182    
183          try
184            {
185            for( String path : new ArrayList<String>( collectors.keySet() ) )
186              closeCollector( path );
187            }
188          finally
189            {
190            collectors.clear();
191            }
192          }
193    
194        public void closeCollector( String path )
195          {
196          TupleEntryCollector collector = collectors.get( path );
197          if( collector == null )
198            return;
199          try
200            {
201            collector.close();
202    
203            flowProcess.increment( Counters.Paths_Closed, 1 );
204            }
205          catch( Exception exception )
206            {
207            LOG.error( "exception while closing TupleEntryCollector {}", path, exception );
208    
209            boolean failOnError = false;
210            Object failProperty = flowProcess.getProperty( PartitionTapProps.FAIL_ON_CLOSE ) ;
211    
212            if ( failProperty != null )
213              failOnError = Boolean.parseBoolean( failProperty.toString() );
214    
215            if ( failOnError )
216              throw new TapException( exception );
217            }
218          }
219    
220        protected void collect( TupleEntry tupleEntry ) throws IOException
221          {
222          // reset select views
223          TupleViews.reset( partitionTuple, tupleEntry.getTuple() ); // partitionTuple is inside partitionEntry
224          TupleViews.reset( parentTuple, tupleEntry.getTuple() );
225    
226          String path = partition.toPartition( partitionEntry );
227    
228          getCollector( path ).add( parentTuple );
229          }
230        }
231    
232      /** Field parent */
233      protected Tap parent;
234      /** Field partition */
235      protected Partition partition;
236      /** Field keepParentOnDelete */
237      protected boolean keepParentOnDelete = false;
238      /** Field openTapsThreshold */
239      protected int openWritesThreshold = OPEN_WRITES_THRESHOLD_DEFAULT;
240    
241      /** Field openedCollectors */
242      private long openedCollectors = 0;
243      /** Field collectors */
244      private final Map<String, TupleEntryCollector> collectors = new LinkedHashMap<String, TupleEntryCollector>( 1000, .75f, true );
245    
246      protected abstract TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<Config> flowProcess, Tap parent, String path, long sequence ) throws IOException;
247    
248      protected abstract TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Tap parent, String path, Input input ) throws IOException;
249    
250      public enum Counters
251        {
252          Paths_Opened, Paths_Closed, Path_Purges
253        }
254    
255      protected BasePartitionTap( Tap parent, Partition partition, int openWritesThreshold )
256        {
257        super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), parent.getSinkMode() );
258        this.parent = parent;
259        this.partition = partition;
260        this.openWritesThreshold = openWritesThreshold;
261        }
262    
263      protected BasePartitionTap( Tap parent, Partition partition, SinkMode sinkMode )
264        {
265        super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), sinkMode );
266        this.parent = parent;
267        this.partition = partition;
268        }
269    
270      protected BasePartitionTap( Tap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold )
271        {
272        super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), sinkMode );
273        this.parent = parent;
274        this.partition = partition;
275        this.keepParentOnDelete = keepParentOnDelete;
276        this.openWritesThreshold = openWritesThreshold;
277        }
278    
279      /**
280       * Method getParent returns the parent Tap of this PartitionTap object.
281       *
282       * @return the parent (type Tap) of this PartitionTap object.
283       */
284      public Tap getParent()
285        {
286        return parent;
287        }
288    
289      /**
290       * Method getPartition returns the {@link Partition} instance used by this PartitionTap
291       *
292       * @return the partition instance
293       */
294      public Partition getPartition()
295        {
296        return partition;
297        }
298    
299      /**
300       * Method getChildPartitionIdentifiers returns an array of all identifiers for all available partitions.
301       * <p/>
302       * This method is used internally to set all incoming paths, override to limit applicable partitions.
303       * <p/>
304       * Note the returns array may be large.
305       *
306       * @param flowProcess    of type FlowProcess
307       * @param fullyQualified of type boolean
308       * @return a String[] of partition identifiers
309       * @throws IOException
310       */
311      public String[] getChildPartitionIdentifiers( FlowProcess<Config> flowProcess, boolean fullyQualified ) throws IOException
312        {
313        return ( (FileType) parent ).getChildIdentifiers( flowProcess.getConfigCopy(), partition.getPathDepth(), fullyQualified );
314        }
315    
316      @Override
317      public String getIdentifier()
318        {
319        return parent.getIdentifier();
320        }
321    
322      protected abstract String getCurrentIdentifier( FlowProcess<Config> flowProcess );
323    
324      /**
325       * Method getOpenWritesThreshold returns the openTapsThreshold of this PartitionTap object.
326       *
327       * @return the openTapsThreshold (type int) of this PartitionTap object.
328       */
329      public int getOpenWritesThreshold()
330        {
331        return openWritesThreshold;
332        }
333    
334      @Override
335      public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess, Output output ) throws IOException
336        {
337        return new PartitionCollector( flowProcess );
338        }
339    
340      @Override
341      public TupleEntryIterator openForRead( FlowProcess<Config> flowProcess, Input input ) throws IOException
342        {
343        return new PartitionIterator( flowProcess, input );
344        }
345    
346      @Override
347      public boolean createResource( Config conf ) throws IOException
348        {
349        return parent.createResource( conf );
350        }
351    
352      @Override
353      public boolean deleteResource( Config conf ) throws IOException
354        {
355        return keepParentOnDelete || parent.deleteResource( conf );
356        }
357    
358      @Override
359      public boolean commitResource( Config conf ) throws IOException
360        {
361        return parent.commitResource( conf );
362        }
363    
364      @Override
365      public boolean rollbackResource( Config conf ) throws IOException
366        {
367        return parent.rollbackResource( conf );
368        }
369    
370      @Override
371      public boolean resourceExists( Config conf ) throws IOException
372        {
373        return parent.resourceExists( conf );
374        }
375    
376      @Override
377      public long getModifiedTime( Config conf ) throws IOException
378        {
379        return parent.getModifiedTime( conf );
380        }
381    
382      @Override
383      public boolean equals( Object object )
384        {
385        if( this == object )
386          return true;
387        if( object == null || getClass() != object.getClass() )
388          return false;
389        if( !super.equals( object ) )
390          return false;
391    
392        BasePartitionTap that = (BasePartitionTap) object;
393    
394        if( parent != null ? !parent.equals( that.parent ) : that.parent != null )
395          return false;
396        if( partition != null ? !partition.equals( that.partition ) : that.partition != null )
397          return false;
398    
399        return true;
400        }
401    
402      @Override
403      public int hashCode()
404        {
405        int result = super.hashCode();
406        result = 31 * result + ( parent != null ? parent.hashCode() : 0 );
407        result = 31 * result + ( partition != null ? partition.hashCode() : 0 );
408        return result;
409        }
410    
411      @Override
412      public String toString()
413        {
414        return getClass().getSimpleName() + "[\"" + parent + "\"]" + "[\"" + partition + "\"]";
415        }
416    
417      public static class PartitionScheme<Config, Input, Output> extends Scheme<Config, Input, Output, Void, Void>
418        {
419        private final Scheme scheme;
420        private final Fields partitionFields;
421    
422        public PartitionScheme( Scheme scheme )
423          {
424          this.scheme = scheme;
425          this.partitionFields = null;
426          }
427    
428        public PartitionScheme( Scheme scheme, Fields partitionFields )
429          {
430          this.scheme = scheme;
431    
432          if( partitionFields == null || partitionFields.isAll() )
433            this.partitionFields = null;
434          else if( partitionFields.isDefined() )
435            this.partitionFields = partitionFields;
436          else
437            throw new IllegalArgumentException( "partitionFields must be defined or the ALL substitution, got: " + partitionFields.printVerbose() );
438          }
439    
440        public Fields getSinkFields()
441          {
442          if( partitionFields == null || scheme.getSinkFields().isAll() )
443            return scheme.getSinkFields();
444    
445          return Fields.merge( scheme.getSinkFields(), partitionFields );
446          }
447    
448        public void setSinkFields( Fields sinkFields )
449          {
450          scheme.setSinkFields( sinkFields );
451          }
452    
453        public Fields getSourceFields()
454          {
455          if( partitionFields == null || scheme.getSourceFields().isUnknown() )
456            return scheme.getSourceFields();
457    
458          return Fields.merge( scheme.getSourceFields(), partitionFields );
459          }
460    
461        public void setSourceFields( Fields sourceFields )
462          {
463          scheme.setSourceFields( sourceFields );
464          }
465    
466        public int getNumSinkParts()
467          {
468          return scheme.getNumSinkParts();
469          }
470    
471        public void setNumSinkParts( int numSinkParts )
472          {
473          scheme.setNumSinkParts( numSinkParts );
474          }
475    
476        @Override
477        public void sourceConfInit( FlowProcess<Config> flowProcess, Tap<Config, Input, Output> tap, Config conf )
478          {
479          scheme.sourceConfInit( flowProcess, tap, conf );
480          }
481    
482        @Override
483        public void sourcePrepare( FlowProcess<Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException
484          {
485          scheme.sourcePrepare( flowProcess, sourceCall );
486          }
487    
488        @Override
489        public boolean source( FlowProcess<Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException
490          {
491          throw new UnsupportedOperationException( "should never be called" );
492          }
493    
494        @Override
495        public void sourceCleanup( FlowProcess<Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException
496          {
497          scheme.sourceCleanup( flowProcess, sourceCall );
498          }
499    
500        @Override
501        public void sinkConfInit( FlowProcess<Config> flowProcess, Tap<Config, Input, Output> tap, Config conf )
502          {
503          scheme.sinkConfInit( flowProcess, tap, conf );
504          }
505    
506        @Override
507        public void sinkPrepare( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
508          {
509          scheme.sinkPrepare( flowProcess, sinkCall );
510          }
511    
512        @Override
513        public void sink( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
514          {
515          throw new UnsupportedOperationException( "should never be called" );
516          }
517    
518        @Override
519        public void sinkCleanup( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
520          {
521          scheme.sinkCleanup( flowProcess, sinkCall );
522          }
523        }
524      }