001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow;
022
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Map;
028import java.util.Properties;
029import java.util.Set;
030
031import cascading.CascadingException;
032import cascading.flow.planner.FlowPlanner;
033import cascading.flow.planner.PlatformInfo;
034import cascading.flow.planner.rule.RuleRegistrySet;
035import cascading.pipe.Pipe;
036import cascading.property.AppProps;
037import cascading.property.PropertyUtil;
038import cascading.scheme.Scheme;
039import cascading.tap.Tap;
040import cascading.util.Util;
041
042import static cascading.flow.FlowDef.flowDef;
043
044/**
045 * Class FlowConnector is the base class for all platform planners.
046 * <p/>
047 * See the {@link FlowDef} class for a fluent way to define a new Flow.
048 * <p/>
049 * Use the FlowConnector to link source and sink {@link Tap} instances with an assembly of {@link Pipe} instances into
050 * an executable {@link cascading.flow.Flow}.
051 * <p/>
052 * FlowConnector invokes a planner for the target execution environment.
053 * <p/>
054 * For executing Flows in local memory against local files, see {@link cascading.flow.local.LocalFlowConnector}.
055 * <p/>
056 * For Apache Hadoop, see the {@link cascading.flow.hadoop.HadoopFlowConnector}.
057 * Or if you have a pre-existing custom Hadoop job to execute, see {@link cascading.flow.hadoop.MapReduceFlow}, which
058 * doesn't require a planner.
059 * <p/>
060 * Note that all {@code connect} methods take a single {@code tail} or an array of {@code tail} Pipe instances. "tail"
061 * refers to the last connected Pipe instances in a pipe-assembly. Pipe-assemblies are graphs of object with "heads"
062 * and "tails". From a given "tail", all connected heads can be found, but not the reverse. So "tails" must be
063 * supplied by the user.
064 * <p/>
065 * The FlowConnector and the underlying execution framework (Hadoop or local mode) can be configured via a
066 * {@link Map} or {@link Properties} instance given to the constructor.
067 * <p/>
068 * This properties map must be populated before constructing a FlowConnector instance. Many planner specific
069 * properties can be set through the {@link FlowConnectorProps} fluent interface.
070 * <p/>
071 * Some planners have required properties. Hadoop expects {@link AppProps#setApplicationJarPath(java.util.Map, String)} or
072 * {@link AppProps#setApplicationJarClass(java.util.Map, Class)} to be set.
073 * <p/>
074 * Any properties set and passed through the FlowConnector constructor will be global to all Flow instances created through
075 * the that FlowConnector instance. Some properties are on the {@link FlowDef} and would only be applicable to the
076 * resulting Flow instance.
077 * <p/>
078 * These properties are used to influence the current planner and are also passed down to the
079 * execution framework to override any default values. For example when using the Hadoop planner, the number of reducers
080 * or mappers can be set by using platform specific properties.
081 * <p/>
082 * Custom operations (Functions, Filter, etc) may also retrieve these property values at runtime through calls to
083 * {@link cascading.flow.FlowProcess#getProperty(String)} or {@link FlowProcess#getStringProperty(String)}.
084 * <p/>
085 * Most applications will need to call {@link cascading.property.AppProps#setApplicationJarClass(java.util.Map, Class)} or
086 * {@link cascading.property.AppProps#setApplicationJarPath(java.util.Map, String)} so that
087 * the correct application jar file is passed through to all child processes. The Class or path must reference
088 * the custom application jar, not a Cascading library class or jar. The easiest thing to do is give setApplicationJarClass
089 * the Class with your static main function and let Cascading figure out which jar to use.
090 * <p/>
091 * Note that Map<Object,Object> is compatible with the {@link Properties} class, so properties can be loaded at
092 * runtime from a configuration file.
093 * <p/>
094 * By default, all {@link cascading.operation.Assertion}s are planned into the resulting Flow instance. This can be
095 * changed for a given Flow by calling {@link FlowDef#setAssertionLevel(cascading.operation.AssertionLevel)} or globally
096 * via {@link FlowConnectorProps#setAssertionLevel(cascading.operation.AssertionLevel)}.
097 * <p/>
098 * Also by default, all {@link cascading.operation.Debug}s are planned into the resulting Flow instance. This can be
099 * changed for a given flow by calling {@link FlowDef#setDebugLevel(cascading.operation.DebugLevel)} or globally via
100 * {@link FlowConnectorProps#setDebugLevel(cascading.operation.DebugLevel)}.
101 * <p/>
102 * As of version 3.0, custom {@link cascading.flow.planner.rule.RuleRegistry} instances can be provided to customize
103 * a given planner.
104 *
105 * @see cascading.flow.local.LocalFlowConnector
106 * @see cascading.flow.hadoop.HadoopFlowConnector
107 * @see cascading.flow.hadoop2.Hadoop2MR1FlowConnector
108 */
109public abstract class FlowConnector
110  {
111  /** Field properties */
112  protected Map<Object, Object> properties; // may be a Map or Properties instance. see PropertyUtil
113
114  private RuleRegistrySet ruleRegistrySet;
115
116  /**
117   * Method getIntermediateSchemeClass is used for debugging.
118   *
119   * @param properties of type Map<Object, Object>
120   * @return Class
121   */
122  public Class getIntermediateSchemeClass( Map<Object, Object> properties )
123    {
124    // supporting stuffed classes to overcome classloading issue
125    Object type = PropertyUtil.getProperty( properties, FlowConnectorProps.INTERMEDIATE_SCHEME_CLASS, null );
126
127    if( type == null )
128      return getDefaultIntermediateSchemeClass();
129
130    if( type instanceof Class )
131      return (Class) type;
132
133    try
134      {
135      return FlowConnector.class.getClassLoader().loadClass( type.toString() );
136      }
137    catch( ClassNotFoundException exception )
138      {
139      throw new CascadingException( "unable to load class: " + type.toString(), exception );
140      }
141    }
142
143  protected abstract Class<? extends Scheme> getDefaultIntermediateSchemeClass();
144
145  protected FlowConnector()
146    {
147    this.properties = new HashMap<>();
148    }
149
150  protected FlowConnector( RuleRegistrySet ruleRegistrySet )
151    {
152    this();
153    this.ruleRegistrySet = ruleRegistrySet;
154    }
155
156  protected FlowConnector( Map<Object, Object> properties )
157    {
158    if( properties == null )
159      this.properties = new HashMap<>();
160    else if( properties instanceof Properties )
161      this.properties = new Properties( (Properties) properties );
162    else
163      this.properties = new HashMap<>( properties );
164    }
165
166  protected FlowConnector( Map<Object, Object> properties, RuleRegistrySet ruleRegistrySet )
167    {
168    this( properties );
169    this.ruleRegistrySet = ruleRegistrySet;
170    }
171
172  /**
173   * Method getProperties returns the properties of this FlowConnector object. The returned Map instance
174   * is immutable to prevent changes to the underlying property values in this FlowConnector instance.
175   * <p/>
176   * If a {@link Properties} instance was passed to the constructor, the returned object will be a flattened
177   * {@link Map} instance.
178   *
179   * @return the properties (type Map<Object, Object>) of this FlowConnector object.
180   */
181  public Map<Object, Object> getProperties()
182    {
183    // Sub-classes of FlowConnector should rely on PropertyUtil to manage access to properties objects internally.
184    return Collections.unmodifiableMap( PropertyUtil.asFlatMap( properties ) );
185    }
186
187  /**
188   * Method connect links the given source and sink Taps to the given pipe assembly.
189   *
190   * @param source source Tap to bind to the head of the given tail Pipe
191   * @param sink   sink Tap to bind to the given tail Pipe
192   * @param tail   tail end of a pipe assembly
193   * @return Flow
194   */
195  public Flow connect( Tap source, Tap sink, Pipe tail )
196    {
197    return connect( null, source, sink, tail );
198    }
199
200  /**
201   * Method connect links the given source and sink Taps to the given pipe assembly.
202   *
203   * @param name   name to give the resulting Flow
204   * @param source source Tap to bind to the head of the given tail Pipe
205   * @param sink   sink Tap to bind to the given tail Pipe
206   * @param tail   tail end of a pipe assembly
207   * @return Flow
208   */
209  public Flow connect( String name, Tap source, Tap sink, Pipe tail )
210    {
211    Map<String, Tap> sources = new HashMap<String, Tap>();
212
213    sources.put( tail.getHeads()[ 0 ].getName(), source );
214
215    return connect( name, sources, sink, tail );
216    }
217
218  /**
219   * Method connect links the given source, sink, and trap Taps to the given pipe assembly. The given trap will
220   * be linked to the assembly head along with the source.
221   *
222   * @param name   name to give the resulting Flow
223   * @param source source Tap to bind to the head of the given tail Pipe
224   * @param sink   sink Tap to bind to the given tail Pipe
225   * @param trap   trap Tap to sink all failed Tuples into
226   * @param tail   tail end of a pipe assembly
227   * @return Flow
228   */
229  public Flow connect( String name, Tap source, Tap sink, Tap trap, Pipe tail )
230    {
231    Map<String, Tap> sources = new HashMap<String, Tap>();
232
233    sources.put( tail.getHeads()[ 0 ].getName(), source );
234
235    Map<String, Tap> traps = new HashMap<String, Tap>();
236
237    traps.put( tail.getHeads()[ 0 ].getName(), trap );
238
239    return connect( name, sources, sink, traps, tail );
240    }
241
242  /**
243   * Method connect links the named source Taps and sink Tap to the given pipe assembly.
244   *
245   * @param sources all head names and source Taps to bind to the heads of the given tail Pipe
246   * @param sink    sink Tap to bind to the given tail Pipe
247   * @param tail    tail end of a pipe assembly
248   * @return Flow
249   */
250  public Flow connect( Map<String, Tap> sources, Tap sink, Pipe tail )
251    {
252    return connect( null, sources, sink, tail );
253    }
254
255  /**
256   * Method connect links the named source Taps and sink Tap to the given pipe assembly.
257   *
258   * @param name    name to give the resulting Flow
259   * @param sources all head names and source Taps to bind to the heads of the given tail Pipe
260   * @param sink    sink Tap to bind to the given tail Pipe
261   * @param tail    tail end of a pipe assembly
262   * @return Flow
263   */
264  public Flow connect( String name, Map<String, Tap> sources, Tap sink, Pipe tail )
265    {
266    Map<String, Tap> sinks = new HashMap<String, Tap>();
267
268    sinks.put( tail.getName(), sink );
269
270    return connect( name, sources, sinks, tail );
271    }
272
273  /**
274   * Method connect links the named source and trap Taps and sink Tap to the given pipe assembly.
275   *
276   * @param name    name to give the resulting Flow
277   * @param sources all head names and source Taps to bind to the heads of the given tail Pipe
278   * @param sink    sink Tap to bind to the given tail Pipe
279   * @param traps   all pipe names and trap Taps to sink all failed Tuples into
280   * @param tail    tail end of a pipe assembly
281   * @return Flow
282   */
283  public Flow connect( String name, Map<String, Tap> sources, Tap sink, Map<String, Tap> traps, Pipe tail )
284    {
285    Map<String, Tap> sinks = new HashMap<String, Tap>();
286
287    sinks.put( tail.getName(), sink );
288
289    return connect( name, sources, sinks, traps, tail );
290    }
291
292  /**
293   * Method connect links the named trap Taps, source and sink Tap to the given pipe assembly.
294   *
295   * @param name   name to give the resulting Flow
296   * @param source source Tap to bind to the head of the given tail Pipe
297   * @param sink   sink Tap to bind to the given tail Pipe
298   * @param traps  all pipe names and trap Taps to sink all failed Tuples into
299   * @param tail   tail end of a pipe assembly
300   * @return Flow
301   */
302  public Flow connect( String name, Tap source, Tap sink, Map<String, Tap> traps, Pipe tail )
303    {
304    Map<String, Tap> sources = new HashMap<String, Tap>();
305
306    sources.put( tail.getHeads()[ 0 ].getName(), source );
307
308    Map<String, Tap> sinks = new HashMap<String, Tap>();
309
310    sinks.put( tail.getName(), sink );
311
312    return connect( name, sources, sinks, traps, tail );
313    }
314
315  /**
316   * Method connect links the named source Taps and sink Tap to the given pipe assembly.
317   * <p/>
318   * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe.
319   * So the head pipe does not need to be included as an argument.
320   *
321   * @param source source Tap to bind to the head of the given tail Pipes
322   * @param sinks  all tail names and sink Taps to bind to the given tail Pipes
323   * @param tails  all tail ends of a pipe assembly
324   * @return Flow
325   */
326  public Flow connect( Tap source, Map<String, Tap> sinks, Collection<Pipe> tails )
327    {
328    return connect( null, source, sinks, tails.toArray( new Pipe[ tails.size() ] ) );
329    }
330
331  /**
332   * Method connect links the named source Taps and sink Tap to the given pipe assembly.
333   * <p/>
334   * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe.
335   * So the head pipe does not need to be included as an argument.
336   *
337   * @param name   name to give the resulting Flow
338   * @param source source Tap to bind to the head of the given tail Pipes
339   * @param sinks  all tail names and sink Taps to bind to the given tail Pipes
340   * @param tails  all tail ends of a pipe assembly
341   * @return Flow
342   */
343  public Flow connect( String name, Tap source, Map<String, Tap> sinks, Collection<Pipe> tails )
344    {
345    return connect( name, source, sinks, tails.toArray( new Pipe[ tails.size() ] ) );
346    }
347
348  /**
349   * Method connect links the named source Taps and sink Tap to the given pipe assembly.
350   * <p/>
351   * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe.
352   * So the head pipe does not need to be included as an argument.
353   *
354   * @param source source Tap to bind to the head of the given tail Pipes
355   * @param sinks  all tail names and sink Taps to bind to the given tail Pipes
356   * @param tails  all tail ends of a pipe assembly
357   * @return Flow
358   */
359  public Flow connect( Tap source, Map<String, Tap> sinks, Pipe... tails )
360    {
361    return connect( null, source, sinks, tails );
362    }
363
364  /**
365   * Method connect links the named source Taps and sink Tap to the given pipe assembly.
366   * <p/>
367   * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe.
368   * So the head pipe does not need to be included as an argument.
369   *
370   * @param name   name to give the resulting Flow
371   * @param source source Tap to bind to the head of the given tail Pipes
372   * @param sinks  all tail names and sink Taps to bind to the given tail Pipes
373   * @param tails  all tail ends of a pipe assembly
374   * @return Flow
375   */
376  public Flow connect( String name, Tap source, Map<String, Tap> sinks, Pipe... tails )
377    {
378    Set<Pipe> heads = new HashSet<Pipe>();
379
380    for( Pipe pipe : tails )
381      Collections.addAll( heads, pipe.getHeads() );
382
383    if( heads.isEmpty() )
384      throw new IllegalArgumentException( "no pipe instance found" );
385
386    if( heads.size() != 1 )
387      throw new IllegalArgumentException( "there may be only 1 head pipe instance, found " + heads.size() );
388
389    Map<String, Tap> sources = new HashMap<String, Tap>();
390
391    for( Pipe pipe : heads )
392      sources.put( pipe.getName(), source );
393
394    return connect( name, sources, sinks, tails );
395    }
396
397  /**
398   * Method connect links the named sources and sinks to the given pipe assembly.
399   *
400   * @param sources all head names and source Taps to bind to the heads of the given tail Pipes
401   * @param sinks   all tail names and sink Taps to bind to the given tail Pipes
402   * @param tails   all tail ends of a pipe assembly
403   * @return Flow
404   */
405  public Flow connect( Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails )
406    {
407    return connect( null, sources, sinks, tails );
408    }
409
410  /**
411   * Method connect links the named sources and sinks to the given pipe assembly.
412   *
413   * @param name    name to give the resulting Flow
414   * @param sources all head names and source Taps to bind to the heads of the given tail Pipes
415   * @param sinks   all tail names and sink Taps to bind to the given tail Pipes
416   * @param tails   all tail ends of a pipe assembly
417   * @return Flow
418   */
419  public Flow connect( String name, Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails )
420    {
421    return connect( name, sources, sinks, new HashMap<String, Tap>(), tails );
422    }
423
424  /**
425   * Method connect links the named sources, sinks and traps to the given pipe assembly.
426   *
427   * @param name    name to give the resulting Flow
428   * @param sources all head names and source Taps to bind to the heads of the given tail Pipes
429   * @param sinks   all tail names and sink Taps to bind to the given tail Pipes
430   * @param traps   all pipe names and trap Taps to sink all failed Tuples into
431   * @param tails   all tail ends of a pipe assembly
432   * @return Flow
433   */
434  public Flow connect( String name, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Pipe... tails )
435    {
436    name = name == null ? makeName( tails ) : name;
437
438    FlowDef flowDef = flowDef()
439      .setName( name )
440      .addSources( sources )
441      .addSinks( sinks )
442      .addTraps( traps )
443      .addTails( tails );
444
445    return connect( flowDef );
446    }
447
448  public Flow connect( FlowDef flowDef )
449    {
450    FlowPlanner flowPlanner = createFlowPlanner();
451
452    flowPlanner.initialize( this, properties );
453
454    RuleRegistrySet ruleRegistrySet = getRuleRegistrySet();
455
456    return flowPlanner.buildFlow( flowDef, ruleRegistrySet );
457    }
458
459  protected abstract FlowPlanner createFlowPlanner();
460
461  /**
462   * Returns the configured RuleRegistry, or the default for this platform.
463   * <p/>
464   * The registry is mutable, and will be applied to all subsequent planner operations via {@link #connect(FlowDef)}.
465   *
466   * @return the current RuleRegistry instance
467   */
468  public RuleRegistrySet getRuleRegistrySet()
469    {
470    if( ruleRegistrySet != null )
471      return ruleRegistrySet;
472
473    ruleRegistrySet = createDefaultRuleRegistrySet();
474
475    return ruleRegistrySet;
476    }
477
478  protected abstract RuleRegistrySet createDefaultRuleRegistrySet();
479
480  /**
481   * Method getPlatformInfo returns an instance of {@link PlatformInfo} for the underlying platform.
482   *
483   * @return of type PlatformInfo
484   */
485  public PlatformInfo getPlatformInfo()
486    {
487    return createFlowPlanner().getPlatformInfo();
488    }
489
490  /////////
491  // UTIL
492  /////////
493
494  private String makeName( Pipe[] pipes )
495    {
496    String[] names = new String[ pipes.length ];
497
498    for( int i = 0; i < pipes.length; i++ )
499      names[ i ] = pipes[ i ].getName();
500
501    String name = Util.join( names, "+" );
502
503    if( name.length() > 32 )
504      name = name.substring( 0, 32 );
505
506    return name;
507    }
508  }