001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.util.Set;
026
027import cascading.flow.Flow;
028import cascading.flow.FlowProcess;
029import cascading.flow.planner.Scope;
030import cascading.property.ConfigDef;
031import cascading.scheme.Scheme;
032import cascading.tuple.Fields;
033import cascading.tuple.TupleEntryCollector;
034import cascading.tuple.TupleEntryIterator;
035
036/**
037 * Class DecoratorTap wraps a given {@link Tap} instance, delegating all calls to the original.
038 * <p/>
039 * It also provides an additional generic field that may hold any custom type, this allows implementations
040 * to attach any meta-info to the tap being decorated.
041 * <p/>
042 * Further, sub-classes of DecoratorTap can be used to decorate any Tap instances associated or created for
043 * use by the {@link cascading.pipe.Checkpoint} {@link cascading.pipe.Pipe}.
044 * <p/>
045 * Sub-classing this is optional if the aim is to simply attach relevant meta-info for use by a given application.
046 * <p/>
047 * In order to pass any meta-info to a management service via the {@link cascading.management.annotation.Property}
048 * annotation, a sub-class of DecoratorTap must be provided.
049 */
050public class DecoratorTap<MetaInfo, Config, Input, Output> extends Tap<Config, Input, Output>
051  {
052  protected MetaInfo metaInfo;
053  protected Tap<Config, Input, Output> original;
054
055  /**
056   * Creates a new Tap instance, wrapping the given Tap, and associates the given MetaInfo type with
057   * the wrapped Tap instance.
058   *
059   * @param metaInfo meta-information about the current Tap
060   * @param original the decorated Tap instance
061   */
062  @ConstructorProperties({"metaInfo", "original"})
063  public DecoratorTap( MetaInfo metaInfo, Tap<Config, Input, Output> original )
064    {
065    setMetaInfo( metaInfo );
066    setOriginal( original );
067    }
068
069  /**
070   * Creates a new Tap instance, wrapping the given Tap, and associates the given MetaInfo type with
071   * the wrapped Tap instance.
072   *
073   * @param original the decorated Tap instance
074   */
075  @ConstructorProperties({"original"})
076  public DecoratorTap( Tap<Config, Input, Output> original )
077    {
078    setOriginal( original );
079    }
080
081  public MetaInfo getMetaInfo()
082    {
083    return metaInfo;
084    }
085
086  public Tap<Config, Input, Output> getOriginal()
087    {
088    return original;
089    }
090
091  protected void setOriginal( Tap<Config, Input, Output> original )
092    {
093    if( original == null )
094      throw new IllegalArgumentException( "wrapped tap value may not be null" );
095
096    this.original = original;
097    }
098
099  protected void setMetaInfo( MetaInfo metaInfo )
100    {
101    this.metaInfo = metaInfo;
102    }
103
104  @Override
105  public Scheme<Config, Input, Output, ?, ?> getScheme()
106    {
107    return original.getScheme();
108    }
109
110  @Override
111  public String getTrace()
112    {
113    return original.getTrace();
114    }
115
116  @Override
117  public void flowConfInit( Flow<Config> flow )
118    {
119    original.flowConfInit( flow );
120    }
121
122  @Override
123  public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
124    {
125    original.sourceConfInit( flowProcess, conf );
126    }
127
128  @Override
129  public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
130    {
131    original.sinkConfInit( flowProcess, conf );
132    }
133
134  @Override
135  public String getIdentifier()
136    {
137    return original.getIdentifier();
138    }
139
140  @Override
141  public Fields getSourceFields()
142    {
143    return original.getSourceFields();
144    }
145
146  @Override
147  public Fields getSinkFields()
148    {
149    return original.getSinkFields();
150    }
151
152  @Override
153  public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException
154    {
155    return original.openForRead( flowProcess, input );
156    }
157
158  @Override
159  public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess ) throws IOException
160    {
161    return original.openForRead( flowProcess );
162    }
163
164  @Override
165  public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException
166    {
167    return original.openForWrite( flowProcess, output );
168    }
169
170  @Override
171  public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess ) throws IOException
172    {
173    return original.openForWrite( flowProcess );
174    }
175
176  @Override
177  public Scope outgoingScopeFor( Set<Scope> incomingScopes )
178    {
179    return original.outgoingScopeFor( incomingScopes );
180    }
181
182  @Override
183  public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess )
184    {
185    return original.retrieveSourceFields( flowProcess );
186    }
187
188  @Override
189  public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Fields fields )
190    {
191    original.presentSourceFields( flowProcess, fields );
192    }
193
194  @Override
195  public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess )
196    {
197    return original.retrieveSinkFields( flowProcess );
198    }
199
200  @Override
201  public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Fields fields )
202    {
203    original.presentSinkFields( flowProcess, fields );
204    }
205
206  @Override
207  public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
208    {
209    return original.resolveIncomingOperationArgumentFields( incomingScope );
210    }
211
212  @Override
213  public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
214    {
215    return original.resolveIncomingOperationPassThroughFields( incomingScope );
216    }
217
218  @Override
219  public String getFullIdentifier( FlowProcess<? extends Config> flowProcess )
220    {
221    return original.getFullIdentifier( flowProcess );
222    }
223
224  @Override
225  public String getFullIdentifier( Config conf )
226    {
227    return original.getFullIdentifier( conf );
228    }
229
230  @Override
231  public boolean createResource( FlowProcess<? extends Config> flowProcess ) throws IOException
232    {
233    return original.createResource( flowProcess );
234    }
235
236  @Override
237  public boolean createResource( Config conf ) throws IOException
238    {
239    return original.createResource( conf );
240    }
241
242  @Override
243  public boolean deleteResource( FlowProcess<? extends Config> flowProcess ) throws IOException
244    {
245    return original.deleteResource( flowProcess );
246    }
247
248  @Override
249  public boolean deleteResource( Config conf ) throws IOException
250    {
251    return original.deleteResource( conf );
252    }
253
254  @Override
255  public boolean prepareResourceForRead( Config conf ) throws IOException
256    {
257    return original.prepareResourceForRead( conf );
258    }
259
260  @Override
261  public boolean prepareResourceForWrite( Config conf ) throws IOException
262    {
263    return original.prepareResourceForWrite( conf );
264    }
265
266  @Override
267  public boolean commitResource( Config conf ) throws IOException
268    {
269    return original.commitResource( conf );
270    }
271
272  @Override
273  public boolean rollbackResource( Config conf ) throws IOException
274    {
275    return original.rollbackResource( conf );
276    }
277
278  @Override
279  public boolean resourceExists( FlowProcess<? extends Config> flowProcess ) throws IOException
280    {
281    return original.resourceExists( flowProcess );
282    }
283
284  @Override
285  public boolean resourceExists( Config conf ) throws IOException
286    {
287    return original.resourceExists( conf );
288    }
289
290  @Override
291  public long getModifiedTime( FlowProcess<? extends Config> flowProcess ) throws IOException
292    {
293    return original.getModifiedTime( flowProcess );
294    }
295
296  @Override
297  public long getModifiedTime( Config conf ) throws IOException
298    {
299    return original.getModifiedTime( conf );
300    }
301
302  @Override
303  public SinkMode getSinkMode()
304    {
305    return original.getSinkMode();
306    }
307
308  @Override
309  public boolean isKeep()
310    {
311    return original.isKeep();
312    }
313
314  @Override
315  public boolean isReplace()
316    {
317    return original.isReplace();
318    }
319
320  @Override
321  public boolean isUpdate()
322    {
323    return original.isUpdate();
324    }
325
326  @Override
327  public boolean isSink()
328    {
329    return original.isSink();
330    }
331
332  @Override
333  public boolean isSource()
334    {
335    return original.isSource();
336    }
337
338  @Override
339  public boolean isTemporary()
340    {
341    return original.isTemporary();
342    }
343
344  @Override
345  public ConfigDef getConfigDef()
346    {
347    return original.getConfigDef();
348    }
349
350  @Override
351  public boolean hasConfigDef()
352    {
353    return original.hasConfigDef();
354    }
355
356  @Override
357  public ConfigDef getNodeConfigDef()
358    {
359    return original.getNodeConfigDef();
360    }
361
362  @Override
363  public boolean hasNodeConfigDef()
364    {
365    return original.hasNodeConfigDef();
366    }
367
368  @Override
369  public ConfigDef getStepConfigDef()
370    {
371    return original.getStepConfigDef();
372    }
373
374  @Override
375  public boolean hasStepConfigDef()
376    {
377    return original.hasStepConfigDef();
378    }
379
380  @Override
381  public String toString()
382    {
383    return getClass().getSimpleName() + "<" + original.toString() + ">";
384    }
385  }