001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.List; 026import java.util.Map; 027 028import cascading.flow.planner.PlannerInfo; 029import cascading.flow.planner.PlatformInfo; 030import cascading.management.UnitOfWork; 031import cascading.stats.FlowStats; 032import cascading.tap.Tap; 033import cascading.tuple.TupleEntryCollector; 034import cascading.tuple.TupleEntryIterator; 035 036/** 037 * A Flow is a logical unit of work declared by an assembly of {@link cascading.pipe.Pipe} instances connected to source 038 * and sink {@link Tap} instances. 039 * <p/> 040 * A Flow is then executed to push the incoming source data through the assembly into one or more sinks. 041 * <p/> 042 * A Flow sub-class instance may not be instantiated directly in most cases, see sub-classes of {@link FlowConnector} class 043 * for supported platforms. 044 * <p/> 045 * Note that {@link cascading.pipe.Pipe} assemblies can be reused in multiple Flow instances. They maintain 046 * no state regarding the Flow execution. Subsequently, {@link cascading.pipe.Pipe} assemblies can be given 047 * parameters through its calling Flow so they can be built in a generic fashion. 048 * <p/> 049 * When a Flow is created, an optimized internal representation is created that is then executed 050 * on the underlying execution platform. This is typically done by creating one or more {@link FlowStep} instances. 051 * </p> 052 * Flows are submitted in order of dependency when used with a {@link cascading.cascade.Cascade}. If two or more steps do not share the 053 * same dependencies and all can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines 054 * the order in which all steps will be submitted for execution. The default submit priority is 5. 055 * <p/> 056 * Use the {@link FlowListener} to receive any events on the life-cycle of the Flow as it executes. Any 057 * {@link Tap} instances owned by the Flow also implementing FlowListener will automatically be added to the 058 * set of listeners. 059 * 060 * @see FlowListener 061 * @see cascading.flow.FlowConnector 062 */ 063public interface Flow<Config> extends UnitOfWork<FlowStats> 064 { 065 String CASCADING_FLOW_ID = "cascading.flow.id"; 066 067 /** 068 * Method getName returns the name of this Flow object. 069 * 070 * @return the name (type String) of this Flow object. 071 */ 072 @Override 073 String getName(); 074 075 /** 076 * Method prepare is used by a {@link cascading.cascade.Cascade} to notify the given Flow it should initialize or clear any resources 077 * necessary for {@link #start()} to be called successfully. 078 * <p/> 079 * Specifically, this implementation calls {@link BaseFlow#deleteSinksIfNotUpdate()} && {@link BaseFlow#deleteTrapsIfNotUpdate()}. 080 * 081 * @throws java.io.IOException when 082 */ 083 @Override 084 void prepare(); 085 086 /** 087 * Method start begins the execution of this Flow instance. It will return immediately. Use the method {@link #complete()} 088 * to block until this Flow completes. 089 */ 090 @Override 091 void start(); 092 093 /** Method stop stops all running jobs, killing any currently executing. */ 094 @Override 095 void stop(); 096 097 /** Method complete starts the current Flow instance if it has not be previously started, then block until completion. */ 098 @Override 099 void complete(); 100 101 @Override 102 void cleanup(); 103 104 /** 105 * Returns any meta-data about the planner that created this Flow instance. 106 * 107 * @return an instance of PlannerInfo 108 */ 109 PlannerInfo getPlannerInfo(); 110 111 /** 112 * Returns any meta-data about the underlying platform this Flow instance will run against. 113 * 114 * @return an instance of PlatformInfo 115 */ 116 PlatformInfo getPlatformInfo(); 117 118 /** 119 * Method getConfig returns the internal configuration object. 120 * <p/> 121 * Any changes to this object will not be reflected in child steps. See {@link cascading.flow.FlowConnector} for setting 122 * default properties visible to children. Or see {@link cascading.flow.FlowStepStrategy} for setting properties on 123 * individual steps before they are executed. 124 * 125 * @return the default configuration of this Flow 126 */ 127 Config getConfig(); 128 129 /** 130 * Method getConfigCopy returns a copy of the internal configuration object. This object can be safely 131 * modified. 132 * 133 * @return a copy of the default configuration of this Flow 134 */ 135 Config getConfigCopy(); 136 137 /** 138 * Method getConfigAsProperties converts the internal configuration object into a {@link java.util.Map} of 139 * key value pairs. 140 * 141 * @return a Map of key/value pairs 142 */ 143 Map<Object, Object> getConfigAsProperties(); 144 145 /** 146 * Returns the String property associated with the given key from the current Configuration instance. 147 * 148 * @param key of type String 149 * @return the String value 150 */ 151 String getProperty( String key ); 152 153 /** 154 * Method getID returns the ID of this Flow object. 155 * <p/> 156 * The ID value is a long HEX String used to identify this instance globally. Subsequent Flow 157 * instances created with identical parameters will not return the same ID. 158 * 159 * @return the ID (type String) of this Flow object. 160 */ 161 @Override 162 String getID(); 163 164 /** 165 * Returns an immutable map of properties giving more details about the Flow object. 166 * <p/> 167 * See {@link cascading.flow.FlowDef#addDescription(String, String)} to set values on a given Flow. 168 * <p/> 169 * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents. 170 * For known description types, see {@link FlowDescriptors}. 171 * 172 * @return Map<String,String> 173 */ 174 public Map<String, String> getFlowDescriptor(); 175 176 @Override 177 String getTags(); 178 179 /** 180 * Method getSubmitPriority returns the submitPriority of this Flow object. 181 * <p/> 182 * 10 is lowest, 1 is the highest, 5 is the default. 183 * 184 * @return the submitPriority (type int) of this FlowStep object. 185 */ 186 int getSubmitPriority(); 187 188 /** 189 * Method setSubmitPriority sets the submitPriority of this Flow object. 190 * <p/> 191 * 10 is lowest, 1 is the highest, 5 is the default. 192 * 193 * @param submitPriority the submitPriority of this FlowStep object. 194 */ 195 void setSubmitPriority( int submitPriority ); 196 197 FlowProcess<Config> getFlowProcess(); 198 199 /** 200 * Method getFlowStats returns the flowStats of this Flow object. 201 * 202 * @return the flowStats (type FlowStats) of this Flow object. 203 */ 204 FlowStats getFlowStats(); 205 206 /** 207 * Method hasListeners returns true if {@link FlowListener} instances have been registered. 208 * 209 * @return boolean 210 */ 211 boolean hasListeners(); 212 213 /** 214 * Method addListener registers the given flowListener with this instance. 215 * 216 * @param flowListener of type FlowListener 217 */ 218 void addListener( FlowListener flowListener ); 219 220 /** 221 * Method removeListener removes the given flowListener from this instance. 222 * 223 * @param flowListener of type FlowListener 224 * @return true if the listener was removed 225 */ 226 boolean removeListener( FlowListener flowListener ); 227 228 /** 229 * Method hasStepListeners returns true if {@link FlowStepListener} instances have been registered 230 * with any of the {@link FlowStep}s belonging to this instance 231 * 232 * @return boolean 233 */ 234 boolean hasStepListeners(); 235 236 /** 237 * Method addStepListener registers the given flowStepListener with this instance. 238 * 239 * @param flowStepListener of type addStepListener 240 */ 241 void addStepListener( FlowStepListener flowStepListener ); 242 243 /** 244 * Method removeStepListener removes the given flowStepListener from this instance. 245 * 246 * @param flowStepListener of type FlowStepListener 247 * @return true if the listener was removed from all the {@link FlowStep} belonging to this instance 248 */ 249 boolean removeStepListener( FlowStepListener flowStepListener ); 250 251 /** 252 * Method getSources returns the sources of this Flow object. 253 * 254 * @return the sources (type Map) of this Flow object. 255 */ 256 Map<String, Tap> getSources(); 257 258 List<String> getSourceNames(); 259 260 Tap getSource( String name ); 261 262 /** 263 * Method getSourcesCollection returns a {@link Collection} of source {@link Tap}s for this Flow object. 264 * 265 * @return the sourcesCollection (type Collection<Tap>) of this Flow object. 266 */ 267 Collection<Tap> getSourcesCollection(); 268 269 /** 270 * Method getSinks returns the sinks of this Flow object. 271 * 272 * @return the sinks (type Map) of this Flow object. 273 */ 274 Map<String, Tap> getSinks(); 275 276 List<String> getSinkNames(); 277 278 Tap getSink( String name ); 279 280 /** 281 * Method getSinksCollection returns a {@link Collection} of sink {@link Tap}s for this Flow object. 282 * 283 * @return the sinkCollection (type Collection<Tap>) of this Flow object. 284 */ 285 Collection<Tap> getSinksCollection(); 286 287 /** 288 * Method getSink returns the first sink of this Flow object. 289 * 290 * @return the sink (type Tap) of this Flow object. 291 */ 292 Tap getSink(); 293 294 /** 295 * Method getTraps returns the traps of this Flow object. 296 * 297 * @return the traps (type Map<String, Tap>) of this Flow object. 298 */ 299 Map<String, Tap> getTraps(); 300 301 List<String> getTrapNames(); 302 303 /** 304 * Method getTrapsCollection returns a {@link Collection} of trap {@link Tap}s for this Flow object. 305 * 306 * @return the trapsCollection (type Collection<Tap>) of this Flow object. 307 */ 308 Collection<Tap> getTrapsCollection(); 309 310 /** 311 * Method getCheckpoints returns the checkpoint taps of this Flow object. 312 * 313 * @return the traps (type Map<String, Tap>) of this Flow object. 314 */ 315 Map<String, Tap> getCheckpoints(); 316 317 List<String> getCheckpointNames(); 318 319 /** 320 * Method getCheckpointsCollection returns a {@link Collection} of checkpoint {@link Tap}s for this Flow object. 321 * 322 * @return the trapsCollection (type Collection<Tap>) of this Flow object. 323 */ 324 Collection<Tap> getCheckpointsCollection(); 325 326 /** 327 * Method getFlowSkipStrategy returns the current {@link cascading.flow.FlowSkipStrategy} used by this Flow. 328 * 329 * @return FlowSkipStrategy 330 */ 331 FlowSkipStrategy getFlowSkipStrategy(); 332 333 /** 334 * Method setFlowSkipStrategy sets a new {@link cascading.flow.FlowSkipStrategy}, the current strategy is returned. 335 * <p/> 336 * FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is {@link FlowSkipIfSinkNotStale}. 337 * An alternative strategy would be {@link cascading.flow.FlowSkipIfSinkExists}. 338 * <p/> 339 * A FlowSkipStrategy will not be consulted when executing a Flow directly through {@link #start()} or {@link #complete()}. Only 340 * when the Flow is executed through a {@link cascading.cascade.Cascade} instance. 341 * 342 * @param flowSkipStrategy of type FlowSkipStrategy 343 * @return FlowSkipStrategy 344 */ 345 FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ); 346 347 /** 348 * Method isSkipFlow returns true if the parent {@link cascading.cascade.Cascade} should skip this Flow instance. True is returned 349 * if the current {@link cascading.flow.FlowSkipStrategy} returns true. 350 * 351 * @return the skipFlow (type boolean) of this Flow object. 352 * @throws IOException when 353 */ 354 boolean isSkipFlow() throws IOException; 355 356 /** 357 * Method areSinksStale returns true if any of the sinks referenced are out of date in relation to the sources. Or 358 * if any sink method {@link cascading.tap.Tap#isReplace()} returns true. 359 * 360 * @return boolean 361 * @throws java.io.IOException when 362 */ 363 boolean areSinksStale() throws IOException; 364 365 /** 366 * Method areSourcesNewer returns true if any source is newer than the given sinkModified date value. 367 * 368 * @param sinkModified of type long 369 * @return boolean 370 * @throws java.io.IOException when 371 */ 372 boolean areSourcesNewer( long sinkModified ) throws IOException; 373 374 /** 375 * Method getSinkModified returns the youngest modified date of any sink {@link cascading.tap.Tap} managed by this Flow instance. 376 * <p/> 377 * If zero (0) is returned, at least one of the sink resources does not exist. If minus one (-1) is returned, 378 * atleast one of the sinks are marked for delete ({@link cascading.tap.Tap#isReplace() returns true}). 379 * 380 * @return the sinkModified (type long) of this Flow object. 381 * @throws java.io.IOException when 382 */ 383 long getSinkModified() throws IOException; 384 385 /** 386 * Returns the current {@link FlowStepStrategy} instance. 387 * 388 * @return FlowStepStrategy 389 */ 390 FlowStepStrategy getFlowStepStrategy(); 391 392 /** 393 * Sets a default {@link FlowStepStrategy} instance. 394 * <p/> 395 * Use a FlowStepStrategy to change {@link cascading.flow.FlowStep} configuration properties 396 * before the properties are submitted to the underlying platform for the step 397 * unit of work. 398 * 399 * @param flowStepStrategy The FlowStepStrategy to use. 400 */ 401 void setFlowStepStrategy( FlowStepStrategy flowStepStrategy ); 402 403 /** 404 * Method getFlowSteps returns the flowSteps of this Flow object. They will be in topological order. 405 * 406 * @return the steps (type List<FlowStep>) of this Flow object. 407 */ 408 List<FlowStep<Config>> getFlowSteps(); 409 410 /** 411 * Method openSource opens the first source Tap. 412 * 413 * @return TupleIterator 414 * @throws IOException when 415 */ 416 TupleEntryIterator openSource() throws IOException; 417 418 /** 419 * Method openSource opens the named source Tap. 420 * 421 * @param name of type String 422 * @return TupleIterator 423 * @throws IOException when 424 */ 425 TupleEntryIterator openSource( String name ) throws IOException; 426 427 /** 428 * Method openSink opens the first sink Tap. 429 * 430 * @return TupleIterator 431 * @throws IOException when 432 */ 433 TupleEntryIterator openSink() throws IOException; 434 435 /** 436 * Method openSink opens the named sink Tap. 437 * 438 * @param name of type String 439 * @return TupleIterator 440 * @throws IOException when 441 */ 442 TupleEntryIterator openSink( String name ) throws IOException; 443 444 /** 445 * Method openTrap opens the first trap Tap. 446 * 447 * @return TupleIterator 448 * @throws IOException when 449 */ 450 TupleEntryIterator openTrap() throws IOException; 451 452 /** 453 * Method openTrap opens the named trap Tap. 454 * 455 * @param name of type String 456 * @return TupleIterator 457 * @throws IOException when 458 */ 459 TupleEntryIterator openTrap( String name ) throws IOException; 460 461 /** 462 * Method resourceExists returns true if the resource represented by the given Tap instance exists. 463 * 464 * @param tap of type Tap 465 * @return boolean 466 * @throws IOException when 467 */ 468 boolean resourceExists( Tap tap ) throws IOException; 469 470 /** 471 * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance. 472 * <p/> 473 * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call, 474 * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be 475 * stored in a Collection. 476 * 477 * @param tap of type Tap 478 * @return TupleIterator 479 * @throws IOException when there is an error opening the resource 480 */ 481 TupleEntryIterator openTapForRead( Tap tap ) throws IOException; 482 483 /** 484 * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance. 485 * 486 * @param tap of type Tap 487 * @return TupleCollector 488 * @throws IOException when there is an error opening the resource 489 */ 490 TupleEntryCollector openTapForWrite( Tap tap ) throws IOException; 491 492 /** 493 * Method writeDOT writes this Flow instance to the given filename as a DOT file for import into a graphics package. 494 * 495 * @param filename of type String 496 */ 497 void writeDOT( String filename ); 498 499 /** 500 * Method writeStepsDOT writes this Flow step graph to the given filename as a DOT file for import into a graphics package. 501 * 502 * @param filename of type String 503 */ 504 void writeStepsDOT( String filename ); 505 506 /** 507 * Returns the parent Cascade ID that owns this Flow instance. 508 * 509 * @return of type String 510 */ 511 String getCascadeID(); 512 513 /** 514 * Returns the run ID given when this Flow instance was defined in the FlowDef. 515 * 516 * @return of type String 517 */ 518 String getRunID(); 519 520 /** 521 * Method jobsAreLocal returns true if all jobs are executed in-process as a single map and reduce task. 522 * 523 * @return boolean 524 */ 525 boolean stepsAreLocal(); 526 527 /** 528 * Method isStopJobsOnExit returns the stopJobsOnExit of this Flow object. Defaults to {@code true}. 529 * 530 * @return the stopJobsOnExit (type boolean) of this Flow object. 531 */ 532 boolean isStopJobsOnExit(); 533 }