001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow; 022 023 import java.io.IOException; 024 import java.util.Collection; 025 import java.util.List; 026 import java.util.Map; 027 028 import cascading.flow.planner.PlatformInfo; 029 import cascading.management.UnitOfWork; 030 import cascading.stats.FlowStats; 031 import cascading.tap.Tap; 032 import cascading.tuple.TupleEntryCollector; 033 import cascading.tuple.TupleEntryIterator; 034 035 /** 036 * A Flow is a logical unit of work declared by an assembly of {@link cascading.pipe.Pipe} instances connected to source 037 * and sink {@link Tap} instances. 038 * <p/> 039 * A Flow is then executed to push the incoming source data through the assembly into one or more sinks. 040 * <p/> 041 * A Flow sub-class instance may not be instantiated directly in most cases, see sub-classes of {@link FlowConnector} class 042 * for supported platforms. 043 * <p/> 044 * Note that {@link cascading.pipe.Pipe} assemblies can be reused in multiple Flow instances. They maintain 045 * no state regarding the Flow execution. Subsequently, {@link cascading.pipe.Pipe} assemblies can be given 046 * parameters through its calling Flow so they can be built in a generic fashion. 047 * <p/> 048 * When a Flow is created, an optimized internal representation is created that is then executed 049 * on the underlying execution platform. This is typically done by creating one or more {@link FlowStep} instances. 050 * </p> 051 * Flows are submitted in order of dependency when used with a {@link cascading.cascade.Cascade}. If two or more steps do not share the 052 * same dependencies and all can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines 053 * the order in which all steps will be submitted for execution. The default submit priority is 5. 054 * <p/> 055 * Use the {@link FlowListener} to receive any events on the life-cycle of the Flow as it executes. Any 056 * {@link Tap} instances owned by the Flow also implementing FlowListener will automatically be added to the 057 * set of listeners. 058 * 059 * @see FlowListener 060 * @see cascading.flow.FlowConnector 061 */ 062 public interface Flow<Config> extends UnitOfWork<FlowStats> 063 { 064 String CASCADING_FLOW_ID = "cascading.flow.id"; 065 066 /** 067 * Method getName returns the name of this Flow object. 068 * 069 * @return the name (type String) of this Flow object. 070 */ 071 @Override 072 String getName(); 073 074 /** 075 * Method prepare is used by a {@link cascading.cascade.Cascade} to notify the given Flow it should initialize or clear any resources 076 * necessary for {@link #start()} to be called successfully. 077 * <p/> 078 * Specifically, this implementation calls {@link BaseFlow#deleteSinksIfNotUpdate()} && {@link BaseFlow#deleteTrapsIfNotUpdate()}. 079 * 080 * @throws java.io.IOException when 081 */ 082 @Override 083 void prepare(); 084 085 /** 086 * Method start begins the execution of this Flow instance. It will return immediately. Use the method {@link #complete()} 087 * to block until this Flow completes. 088 */ 089 @Override 090 void start(); 091 092 /** Method stop stops all running jobs, killing any currently executing. */ 093 @Override 094 void stop(); 095 096 /** Method complete starts the current Flow instance if it has not be previously started, then block until completion. */ 097 @Override 098 void complete(); 099 100 @Override 101 void cleanup(); 102 103 /** 104 * Method getConfig returns the internal configuration object. 105 * <p/> 106 * Any changes to this object will not be reflected in child steps. See {@link cascading.flow.FlowConnector} for setting 107 * default properties visible to children. Or see {@link cascading.flow.FlowStepStrategy} for setting properties on 108 * individual steps before they are executed. 109 * 110 * @return the default configuration of this Flow 111 */ 112 Config getConfig(); 113 114 /** 115 * Method getConfigCopy returns a copy of the internal configuration object. This object can be safely 116 * modified. 117 * 118 * @return a copy of the default configuration of this Flow 119 */ 120 Config getConfigCopy(); 121 122 /** 123 * Method getConfiAsProperties converts the internal configuration object into a {@link java.util.Map} of 124 * key value pairs. 125 * 126 * @return a Map of key/value pairs 127 */ 128 Map<Object, Object> getConfigAsProperties(); 129 130 String getProperty( String key ); 131 132 /** 133 * Method getID returns the ID of this Flow object. 134 * <p/> 135 * The ID value is a long HEX String used to identify this instance globally. Subsequent Flow 136 * instances created with identical parameters will not return the same ID. 137 * 138 * @return the ID (type String) of this Flow object. 139 */ 140 @Override 141 String getID(); 142 143 @Override 144 String getTags(); 145 146 /** 147 * Method getSubmitPriority returns the submitPriority of this Flow object. 148 * <p/> 149 * 10 is lowest, 1 is the highest, 5 is the default. 150 * 151 * @return the submitPriority (type int) of this FlowStep object. 152 */ 153 int getSubmitPriority(); 154 155 /** 156 * Method setSubmitPriority sets the submitPriority of this Flow object. 157 * <p/> 158 * 10 is lowest, 1 is the highest, 5 is the default. 159 * 160 * @param submitPriority the submitPriority of this FlowStep object. 161 */ 162 void setSubmitPriority( int submitPriority ); 163 164 FlowProcess<Config> getFlowProcess(); 165 166 /** 167 * Method getFlowStats returns the flowStats of this Flow object. 168 * 169 * @return the flowStats (type FlowStats) of this Flow object. 170 */ 171 FlowStats getFlowStats(); 172 173 /** 174 * Method hasListeners returns true if {@link FlowListener} instances have been registered. 175 * 176 * @return boolean 177 */ 178 boolean hasListeners(); 179 180 /** 181 * Method addListener registers the given flowListener with this instance. 182 * 183 * @param flowListener of type FlowListener 184 */ 185 void addListener( FlowListener flowListener ); 186 187 /** 188 * Method removeListener removes the given flowListener from this instance. 189 * 190 * @param flowListener of type FlowListener 191 * @return true if the listener was removed 192 */ 193 boolean removeListener( FlowListener flowListener ); 194 195 /** 196 * Method hasStepListeners returns true if {@link FlowStepListener} instances have been registered 197 * with any of the {@link FlowStep}s belonging to this instance 198 * 199 * @return boolean 200 */ 201 boolean hasStepListeners(); 202 203 /** 204 * Method addStepListener registers the given flowStepListener with this instance. 205 * 206 * @param flowStepListener of type addStepListener 207 */ 208 void addStepListener( FlowStepListener flowStepListener ); 209 210 /** 211 * Method removeStepListener removes the given flowStepListener from this instance. 212 * 213 * @param flowStepListener of type FlowStepListener 214 * @return true if the listener was removed from all the {@link FlowStep} belonging to this instance 215 */ 216 boolean removeStepListener( FlowStepListener flowStepListener ); 217 218 /** 219 * Method getSources returns the sources of this Flow object. 220 * 221 * @return the sources (type Map) of this Flow object. 222 */ 223 Map<String, Tap> getSources(); 224 225 List<String> getSourceNames(); 226 227 Tap getSource( String name ); 228 229 /** 230 * Method getSourcesCollection returns a {@link Collection} of source {@link Tap}s for this Flow object. 231 * 232 * @return the sourcesCollection (type Collection<Tap>) of this Flow object. 233 */ 234 Collection<Tap> getSourcesCollection(); 235 236 /** 237 * Method getSinks returns the sinks of this Flow object. 238 * 239 * @return the sinks (type Map) of this Flow object. 240 */ 241 Map<String, Tap> getSinks(); 242 243 List<String> getSinkNames(); 244 245 Tap getSink( String name ); 246 247 /** 248 * Method getSinksCollection returns a {@link Collection} of sink {@link Tap}s for this Flow object. 249 * 250 * @return the sinkCollection (type Collection<Tap>) of this Flow object. 251 */ 252 Collection<Tap> getSinksCollection(); 253 254 /** 255 * Method getSink returns the first sink of this Flow object. 256 * 257 * @return the sink (type Tap) of this Flow object. 258 */ 259 Tap getSink(); 260 261 /** 262 * Method getTraps returns the traps of this Flow object. 263 * 264 * @return the traps (type Map<String, Tap>) of this Flow object. 265 */ 266 Map<String, Tap> getTraps(); 267 268 List<String> getTrapNames(); 269 270 /** 271 * Method getTrapsCollection returns a {@link Collection} of trap {@link Tap}s for this Flow object. 272 * 273 * @return the trapsCollection (type Collection<Tap>) of this Flow object. 274 */ 275 Collection<Tap> getTrapsCollection(); 276 277 /** 278 * Method getCheckpoints returns the checkpoint taps of this Flow object. 279 * 280 * @return the traps (type Map<String, Tap>) of this Flow object. 281 */ 282 Map<String, Tap> getCheckpoints(); 283 284 List<String> getCheckpointNames(); 285 286 /** 287 * Method getCheckpointsCollection returns a {@link Collection} of checkpoint {@link Tap}s for this Flow object. 288 * 289 * @return the trapsCollection (type Collection<Tap>) of this Flow object. 290 */ 291 Collection<Tap> getCheckpointsCollection(); 292 293 294 /** 295 * Method getFlowSkipStrategy returns the current {@link cascading.flow.FlowSkipStrategy} used by this Flow. 296 * 297 * @return FlowSkipStrategy 298 */ 299 FlowSkipStrategy getFlowSkipStrategy(); 300 301 /** 302 * Method setFlowSkipStrategy sets a new {@link cascading.flow.FlowSkipStrategy}, the current strategy is returned. 303 * <p/> 304 * FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is {@link FlowSkipIfSinkNotStale}. 305 * An alternative strategy would be {@link cascading.flow.FlowSkipIfSinkExists}. 306 * <p/> 307 * A FlowSkipStrategy will not be consulted when executing a Flow directly through {@link #start()} or {@link #complete()}. Only 308 * when the Flow is executed through a {@link cascading.cascade.Cascade} instance. 309 * 310 * @param flowSkipStrategy of type FlowSkipStrategy 311 * @return FlowSkipStrategy 312 */ 313 FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ); 314 315 /** 316 * Method isSkipFlow returns true if the parent {@link cascading.cascade.Cascade} should skip this Flow instance. True is returned 317 * if the current {@link cascading.flow.FlowSkipStrategy} returns true. 318 * 319 * @return the skipFlow (type boolean) of this Flow object. 320 * @throws IOException when 321 */ 322 boolean isSkipFlow() throws IOException; 323 324 /** 325 * Method areSinksStale returns true if any of the sinks referenced are out of date in relation to the sources. Or 326 * if any sink method {@link cascading.tap.Tap#isReplace()} returns true. 327 * 328 * @return boolean 329 * @throws java.io.IOException when 330 */ 331 boolean areSinksStale() throws IOException; 332 333 /** 334 * Method areSourcesNewer returns true if any source is newer than the given sinkModified date value. 335 * 336 * @param sinkModified of type long 337 * @return boolean 338 * @throws java.io.IOException when 339 */ 340 boolean areSourcesNewer( long sinkModified ) throws IOException; 341 342 /** 343 * Method getSinkModified returns the youngest modified date of any sink {@link cascading.tap.Tap} managed by this Flow instance. 344 * <p/> 345 * If zero (0) is returned, at least one of the sink resources does not exist. If minus one (-1) is returned, 346 * atleast one of the sinks are marked for delete ({@link cascading.tap.Tap#isReplace() returns true}). 347 * 348 * @return the sinkModified (type long) of this Flow object. 349 * @throws java.io.IOException when 350 */ 351 long getSinkModified() throws IOException; 352 353 /** 354 * Returns the current {@link FlowStepStrategy} instance. 355 * 356 * @return FlowStepStrategy 357 */ 358 FlowStepStrategy getFlowStepStrategy(); 359 360 /** 361 * Sets a default {@link FlowStepStrategy} instance. 362 * <p/> 363 * Use a FlowStepStrategy to change {@link cascading.flow.FlowStep} configuration properties 364 * before the properties are submitted to the underlying platform for the step 365 * unit of work. 366 * 367 * @param flowStepStrategy The FlowStepStrategy to use. 368 */ 369 void setFlowStepStrategy( FlowStepStrategy flowStepStrategy ); 370 371 /** 372 * Method getFlowSteps returns the flowSteps of this Flow object. They will be in topological order. 373 * 374 * @return the steps (type List<FlowStep>) of this Flow object. 375 */ 376 List<FlowStep<Config>> getFlowSteps(); 377 378 /** 379 * Method openSource opens the first source Tap. 380 * 381 * @return TupleIterator 382 * @throws IOException when 383 */ 384 TupleEntryIterator openSource() throws IOException; 385 386 /** 387 * Method openSource opens the named source Tap. 388 * 389 * @param name of type String 390 * @return TupleIterator 391 * @throws IOException when 392 */ 393 TupleEntryIterator openSource( String name ) throws IOException; 394 395 /** 396 * Method openSink opens the first sink Tap. 397 * 398 * @return TupleIterator 399 * @throws IOException when 400 */ 401 TupleEntryIterator openSink() throws IOException; 402 403 /** 404 * Method openSink opens the named sink Tap. 405 * 406 * @param name of type String 407 * @return TupleIterator 408 * @throws IOException when 409 */ 410 TupleEntryIterator openSink( String name ) throws IOException; 411 412 /** 413 * Method openTrap opens the first trap Tap. 414 * 415 * @return TupleIterator 416 * @throws IOException when 417 */ 418 TupleEntryIterator openTrap() throws IOException; 419 420 /** 421 * Method openTrap opens the named trap Tap. 422 * 423 * @param name of type String 424 * @return TupleIterator 425 * @throws IOException when 426 */ 427 TupleEntryIterator openTrap( String name ) throws IOException; 428 429 /** 430 * Method resourceExists returns true if the resource represented by the given Tap instance exists. 431 * 432 * @param tap of type Tap 433 * @return boolean 434 * @throws IOException when 435 */ 436 boolean resourceExists( Tap tap ) throws IOException; 437 438 /** 439 * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance. 440 * <p/> 441 * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call, 442 * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be 443 * stored in a Collection. 444 * 445 * @param tap of type Tap 446 * @return TupleIterator 447 * @throws IOException when there is an error opening the resource 448 */ 449 TupleEntryIterator openTapForRead( Tap tap ) throws IOException; 450 451 /** 452 * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance. 453 * 454 * @param tap of type Tap 455 * @return TupleCollector 456 * @throws IOException when there is an error opening the resource 457 */ 458 TupleEntryCollector openTapForWrite( Tap tap ) throws IOException; 459 460 /** 461 * Method writeDOT writes this Flow instance to the given filename as a DOT file for import into a graphics package. 462 * 463 * @param filename of type String 464 */ 465 void writeDOT( String filename ); 466 467 /** 468 * Method writeStepsDOT writes this Flow step graph to the given filename as a DOT file for import into a graphics package. 469 * 470 * @param filename of type String 471 */ 472 void writeStepsDOT( String filename ); 473 474 String getCascadeID(); 475 476 String getRunID(); 477 478 PlatformInfo getPlatformInfo(); 479 480 /** 481 * Method jobsAreLocal returns true if all jobs are executed in-process as a single map and reduce task. 482 * 483 * @return boolean 484 */ 485 boolean stepsAreLocal(); 486 487 /** 488 * Method isStopJobsOnExit returns the stopJobsOnExit of this Flow object. Defaults to {@code true}. 489 * 490 * @return the stopJobsOnExit (type boolean) of this Flow object. 491 */ 492 boolean isStopJobsOnExit(); 493 }