001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025import java.util.Set; 026 027import cascading.flow.Flow; 028import cascading.flow.FlowProcess; 029import cascading.flow.planner.Scope; 030import cascading.property.ConfigDef; 031import cascading.scheme.Scheme; 032import cascading.tuple.Fields; 033import cascading.tuple.TupleEntryCollector; 034import cascading.tuple.TupleEntryIterator; 035 036/** 037 * Class DecoratorTap wraps a given {@link Tap} instance, delegating all calls to the original. 038 * <p/> 039 * It also provides an additional generic field that may hold any custom type, this allows implementations 040 * to attach any meta-info to the tap being decorated. 041 * <p/> 042 * Further, sub-classes of DecoratorTap can be used to decorate any Tap instances associated or created for 043 * use by the {@link cascading.pipe.Checkpoint} {@link cascading.pipe.Pipe}. 044 * <p/> 045 * Sub-classing this is optional if the aim is to simply attach relevant meta-info for use by a given application. 046 * <p/> 047 * In order to pass any meta-info to a management service via the {@link cascading.management.annotation.Property} 048 * annotation, a sub-class of DecoratorTap must be provided. 049 */ 050public class DecoratorTap<MetaInfo, Config, Input, Output> extends Tap<Config, Input, Output> 051 { 052 protected MetaInfo metaInfo; 053 protected Tap<Config, Input, Output> original; 054 055 /** 056 * Creates a new Tap instance, wrapping the given Tap, and associates the given MetaInfo type with 057 * the wrapped Tap instance. 058 * 059 * @param metaInfo meta-information about the current Tap 060 * @param original the decorated Tap instance 061 */ 062 @ConstructorProperties({"metaInfo", "original"}) 063 public DecoratorTap( MetaInfo metaInfo, Tap<Config, Input, Output> original ) 064 { 065 setMetaInfo( metaInfo ); 066 setOriginal( original ); 067 } 068 069 /** 070 * Creates a new Tap instance, wrapping the given Tap, and associates the given MetaInfo type with 071 * the wrapped Tap instance. 072 * 073 * @param original the decorated Tap instance 074 */ 075 @ConstructorProperties({"original"}) 076 public DecoratorTap( Tap<Config, Input, Output> original ) 077 { 078 setOriginal( original ); 079 } 080 081 public MetaInfo getMetaInfo() 082 { 083 return metaInfo; 084 } 085 086 public Tap<Config, Input, Output> getOriginal() 087 { 088 return original; 089 } 090 091 protected void setOriginal( Tap<Config, Input, Output> original ) 092 { 093 if( original == null ) 094 throw new IllegalArgumentException( "wrapped tap value may not be null" ); 095 096 this.original = original; 097 } 098 099 protected void setMetaInfo( MetaInfo metaInfo ) 100 { 101 this.metaInfo = metaInfo; 102 } 103 104 @Override 105 public Scheme<Config, Input, Output, ?, ?> getScheme() 106 { 107 return original.getScheme(); 108 } 109 110 @Override 111 public String getTrace() 112 { 113 return original.getTrace(); 114 } 115 116 @Override 117 public void flowConfInit( Flow<Config> flow ) 118 { 119 original.flowConfInit( flow ); 120 } 121 122 @Override 123 public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Config conf ) 124 { 125 original.sourceConfInit( flowProcess, conf ); 126 } 127 128 @Override 129 public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Config conf ) 130 { 131 original.sinkConfInit( flowProcess, conf ); 132 } 133 134 @Override 135 public String getIdentifier() 136 { 137 return original.getIdentifier(); 138 } 139 140 @Override 141 public Fields getSourceFields() 142 { 143 return original.getSourceFields(); 144 } 145 146 @Override 147 public Fields getSinkFields() 148 { 149 return original.getSinkFields(); 150 } 151 152 @Override 153 public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException 154 { 155 return original.openForRead( flowProcess, input ); 156 } 157 158 @Override 159 public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess ) throws IOException 160 { 161 return original.openForRead( flowProcess ); 162 } 163 164 @Override 165 public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException 166 { 167 return original.openForWrite( flowProcess, output ); 168 } 169 170 @Override 171 public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess ) throws IOException 172 { 173 return original.openForWrite( flowProcess ); 174 } 175 176 @Override 177 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 178 { 179 return original.outgoingScopeFor( incomingScopes ); 180 } 181 182 @Override 183 public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess ) 184 { 185 return original.retrieveSourceFields( flowProcess ); 186 } 187 188 @Override 189 public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Fields fields ) 190 { 191 original.presentSourceFields( flowProcess, fields ); 192 } 193 194 @Override 195 public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess ) 196 { 197 return original.retrieveSinkFields( flowProcess ); 198 } 199 200 @Override 201 public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Fields fields ) 202 { 203 original.presentSinkFields( flowProcess, fields ); 204 } 205 206 @Override 207 public Fields resolveIncomingOperationArgumentFields( Scope incomingScope ) 208 { 209 return original.resolveIncomingOperationArgumentFields( incomingScope ); 210 } 211 212 @Override 213 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 214 { 215 return original.resolveIncomingOperationPassThroughFields( incomingScope ); 216 } 217 218 @Override 219 public String getFullIdentifier( FlowProcess<? extends Config> flowProcess ) 220 { 221 return original.getFullIdentifier( flowProcess ); 222 } 223 224 @Override 225 public String getFullIdentifier( Config conf ) 226 { 227 return original.getFullIdentifier( conf ); 228 } 229 230 @Override 231 public boolean createResource( FlowProcess<? extends Config> flowProcess ) throws IOException 232 { 233 return original.createResource( flowProcess ); 234 } 235 236 @Override 237 public boolean createResource( Config conf ) throws IOException 238 { 239 return original.createResource( conf ); 240 } 241 242 @Override 243 public boolean deleteResource( FlowProcess<? extends Config> flowProcess ) throws IOException 244 { 245 return original.deleteResource( flowProcess ); 246 } 247 248 @Override 249 public boolean deleteResource( Config conf ) throws IOException 250 { 251 return original.deleteResource( conf ); 252 } 253 254 @Override 255 public boolean prepareResourceForRead( Config conf ) throws IOException 256 { 257 return original.prepareResourceForRead( conf ); 258 } 259 260 @Override 261 public boolean prepareResourceForWrite( Config conf ) throws IOException 262 { 263 return original.prepareResourceForWrite( conf ); 264 } 265 266 @Override 267 public boolean commitResource( Config conf ) throws IOException 268 { 269 return original.commitResource( conf ); 270 } 271 272 @Override 273 public boolean rollbackResource( Config conf ) throws IOException 274 { 275 return original.rollbackResource( conf ); 276 } 277 278 @Override 279 public boolean resourceExists( FlowProcess<? extends Config> flowProcess ) throws IOException 280 { 281 return original.resourceExists( flowProcess ); 282 } 283 284 @Override 285 public boolean resourceExists( Config conf ) throws IOException 286 { 287 return original.resourceExists( conf ); 288 } 289 290 @Override 291 public long getModifiedTime( FlowProcess<? extends Config> flowProcess ) throws IOException 292 { 293 return original.getModifiedTime( flowProcess ); 294 } 295 296 @Override 297 public long getModifiedTime( Config conf ) throws IOException 298 { 299 return original.getModifiedTime( conf ); 300 } 301 302 @Override 303 public SinkMode getSinkMode() 304 { 305 return original.getSinkMode(); 306 } 307 308 @Override 309 public boolean isKeep() 310 { 311 return original.isKeep(); 312 } 313 314 @Override 315 public boolean isReplace() 316 { 317 return original.isReplace(); 318 } 319 320 @Override 321 public boolean isUpdate() 322 { 323 return original.isUpdate(); 324 } 325 326 @Override 327 public boolean isSink() 328 { 329 return original.isSink(); 330 } 331 332 @Override 333 public boolean isSource() 334 { 335 return original.isSource(); 336 } 337 338 @Override 339 public boolean isTemporary() 340 { 341 return original.isTemporary(); 342 } 343 344 @Override 345 public ConfigDef getConfigDef() 346 { 347 return original.getConfigDef(); 348 } 349 350 @Override 351 public boolean hasConfigDef() 352 { 353 return original.hasConfigDef(); 354 } 355 356 @Override 357 public ConfigDef getNodeConfigDef() 358 { 359 return original.getNodeConfigDef(); 360 } 361 362 @Override 363 public boolean hasNodeConfigDef() 364 { 365 return original.hasNodeConfigDef(); 366 } 367 368 @Override 369 public ConfigDef getStepConfigDef() 370 { 371 return original.getStepConfigDef(); 372 } 373 374 @Override 375 public boolean hasStepConfigDef() 376 { 377 return original.hasStepConfigDef(); 378 } 379 380 @Override 381 public String toString() 382 { 383 return getClass().getSimpleName() + "<" + original.toString() + ">"; 384 } 385 }