001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 import java.util.Set; 026 027 import cascading.flow.Flow; 028 import cascading.flow.FlowElement; 029 import cascading.flow.FlowProcess; 030 import cascading.flow.planner.Scope; 031 import cascading.property.ConfigDef; 032 import cascading.scheme.Scheme; 033 import cascading.tuple.Fields; 034 import cascading.tuple.TupleEntryCollector; 035 import cascading.tuple.TupleEntryIterator; 036 037 /** 038 * Class DecoratorTap wraps a given {@link Tap} instance, delegating all calls to the original. 039 * <p/> 040 * It also provides an additional generic field that may hold any custom type, this allows implementations 041 * to attach any meta-info to the tap being decorated. 042 * <p/> 043 * Further, sub-classes of DecoratorTap can be used to decorate any Tap instances associated or created for 044 * use by the {@link cascading.pipe.Checkpoint} {@link cascading.pipe.Pipe}. 045 * <p/> 046 * Sub-classing this is optional if the aim is to simply attach relevant meta-info for use by a given application. 047 * <p/> 048 * In order to pass any meta-info to a management service via the {@link cascading.management.annotation.Property} 049 * annotation, a sub-class of DecoratorTap must be provided. 050 */ 051 public class DecoratorTap<MetaInfo, Config, Input, Output> extends Tap<Config, Input, Output> 052 { 053 protected MetaInfo metaInfo; 054 protected Tap<Config, Input, Output> original; 055 056 /** 057 * Creates a new Tap instance, wrapping the given Tap, and associates the given MetaInfo type with 058 * the wrapped Tap instance. 059 * 060 * @param metaInfo meta-information about the current Tap 061 * @param original the decorated Tap instance 062 */ 063 @ConstructorProperties( {"metaInfo", "original"} ) 064 public DecoratorTap( MetaInfo metaInfo, Tap<Config, Input, Output> original ) 065 { 066 setMetaInfo( metaInfo ); 067 setOriginal( original ); 068 } 069 070 /** 071 * Creates a new Tap instance, wrapping the given Tap, and associates the given MetaInfo type with 072 * the wrapped Tap instance. 073 * 074 * @param original the decorated Tap instance 075 */ 076 @ConstructorProperties( {"original"} ) 077 public DecoratorTap( Tap<Config, Input, Output> original ) 078 { 079 setOriginal( original ); 080 } 081 082 public MetaInfo getMetaInfo() 083 { 084 return metaInfo; 085 } 086 087 public Tap<Config, Input, Output> getOriginal() 088 { 089 return original; 090 } 091 092 protected void setOriginal( Tap<Config, Input, Output> original ) 093 { 094 if( original == null ) 095 throw new IllegalArgumentException( "wrapped tap value may not be null" ); 096 097 this.original = original; 098 } 099 100 protected void setMetaInfo( MetaInfo metaInfo ) 101 { 102 this.metaInfo = metaInfo; 103 } 104 105 @Override 106 public Scheme<Config, Input, Output, ?, ?> getScheme() 107 { 108 return original.getScheme(); 109 } 110 111 @Override 112 public String getTrace() 113 { 114 return original.getTrace(); 115 } 116 117 public void flowConfInit( Flow<Config> flow ) 118 { 119 original.flowConfInit( flow ); 120 } 121 122 public void sourceConfInit( FlowProcess<Config> flowProcess, Config conf ) 123 { 124 original.sourceConfInit( flowProcess, conf ); 125 } 126 127 public void sinkConfInit( FlowProcess<Config> flowProcess, Config conf ) 128 { 129 original.sinkConfInit( flowProcess, conf ); 130 } 131 132 public String getIdentifier() 133 { 134 return original.getIdentifier(); 135 } 136 137 @Override 138 public Fields getSourceFields() 139 { 140 return original.getSourceFields(); 141 } 142 143 @Override 144 public Fields getSinkFields() 145 { 146 return original.getSinkFields(); 147 } 148 149 public TupleEntryIterator openForRead( FlowProcess<Config> flowProcess, Input input ) throws IOException 150 { 151 return original.openForRead( flowProcess, input ); 152 } 153 154 public TupleEntryIterator openForRead( FlowProcess<Config> flowProcess ) throws IOException 155 { 156 return original.openForRead( flowProcess ); 157 } 158 159 public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess, Output output ) throws IOException 160 { 161 return original.openForWrite( flowProcess, output ); 162 } 163 164 public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess ) throws IOException 165 { 166 return original.openForWrite( flowProcess ); 167 } 168 169 @Override 170 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 171 { 172 return original.outgoingScopeFor( incomingScopes ); 173 } 174 175 public Fields retrieveSourceFields( FlowProcess<Config> flowProcess ) 176 { 177 return original.retrieveSourceFields( flowProcess ); 178 } 179 180 public void presentSourceFields( FlowProcess<Config> flowProcess, Fields fields ) 181 { 182 original.presentSourceFields( flowProcess, fields ); 183 } 184 185 public Fields retrieveSinkFields( FlowProcess<Config> flowProcess ) 186 { 187 return original.retrieveSinkFields( flowProcess ); 188 } 189 190 public void presentSinkFields( FlowProcess<Config> flowProcess, Fields fields ) 191 { 192 original.presentSinkFields( flowProcess, fields ); 193 } 194 195 @Override 196 public Fields resolveIncomingOperationArgumentFields( Scope incomingScope ) 197 { 198 return original.resolveIncomingOperationArgumentFields( incomingScope ); 199 } 200 201 @Override 202 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 203 { 204 return original.resolveIncomingOperationPassThroughFields( incomingScope ); 205 } 206 207 public String getFullIdentifier( FlowProcess<Config> flowProcess ) 208 { 209 return original.getFullIdentifier( flowProcess ); 210 } 211 212 public String getFullIdentifier( Config conf ) 213 { 214 return original.getFullIdentifier( conf ); 215 } 216 217 public boolean createResource( FlowProcess<Config> flowProcess ) throws IOException 218 { 219 return original.createResource( flowProcess ); 220 } 221 222 public boolean createResource( Config conf ) throws IOException 223 { 224 return original.createResource( conf ); 225 } 226 227 public boolean deleteResource( FlowProcess<Config> flowProcess ) throws IOException 228 { 229 return original.deleteResource( flowProcess ); 230 } 231 232 public boolean deleteResource( Config conf ) throws IOException 233 { 234 return original.deleteResource( conf ); 235 } 236 237 public boolean commitResource( Config conf ) throws IOException 238 { 239 return original.commitResource( conf ); 240 } 241 242 public boolean rollbackResource( Config conf ) throws IOException 243 { 244 return original.rollbackResource( conf ); 245 } 246 247 public boolean resourceExists( FlowProcess<Config> flowProcess ) throws IOException 248 { 249 return original.resourceExists( flowProcess ); 250 } 251 252 public boolean resourceExists( Config conf ) throws IOException 253 { 254 return original.resourceExists( conf ); 255 } 256 257 public long getModifiedTime( FlowProcess<Config> flowProcess ) throws IOException 258 { 259 return original.getModifiedTime( flowProcess ); 260 } 261 262 public long getModifiedTime( Config conf ) throws IOException 263 { 264 return original.getModifiedTime( conf ); 265 } 266 267 @Override 268 public SinkMode getSinkMode() 269 { 270 return original.getSinkMode(); 271 } 272 273 @Override 274 public boolean isKeep() 275 { 276 return original.isKeep(); 277 } 278 279 @Override 280 public boolean isReplace() 281 { 282 return original.isReplace(); 283 } 284 285 @Override 286 public boolean isUpdate() 287 { 288 return original.isUpdate(); 289 } 290 291 @Override 292 public boolean isSink() 293 { 294 return original.isSink(); 295 } 296 297 @Override 298 public boolean isSource() 299 { 300 return original.isSource(); 301 } 302 303 @Override 304 public boolean isTemporary() 305 { 306 return original.isTemporary(); 307 } 308 309 @Override 310 public ConfigDef getConfigDef() 311 { 312 return original.getConfigDef(); 313 } 314 315 @Override 316 public boolean hasConfigDef() 317 { 318 return original.hasConfigDef(); 319 } 320 321 @Override 322 public ConfigDef getStepConfigDef() 323 { 324 return original.getStepConfigDef(); 325 } 326 327 @Override 328 public boolean hasStepConfigDef() 329 { 330 return original.hasStepConfigDef(); 331 } 332 333 @Override 334 public boolean isEquivalentTo( FlowElement element ) 335 { 336 return original.isEquivalentTo( element ); 337 } 338 339 @Override 340 public String toString() 341 { 342 return original.toString(); 343 } 344 }