001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.util.Set;
026
027import cascading.flow.Flow;
028import cascading.flow.FlowElement;
029import cascading.flow.FlowProcess;
030import cascading.flow.planner.Scope;
031import cascading.property.ConfigDef;
032import cascading.scheme.Scheme;
033import cascading.tuple.Fields;
034import cascading.tuple.TupleEntryCollector;
035import cascading.tuple.TupleEntryIterator;
036
037/**
038 * Class DecoratorTap wraps a given {@link Tap} instance, delegating all calls to the original.
039 * <p/>
040 * It also provides an additional generic field that may hold any custom type, this allows implementations
041 * to attach any meta-info to the tap being decorated.
042 * <p/>
043 * Further, sub-classes of DecoratorTap can be used to decorate any Tap instances associated or created for
044 * use by the {@link cascading.pipe.Checkpoint} {@link cascading.pipe.Pipe}.
045 * <p/>
046 * Sub-classing this is optional if the aim is to simply attach relevant meta-info for use by a given application.
047 * <p/>
048 * In order to pass any meta-info to a management service via the {@link cascading.management.annotation.Property}
049 * annotation, a sub-class of DecoratorTap must be provided.
050 */
051public class DecoratorTap<MetaInfo, Config, Input, Output> extends Tap<Config, Input, Output>
052  {
053  protected MetaInfo metaInfo;
054  protected Tap<Config, Input, Output> original;
055
056  /**
057   * Creates a new Tap instance, wrapping the given Tap, and associates the given MetaInfo type with
058   * the wrapped Tap instance.
059   *
060   * @param metaInfo meta-information about the current Tap
061   * @param original the decorated Tap instance
062   */
063  @ConstructorProperties({"metaInfo", "original"})
064  public DecoratorTap( MetaInfo metaInfo, Tap<Config, Input, Output> original )
065    {
066    setMetaInfo( metaInfo );
067    setOriginal( original );
068    }
069
070  /**
071   * Creates a new Tap instance, wrapping the given Tap, and associates the given MetaInfo type with
072   * the wrapped Tap instance.
073   *
074   * @param original the decorated Tap instance
075   */
076  @ConstructorProperties({"original"})
077  public DecoratorTap( Tap<Config, Input, Output> original )
078    {
079    setOriginal( original );
080    }
081
082  public MetaInfo getMetaInfo()
083    {
084    return metaInfo;
085    }
086
087  public Tap<Config, Input, Output> getOriginal()
088    {
089    return original;
090    }
091
092  protected void setOriginal( Tap<Config, Input, Output> original )
093    {
094    if( original == null )
095      throw new IllegalArgumentException( "wrapped tap value may not be null" );
096
097    this.original = original;
098    }
099
100  protected void setMetaInfo( MetaInfo metaInfo )
101    {
102    this.metaInfo = metaInfo;
103    }
104
105  @Override
106  public Scheme<Config, Input, Output, ?, ?> getScheme()
107    {
108    return original.getScheme();
109    }
110
111  @Override
112  public String getTrace()
113    {
114    return original.getTrace();
115    }
116
117  @Override
118  public void flowConfInit( Flow<Config> flow )
119    {
120    original.flowConfInit( flow );
121    }
122
123  @Override
124  public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
125    {
126    original.sourceConfInit( flowProcess, conf );
127    }
128
129  @Override
130  public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
131    {
132    original.sinkConfInit( flowProcess, conf );
133    }
134
135  @Override
136  public String getIdentifier()
137    {
138    return original.getIdentifier();
139    }
140
141  @Override
142  public Fields getSourceFields()
143    {
144    return original.getSourceFields();
145    }
146
147  @Override
148  public Fields getSinkFields()
149    {
150    return original.getSinkFields();
151    }
152
153  @Override
154  public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException
155    {
156    return original.openForRead( flowProcess, input );
157    }
158
159  @Override
160  public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess ) throws IOException
161    {
162    return original.openForRead( flowProcess );
163    }
164
165  @Override
166  public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException
167    {
168    return original.openForWrite( flowProcess, output );
169    }
170
171  @Override
172  public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess ) throws IOException
173    {
174    return original.openForWrite( flowProcess );
175    }
176
177  @Override
178  public Scope outgoingScopeFor( Set<Scope> incomingScopes )
179    {
180    return original.outgoingScopeFor( incomingScopes );
181    }
182
183  @Override
184  public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess )
185    {
186    return original.retrieveSourceFields( flowProcess );
187    }
188
189  @Override
190  public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Fields fields )
191    {
192    original.presentSourceFields( flowProcess, fields );
193    }
194
195  @Override
196  public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess )
197    {
198    return original.retrieveSinkFields( flowProcess );
199    }
200
201  @Override
202  public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Fields fields )
203    {
204    original.presentSinkFields( flowProcess, fields );
205    }
206
207  @Override
208  public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
209    {
210    return original.resolveIncomingOperationArgumentFields( incomingScope );
211    }
212
213  @Override
214  public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
215    {
216    return original.resolveIncomingOperationPassThroughFields( incomingScope );
217    }
218
219  @Override
220  public String getFullIdentifier( FlowProcess<? extends Config> flowProcess )
221    {
222    return original.getFullIdentifier( flowProcess );
223    }
224
225  @Override
226  public String getFullIdentifier( Config conf )
227    {
228    return original.getFullIdentifier( conf );
229    }
230
231  @Override
232  public boolean createResource( FlowProcess<? extends Config> flowProcess ) throws IOException
233    {
234    return original.createResource( flowProcess );
235    }
236
237  @Override
238  public boolean createResource( Config conf ) throws IOException
239    {
240    return original.createResource( conf );
241    }
242
243  @Override
244  public boolean deleteResource( FlowProcess<? extends Config> flowProcess ) throws IOException
245    {
246    return original.deleteResource( flowProcess );
247    }
248
249  @Override
250  public boolean deleteResource( Config conf ) throws IOException
251    {
252    return original.deleteResource( conf );
253    }
254
255  @Override
256  public boolean prepareResourceForRead( Config conf ) throws IOException
257    {
258    return original.prepareResourceForRead( conf );
259    }
260
261  @Override
262  public boolean prepareResourceForWrite( Config conf ) throws IOException
263    {
264    return original.prepareResourceForWrite( conf );
265    }
266
267  @Override
268  public boolean commitResource( Config conf ) throws IOException
269    {
270    return original.commitResource( conf );
271    }
272
273  @Override
274  public boolean rollbackResource( Config conf ) throws IOException
275    {
276    return original.rollbackResource( conf );
277    }
278
279  @Override
280  public boolean resourceExists( FlowProcess<? extends Config> flowProcess ) throws IOException
281    {
282    return original.resourceExists( flowProcess );
283    }
284
285  @Override
286  public boolean resourceExists( Config conf ) throws IOException
287    {
288    return original.resourceExists( conf );
289    }
290
291  @Override
292  public long getModifiedTime( FlowProcess<? extends Config> flowProcess ) throws IOException
293    {
294    return original.getModifiedTime( flowProcess );
295    }
296
297  @Override
298  public long getModifiedTime( Config conf ) throws IOException
299    {
300    return original.getModifiedTime( conf );
301    }
302
303  @Override
304  public SinkMode getSinkMode()
305    {
306    return original.getSinkMode();
307    }
308
309  @Override
310  public boolean isKeep()
311    {
312    return original.isKeep();
313    }
314
315  @Override
316  public boolean isReplace()
317    {
318    return original.isReplace();
319    }
320
321  @Override
322  public boolean isUpdate()
323    {
324    return original.isUpdate();
325    }
326
327  @Override
328  public boolean isSink()
329    {
330    return original.isSink();
331    }
332
333  @Override
334  public boolean isSource()
335    {
336    return original.isSource();
337    }
338
339  @Override
340  public boolean isTemporary()
341    {
342    return original.isTemporary();
343    }
344
345  @Override
346  public ConfigDef getConfigDef()
347    {
348    return original.getConfigDef();
349    }
350
351  @Override
352  public boolean hasConfigDef()
353    {
354    return original.hasConfigDef();
355    }
356
357  @Override
358  public ConfigDef getNodeConfigDef()
359    {
360    return original.getNodeConfigDef();
361    }
362
363  @Override
364  public boolean hasNodeConfigDef()
365    {
366    return original.hasNodeConfigDef();
367    }
368
369  @Override
370  public ConfigDef getStepConfigDef()
371    {
372    return original.getStepConfigDef();
373    }
374
375  @Override
376  public boolean hasStepConfigDef()
377    {
378    return original.hasStepConfigDef();
379    }
380
381  @Override
382  public boolean isEquivalentTo( FlowElement element )
383    {
384    return original.isEquivalentTo( element );
385    }
386
387  @Override
388  public String toString()
389    {
390    return original.toString();
391    }
392  }