001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.List;
026import java.util.Map;
027
028import cascading.flow.planner.PlannerInfo;
029import cascading.flow.planner.PlatformInfo;
030import cascading.management.UnitOfWork;
031import cascading.stats.FlowStats;
032import cascading.tap.Tap;
033import cascading.tuple.TupleEntryCollector;
034import cascading.tuple.TupleEntryIterator;
035
036/**
037 * A Flow is a logical unit of work declared by an assembly of {@link cascading.pipe.Pipe} instances connected to source
038 * and sink {@link Tap} instances.
039 * <p/>
040 * A Flow is then executed to push the incoming source data through the assembly into one or more sinks.
041 * <p/>
042 * A Flow sub-class instance may not be instantiated directly in most cases, see sub-classes of {@link FlowConnector} class
043 * for supported platforms.
044 * <p/>
045 * Note that {@link cascading.pipe.Pipe} assemblies can be reused in multiple Flow instances. They maintain
046 * no state regarding the Flow execution. Subsequently, {@link cascading.pipe.Pipe} assemblies can be given
047 * parameters through its calling Flow so they can be built in a generic fashion.
048 * <p/>
049 * When a Flow is created, an optimized internal representation is created that is then executed
050 * on the underlying execution platform. This is typically done by creating one or more {@link FlowStep} instances.
051 * </p>
052 * Flows are submitted in order of dependency when used with a {@link cascading.cascade.Cascade}. If two or more steps do not share the
053 * same dependencies and all can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines
054 * the order in which all steps will be submitted for execution. The default submit priority is 5.
055 * <p/>
056 * Use the {@link FlowListener} to receive any events on the life-cycle of the Flow as it executes. Any
057 * {@link Tap} instances owned by the Flow also implementing FlowListener will automatically be added to the
058 * set of listeners.
059 *
060 * @see FlowListener
061 * @see cascading.flow.FlowConnector
062 */
063public interface Flow<Config> extends UnitOfWork<FlowStats>
064  {
065  String CASCADING_FLOW_ID = "cascading.flow.id";
066
067  /**
068   * Method getName returns the name of this Flow object.
069   *
070   * @return the name (type String) of this Flow object.
071   */
072  @Override
073  String getName();
074
075  /**
076   * Method prepare is used by a {@link cascading.cascade.Cascade} to notify the given Flow it should initialize or clear any resources
077   * necessary for {@link #start()} to be called successfully.
078   * <p/>
079   * Specifically, this implementation calls {@link BaseFlow#deleteSinksIfNotUpdate()} && {@link BaseFlow#deleteTrapsIfNotUpdate()}.
080   *
081   * @throws java.io.IOException when
082   */
083  @Override
084  void prepare();
085
086  /**
087   * Method start begins the execution of this Flow instance. It will return immediately. Use the method {@link #complete()}
088   * to block until this Flow completes.
089   */
090  @Override
091  void start();
092
093  /** Method stop stops all running jobs, killing any currently executing. */
094  @Override
095  void stop();
096
097  /** Method complete starts the current Flow instance if it has not be previously started, then block until completion. */
098  @Override
099  void complete();
100
101  @Override
102  void cleanup();
103
104  /**
105   * Returns any meta-data about the planner that created this Flow instance.
106   *
107   * @return an instance of PlannerInfo
108   */
109  PlannerInfo getPlannerInfo();
110
111  /**
112   * Returns any meta-data about the underlying platform this Flow instance will run against.
113   *
114   * @return an instance of PlatformInfo
115   */
116  PlatformInfo getPlatformInfo();
117
118  /**
119   * Method getConfig returns the internal configuration object.
120   * <p/>
121   * Any changes to this object will not be reflected in child steps. See {@link cascading.flow.FlowConnector} for setting
122   * default properties visible to children. Or see {@link cascading.flow.FlowStepStrategy} for setting properties on
123   * individual steps before they are executed.
124   *
125   * @return the default configuration of this Flow
126   */
127  Config getConfig();
128
129  /**
130   * Method getConfigCopy returns a copy of the internal configuration object. This object can be safely
131   * modified.
132   *
133   * @return a copy of the default configuration of this Flow
134   */
135  Config getConfigCopy();
136
137  /**
138   * Method getConfigAsProperties converts the internal configuration object into a {@link java.util.Map} of
139   * key value pairs.
140   *
141   * @return a Map of key/value pairs
142   */
143  Map<Object, Object> getConfigAsProperties();
144
145  /**
146   * Returns the String property associated with the given key from the current Configuration instance.
147   *
148   * @param key of type String
149   * @return the String value
150   */
151  String getProperty( String key );
152
153  /**
154   * Method getID returns the ID of this Flow object.
155   * <p/>
156   * The ID value is a long HEX String used to identify this instance globally. Subsequent Flow
157   * instances created with identical parameters will not return the same ID.
158   *
159   * @return the ID (type String) of this Flow object.
160   */
161  @Override
162  String getID();
163
164  /**
165   * Returns an immutable map of properties giving more details about the Flow object.
166   * <p/>
167   * See {@link cascading.flow.FlowDef#addDescription(String, String)} to set values on a given Flow.
168   * <p/>
169   * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents.
170   * For known description types, see {@link FlowDescriptors}.
171   *
172   * @return Map<String,String>
173   */
174  public Map<String, String> getFlowDescriptor();
175
176  @Override
177  String getTags();
178
179  /**
180   * Method getSubmitPriority returns the submitPriority of this Flow object.
181   * <p/>
182   * 10 is lowest, 1 is the highest, 5 is the default.
183   *
184   * @return the submitPriority (type int) of this FlowStep object.
185   */
186  int getSubmitPriority();
187
188  /**
189   * Method setSubmitPriority sets the submitPriority of this Flow object.
190   * <p/>
191   * 10 is lowest, 1 is the highest, 5 is the default.
192   *
193   * @param submitPriority the submitPriority of this FlowStep object.
194   */
195  void setSubmitPriority( int submitPriority );
196
197  FlowProcess<Config> getFlowProcess();
198
199  /**
200   * Method getFlowStats returns the flowStats of this Flow object.
201   *
202   * @return the flowStats (type FlowStats) of this Flow object.
203   */
204  FlowStats getFlowStats();
205
206  /**
207   * Method hasListeners returns true if {@link FlowListener} instances have been registered.
208   *
209   * @return boolean
210   */
211  boolean hasListeners();
212
213  /**
214   * Method addListener registers the given flowListener with this instance.
215   *
216   * @param flowListener of type FlowListener
217   */
218  void addListener( FlowListener flowListener );
219
220  /**
221   * Method removeListener removes the given flowListener from this instance.
222   *
223   * @param flowListener of type FlowListener
224   * @return true if the listener was removed
225   */
226  boolean removeListener( FlowListener flowListener );
227
228  /**
229   * Method hasStepListeners returns true if {@link FlowStepListener} instances have been registered
230   * with any of the {@link FlowStep}s belonging to this instance
231   *
232   * @return boolean
233   */
234  boolean hasStepListeners();
235
236  /**
237   * Method addStepListener registers the given flowStepListener with this instance.
238   *
239   * @param flowStepListener of type addStepListener
240   */
241  void addStepListener( FlowStepListener flowStepListener );
242
243  /**
244   * Method removeStepListener removes the given flowStepListener from this instance.
245   *
246   * @param flowStepListener of type FlowStepListener
247   * @return true if the listener was removed from all the {@link FlowStep} belonging to this instance
248   */
249  boolean removeStepListener( FlowStepListener flowStepListener );
250
251  /**
252   * Method getSources returns the sources of this Flow object.
253   *
254   * @return the sources (type Map) of this Flow object.
255   */
256  Map<String, Tap> getSources();
257
258  List<String> getSourceNames();
259
260  Tap getSource( String name );
261
262  /**
263   * Method getSourcesCollection returns a {@link Collection} of source {@link Tap}s for this Flow object.
264   *
265   * @return the sourcesCollection (type Collection<Tap>) of this Flow object.
266   */
267  Collection<Tap> getSourcesCollection();
268
269  /**
270   * Method getSinks returns the sinks of this Flow object.
271   *
272   * @return the sinks (type Map) of this Flow object.
273   */
274  Map<String, Tap> getSinks();
275
276  List<String> getSinkNames();
277
278  Tap getSink( String name );
279
280  /**
281   * Method getSinksCollection returns a {@link Collection} of sink {@link Tap}s for this Flow object.
282   *
283   * @return the sinkCollection (type Collection<Tap>) of this Flow object.
284   */
285  Collection<Tap> getSinksCollection();
286
287  /**
288   * Method getSink returns the first sink of this Flow object.
289   *
290   * @return the sink (type Tap) of this Flow object.
291   */
292  Tap getSink();
293
294  /**
295   * Method getTraps returns the traps of this Flow object.
296   *
297   * @return the traps (type Map<String, Tap>) of this Flow object.
298   */
299  Map<String, Tap> getTraps();
300
301  List<String> getTrapNames();
302
303  /**
304   * Method getTrapsCollection returns a {@link Collection} of trap {@link Tap}s for this Flow object.
305   *
306   * @return the trapsCollection (type Collection<Tap>) of this Flow object.
307   */
308  Collection<Tap> getTrapsCollection();
309
310  /**
311   * Method getCheckpoints returns the checkpoint taps of this Flow object.
312   *
313   * @return the traps (type Map<String, Tap>) of this Flow object.
314   */
315  Map<String, Tap> getCheckpoints();
316
317  List<String> getCheckpointNames();
318
319  /**
320   * Method getCheckpointsCollection returns a {@link Collection} of checkpoint {@link Tap}s for this Flow object.
321   *
322   * @return the trapsCollection (type Collection<Tap>) of this Flow object.
323   */
324  Collection<Tap> getCheckpointsCollection();
325
326  /**
327   * Method getFlowSkipStrategy returns the current {@link cascading.flow.FlowSkipStrategy} used by this Flow.
328   *
329   * @return FlowSkipStrategy
330   */
331  FlowSkipStrategy getFlowSkipStrategy();
332
333  /**
334   * Method setFlowSkipStrategy sets a new {@link cascading.flow.FlowSkipStrategy}, the current strategy is returned.
335   * <p/>
336   * FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is {@link FlowSkipIfSinkNotStale}.
337   * An alternative strategy would be {@link cascading.flow.FlowSkipIfSinkExists}.
338   * <p/>
339   * A FlowSkipStrategy will not be consulted when executing a Flow directly through {@link #start()} or {@link #complete()}. Only
340   * when the Flow is executed through a {@link cascading.cascade.Cascade} instance.
341   *
342   * @param flowSkipStrategy of type FlowSkipStrategy
343   * @return FlowSkipStrategy
344   */
345  FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy );
346
347  /**
348   * Method isSkipFlow returns true if the parent {@link cascading.cascade.Cascade} should skip this Flow instance. True is returned
349   * if the current {@link cascading.flow.FlowSkipStrategy} returns true.
350   *
351   * @return the skipFlow (type boolean) of this Flow object.
352   * @throws IOException when
353   */
354  boolean isSkipFlow() throws IOException;
355
356  /**
357   * Method areSinksStale returns true if any of the sinks referenced are out of date in relation to the sources. Or
358   * if any sink method {@link cascading.tap.Tap#isReplace()} returns true.
359   *
360   * @return boolean
361   * @throws java.io.IOException when
362   */
363  boolean areSinksStale() throws IOException;
364
365  /**
366   * Method areSourcesNewer returns true if any source is newer than the given sinkModified date value.
367   *
368   * @param sinkModified of type long
369   * @return boolean
370   * @throws java.io.IOException when
371   */
372  boolean areSourcesNewer( long sinkModified ) throws IOException;
373
374  /**
375   * Method getSinkModified returns the youngest modified date of any sink {@link cascading.tap.Tap} managed by this Flow instance.
376   * <p/>
377   * If zero (0) is returned, at least one of the sink resources does not exist. If minus one (-1) is returned,
378   * atleast one of the sinks are marked for delete ({@link cascading.tap.Tap#isReplace() returns true}).
379   *
380   * @return the sinkModified (type long) of this Flow object.
381   * @throws java.io.IOException when
382   */
383  long getSinkModified() throws IOException;
384
385  /**
386   * Returns the current {@link FlowStepStrategy} instance.
387   *
388   * @return FlowStepStrategy
389   */
390  FlowStepStrategy getFlowStepStrategy();
391
392  /**
393   * Sets a default {@link FlowStepStrategy} instance.
394   * <p/>
395   * Use a FlowStepStrategy to change {@link cascading.flow.FlowStep} configuration properties
396   * before the properties are submitted to the underlying platform for the step
397   * unit of work.
398   *
399   * @param flowStepStrategy The FlowStepStrategy to use.
400   */
401  void setFlowStepStrategy( FlowStepStrategy flowStepStrategy );
402
403  /**
404   * Method getFlowSteps returns the flowSteps of this Flow object. They will be in topological order.
405   *
406   * @return the steps (type List<FlowStep>) of this Flow object.
407   */
408  List<FlowStep<Config>> getFlowSteps();
409
410  /**
411   * Method openSource opens the first source Tap.
412   *
413   * @return TupleIterator
414   * @throws IOException when
415   */
416  TupleEntryIterator openSource() throws IOException;
417
418  /**
419   * Method openSource opens the named source Tap.
420   *
421   * @param name of type String
422   * @return TupleIterator
423   * @throws IOException when
424   */
425  TupleEntryIterator openSource( String name ) throws IOException;
426
427  /**
428   * Method openSink opens the first sink Tap.
429   *
430   * @return TupleIterator
431   * @throws IOException when
432   */
433  TupleEntryIterator openSink() throws IOException;
434
435  /**
436   * Method openSink opens the named sink Tap.
437   *
438   * @param name of type String
439   * @return TupleIterator
440   * @throws IOException when
441   */
442  TupleEntryIterator openSink( String name ) throws IOException;
443
444  /**
445   * Method openTrap opens the first trap Tap.
446   *
447   * @return TupleIterator
448   * @throws IOException when
449   */
450  TupleEntryIterator openTrap() throws IOException;
451
452  /**
453   * Method openTrap opens the named trap Tap.
454   *
455   * @param name of type String
456   * @return TupleIterator
457   * @throws IOException when
458   */
459  TupleEntryIterator openTrap( String name ) throws IOException;
460
461  /**
462   * Method resourceExists returns true if the resource represented by the given Tap instance exists.
463   *
464   * @param tap of type Tap
465   * @return boolean
466   * @throws IOException when
467   */
468  boolean resourceExists( Tap tap ) throws IOException;
469
470  /**
471   * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance.
472   * <p/>
473   * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
474   * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
475   * stored in a Collection.
476   *
477   * @param tap of type Tap
478   * @return TupleIterator
479   * @throws IOException when there is an error opening the resource
480   */
481  TupleEntryIterator openTapForRead( Tap tap ) throws IOException;
482
483  /**
484   * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance.
485   *
486   * @param tap of type Tap
487   * @return TupleCollector
488   * @throws IOException when there is an error opening the resource
489   */
490  TupleEntryCollector openTapForWrite( Tap tap ) throws IOException;
491
492  /**
493   * Method writeDOT writes this Flow instance to the given filename as a DOT file for import into a graphics package.
494   *
495   * @param filename of type String
496   */
497  void writeDOT( String filename );
498
499  /**
500   * Method writeStepsDOT writes this Flow step graph to the given filename as a DOT file for import into a graphics package.
501   *
502   * @param filename of type String
503   */
504  void writeStepsDOT( String filename );
505
506  /**
507   * Returns the parent Cascade ID that owns this Flow instance.
508   *
509   * @return of type String
510   */
511  String getCascadeID();
512
513  /**
514   * Returns the run ID given when this Flow instance was defined in the FlowDef.
515   *
516   * @return of type String
517   */
518  String getRunID();
519
520  /**
521   * Method jobsAreLocal returns true if all jobs are executed in-process as a single map and reduce task.
522   *
523   * @return boolean
524   */
525  boolean stepsAreLocal();
526
527  /**
528   * Method isStopJobsOnExit returns the stopJobsOnExit of this Flow object. Defaults to {@code true}.
529   *
530   * @return the stopJobsOnExit (type boolean) of this Flow object.
531   */
532  boolean isStopJobsOnExit();
533  }