001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.scheme; 022 023import java.io.IOException; 024import java.io.Serializable; 025 026import cascading.flow.FlowProcess; 027import cascading.tap.Tap; 028import cascading.tuple.Fields; 029import cascading.tuple.Tuple; 030import cascading.util.TraceUtil; 031import cascading.util.Traceable; 032 033/** 034 * A Scheme defines what is stored in a {@link Tap} instance by declaring the {@link Tuple} 035 * field names, and alternately parsing or rendering the incoming or outgoing {@link Tuple} 036 * stream, respectively. 037 * <p/> 038 * A Scheme defines the type of resource data will be sourced from or sinked to. 039 * <p/> 040 * The default sourceFields are {@link Fields#UNKNOWN} and the default sinkFields are {@link Fields#ALL}. 041 * <p/> 042 * Any given sourceFields only label the values in the {@link Tuple}s as they are sourced. 043 * It does not necessarily filter the output since a given implementation may choose to 044 * collapse values and ignore keys depending on the format. 045 * <p/> 046 * If the sinkFields are {@link Fields#ALL}, the Cascading planner will attempt to resolve the actual field names 047 * and make them available via the {@link cascading.scheme.SinkCall#getOutgoingEntry()} method. Sometimes this may 048 * not be possible (in the case the {@link Tap#openForWrite(cascading.flow.FlowProcess)} method is called from user 049 * code directly (without planner intervention). 050 * <p/> 051 * If the sinkFields are a valid selector, the {@link #sink(cascading.flow.FlowProcess, SinkCall)} method will 052 * only see the fields expected. 053 * <p/> 054 * Setting the {@code numSinkParts} value to 1 (one) attempts to ensure the output resource has only one part. 055 * In the case of MapReduce, this is only a suggestion for the Map side, on the Reduce side it does this by 056 * setting the number of reducers to the given value. This may affect performance, so be cautioned. 057 * </p> 058 * Note that setting numSinkParts does not force the planner to insert a final Reduce operation in the job, so 059 * numSinkParts may be ignored entirely if the final job is Map only. To force the Flow to have a final Reduce, 060 * add a {@link cascading.pipe.GroupBy} to the assembly before sinking. 061 */ 062public abstract class Scheme<Config, Input, Output, SourceContext, SinkContext> implements Serializable, Traceable 063 { 064 /** Field sinkFields */ 065 Fields sinkFields = Fields.ALL; 066 /** Field sourceFields */ 067 Fields sourceFields = Fields.UNKNOWN; 068 /** Field numSinkParts */ 069 int numSinkParts; 070 /** Field trace */ 071 private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override 072 073 /** Constructor Scheme creates a new Scheme instance. */ 074 protected Scheme() 075 { 076 } 077 078 /** 079 * Constructor Scheme creates a new Scheme instance. 080 * 081 * @param sourceFields of type Fields 082 */ 083 protected Scheme( Fields sourceFields ) 084 { 085 setSourceFields( sourceFields ); 086 } 087 088 /** 089 * Constructor Scheme creates a new Scheme instance. 090 * 091 * @param sourceFields of type Fields 092 * @param numSinkParts of type int 093 */ 094 protected Scheme( Fields sourceFields, int numSinkParts ) 095 { 096 setSourceFields( sourceFields ); 097 this.numSinkParts = numSinkParts; 098 } 099 100 /** 101 * Constructor Scheme creates a new Scheme instance. 102 * 103 * @param sourceFields of type Fields 104 * @param sinkFields of type Fields 105 */ 106 protected Scheme( Fields sourceFields, Fields sinkFields ) 107 { 108 setSourceFields( sourceFields ); 109 setSinkFields( sinkFields ); 110 } 111 112 /** 113 * Constructor Scheme creates a new Scheme instance. 114 * 115 * @param sourceFields of type Fields 116 * @param sinkFields of type Fields 117 * @param numSinkParts of type int 118 */ 119 protected Scheme( Fields sourceFields, Fields sinkFields, int numSinkParts ) 120 { 121 setSourceFields( sourceFields ); 122 setSinkFields( sinkFields ); 123 this.numSinkParts = numSinkParts; 124 } 125 126 /** 127 * Method getSinkFields returns the sinkFields of this Scheme object. 128 * 129 * @return the sinkFields (type Fields) of this Scheme object. 130 */ 131 public Fields getSinkFields() 132 { 133 return sinkFields; 134 } 135 136 /** 137 * Method setSinkFields sets the sinkFields of this Scheme object. 138 * 139 * @param sinkFields the sinkFields of this Scheme object. 140 */ 141 public void setSinkFields( Fields sinkFields ) 142 { 143 if( sinkFields.isUnknown() ) 144 this.sinkFields = Fields.ALL; 145 else 146 this.sinkFields = sinkFields; 147 } 148 149 /** 150 * Method getSourceFields returns the sourceFields of this Scheme object. 151 * 152 * @return the sourceFields (type Fields) of this Scheme object. 153 */ 154 public Fields getSourceFields() 155 { 156 return sourceFields; 157 } 158 159 /** 160 * Method setSourceFields sets the sourceFields of this Scheme object. 161 * 162 * @param sourceFields the sourceFields of this Scheme object. 163 */ 164 public void setSourceFields( Fields sourceFields ) 165 { 166 if( sourceFields.isAll() ) 167 this.sourceFields = Fields.UNKNOWN; 168 else 169 this.sourceFields = sourceFields; 170 } 171 172 /** 173 * Method getNumSinkParts returns the numSinkParts of this Scheme object. 174 * 175 * @return the numSinkParts (type int) of this Scheme object. 176 */ 177 public int getNumSinkParts() 178 { 179 return numSinkParts; 180 } 181 182 /** 183 * Method setNumSinkParts sets the numSinkParts of this Scheme object. 184 * 185 * @param numSinkParts the numSinkParts of this Scheme object. 186 */ 187 public void setNumSinkParts( int numSinkParts ) 188 { 189 this.numSinkParts = numSinkParts; 190 } 191 192 @Override 193 public String getTrace() 194 { 195 return trace; 196 } 197 198 /** 199 * Method isSymmetrical returns {@code true} if the sink fields equal the source fields. That is, this 200 * scheme sources the same fields as it sinks. 201 * 202 * @return the symmetrical (type boolean) of this Scheme object. 203 */ 204 public boolean isSymmetrical() 205 { 206 return getSourceFields().equals( Fields.UNKNOWN ) && getSinkFields().equals( Fields.ALL ) || getSinkFields().equals( getSourceFields() ); 207 } 208 209 /** 210 * Method isSource returns true if this Scheme instance can be used as a source. 211 * 212 * @return boolean 213 */ 214 public boolean isSource() 215 { 216 return true; 217 } 218 219 /** 220 * Method isSink returns true if this Scheme instance can be used as a sink. 221 * 222 * @return boolean 223 */ 224 public boolean isSink() 225 { 226 return true; 227 } 228 229 /** 230 * Method retrieveSourceFields notifies a Scheme when it is appropriate to dynamically 231 * update the fields it sources. By default the current declared fields are returned. 232 * <p/> 233 * The {@code FlowProcess} presents all known properties resolved by the current planner. 234 * <p/> 235 * The {@code tap} instance is the parent {@link Tap} for this Scheme instance. 236 * 237 * @param flowProcess of type FlowProcess 238 * @param tap of type Tap 239 * @return Fields 240 */ 241 public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess, Tap tap ) 242 { 243 return getSourceFields(); 244 } 245 246 /** 247 * Method presentSourceFields is called after the planner is invoked and all fields are resolved. This 248 * method presents to the Scheme the actual source fields after any planner intervention. 249 * <p/> 250 * This method is called after {@link #retrieveSourceFields(cascading.flow.FlowProcess, cascading.tap.Tap)}. 251 * 252 * @param flowProcess of type FlowProcess 253 * @param tap of type Tap 254 * @param fields of type Fields 255 */ 256 public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Tap tap, Fields fields ) 257 { 258 presentSourceFieldsInternal( fields ); 259 } 260 261 protected void presentSourceFieldsInternal( Fields fields ) 262 { 263 if( getSourceFields().equals( Fields.UNKNOWN ) ) 264 setSourceFields( fields ); 265 } 266 267 /** 268 * Method retrieveSinkFields notifies a Scheme when it is appropriate to dynamically 269 * update the fields it sources. By default the current declared fields are returned. 270 * <p/> 271 * The {@code FlowProcess} presents all known properties resolved by the current planner. 272 * <p/> 273 * The {@code tap} instance is the parent {@link Tap} for this Scheme instance. 274 * 275 * @param flowProcess of type FlowProcess 276 * @param tap of type Tap 277 * @return Fields 278 */ 279 public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess, Tap tap ) 280 { 281 return getSinkFields(); 282 } 283 284 /** 285 * Method presentSinkFields is called after the planner is invoked and all fields are resolved. This 286 * method presents to the Scheme the actual source fields after any planner intervention. 287 * <p/> 288 * This method is called after {@link #retrieveSinkFields(cascading.flow.FlowProcess, cascading.tap.Tap)}. 289 * 290 * @param flowProcess of type FlowProcess 291 * @param tap of type Tap 292 * @param fields of type Fields 293 */ 294 public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Tap tap, Fields fields ) 295 { 296 presentSinkFieldsInternal( fields ); 297 } 298 299 protected void presentSinkFieldsInternal( Fields fields ) 300 { 301 if( getSinkFields().equals( Fields.ALL ) ) 302 setSinkFields( fields ); 303 } 304 305 /** 306 * Method sourceInit initializes this instance as a source. 307 * <p/> 308 * This method is executed client side as a means to provide necessary configuration parameters 309 * used by the underlying platform. 310 * <p/> 311 * It is not intended to initialize resources that would be necessary during the execution of this 312 * class, like a "formatter" or "parser". 313 * <p/> 314 * See {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)} if resources much be initialized 315 * before use. And {@link #sourceCleanup(cascading.flow.FlowProcess, SourceCall)} if resources must be 316 * destroyed after use. 317 * 318 * @param flowProcess of type FlowProcess 319 * @param tap of type Tap 320 * @param conf of type Config 321 */ 322 public abstract void sourceConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf ); 323 324 /** 325 * Method sinkInit initializes this instance as a sink. 326 * <p/> 327 * This method is executed client side as a means to provide necessary configuration parameters 328 * used by the underlying platform. 329 * <p/> 330 * It is not intended to initialize resources that would be necessary during the execution of this 331 * class, like a "formatter" or "parser". 332 * <p/> 333 * See {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)} if resources much be initialized 334 * before use. And {@link #sinkCleanup(cascading.flow.FlowProcess, SinkCall)} if resources must be 335 * destroyed after use. 336 * 337 * @param flowProcess of type FlowProcess 338 * @param tap of type Tap 339 * @param conf of type Config 340 */ 341 public abstract void sinkConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf ); 342 343 /** 344 * Method sourcePrepare is used to initialize resources needed during each call of 345 * {@link #source(cascading.flow.FlowProcess, SourceCall)}. 346 * <p/> 347 * Be sure to place any initialized objects in the {@code SourceContext} so each instance 348 * will remain threadsafe. 349 * 350 * @param flowProcess of type FlowProcess 351 * @param sourceCall of type SourceCall<SourceContext, Input> 352 */ 353 public void sourcePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException 354 { 355 } 356 357 /** 358 * Method source will read a new "record" or value from {@link cascading.scheme.SourceCall#getInput()} and populate 359 * the available {@link Tuple} via {@link cascading.scheme.SourceCall#getIncomingEntry()} and return {@code true} 360 * on success or {@code false} if no more values available. 361 * <p/> 362 * It's ok to set a new Tuple instance on the {@code incomingEntry} {@link cascading.tuple.TupleEntry}, or 363 * to simply re-use the existing instance. 364 * <p/> 365 * Note this is only time it is safe to modify a Tuple instance handed over via a method call. 366 * <p/> 367 * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular 368 * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to 369 * any applicable failure trap Tap. 370 * 371 * @param flowProcess of type FlowProcess 372 * @param sourceCall of SourceCall 373 * @return returns {@code true} when a Tuple was successfully read 374 */ 375 public abstract boolean source( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException; 376 377 /** 378 * Method sourceCleanup is used to destroy resources created by 379 * {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)}. 380 * 381 * @param flowProcess of Process 382 * @param sourceCall of type SourceCall<SourceContext, Input> 383 */ 384 public void sourceCleanup( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException 385 { 386 } 387 388 /** 389 * Method sinkPrepare is used to initialize resources needed during each call of 390 * {@link #sink(cascading.flow.FlowProcess, SinkCall)}. 391 * <p/> 392 * Be sure to place any initialized objects in the {@code SinkContext} so each instance 393 * will remain threadsafe. 394 * 395 * @param flowProcess of type FlowProcess 396 * @param sinkCall of type SinkCall<SinkContext, Output> 397 */ 398 public void sinkPrepare( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException 399 { 400 } 401 402 /** 403 * Method sink writes out the given {@link Tuple} found on {@link cascading.scheme.SinkCall#getOutgoingEntry()} to 404 * the {@link cascading.scheme.SinkCall#getOutput()}. 405 * <p/> 406 * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular 407 * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to 408 * any applicable failure trap Tap. If not set, the incoming Tuple will be written instead. 409 * 410 * @param flowProcess of Process 411 * @param sinkCall of SinkCall 412 */ 413 public abstract void sink( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException; 414 415 /** 416 * Method sinkCleanup is used to destroy resources created by 417 * {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)}. 418 * 419 * @param flowProcess of type FlowProcess 420 * @param sinkCall of type SinkCall<SinkContext, Output> 421 */ 422 public void sinkCleanup( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException 423 { 424 } 425 426 @Override 427 public boolean equals( Object object ) 428 { 429 if( this == object ) 430 return true; 431 if( object == null || getClass() != object.getClass() ) 432 return false; 433 434 Scheme scheme = (Scheme) object; 435 436 if( numSinkParts != scheme.numSinkParts ) 437 return false; 438 if( sinkFields != null ? !sinkFields.equals( scheme.sinkFields ) : scheme.sinkFields != null ) 439 return false; 440 if( sourceFields != null ? !sourceFields.equals( scheme.sourceFields ) : scheme.sourceFields != null ) 441 return false; 442 443 return true; 444 } 445 446 @Override 447 public String toString() 448 { 449 if( getSinkFields().equals( getSourceFields() ) ) 450 return getClass().getSimpleName() + "[" + getSourceFields().print() + "]"; 451 else 452 return getClass().getSimpleName() + "[" + getSourceFields().print() + "->" + getSinkFields().print() + "]"; 453 } 454 455 public int hashCode() 456 { 457 int result; 458 result = sinkFields != null ? sinkFields.hashCode() : 0; 459 result = 31 * result + ( sourceFields != null ? sourceFields.hashCode() : 0 ); 460 result = 31 * result + numSinkParts; 461 return result; 462 } 463 }