001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.util.Set;
026    
027    import cascading.flow.Flow;
028    import cascading.flow.FlowElement;
029    import cascading.flow.FlowProcess;
030    import cascading.flow.planner.Scope;
031    import cascading.property.ConfigDef;
032    import cascading.scheme.Scheme;
033    import cascading.tuple.Fields;
034    import cascading.tuple.TupleEntryCollector;
035    import cascading.tuple.TupleEntryIterator;
036    
037    /**
038     * Class DecoratorTap wraps a given {@link Tap} instance, delegating all calls to the original.
039     * <p/>
040     * It also provides an additional generic field that may hold any custom type, this allows implementations
041     * to attach any meta-info to the tap being decorated.
042     * <p/>
043     * Further, sub-classes of DecoratorTap can be used to decorate any Tap instances associated or created for
044     * use by the {@link cascading.pipe.Checkpoint} {@link cascading.pipe.Pipe}.
045     * <p/>
046     * Sub-classing this is optional if the aim is to simply attach relevant meta-info for use by a given application.
047     * <p/>
048     * In order to pass any meta-info to a management service via the {@link cascading.management.annotation.Property}
049     * annotation, a sub-class of DecoratorTap must be provided.
050     */
051    public class DecoratorTap<MetaInfo, Config, Input, Output> extends Tap<Config, Input, Output>
052      {
053      protected MetaInfo metaInfo;
054      protected Tap<Config, Input, Output> original;
055    
056      /**
057       * Creates a new Tap instance, wrapping the given Tap, and associates the given MetaInfo type with
058       * the wrapped Tap instance.
059       *
060       * @param metaInfo meta-information about the current Tap
061       * @param original the decorated Tap instance
062       */
063      @ConstructorProperties( {"metaInfo", "original"} )
064      public DecoratorTap( MetaInfo metaInfo, Tap<Config, Input, Output> original )
065        {
066        setMetaInfo( metaInfo );
067        setOriginal( original );
068        }
069    
070      /**
071       * Creates a new Tap instance, wrapping the given Tap, and associates the given MetaInfo type with
072       * the wrapped Tap instance.
073       *
074       * @param original the decorated Tap instance
075       */
076      @ConstructorProperties( {"original"} )
077      public DecoratorTap( Tap<Config, Input, Output> original )
078        {
079        setOriginal( original );
080        }
081    
082      public MetaInfo getMetaInfo()
083        {
084        return metaInfo;
085        }
086    
087      public Tap<Config, Input, Output> getOriginal()
088        {
089        return original;
090        }
091    
092      protected void setOriginal( Tap<Config, Input, Output> original )
093        {
094        if( original == null )
095          throw new IllegalArgumentException( "wrapped tap value may not be null" );
096    
097        this.original = original;
098        }
099    
100      protected void setMetaInfo( MetaInfo metaInfo )
101        {
102        this.metaInfo = metaInfo;
103        }
104    
105      @Override
106      public Scheme<Config, Input, Output, ?, ?> getScheme()
107        {
108        return original.getScheme();
109        }
110    
111      @Override
112      public String getTrace()
113        {
114        return original.getTrace();
115        }
116    
117      public void flowConfInit( Flow<Config> flow )
118        {
119        original.flowConfInit( flow );
120        }
121    
122      public void sourceConfInit( FlowProcess<Config> flowProcess, Config conf )
123        {
124        original.sourceConfInit( flowProcess, conf );
125        }
126    
127      public void sinkConfInit( FlowProcess<Config> flowProcess, Config conf )
128        {
129        original.sinkConfInit( flowProcess, conf );
130        }
131    
132      public String getIdentifier()
133        {
134        return original.getIdentifier();
135        }
136    
137      @Override
138      public Fields getSourceFields()
139        {
140        return original.getSourceFields();
141        }
142    
143      @Override
144      public Fields getSinkFields()
145        {
146        return original.getSinkFields();
147        }
148    
149      public TupleEntryIterator openForRead( FlowProcess<Config> flowProcess, Input input ) throws IOException
150        {
151        return original.openForRead( flowProcess, input );
152        }
153    
154      public TupleEntryIterator openForRead( FlowProcess<Config> flowProcess ) throws IOException
155        {
156        return original.openForRead( flowProcess );
157        }
158    
159      public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess, Output output ) throws IOException
160        {
161        return original.openForWrite( flowProcess, output );
162        }
163    
164      public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess ) throws IOException
165        {
166        return original.openForWrite( flowProcess );
167        }
168    
169      @Override
170      public Scope outgoingScopeFor( Set<Scope> incomingScopes )
171        {
172        return original.outgoingScopeFor( incomingScopes );
173        }
174    
175      public Fields retrieveSourceFields( FlowProcess<Config> flowProcess )
176        {
177        return original.retrieveSourceFields( flowProcess );
178        }
179    
180      public void presentSourceFields( FlowProcess<Config> flowProcess, Fields fields )
181        {
182        original.presentSourceFields( flowProcess, fields );
183        }
184    
185      public Fields retrieveSinkFields( FlowProcess<Config> flowProcess )
186        {
187        return original.retrieveSinkFields( flowProcess );
188        }
189    
190      public void presentSinkFields( FlowProcess<Config> flowProcess, Fields fields )
191        {
192        original.presentSinkFields( flowProcess, fields );
193        }
194    
195      @Override
196      public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
197        {
198        return original.resolveIncomingOperationArgumentFields( incomingScope );
199        }
200    
201      @Override
202      public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
203        {
204        return original.resolveIncomingOperationPassThroughFields( incomingScope );
205        }
206    
207      public String getFullIdentifier( FlowProcess<Config> flowProcess )
208        {
209        return original.getFullIdentifier( flowProcess );
210        }
211    
212      public String getFullIdentifier( Config conf )
213        {
214        return original.getFullIdentifier( conf );
215        }
216    
217      public boolean createResource( FlowProcess<Config> flowProcess ) throws IOException
218        {
219        return original.createResource( flowProcess );
220        }
221    
222      public boolean createResource( Config conf ) throws IOException
223        {
224        return original.createResource( conf );
225        }
226    
227      public boolean deleteResource( FlowProcess<Config> flowProcess ) throws IOException
228        {
229        return original.deleteResource( flowProcess );
230        }
231    
232      public boolean deleteResource( Config conf ) throws IOException
233        {
234        return original.deleteResource( conf );
235        }
236    
237      public boolean commitResource( Config conf ) throws IOException
238        {
239        return original.commitResource( conf );
240        }
241    
242      public boolean rollbackResource( Config conf ) throws IOException
243        {
244        return original.rollbackResource( conf );
245        }
246    
247      public boolean resourceExists( FlowProcess<Config> flowProcess ) throws IOException
248        {
249        return original.resourceExists( flowProcess );
250        }
251    
252      public boolean resourceExists( Config conf ) throws IOException
253        {
254        return original.resourceExists( conf );
255        }
256    
257      public long getModifiedTime( FlowProcess<Config> flowProcess ) throws IOException
258        {
259        return original.getModifiedTime( flowProcess );
260        }
261    
262      public long getModifiedTime( Config conf ) throws IOException
263        {
264        return original.getModifiedTime( conf );
265        }
266    
267      @Override
268      public SinkMode getSinkMode()
269        {
270        return original.getSinkMode();
271        }
272    
273      @Override
274      public boolean isKeep()
275        {
276        return original.isKeep();
277        }
278    
279      @Override
280      public boolean isReplace()
281        {
282        return original.isReplace();
283        }
284    
285      @Override
286      public boolean isUpdate()
287        {
288        return original.isUpdate();
289        }
290    
291      @Override
292      public boolean isSink()
293        {
294        return original.isSink();
295        }
296    
297      @Override
298      public boolean isSource()
299        {
300        return original.isSource();
301        }
302    
303      @Override
304      public boolean isTemporary()
305        {
306        return original.isTemporary();
307        }
308    
309      @Override
310      public ConfigDef getConfigDef()
311        {
312        return original.getConfigDef();
313        }
314    
315      @Override
316      public boolean hasConfigDef()
317        {
318        return original.hasConfigDef();
319        }
320    
321      @Override
322      public ConfigDef getStepConfigDef()
323        {
324        return original.getStepConfigDef();
325        }
326    
327      @Override
328      public boolean hasStepConfigDef()
329        {
330        return original.hasStepConfigDef();
331        }
332    
333      @Override
334      public boolean isEquivalentTo( FlowElement element )
335        {
336        return original.isEquivalentTo( element );
337        }
338    
339      @Override
340      public String toString()
341        {
342        return original.toString();
343        }
344      }