001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow;
022    
023    import java.io.IOException;
024    import java.util.Collection;
025    import java.util.List;
026    import java.util.Map;
027    
028    import cascading.flow.planner.PlatformInfo;
029    import cascading.management.UnitOfWork;
030    import cascading.stats.FlowStats;
031    import cascading.tap.Tap;
032    import cascading.tuple.TupleEntryCollector;
033    import cascading.tuple.TupleEntryIterator;
034    
035    /**
036     * A Flow is a logical unit of work declared by an assembly of {@link cascading.pipe.Pipe} instances connected to source
037     * and sink {@link Tap} instances.
038     * <p/>
039     * A Flow is then executed to push the incoming source data through the assembly into one or more sinks.
040     * <p/>
041     * A Flow sub-class instance may not be instantiated directly in most cases, see sub-classes of {@link FlowConnector} class
042     * for supported platforms.
043     * <p/>
044     * Note that {@link cascading.pipe.Pipe} assemblies can be reused in multiple Flow instances. They maintain
045     * no state regarding the Flow execution. Subsequently, {@link cascading.pipe.Pipe} assemblies can be given
046     * parameters through its calling Flow so they can be built in a generic fashion.
047     * <p/>
048     * When a Flow is created, an optimized internal representation is created that is then executed
049     * on the underlying execution platform. This is typically done by creating one or more {@link FlowStep} instances.
050     * </p>
051     * Flows are submitted in order of dependency when used with a {@link cascading.cascade.Cascade}. If two or more steps do not share the
052     * same dependencies and all can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines
053     * the order in which all steps will be submitted for execution. The default submit priority is 5.
054     * <p/>
055     * Use the {@link FlowListener} to receive any events on the life-cycle of the Flow as it executes. Any
056     * {@link Tap} instances owned by the Flow also implementing FlowListener will automatically be added to the
057     * set of listeners.
058     *
059     * @see FlowListener
060     * @see cascading.flow.FlowConnector
061     */
062    public interface Flow<Config> extends UnitOfWork<FlowStats>
063      {
064      String CASCADING_FLOW_ID = "cascading.flow.id";
065    
066      /**
067       * Method getName returns the name of this Flow object.
068       *
069       * @return the name (type String) of this Flow object.
070       */
071      @Override
072      String getName();
073    
074      /**
075       * Method prepare is used by a {@link cascading.cascade.Cascade} to notify the given Flow it should initialize or clear any resources
076       * necessary for {@link #start()} to be called successfully.
077       * <p/>
078       * Specifically, this implementation calls {@link BaseFlow#deleteSinksIfNotUpdate()} && {@link BaseFlow#deleteTrapsIfNotUpdate()}.
079       *
080       * @throws java.io.IOException when
081       */
082      @Override
083      void prepare();
084    
085      /**
086       * Method start begins the execution of this Flow instance. It will return immediately. Use the method {@link #complete()}
087       * to block until this Flow completes.
088       */
089      @Override
090      void start();
091    
092      /** Method stop stops all running jobs, killing any currently executing. */
093      @Override
094      void stop();
095    
096      /** Method complete starts the current Flow instance if it has not be previously started, then block until completion. */
097      @Override
098      void complete();
099    
100      @Override
101      void cleanup();
102    
103      /**
104       * Method getConfig returns the internal configuration object.
105       * <p/>
106       * Any changes to this object will not be reflected in child steps. See {@link cascading.flow.FlowConnector} for setting
107       * default properties visible to children. Or see {@link cascading.flow.FlowStepStrategy} for setting properties on
108       * individual steps before they are executed.
109       *
110       * @return the default configuration of this Flow
111       */
112      Config getConfig();
113    
114      /**
115       * Method getConfigCopy returns a copy of the internal configuration object. This object can be safely
116       * modified.
117       *
118       * @return a copy of the default configuration of this Flow
119       */
120      Config getConfigCopy();
121    
122      /**
123       * Method getConfiAsProperties converts the internal configuration object into a {@link java.util.Map} of
124       * key value pairs.
125       *
126       * @return a Map of key/value pairs
127       */
128      Map<Object, Object> getConfigAsProperties();
129    
130      String getProperty( String key );
131    
132      /**
133       * Method getID returns the ID of this Flow object.
134       * <p/>
135       * The ID value is a long HEX String used to identify this instance globally. Subsequent Flow
136       * instances created with identical parameters will not return the same ID.
137       *
138       * @return the ID (type String) of this Flow object.
139       */
140      @Override
141      String getID();
142    
143      @Override
144      String getTags();
145    
146      /**
147       * Method getSubmitPriority returns the submitPriority of this Flow object.
148       * <p/>
149       * 10 is lowest, 1 is the highest, 5 is the default.
150       *
151       * @return the submitPriority (type int) of this FlowStep object.
152       */
153      int getSubmitPriority();
154    
155      /**
156       * Method setSubmitPriority sets the submitPriority of this Flow object.
157       * <p/>
158       * 10 is lowest, 1 is the highest, 5 is the default.
159       *
160       * @param submitPriority the submitPriority of this FlowStep object.
161       */
162      void setSubmitPriority( int submitPriority );
163    
164      FlowProcess<Config> getFlowProcess();
165    
166      /**
167       * Method getFlowStats returns the flowStats of this Flow object.
168       *
169       * @return the flowStats (type FlowStats) of this Flow object.
170       */
171      FlowStats getFlowStats();
172    
173      /**
174       * Method hasListeners returns true if {@link FlowListener} instances have been registered.
175       *
176       * @return boolean
177       */
178      boolean hasListeners();
179    
180      /**
181       * Method addListener registers the given flowListener with this instance.
182       *
183       * @param flowListener of type FlowListener
184       */
185      void addListener( FlowListener flowListener );
186    
187      /**
188       * Method removeListener removes the given flowListener from this instance.
189       *
190       * @param flowListener of type FlowListener
191       * @return true if the listener was removed
192       */
193      boolean removeListener( FlowListener flowListener );
194    
195      /**
196       * Method hasStepListeners returns true if {@link FlowStepListener} instances have been registered
197       * with any of the {@link FlowStep}s belonging to this instance
198       *
199       * @return boolean
200       */
201      boolean hasStepListeners();
202    
203      /**
204       * Method addStepListener registers the given flowStepListener with this instance.
205       *
206       * @param flowStepListener of type addStepListener
207       */
208      void addStepListener( FlowStepListener flowStepListener );
209    
210      /**
211       * Method removeStepListener removes the given flowStepListener from this instance.
212       *
213       * @param flowStepListener of type FlowStepListener
214       * @return true if the listener was removed from all the {@link FlowStep} belonging to this instance
215       */
216      boolean removeStepListener( FlowStepListener flowStepListener );
217    
218      /**
219       * Method getSources returns the sources of this Flow object.
220       *
221       * @return the sources (type Map) of this Flow object.
222       */
223      Map<String, Tap> getSources();
224    
225      List<String> getSourceNames();
226    
227      Tap getSource( String name );
228    
229      /**
230       * Method getSourcesCollection returns a {@link Collection} of source {@link Tap}s for this Flow object.
231       *
232       * @return the sourcesCollection (type Collection<Tap>) of this Flow object.
233       */
234      Collection<Tap> getSourcesCollection();
235    
236      /**
237       * Method getSinks returns the sinks of this Flow object.
238       *
239       * @return the sinks (type Map) of this Flow object.
240       */
241      Map<String, Tap> getSinks();
242    
243      List<String> getSinkNames();
244    
245      Tap getSink( String name );
246    
247      /**
248       * Method getSinksCollection returns a {@link Collection} of sink {@link Tap}s for this Flow object.
249       *
250       * @return the sinkCollection (type Collection<Tap>) of this Flow object.
251       */
252      Collection<Tap> getSinksCollection();
253    
254      /**
255       * Method getSink returns the first sink of this Flow object.
256       *
257       * @return the sink (type Tap) of this Flow object.
258       */
259      Tap getSink();
260    
261      /**
262       * Method getTraps returns the traps of this Flow object.
263       *
264       * @return the traps (type Map<String, Tap>) of this Flow object.
265       */
266      Map<String, Tap> getTraps();
267    
268      List<String> getTrapNames();
269    
270      /**
271       * Method getTrapsCollection returns a {@link Collection} of trap {@link Tap}s for this Flow object.
272       *
273       * @return the trapsCollection (type Collection<Tap>) of this Flow object.
274       */
275      Collection<Tap> getTrapsCollection();
276    
277      /**
278       * Method getCheckpoints returns the checkpoint taps of this Flow object.
279       *
280       * @return the traps (type Map<String, Tap>) of this Flow object.
281       */
282      Map<String, Tap> getCheckpoints();
283    
284      List<String> getCheckpointNames();
285    
286      /**
287       * Method getCheckpointsCollection returns a {@link Collection} of checkpoint {@link Tap}s for this Flow object.
288       *
289       * @return the trapsCollection (type Collection<Tap>) of this Flow object.
290       */
291      Collection<Tap> getCheckpointsCollection();
292    
293    
294      /**
295       * Method getFlowSkipStrategy returns the current {@link cascading.flow.FlowSkipStrategy} used by this Flow.
296       *
297       * @return FlowSkipStrategy
298       */
299      FlowSkipStrategy getFlowSkipStrategy();
300    
301      /**
302       * Method setFlowSkipStrategy sets a new {@link cascading.flow.FlowSkipStrategy}, the current strategy is returned.
303       * <p/>
304       * FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is {@link FlowSkipIfSinkNotStale}.
305       * An alternative strategy would be {@link cascading.flow.FlowSkipIfSinkExists}.
306       * <p/>
307       * A FlowSkipStrategy will not be consulted when executing a Flow directly through {@link #start()} or {@link #complete()}. Only
308       * when the Flow is executed through a {@link cascading.cascade.Cascade} instance.
309       *
310       * @param flowSkipStrategy of type FlowSkipStrategy
311       * @return FlowSkipStrategy
312       */
313      FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy );
314    
315      /**
316       * Method isSkipFlow returns true if the parent {@link cascading.cascade.Cascade} should skip this Flow instance. True is returned
317       * if the current {@link cascading.flow.FlowSkipStrategy} returns true.
318       *
319       * @return the skipFlow (type boolean) of this Flow object.
320       * @throws IOException when
321       */
322      boolean isSkipFlow() throws IOException;
323    
324      /**
325       * Method areSinksStale returns true if any of the sinks referenced are out of date in relation to the sources. Or
326       * if any sink method {@link cascading.tap.Tap#isReplace()} returns true.
327       *
328       * @return boolean
329       * @throws java.io.IOException when
330       */
331      boolean areSinksStale() throws IOException;
332    
333      /**
334       * Method areSourcesNewer returns true if any source is newer than the given sinkModified date value.
335       *
336       * @param sinkModified of type long
337       * @return boolean
338       * @throws java.io.IOException when
339       */
340      boolean areSourcesNewer( long sinkModified ) throws IOException;
341    
342      /**
343       * Method getSinkModified returns the youngest modified date of any sink {@link cascading.tap.Tap} managed by this Flow instance.
344       * <p/>
345       * If zero (0) is returned, at least one of the sink resources does not exist. If minus one (-1) is returned,
346       * atleast one of the sinks are marked for delete ({@link cascading.tap.Tap#isReplace() returns true}).
347       *
348       * @return the sinkModified (type long) of this Flow object.
349       * @throws java.io.IOException when
350       */
351      long getSinkModified() throws IOException;
352    
353      /**
354       * Returns the current {@link FlowStepStrategy} instance.
355       *
356       * @return FlowStepStrategy
357       */
358      FlowStepStrategy getFlowStepStrategy();
359    
360      /**
361       * Sets a default {@link FlowStepStrategy} instance.
362       * <p/>
363       * Use a FlowStepStrategy to change {@link cascading.flow.FlowStep} configuration properties
364       * before the properties are submitted to the underlying platform for the step
365       * unit of work.
366       *
367       * @param flowStepStrategy The FlowStepStrategy to use.
368       */
369      void setFlowStepStrategy( FlowStepStrategy flowStepStrategy );
370    
371      /**
372       * Method getFlowSteps returns the flowSteps of this Flow object. They will be in topological order.
373       *
374       * @return the steps (type List<FlowStep>) of this Flow object.
375       */
376      List<FlowStep<Config>> getFlowSteps();
377    
378      /**
379       * Method openSource opens the first source Tap.
380       *
381       * @return TupleIterator
382       * @throws IOException when
383       */
384      TupleEntryIterator openSource() throws IOException;
385    
386      /**
387       * Method openSource opens the named source Tap.
388       *
389       * @param name of type String
390       * @return TupleIterator
391       * @throws IOException when
392       */
393      TupleEntryIterator openSource( String name ) throws IOException;
394    
395      /**
396       * Method openSink opens the first sink Tap.
397       *
398       * @return TupleIterator
399       * @throws IOException when
400       */
401      TupleEntryIterator openSink() throws IOException;
402    
403      /**
404       * Method openSink opens the named sink Tap.
405       *
406       * @param name of type String
407       * @return TupleIterator
408       * @throws IOException when
409       */
410      TupleEntryIterator openSink( String name ) throws IOException;
411    
412      /**
413       * Method openTrap opens the first trap Tap.
414       *
415       * @return TupleIterator
416       * @throws IOException when
417       */
418      TupleEntryIterator openTrap() throws IOException;
419    
420      /**
421       * Method openTrap opens the named trap Tap.
422       *
423       * @param name of type String
424       * @return TupleIterator
425       * @throws IOException when
426       */
427      TupleEntryIterator openTrap( String name ) throws IOException;
428    
429      /**
430       * Method resourceExists returns true if the resource represented by the given Tap instance exists.
431       *
432       * @param tap of type Tap
433       * @return boolean
434       * @throws IOException when
435       */
436      boolean resourceExists( Tap tap ) throws IOException;
437    
438      /**
439       * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance.
440       * <p/>
441       * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
442       * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
443       * stored in a Collection.
444       *
445       * @param tap of type Tap
446       * @return TupleIterator
447       * @throws IOException when there is an error opening the resource
448       */
449      TupleEntryIterator openTapForRead( Tap tap ) throws IOException;
450    
451      /**
452       * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance.
453       *
454       * @param tap of type Tap
455       * @return TupleCollector
456       * @throws IOException when there is an error opening the resource
457       */
458      TupleEntryCollector openTapForWrite( Tap tap ) throws IOException;
459    
460      /**
461       * Method writeDOT writes this Flow instance to the given filename as a DOT file for import into a graphics package.
462       *
463       * @param filename of type String
464       */
465      void writeDOT( String filename );
466    
467      /**
468       * Method writeStepsDOT writes this Flow step graph to the given filename as a DOT file for import into a graphics package.
469       *
470       * @param filename of type String
471       */
472      void writeStepsDOT( String filename );
473    
474      String getCascadeID();
475    
476      String getRunID();
477    
478      PlatformInfo getPlatformInfo();
479    
480      /**
481       * Method jobsAreLocal returns true if all jobs are executed in-process as a single map and reduce task.
482       *
483       * @return boolean
484       */
485      boolean stepsAreLocal();
486    
487      /**
488       * Method isStopJobsOnExit returns the stopJobsOnExit of this Flow object. Defaults to {@code true}.
489       *
490       * @return the stopJobsOnExit (type boolean) of this Flow object.
491       */
492      boolean isStopJobsOnExit();
493      }