001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow;
022    
023    import java.util.ArrayList;
024    import java.util.Arrays;
025    import java.util.Collection;
026    import java.util.HashMap;
027    import java.util.List;
028    import java.util.Map;
029    
030    import cascading.operation.AssertionLevel;
031    import cascading.operation.DebugLevel;
032    import cascading.pipe.Checkpoint;
033    import cascading.pipe.Pipe;
034    import cascading.property.UnitOfWorkDef;
035    import cascading.tap.Tap;
036    
037    /**
038     * Class FlowDef is a fluent interface for defining a {@link Flow}.
039     * <p/>
040     * This allows for ad-hoc building of Flow data and meta-data like tags.
041     * <p/>
042     * Instead of calling one of the {@link FlowConnector} connect methods, {@link FlowConnector#connect(FlowDef)}
043     * can be called.
044     */
045    public class FlowDef extends UnitOfWorkDef<FlowDef>
046      {
047      protected Map<String, Tap> sources = new HashMap<String, Tap>();
048      protected Map<String, Tap> sinks = new HashMap<String, Tap>();
049      protected Map<String, Tap> traps = new HashMap<String, Tap>();
050      protected Map<String, Tap> checkpoints = new HashMap<String, Tap>();
051    
052      protected List<String> classPath = new ArrayList<String>();
053      protected List<Pipe> tails = new ArrayList<Pipe>();
054      protected List<AssemblyPlanner> assemblyPlanners = new ArrayList<AssemblyPlanner>();
055    
056      protected AssertionLevel assertionLevel;
057      protected DebugLevel debugLevel;
058    
059      protected String runID;
060    
061      /**
062       * Creates a new instance of a FlowDef.
063       *
064       * @return a FlowDef
065       */
066      public static FlowDef flowDef()
067        {
068        return new FlowDef();
069        }
070    
071      /** Constructor FlowDef creates a new FlowDef instance. */
072      public FlowDef()
073        {
074        }
075    
076      /**
077       * Method getAssemblyPlanners returns the current registered AssemblyPlanners.
078       *
079       * @return a List of AssemblyPlanner instances
080       */
081      public List<AssemblyPlanner> getAssemblyPlanners()
082        {
083        return assemblyPlanners;
084        }
085    
086      /**
087       * Method addAssemblyPlanner adds new AssemblyPlanner instances to be evaluated.
088       *
089       * @param assemblyPlanner of type AssemblyPlanner
090       * @return a FlowDef
091       */
092      public FlowDef addAssemblyPlanner( AssemblyPlanner assemblyPlanner )
093        {
094        assemblyPlanners.add( assemblyPlanner );
095    
096        return this;
097        }
098    
099      /**
100       * Method getSources returns the sources of this FlowDef object.
101       *
102       * @return the sources (type Map<String, Tap>) of this FlowDef object.
103       */
104      public Map<String, Tap> getSources()
105        {
106        return sources;
107        }
108    
109      /**
110       * Method getSourcesCopy returns a copy of the sources Map.
111       *
112       * @return the sourcesCopy (type Map<String, Tap>) of this FlowDef object.
113       */
114      public Map<String, Tap> getSourcesCopy()
115        {
116        return new HashMap<String, Tap>( sources );
117        }
118    
119      /**
120       * Method addSource adds a new named source {@link Tap} for use in the resulting {@link Flow}.
121       *
122       * @param name   of String
123       * @param source of Tap
124       * @return FlowDef
125       */
126      public FlowDef addSource( String name, Tap source )
127        {
128        if( sources.containsKey( name ) )
129          throw new IllegalArgumentException( "cannot add duplicate source: " + name );
130    
131        sources.put( name, source );
132        return this;
133        }
134    
135      /**
136       * Method addSource adds a new source {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}.
137       * <p/>
138       * If the given pipe is not a head pipe, it will be resolved. If more than one is found, an
139       * {@link IllegalArgumentException} will be thrown.
140       *
141       * @param pipe   of Pipe
142       * @param source of Tap
143       * @return FlowDef
144       */
145      public FlowDef addSource( Pipe pipe, Tap source )
146        {
147        if( pipe == null )
148          throw new IllegalArgumentException( "pipe may not be null" );
149    
150        Pipe[] heads = pipe.getHeads();
151    
152        if( heads.length != 1 )
153          throw new IllegalArgumentException( "pipe has too many heads, found: " + Arrays.toString( Pipe.names( heads ) ) );
154    
155        addSource( heads[ 0 ].getName(), source );
156        return this;
157        }
158    
159      /**
160       * Method addSources adds a map of name and {@link Tap} pairs.
161       *
162       * @param sources of Map<String, Tap>
163       * @return FlowDef
164       */
165      public FlowDef addSources( Map<String, Tap> sources )
166        {
167        if( sources != null )
168          {
169          for( Map.Entry<String, Tap> entry : sources.entrySet() )
170            addSource( entry.getKey(), entry.getValue() );
171          }
172    
173        return this;
174        }
175    
176      /**
177       * Method getSinks returns the sinks of this FlowDef object.
178       *
179       * @return the sinks (type Map<String, Tap>) of this FlowDef object.
180       */
181      public Map<String, Tap> getSinks()
182        {
183        return sinks;
184        }
185    
186      /**
187       * Method getSinksCopy returns a copy of the sink Map.
188       *
189       * @return the sinksCopy (type Map<String, Tap>) of this FlowDef object.
190       */
191      public Map<String, Tap> getSinksCopy()
192        {
193        return new HashMap<String, Tap>( sinks );
194        }
195    
196      /**
197       * Method addSink adds a new named sink {@link Tap} for use in the resulting {@link Flow}.
198       *
199       * @param name of String
200       * @param sink of Tap
201       * @return FlowDef
202       */
203      public FlowDef addSink( String name, Tap sink )
204        {
205        if( sinks.containsKey( name ) )
206          throw new IllegalArgumentException( "cannot add duplicate sink: " + name );
207    
208        sinks.put( name, sink );
209        return this;
210        }
211    
212      /**
213       * Method addSink adds a new sink {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}.
214       *
215       * @param tail of Pipe
216       * @param sink of Tap
217       * @return FlowDef
218       */
219      public FlowDef addSink( Pipe tail, Tap sink )
220        {
221        addSink( tail.getName(), sink );
222        return this;
223        }
224    
225      /**
226       * Method addTailSink adds the tail {@link Pipe} and sink {@link Tap} to this FlowDef.
227       * <p/>
228       * This is a convenience method for adding both a tail and sink simultaneously. There isn't a similar method
229       * for heads and sources as the head Pipe can always be derived.
230       *
231       * @param tail of Pipe
232       * @param sink of Tap
233       * @return FlowDef
234       */
235      public FlowDef addTailSink( Pipe tail, Tap sink )
236        {
237        addSink( tail.getName(), sink );
238        addTail( tail );
239        return this;
240        }
241    
242      /**
243       * Method addSinks adds a Map of the named and {@link Tap} pairs.
244       *
245       * @param sinks of Map<String, Tap>
246       * @return FlowDef
247       */
248      public FlowDef addSinks( Map<String, Tap> sinks )
249        {
250        if( sinks != null )
251          {
252          for( Map.Entry<String, Tap> entry : sinks.entrySet() )
253            addSink( entry.getKey(), entry.getValue() );
254          }
255    
256        return this;
257        }
258    
259      /**
260       * Method getTraps returns the traps of this FlowDef object.
261       *
262       * @return the traps (type Map<String, Tap>) of this FlowDef object.
263       */
264      public Map<String, Tap> getTraps()
265        {
266        return traps;
267        }
268    
269      /**
270       * Method getTrapsCopy returns a copy of the trap Map.
271       *
272       * @return the trapsCopy (type Map<String, Tap>) of this FlowDef object.
273       */
274      public Map<String, Tap> getTrapsCopy()
275        {
276        return new HashMap<String, Tap>( traps );
277        }
278    
279      /**
280       * Method addTrap adds a new named trap {@link Tap} for use in the resulting {@link Flow}.
281       *
282       * @param name of String
283       * @param trap of Tap
284       * @return FlowDef
285       */
286      public FlowDef addTrap( String name, Tap trap )
287        {
288        if( traps.containsKey( name ) )
289          throw new IllegalArgumentException( "cannot add duplicate trap: " + name );
290    
291        traps.put( name, trap );
292        return this;
293        }
294    
295      /**
296       * Method addTrap adds a new trap {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}.
297       *
298       * @param pipe of Pipe
299       * @param trap of Tap
300       * @return FlowDef
301       */
302      public FlowDef addTrap( Pipe pipe, Tap trap )
303        {
304        addTrap( pipe.getName(), trap );
305        return this;
306        }
307    
308      /**
309       * Method addTraps adds a Map of the names and {@link Tap} pairs.
310       *
311       * @param traps of Map<String, Tap>
312       * @return FlowDef
313       */
314      public FlowDef addTraps( Map<String, Tap> traps )
315        {
316        if( traps != null )
317          {
318          for( Map.Entry<String, Tap> entry : traps.entrySet() )
319            addTrap( entry.getKey(), entry.getValue() );
320          }
321    
322        return this;
323        }
324    
325      /**
326       * Method getCheckpoints returns the checkpoint taps of this FlowDef object.
327       *
328       * @return the checkpoints (type Map<String, Tap>) of this FlowDef object.
329       */
330      public Map<String, Tap> getCheckpoints()
331        {
332        return checkpoints;
333        }
334    
335      /**
336       * Method getCheckpointsCopy returns a copy of the checkpoint tap Map.
337       *
338       * @return the checkpointsCopy (type Map<String, Tap>) of this FlowDef object.
339       */
340      public Map<String, Tap> getCheckpointsCopy()
341        {
342        return new HashMap<String, Tap>( checkpoints );
343        }
344    
345      /**
346       * Method addCheckpoint adds a new named checkpoint {@link Tap} for use in the resulting {@link Flow}.
347       *
348       * @param name       of String
349       * @param checkpoint of Tap
350       * @return FlowDef
351       */
352      public FlowDef addCheckpoint( String name, Tap checkpoint )
353        {
354        if( checkpoints.containsKey( name ) )
355          throw new IllegalArgumentException( "cannot add duplicate checkpoint: " + name );
356    
357        checkpoints.put( name, checkpoint );
358        return this;
359        }
360    
361      /**
362       * Method addCheckpoint adds a new checkpoint {@link Tap} named after the given {@link Checkpoint} for use in the resulting {@link Flow}.
363       *
364       * @param pipe       of Pipe
365       * @param checkpoint of Tap
366       * @return FlowDef
367       */
368      public FlowDef addCheckpoint( Checkpoint pipe, Tap checkpoint )
369        {
370        addCheckpoint( pipe.getName(), checkpoint );
371        return this;
372        }
373    
374      /**
375       * Method addCheckpoints adds a Map of the names and {@link Tap} pairs.
376       *
377       * @param checkpoints of Map<String, Tap>
378       * @return FlowDef
379       */
380      public FlowDef addCheckpoints( Map<String, Tap> checkpoints )
381        {
382        if( checkpoints != null )
383          {
384          for( Map.Entry<String, Tap> entry : checkpoints.entrySet() )
385            addCheckpoint( entry.getKey(), entry.getValue() );
386          }
387    
388        return this;
389        }
390    
391      /**
392       * Method getTails returns all the current pipe assembly tails the FlowDef holds.
393       *
394       * @return the tails (type List<Pipe>) of this FlowDef object.
395       */
396      public List<Pipe> getTails()
397        {
398        return tails;
399        }
400    
401      /**
402       * Method getTailsArray returns all the current pipe assembly tails the FlowDef holds.
403       *
404       * @return the tailsArray (type Pipe[]) of this FlowDef object.
405       */
406      public Pipe[] getTailsArray()
407        {
408        return tails.toArray( new Pipe[ tails.size() ] );
409        }
410    
411      /**
412       * Method addTail adds a new {@link Pipe} to this FlowDef that represents a tail in a pipe assembly.
413       * <p/>
414       * Be sure to add a sink tap that has the same name as this tail.
415       *
416       * @param tail of Pipe
417       * @return FlowDef
418       */
419      public FlowDef addTail( Pipe tail )
420        {
421        if( tail != null )
422          this.tails.add( tail );
423    
424        return this;
425        }
426    
427      /**
428       * Method addTails adds a Collection of tails.
429       *
430       * @param tails of Collection<Pipe>
431       * @return FlowDef
432       */
433      public FlowDef addTails( Collection<Pipe> tails )
434        {
435        for( Pipe tail : tails )
436          addTail( tail );
437    
438        return this;
439        }
440    
441      /**
442       * Method addTails adds an array of tails.
443       *
444       * @param tails of Pipe...
445       * @return FlowDef
446       */
447      public FlowDef addTails( Pipe... tails )
448        {
449        for( Pipe tail : tails )
450          addTail( tail );
451    
452        return this;
453        }
454    
455      public FlowDef setAssertionLevel( AssertionLevel assertionLevel )
456        {
457        this.assertionLevel = assertionLevel;
458    
459        return this;
460        }
461    
462      public AssertionLevel getAssertionLevel()
463        {
464        return assertionLevel;
465        }
466    
467      public FlowDef setDebugLevel( DebugLevel debugLevel )
468        {
469        this.debugLevel = debugLevel;
470    
471        return this;
472        }
473    
474      public DebugLevel getDebugLevel()
475        {
476        return debugLevel;
477        }
478    
479      /**
480       * Method setRunID sets the checkpoint run or execution ID to be used to find prior failed runs against
481       * this runID.
482       * <p/>
483       * When given, and a {@link Flow} fails to execute, a subsequent attempt to run the same Flow with the same
484       * runID will allow the Flow instance to start where it left off.
485       * <p/>
486       * Not all planners support this feature.
487       * <p/>
488       * A Flow name is required when using a runID.
489       *
490       * @param runID of type String
491       * @return FlowDef
492       */
493      public FlowDef setRunID( String runID )
494        {
495        if( runID != null && runID.isEmpty() )
496          return this;
497    
498        this.runID = runID;
499    
500        return this;
501        }
502    
503      public String getRunID()
504        {
505        return runID;
506        }
507    
508      public List<String> getClassPath()
509        {
510        return classPath;
511        }
512    
513      /**
514       * Adds each given artifact to the classpath the assembly will execute under allowing
515       * {@link cascading.pipe.Operator}s to dynamically load classes and resources from a {@link ClassLoader}.
516       *
517       * @param artifact a jar or other file String path
518       * @return FlowDef
519       */
520      public FlowDef addToClassPath( String artifact )
521        {
522        if( artifact == null || artifact.isEmpty() )
523          return this;
524    
525        classPath.add( artifact );
526    
527        return this;
528        }
529      }