001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025import java.util.Set; 026 027import cascading.flow.Flow; 028import cascading.flow.FlowElement; 029import cascading.flow.FlowProcess; 030import cascading.flow.planner.Scope; 031import cascading.property.ConfigDef; 032import cascading.scheme.Scheme; 033import cascading.tuple.Fields; 034import cascading.tuple.TupleEntryCollector; 035import cascading.tuple.TupleEntryIterator; 036 037/** 038 * Class DecoratorTap wraps a given {@link Tap} instance, delegating all calls to the original. 039 * <p/> 040 * It also provides an additional generic field that may hold any custom type, this allows implementations 041 * to attach any meta-info to the tap being decorated. 042 * <p/> 043 * Further, sub-classes of DecoratorTap can be used to decorate any Tap instances associated or created for 044 * use by the {@link cascading.pipe.Checkpoint} {@link cascading.pipe.Pipe}. 045 * <p/> 046 * Sub-classing this is optional if the aim is to simply attach relevant meta-info for use by a given application. 047 * <p/> 048 * In order to pass any meta-info to a management service via the {@link cascading.management.annotation.Property} 049 * annotation, a sub-class of DecoratorTap must be provided. 050 */ 051public class DecoratorTap<MetaInfo, Config, Input, Output> extends Tap<Config, Input, Output> 052 { 053 protected MetaInfo metaInfo; 054 protected Tap<Config, Input, Output> original; 055 056 /** 057 * Creates a new Tap instance, wrapping the given Tap, and associates the given MetaInfo type with 058 * the wrapped Tap instance. 059 * 060 * @param metaInfo meta-information about the current Tap 061 * @param original the decorated Tap instance 062 */ 063 @ConstructorProperties({"metaInfo", "original"}) 064 public DecoratorTap( MetaInfo metaInfo, Tap<Config, Input, Output> original ) 065 { 066 setMetaInfo( metaInfo ); 067 setOriginal( original ); 068 } 069 070 /** 071 * Creates a new Tap instance, wrapping the given Tap, and associates the given MetaInfo type with 072 * the wrapped Tap instance. 073 * 074 * @param original the decorated Tap instance 075 */ 076 @ConstructorProperties({"original"}) 077 public DecoratorTap( Tap<Config, Input, Output> original ) 078 { 079 setOriginal( original ); 080 } 081 082 public MetaInfo getMetaInfo() 083 { 084 return metaInfo; 085 } 086 087 public Tap<Config, Input, Output> getOriginal() 088 { 089 return original; 090 } 091 092 protected void setOriginal( Tap<Config, Input, Output> original ) 093 { 094 if( original == null ) 095 throw new IllegalArgumentException( "wrapped tap value may not be null" ); 096 097 this.original = original; 098 } 099 100 protected void setMetaInfo( MetaInfo metaInfo ) 101 { 102 this.metaInfo = metaInfo; 103 } 104 105 @Override 106 public Scheme<Config, Input, Output, ?, ?> getScheme() 107 { 108 return original.getScheme(); 109 } 110 111 @Override 112 public String getTrace() 113 { 114 return original.getTrace(); 115 } 116 117 @Override 118 public void flowConfInit( Flow<Config> flow ) 119 { 120 original.flowConfInit( flow ); 121 } 122 123 @Override 124 public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Config conf ) 125 { 126 original.sourceConfInit( flowProcess, conf ); 127 } 128 129 @Override 130 public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Config conf ) 131 { 132 original.sinkConfInit( flowProcess, conf ); 133 } 134 135 @Override 136 public String getIdentifier() 137 { 138 return original.getIdentifier(); 139 } 140 141 @Override 142 public Fields getSourceFields() 143 { 144 return original.getSourceFields(); 145 } 146 147 @Override 148 public Fields getSinkFields() 149 { 150 return original.getSinkFields(); 151 } 152 153 @Override 154 public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException 155 { 156 return original.openForRead( flowProcess, input ); 157 } 158 159 @Override 160 public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess ) throws IOException 161 { 162 return original.openForRead( flowProcess ); 163 } 164 165 @Override 166 public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException 167 { 168 return original.openForWrite( flowProcess, output ); 169 } 170 171 @Override 172 public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess ) throws IOException 173 { 174 return original.openForWrite( flowProcess ); 175 } 176 177 @Override 178 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 179 { 180 return original.outgoingScopeFor( incomingScopes ); 181 } 182 183 @Override 184 public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess ) 185 { 186 return original.retrieveSourceFields( flowProcess ); 187 } 188 189 @Override 190 public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Fields fields ) 191 { 192 original.presentSourceFields( flowProcess, fields ); 193 } 194 195 @Override 196 public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess ) 197 { 198 return original.retrieveSinkFields( flowProcess ); 199 } 200 201 @Override 202 public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Fields fields ) 203 { 204 original.presentSinkFields( flowProcess, fields ); 205 } 206 207 @Override 208 public Fields resolveIncomingOperationArgumentFields( Scope incomingScope ) 209 { 210 return original.resolveIncomingOperationArgumentFields( incomingScope ); 211 } 212 213 @Override 214 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 215 { 216 return original.resolveIncomingOperationPassThroughFields( incomingScope ); 217 } 218 219 @Override 220 public String getFullIdentifier( FlowProcess<? extends Config> flowProcess ) 221 { 222 return original.getFullIdentifier( flowProcess ); 223 } 224 225 @Override 226 public String getFullIdentifier( Config conf ) 227 { 228 return original.getFullIdentifier( conf ); 229 } 230 231 @Override 232 public boolean createResource( FlowProcess<? extends Config> flowProcess ) throws IOException 233 { 234 return original.createResource( flowProcess ); 235 } 236 237 @Override 238 public boolean createResource( Config conf ) throws IOException 239 { 240 return original.createResource( conf ); 241 } 242 243 @Override 244 public boolean deleteResource( FlowProcess<? extends Config> flowProcess ) throws IOException 245 { 246 return original.deleteResource( flowProcess ); 247 } 248 249 @Override 250 public boolean deleteResource( Config conf ) throws IOException 251 { 252 return original.deleteResource( conf ); 253 } 254 255 @Override 256 public boolean prepareResourceForRead( Config conf ) throws IOException 257 { 258 return original.prepareResourceForRead( conf ); 259 } 260 261 @Override 262 public boolean prepareResourceForWrite( Config conf ) throws IOException 263 { 264 return original.prepareResourceForWrite( conf ); 265 } 266 267 @Override 268 public boolean commitResource( Config conf ) throws IOException 269 { 270 return original.commitResource( conf ); 271 } 272 273 @Override 274 public boolean rollbackResource( Config conf ) throws IOException 275 { 276 return original.rollbackResource( conf ); 277 } 278 279 @Override 280 public boolean resourceExists( FlowProcess<? extends Config> flowProcess ) throws IOException 281 { 282 return original.resourceExists( flowProcess ); 283 } 284 285 @Override 286 public boolean resourceExists( Config conf ) throws IOException 287 { 288 return original.resourceExists( conf ); 289 } 290 291 @Override 292 public long getModifiedTime( FlowProcess<? extends Config> flowProcess ) throws IOException 293 { 294 return original.getModifiedTime( flowProcess ); 295 } 296 297 @Override 298 public long getModifiedTime( Config conf ) throws IOException 299 { 300 return original.getModifiedTime( conf ); 301 } 302 303 @Override 304 public SinkMode getSinkMode() 305 { 306 return original.getSinkMode(); 307 } 308 309 @Override 310 public boolean isKeep() 311 { 312 return original.isKeep(); 313 } 314 315 @Override 316 public boolean isReplace() 317 { 318 return original.isReplace(); 319 } 320 321 @Override 322 public boolean isUpdate() 323 { 324 return original.isUpdate(); 325 } 326 327 @Override 328 public boolean isSink() 329 { 330 return original.isSink(); 331 } 332 333 @Override 334 public boolean isSource() 335 { 336 return original.isSource(); 337 } 338 339 @Override 340 public boolean isTemporary() 341 { 342 return original.isTemporary(); 343 } 344 345 @Override 346 public ConfigDef getConfigDef() 347 { 348 return original.getConfigDef(); 349 } 350 351 @Override 352 public boolean hasConfigDef() 353 { 354 return original.hasConfigDef(); 355 } 356 357 @Override 358 public ConfigDef getNodeConfigDef() 359 { 360 return original.getNodeConfigDef(); 361 } 362 363 @Override 364 public boolean hasNodeConfigDef() 365 { 366 return original.hasNodeConfigDef(); 367 } 368 369 @Override 370 public ConfigDef getStepConfigDef() 371 { 372 return original.getStepConfigDef(); 373 } 374 375 @Override 376 public boolean hasStepConfigDef() 377 { 378 return original.hasStepConfigDef(); 379 } 380 381 @Override 382 public boolean isEquivalentTo( FlowElement element ) 383 { 384 return original.isEquivalentTo( element ); 385 } 386 387 @Override 388 public String toString() 389 { 390 return original.toString(); 391 } 392 }