001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap;
022    
023    import java.io.IOException;
024    import java.util.HashSet;
025    import java.util.LinkedHashMap;
026    import java.util.Map;
027    import java.util.Set;
028    
029    import cascading.flow.FlowProcess;
030    import cascading.scheme.Scheme;
031    import cascading.scheme.SinkCall;
032    import cascading.scheme.SourceCall;
033    import cascading.tuple.Fields;
034    import cascading.tuple.Tuple;
035    import cascading.tuple.TupleEntry;
036    import cascading.tuple.TupleEntryCollector;
037    import cascading.tuple.TupleEntrySchemeCollector;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     *
043     */
044    public abstract class BaseTemplateTap<Config, Output> extends SinkTap<Config, Output>
045      {
046      /** Field LOG */
047      private static final Logger LOG = LoggerFactory.getLogger( BaseTemplateTap.class );
048      /** Field OPEN_FILES_THRESHOLD_DEFAULT */
049      protected static final int OPEN_TAPS_THRESHOLD_DEFAULT = 300;
050    
051      private class TemplateCollector extends TupleEntryCollector
052        {
053        private final FlowProcess<Config> flowProcess;
054        private final Config conf;
055        private final Fields parentFields;
056        private final Fields pathFields;
057    
058        public TemplateCollector( FlowProcess<Config> flowProcess )
059          {
060          super( Fields.asDeclaration( getSinkFields() ) );
061          this.flowProcess = flowProcess;
062          this.conf = flowProcess.getConfigCopy();
063          this.parentFields = parent.getSinkFields();
064          this.pathFields = ( (TemplateScheme) getScheme() ).pathFields;
065          }
066    
067        private TupleEntryCollector getCollector( String path )
068          {
069          TupleEntryCollector collector = collectors.get( path );
070    
071          if( collector != null )
072            return collector;
073    
074          try
075            {
076            LOG.debug( "creating collector for parent: {}, path: {}", parent.getFullIdentifier( conf ), path );
077    
078            collector = createTupleEntrySchemeCollector( flowProcess, parent, path );
079    
080            flowProcess.increment( Counters.Paths_Opened, 1 );
081            }
082          catch( IOException exception )
083            {
084            throw new TapException( "unable to open template path: " + path, exception );
085            }
086    
087          if( collectors.size() > openTapsThreshold )
088            purgeCollectors();
089    
090          collectors.put( path, collector );
091    
092          if( LOG.isInfoEnabled() && collectors.size() % 100 == 0 )
093            LOG.info( "caching {} open Taps", collectors.size() );
094    
095          return collector;
096          }
097    
098        private void purgeCollectors()
099          {
100          int numToClose = Math.max( 1, (int) ( openTapsThreshold * .10 ) );
101    
102          if( LOG.isInfoEnabled() )
103            LOG.info( "removing {} open Taps from cache of size {}", numToClose, collectors.size() );
104    
105          Set<String> removeKeys = new HashSet<String>();
106          Set<String> keys = collectors.keySet();
107    
108          for( String key : keys )
109            {
110            if( numToClose-- == 0 )
111              break;
112    
113            removeKeys.add( key );
114            }
115    
116          for( String removeKey : removeKeys )
117            closeCollector( collectors.remove( removeKey ) );
118    
119          flowProcess.increment( Counters.Path_Purges, 1 );
120          }
121    
122        @Override
123        public void close()
124          {
125          super.close();
126    
127          try
128            {
129            for( TupleEntryCollector collector : collectors.values() )
130              closeCollector( collector );
131            }
132          finally
133            {
134            collectors.clear();
135            }
136          }
137    
138        private void closeCollector( TupleEntryCollector collector )
139          {
140          if( collector == null )
141            return;
142    
143          try
144            {
145            collector.close();
146    
147            flowProcess.increment( Counters.Paths_Closed, 1 );
148            }
149          catch( Exception exception )
150            {
151            // do nothing
152            }
153          }
154    
155        protected void collect( TupleEntry tupleEntry ) throws IOException
156          {
157          if( pathFields != null )
158            {
159            Tuple pathValues = tupleEntry.selectTuple( pathFields );
160            String path = pathValues.format( pathTemplate );
161    
162            getCollector( path ).add( tupleEntry.selectTuple( parentFields ) );
163            }
164          else
165            {
166            String path = tupleEntry.getTuple().format( pathTemplate );
167    
168            getCollector( path ).add( tupleEntry );
169            }
170          }
171        }
172    
173      /** Field parent */
174      protected Tap parent;
175      /** Field pathTemplate */
176      protected String pathTemplate;
177      /** Field keepParentOnDelete */
178      protected boolean keepParentOnDelete = false;
179      /** Field openTapsThreshold */
180      protected int openTapsThreshold = OPEN_TAPS_THRESHOLD_DEFAULT;
181      /** Field collectors */
182      private final Map<String, TupleEntryCollector> collectors = new LinkedHashMap<String, TupleEntryCollector>( 1000, .75f, true );
183    
184      protected abstract TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<Config> flowProcess, Tap parent, String path ) throws IOException;
185    
186      /**
187       * Method getParent returns the parent Tap of this TemplateTap object.
188       *
189       * @return the parent (type Tap) of this TemplateTap object.
190       */
191      public Tap getParent()
192        {
193        return parent;
194        }
195    
196      /**
197       * Method getPathTemplate returns the pathTemplate {@link java.util.Formatter} format String of this TemplateTap object.
198       *
199       * @return the pathTemplate (type String) of this TemplateTap object.
200       */
201      public String getPathTemplate()
202        {
203        return pathTemplate;
204        }
205    
206      @Override
207      public String getIdentifier()
208        {
209        return parent.getIdentifier();
210        }
211    
212      /**
213       * Method getOpenTapsThreshold returns the openTapsThreshold of this TemplateTap object.
214       *
215       * @return the openTapsThreshold (type int) of this TemplateTap object.
216       */
217      public int getOpenTapsThreshold()
218        {
219        return openTapsThreshold;
220        }
221    
222      @Override
223      public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess, Output output ) throws IOException
224        {
225        return new TemplateCollector( flowProcess );
226        }
227    
228      /** @see cascading.tap.Tap#createResource(Object) */
229      public boolean createResource( Config conf ) throws IOException
230        {
231        return parent.createResource( conf );
232        }
233    
234      /** @see cascading.tap.Tap#deleteResource(Object) */
235      public boolean deleteResource( Config conf ) throws IOException
236        {
237        return keepParentOnDelete || parent.deleteResource( conf );
238        }
239    
240      @Override
241      public boolean commitResource( Config conf ) throws IOException
242        {
243        return parent.commitResource( conf );
244        }
245    
246      @Override
247      public boolean rollbackResource( Config conf ) throws IOException
248        {
249        return parent.rollbackResource( conf );
250        }
251    
252      /** @see cascading.tap.Tap#resourceExists(Object) */
253      public boolean resourceExists( Config conf ) throws IOException
254        {
255        return parent.resourceExists( conf );
256        }
257    
258      /** @see cascading.tap.Tap#getModifiedTime(Object) */
259      @Override
260      public long getModifiedTime( Config conf ) throws IOException
261        {
262        return parent.getModifiedTime( conf );
263        }
264    
265      @Override
266      public boolean equals( Object object )
267        {
268        if( this == object )
269          return true;
270        if( object == null || getClass() != object.getClass() )
271          return false;
272        if( !super.equals( object ) )
273          return false;
274    
275        BaseTemplateTap that = (BaseTemplateTap) object;
276    
277        if( parent != null ? !parent.equals( that.parent ) : that.parent != null )
278          return false;
279        if( pathTemplate != null ? !pathTemplate.equals( that.pathTemplate ) : that.pathTemplate != null )
280          return false;
281    
282        return true;
283        }
284    
285      @Override
286      public int hashCode()
287        {
288        int result = super.hashCode();
289        result = 31 * result + ( parent != null ? parent.hashCode() : 0 );
290        result = 31 * result + ( pathTemplate != null ? pathTemplate.hashCode() : 0 );
291        return result;
292        }
293    
294      @Override
295      public String toString()
296        {
297        return getClass().getSimpleName() + "[\"" + parent + "\"]" + "[\"" + pathTemplate + "\"]";
298        }
299    
300      public enum Counters
301        {
302          Paths_Opened, Paths_Closed, Path_Purges
303        }
304    
305      protected BaseTemplateTap( Tap parent, String pathTemplate, int openTapsThreshold )
306        {
307        this( new TemplateScheme( parent.getScheme() ) );
308        this.parent = parent;
309        this.pathTemplate = pathTemplate;
310        this.openTapsThreshold = openTapsThreshold;
311        }
312    
313      protected BaseTemplateTap( Tap parent, String pathTemplate, SinkMode sinkMode )
314        {
315        super( new TemplateScheme( parent.getScheme() ), sinkMode );
316        this.parent = parent;
317        this.pathTemplate = pathTemplate;
318        }
319    
320      protected BaseTemplateTap( Tap parent, String pathTemplate, SinkMode sinkMode, boolean keepParentOnDelete, int openTapsThreshold )
321        {
322        super( new TemplateScheme( parent.getScheme() ), sinkMode );
323        this.parent = parent;
324        this.pathTemplate = pathTemplate;
325        this.keepParentOnDelete = keepParentOnDelete;
326        this.openTapsThreshold = openTapsThreshold;
327        }
328    
329      protected BaseTemplateTap( Tap parent, String pathTemplate, Fields pathFields, int openTapsThreshold )
330        {
331        super( new TemplateScheme( parent.getScheme(), pathFields ) );
332        this.parent = parent;
333        this.pathTemplate = pathTemplate;
334        this.openTapsThreshold = openTapsThreshold;
335        }
336    
337      protected BaseTemplateTap( Tap parent, String pathTemplate, Fields pathFields, SinkMode sinkMode )
338        {
339        super( new TemplateScheme( parent.getScheme(), pathFields ), sinkMode );
340        this.parent = parent;
341        this.pathTemplate = pathTemplate;
342        }
343    
344      protected BaseTemplateTap( Tap parent, String pathTemplate, Fields pathFields, SinkMode sinkMode, boolean keepParentOnDelete, int openTapsThreshold )
345        {
346        super( new TemplateScheme( parent.getScheme(), pathFields ), sinkMode );
347        this.parent = parent;
348        this.pathTemplate = pathTemplate;
349        this.keepParentOnDelete = keepParentOnDelete;
350        this.openTapsThreshold = openTapsThreshold;
351        }
352    
353      protected BaseTemplateTap( Scheme<Config, ?, Output, ?, ?> scheme, SinkMode sinkMode )
354        {
355        super( scheme, sinkMode );
356        }
357    
358      protected BaseTemplateTap( Scheme<Config, ?, Output, ?, ?> scheme )
359        {
360        super( scheme );
361        }
362    
363      public static class TemplateScheme<Config, Output> extends Scheme<Config, Void, Output, Void, Void>
364        {
365        private final Scheme scheme;
366        private final Fields pathFields;
367    
368        public TemplateScheme( Scheme scheme )
369          {
370          this.scheme = scheme;
371          this.pathFields = null;
372          }
373    
374        public TemplateScheme( Scheme scheme, Fields pathFields )
375          {
376          this.scheme = scheme;
377    
378          if( pathFields == null || pathFields.isAll() )
379            this.pathFields = null;
380          else if( pathFields.isDefined() )
381            this.pathFields = pathFields;
382          else
383            throw new IllegalArgumentException( "pathFields must be defined or the ALL substitution, got: " + pathFields.printVerbose() );
384          }
385    
386        public Fields getSinkFields()
387          {
388          if( pathFields == null || scheme.getSinkFields().isAll() )
389            return scheme.getSinkFields();
390    
391          return Fields.merge( scheme.getSinkFields(), pathFields );
392          }
393    
394        public void setSinkFields( Fields sinkFields )
395          {
396          scheme.setSinkFields( sinkFields );
397          }
398    
399        public Fields getSourceFields()
400          {
401          return scheme.getSourceFields();
402          }
403    
404        public void setSourceFields( Fields sourceFields )
405          {
406          scheme.setSourceFields( sourceFields );
407          }
408    
409        public int getNumSinkParts()
410          {
411          return scheme.getNumSinkParts();
412          }
413    
414        public void setNumSinkParts( int numSinkParts )
415          {
416          scheme.setNumSinkParts( numSinkParts );
417          }
418    
419        @Override
420        public void sourceConfInit( FlowProcess<Config> flowProcess, Tap<Config, Void, Output> tap, Config conf )
421          {
422          scheme.sourceConfInit( flowProcess, tap, conf );
423          }
424    
425        @Override
426        public void sourcePrepare( FlowProcess<Config> flowProcess, SourceCall<Void, Void> sourceCall ) throws IOException
427          {
428          scheme.sourcePrepare( flowProcess, sourceCall );
429          }
430    
431        @Override
432        public boolean source( FlowProcess<Config> flowProcess, SourceCall<Void, Void> sourceCall ) throws IOException
433          {
434          throw new UnsupportedOperationException( "not supported" );
435          }
436    
437        @Override
438        public void sourceCleanup( FlowProcess<Config> flowProcess, SourceCall<Void, Void> sourceCall ) throws IOException
439          {
440          scheme.sourceCleanup( flowProcess, sourceCall );
441          }
442    
443        @Override
444        public void sinkConfInit( FlowProcess<Config> flowProcess, Tap<Config, Void, Output> tap, Config conf )
445          {
446          scheme.sinkConfInit( flowProcess, tap, conf );
447          }
448    
449        @Override
450        public void sinkPrepare( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
451          {
452          scheme.sinkPrepare( flowProcess, sinkCall );
453          }
454    
455        @Override
456        public void sink( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
457          {
458          throw new UnsupportedOperationException( "should never be called" );
459          }
460    
461        @Override
462        public void sinkCleanup( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
463          {
464          scheme.sinkCleanup( flowProcess, sinkCall );
465          }
466        }
467      }