001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow; 022 023 import java.io.IOException; 024 import java.util.Collection; 025 import java.util.List; 026 import java.util.Map; 027 028 import cascading.flow.planner.PlatformInfo; 029 import cascading.management.UnitOfWork; 030 import cascading.stats.FlowStats; 031 import cascading.tap.Tap; 032 import cascading.tuple.TupleEntryCollector; 033 import cascading.tuple.TupleEntryIterator; 034 035 /** 036 * A Flow is a logical unit of work declared by an assembly of {@link cascading.pipe.Pipe} instances connected to source 037 * and sink {@link Tap} instances. 038 * <p/> 039 * A Flow is then executed to push the incoming source data through the assembly into one or more sinks. 040 * <p/> 041 * A Flow sub-class instance may not be instantiated directly in most cases, see sub-classes of {@link FlowConnector} class 042 * for supported platforms. 043 * <p/> 044 * Note that {@link cascading.pipe.Pipe} assemblies can be reused in multiple Flow instances. They maintain 045 * no state regarding the Flow execution. Subsequently, {@link cascading.pipe.Pipe} assemblies can be given 046 * parameters through its calling Flow so they can be built in a generic fashion. 047 * <p/> 048 * When a Flow is created, an optimized internal representation is created that is then executed 049 * on the underlying execution platform. This is typically done by creating one or more {@link FlowStep} instances. 050 * </p> 051 * Flows are submitted in order of dependency when used with a {@link cascading.cascade.Cascade}. If two or more steps do not share the 052 * same dependencies and all can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines 053 * the order in which all steps will be submitted for execution. The default submit priority is 5. 054 * <p/> 055 * Use the {@link FlowListener} to receive any events on the life-cycle of the Flow as it executes. Any 056 * {@link Tap} instances owned by the Flow also implementing FlowListener will automatically be added to the 057 * set of listeners. 058 * 059 * @see FlowListener 060 * @see cascading.flow.FlowConnector 061 */ 062 public interface Flow<Config> extends UnitOfWork<FlowStats> 063 { 064 String CASCADING_FLOW_ID = "cascading.flow.id"; 065 066 /** 067 * Method getName returns the name of this Flow object. 068 * 069 * @return the name (type String) of this Flow object. 070 */ 071 @Override 072 String getName(); 073 074 /** 075 * Method prepare is used by a {@link cascading.cascade.Cascade} to notify the given Flow it should initialize or clear any resources 076 * necessary for {@link #start()} to be called successfully. 077 * <p/> 078 * Specifically, this implementation calls {@link BaseFlow#deleteSinksIfNotUpdate()} && {@link BaseFlow#deleteTrapsIfNotUpdate()}. 079 * 080 * @throws java.io.IOException when 081 */ 082 @Override 083 void prepare(); 084 085 /** 086 * Method start begins the execution of this Flow instance. It will return immediately. Use the method {@link #complete()} 087 * to block until this Flow completes. 088 */ 089 @Override 090 void start(); 091 092 /** Method stop stops all running jobs, killing any currently executing. */ 093 @Override 094 void stop(); 095 096 /** Method complete starts the current Flow instance if it has not be previously started, then block until completion. */ 097 @Override 098 void complete(); 099 100 @Override 101 void cleanup(); 102 103 /** 104 * Method getConfig returns the internal configuration object. 105 * <p/> 106 * Any changes to this object will not be reflected in child steps. See {@link cascading.flow.FlowConnector} for setting 107 * default properties visible to children. Or see {@link cascading.flow.FlowStepStrategy} for setting properties on 108 * individual steps before they are executed. 109 * 110 * @return the default configuration of this Flow 111 */ 112 Config getConfig(); 113 114 /** 115 * Method getConfigCopy returns a copy of the internal configuration object. This object can be safely 116 * modified. 117 * 118 * @return a copy of the default configuration of this Flow 119 */ 120 Config getConfigCopy(); 121 122 /** 123 * Method getConfiAsProperties converts the internal configuration object into a {@link java.util.Map} of 124 * key value pairs. 125 * 126 * @return a Map of key/value pairs 127 */ 128 Map<Object, Object> getConfigAsProperties(); 129 130 String getProperty( String key ); 131 132 /** 133 * Method getID returns the ID of this Flow object. 134 * <p/> 135 * The ID value is a long HEX String used to identify this instance globally. Subsequent Flow 136 * instances created with identical parameters will not return the same ID. 137 * 138 * @return the ID (type String) of this Flow object. 139 */ 140 @Override 141 String getID(); 142 143 /** 144 * Returns an immutable map of properties giving more details about the Flow object. 145 * <p/> 146 * See {@link cascading.flow.FlowDef#addDescription(String, String)} to set values on a given Flow. 147 * <p/> 148 * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents. 149 * For known description types, see {@link FlowDescriptors}. 150 * 151 * @return Map<String,String> 152 */ 153 public Map<String, String> getFlowDescriptor(); 154 155 @Override 156 String getTags(); 157 158 /** 159 * Method getSubmitPriority returns the submitPriority of this Flow object. 160 * <p/> 161 * 10 is lowest, 1 is the highest, 5 is the default. 162 * 163 * @return the submitPriority (type int) of this FlowStep object. 164 */ 165 int getSubmitPriority(); 166 167 /** 168 * Method setSubmitPriority sets the submitPriority of this Flow object. 169 * <p/> 170 * 10 is lowest, 1 is the highest, 5 is the default. 171 * 172 * @param submitPriority the submitPriority of this FlowStep object. 173 */ 174 void setSubmitPriority( int submitPriority ); 175 176 FlowProcess<Config> getFlowProcess(); 177 178 /** 179 * Method getFlowStats returns the flowStats of this Flow object. 180 * 181 * @return the flowStats (type FlowStats) of this Flow object. 182 */ 183 FlowStats getFlowStats(); 184 185 /** 186 * Method hasListeners returns true if {@link FlowListener} instances have been registered. 187 * 188 * @return boolean 189 */ 190 boolean hasListeners(); 191 192 /** 193 * Method addListener registers the given flowListener with this instance. 194 * 195 * @param flowListener of type FlowListener 196 */ 197 void addListener( FlowListener flowListener ); 198 199 /** 200 * Method removeListener removes the given flowListener from this instance. 201 * 202 * @param flowListener of type FlowListener 203 * @return true if the listener was removed 204 */ 205 boolean removeListener( FlowListener flowListener ); 206 207 /** 208 * Method hasStepListeners returns true if {@link FlowStepListener} instances have been registered 209 * with any of the {@link FlowStep}s belonging to this instance 210 * 211 * @return boolean 212 */ 213 boolean hasStepListeners(); 214 215 /** 216 * Method addStepListener registers the given flowStepListener with this instance. 217 * 218 * @param flowStepListener of type addStepListener 219 */ 220 void addStepListener( FlowStepListener flowStepListener ); 221 222 /** 223 * Method removeStepListener removes the given flowStepListener from this instance. 224 * 225 * @param flowStepListener of type FlowStepListener 226 * @return true if the listener was removed from all the {@link FlowStep} belonging to this instance 227 */ 228 boolean removeStepListener( FlowStepListener flowStepListener ); 229 230 /** 231 * Method getSources returns the sources of this Flow object. 232 * 233 * @return the sources (type Map) of this Flow object. 234 */ 235 Map<String, Tap> getSources(); 236 237 List<String> getSourceNames(); 238 239 Tap getSource( String name ); 240 241 /** 242 * Method getSourcesCollection returns a {@link Collection} of source {@link Tap}s for this Flow object. 243 * 244 * @return the sourcesCollection (type Collection<Tap>) of this Flow object. 245 */ 246 Collection<Tap> getSourcesCollection(); 247 248 /** 249 * Method getSinks returns the sinks of this Flow object. 250 * 251 * @return the sinks (type Map) of this Flow object. 252 */ 253 Map<String, Tap> getSinks(); 254 255 List<String> getSinkNames(); 256 257 Tap getSink( String name ); 258 259 /** 260 * Method getSinksCollection returns a {@link Collection} of sink {@link Tap}s for this Flow object. 261 * 262 * @return the sinkCollection (type Collection<Tap>) of this Flow object. 263 */ 264 Collection<Tap> getSinksCollection(); 265 266 /** 267 * Method getSink returns the first sink of this Flow object. 268 * 269 * @return the sink (type Tap) of this Flow object. 270 */ 271 Tap getSink(); 272 273 /** 274 * Method getTraps returns the traps of this Flow object. 275 * 276 * @return the traps (type Map<String, Tap>) of this Flow object. 277 */ 278 Map<String, Tap> getTraps(); 279 280 List<String> getTrapNames(); 281 282 /** 283 * Method getTrapsCollection returns a {@link Collection} of trap {@link Tap}s for this Flow object. 284 * 285 * @return the trapsCollection (type Collection<Tap>) of this Flow object. 286 */ 287 Collection<Tap> getTrapsCollection(); 288 289 /** 290 * Method getCheckpoints returns the checkpoint taps of this Flow object. 291 * 292 * @return the traps (type Map<String, Tap>) of this Flow object. 293 */ 294 Map<String, Tap> getCheckpoints(); 295 296 List<String> getCheckpointNames(); 297 298 /** 299 * Method getCheckpointsCollection returns a {@link Collection} of checkpoint {@link Tap}s for this Flow object. 300 * 301 * @return the trapsCollection (type Collection<Tap>) of this Flow object. 302 */ 303 Collection<Tap> getCheckpointsCollection(); 304 305 /** 306 * Method getFlowSkipStrategy returns the current {@link cascading.flow.FlowSkipStrategy} used by this Flow. 307 * 308 * @return FlowSkipStrategy 309 */ 310 FlowSkipStrategy getFlowSkipStrategy(); 311 312 /** 313 * Method setFlowSkipStrategy sets a new {@link cascading.flow.FlowSkipStrategy}, the current strategy is returned. 314 * <p/> 315 * FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is {@link FlowSkipIfSinkNotStale}. 316 * An alternative strategy would be {@link cascading.flow.FlowSkipIfSinkExists}. 317 * <p/> 318 * A FlowSkipStrategy will not be consulted when executing a Flow directly through {@link #start()} or {@link #complete()}. Only 319 * when the Flow is executed through a {@link cascading.cascade.Cascade} instance. 320 * 321 * @param flowSkipStrategy of type FlowSkipStrategy 322 * @return FlowSkipStrategy 323 */ 324 FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ); 325 326 /** 327 * Method isSkipFlow returns true if the parent {@link cascading.cascade.Cascade} should skip this Flow instance. True is returned 328 * if the current {@link cascading.flow.FlowSkipStrategy} returns true. 329 * 330 * @return the skipFlow (type boolean) of this Flow object. 331 * @throws IOException when 332 */ 333 boolean isSkipFlow() throws IOException; 334 335 /** 336 * Method areSinksStale returns true if any of the sinks referenced are out of date in relation to the sources. Or 337 * if any sink method {@link cascading.tap.Tap#isReplace()} returns true. 338 * 339 * @return boolean 340 * @throws java.io.IOException when 341 */ 342 boolean areSinksStale() throws IOException; 343 344 /** 345 * Method areSourcesNewer returns true if any source is newer than the given sinkModified date value. 346 * 347 * @param sinkModified of type long 348 * @return boolean 349 * @throws java.io.IOException when 350 */ 351 boolean areSourcesNewer( long sinkModified ) throws IOException; 352 353 /** 354 * Method getSinkModified returns the youngest modified date of any sink {@link cascading.tap.Tap} managed by this Flow instance. 355 * <p/> 356 * If zero (0) is returned, at least one of the sink resources does not exist. If minus one (-1) is returned, 357 * atleast one of the sinks are marked for delete ({@link cascading.tap.Tap#isReplace() returns true}). 358 * 359 * @return the sinkModified (type long) of this Flow object. 360 * @throws java.io.IOException when 361 */ 362 long getSinkModified() throws IOException; 363 364 /** 365 * Returns the current {@link FlowStepStrategy} instance. 366 * 367 * @return FlowStepStrategy 368 */ 369 FlowStepStrategy getFlowStepStrategy(); 370 371 /** 372 * Sets a default {@link FlowStepStrategy} instance. 373 * <p/> 374 * Use a FlowStepStrategy to change {@link cascading.flow.FlowStep} configuration properties 375 * before the properties are submitted to the underlying platform for the step 376 * unit of work. 377 * 378 * @param flowStepStrategy The FlowStepStrategy to use. 379 */ 380 void setFlowStepStrategy( FlowStepStrategy flowStepStrategy ); 381 382 /** 383 * Method getFlowSteps returns the flowSteps of this Flow object. They will be in topological order. 384 * 385 * @return the steps (type List<FlowStep>) of this Flow object. 386 */ 387 List<FlowStep<Config>> getFlowSteps(); 388 389 /** 390 * Method openSource opens the first source Tap. 391 * 392 * @return TupleIterator 393 * @throws IOException when 394 */ 395 TupleEntryIterator openSource() throws IOException; 396 397 /** 398 * Method openSource opens the named source Tap. 399 * 400 * @param name of type String 401 * @return TupleIterator 402 * @throws IOException when 403 */ 404 TupleEntryIterator openSource( String name ) throws IOException; 405 406 /** 407 * Method openSink opens the first sink Tap. 408 * 409 * @return TupleIterator 410 * @throws IOException when 411 */ 412 TupleEntryIterator openSink() throws IOException; 413 414 /** 415 * Method openSink opens the named sink Tap. 416 * 417 * @param name of type String 418 * @return TupleIterator 419 * @throws IOException when 420 */ 421 TupleEntryIterator openSink( String name ) throws IOException; 422 423 /** 424 * Method openTrap opens the first trap Tap. 425 * 426 * @return TupleIterator 427 * @throws IOException when 428 */ 429 TupleEntryIterator openTrap() throws IOException; 430 431 /** 432 * Method openTrap opens the named trap Tap. 433 * 434 * @param name of type String 435 * @return TupleIterator 436 * @throws IOException when 437 */ 438 TupleEntryIterator openTrap( String name ) throws IOException; 439 440 /** 441 * Method resourceExists returns true if the resource represented by the given Tap instance exists. 442 * 443 * @param tap of type Tap 444 * @return boolean 445 * @throws IOException when 446 */ 447 boolean resourceExists( Tap tap ) throws IOException; 448 449 /** 450 * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance. 451 * <p/> 452 * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call, 453 * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be 454 * stored in a Collection. 455 * 456 * @param tap of type Tap 457 * @return TupleIterator 458 * @throws IOException when there is an error opening the resource 459 */ 460 TupleEntryIterator openTapForRead( Tap tap ) throws IOException; 461 462 /** 463 * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance. 464 * 465 * @param tap of type Tap 466 * @return TupleCollector 467 * @throws IOException when there is an error opening the resource 468 */ 469 TupleEntryCollector openTapForWrite( Tap tap ) throws IOException; 470 471 /** 472 * Method writeDOT writes this Flow instance to the given filename as a DOT file for import into a graphics package. 473 * 474 * @param filename of type String 475 */ 476 void writeDOT( String filename ); 477 478 /** 479 * Method writeStepsDOT writes this Flow step graph to the given filename as a DOT file for import into a graphics package. 480 * 481 * @param filename of type String 482 */ 483 void writeStepsDOT( String filename ); 484 485 String getCascadeID(); 486 487 String getRunID(); 488 489 PlatformInfo getPlatformInfo(); 490 491 /** 492 * Method jobsAreLocal returns true if all jobs are executed in-process as a single map and reduce task. 493 * 494 * @return boolean 495 */ 496 boolean stepsAreLocal(); 497 498 /** 499 * Method isStopJobsOnExit returns the stopJobsOnExit of this Flow object. Defaults to {@code true}. 500 * 501 * @return the stopJobsOnExit (type boolean) of this Flow object. 502 */ 503 boolean isStopJobsOnExit(); 504 }