001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe;
022
023import java.beans.ConstructorProperties;
024import java.io.Serializable;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Set;
028
029import cascading.flow.FlowElement;
030import cascading.flow.planner.Scope;
031import cascading.property.ConfigDef;
032import cascading.tuple.Fields;
033import cascading.util.TraceUtil;
034import cascading.util.Traceable;
035import cascading.util.Util;
036
037import static java.util.Arrays.asList;
038
039/**
040 * Class Pipe is used to name branches in pipe assemblies, and as a base class for core
041 * processing model types, specifically {@link Each}, {@link Every}, {@link GroupBy},
042 * {@link CoGroup}, {@link Merge}, {@link HashJoin}, and {@link SubAssembly}.
043 * <p/>
044 * Pipes are chained together through their constructors.
045 * <p/>
046 * To effect a split in the pipe,
047 * simply pass a Pipe instance to two or more constructors of subsequent Pipe instances.
048 * </p>
049 * A join can be achieved by passing two or more Pipe instances to a {@link CoGroup} or {@link HashJoin} pipe.
050 * <p/>
051 * A merge can be achieved by passing two or more Pipe instances to a {@link GroupBy} or {@link Merge} pipe.
052 *
053 * @see Each
054 * @see Every
055 * @see GroupBy
056 * @see Merge
057 * @see CoGroup
058 * @see HashJoin
059 * @see SubAssembly
060 */
061public class Pipe implements FlowElement, Serializable, Traceable
062  {
063  /** Field serialVersionUID */
064  private static final long serialVersionUID = 1L;
065  /** Field name */
066  protected String name;
067  /** Field previous */
068  protected Pipe previous;
069  /** Field parent */
070  protected Pipe parent;
071
072  protected ConfigDef configDef;
073
074  protected ConfigDef stepConfigDef;
075
076  protected ConfigDef nodeConfigDef;
077
078  /** Field id */
079  private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent
080  /** Field trace */
081  private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override
082
083  public static synchronized String id( Pipe pipe )
084    {
085    return pipe.id;
086    }
087
088  /**
089   * Convenience method to create an array of Pipe instances.
090   *
091   * @param pipes vararg list of pipes
092   * @return array of pipes
093   */
094  public static Pipe[] pipes( Pipe... pipes )
095    {
096    return pipes;
097    }
098
099  /**
100   * Convenience method for finding all Pipe names in an assembly.
101   *
102   * @param tails vararg list of all tails in given assembly
103   * @return array of Pipe names
104   */
105  public static String[] names( Pipe... tails )
106    {
107    Set<String> names = new HashSet<String>();
108
109    collectNames( tails, names );
110
111    return names.toArray( new String[ names.size() ] );
112    }
113
114  private static void collectNames( Pipe[] pipes, Set<String> names )
115    {
116    for( Pipe pipe : pipes )
117      {
118      if( pipe instanceof SubAssembly )
119        names.addAll( asList( ( (SubAssembly) pipe ).getTailNames() ) );
120      else
121        names.add( pipe.getName() );
122
123      collectNames( SubAssembly.unwind( pipe.getPrevious() ), names );
124      }
125    }
126
127  public static Pipe[] named( String name, Pipe... tails )
128    {
129    Set<Pipe> pipes = new HashSet<Pipe>();
130
131    collectPipes( name, tails, pipes );
132
133    return pipes.toArray( new Pipe[ pipes.size() ] );
134    }
135
136  private static void collectPipes( String name, Pipe[] tails, Set<Pipe> pipes )
137    {
138    for( Pipe tail : tails )
139      {
140      if( !( tail instanceof SubAssembly ) && tail.getName().equals( name ) )
141        pipes.add( tail );
142
143      collectPipes( name, SubAssembly.unwind( tail.getPrevious() ), pipes );
144      }
145    }
146
147  static Pipe[] resolvePreviousAll( Pipe... pipes )
148    {
149    Pipe[] resolved = new Pipe[ pipes.length ];
150
151    for( int i = 0; i < pipes.length; i++ )
152      resolved[ i ] = resolvePrevious( pipes[ i ] );
153
154    return resolved;
155    }
156
157  static Pipe resolvePrevious( Pipe pipe )
158    {
159    if( pipe instanceof Splice || pipe instanceof Operator )
160      return pipe;
161
162    Pipe[] pipes = pipe.getPrevious();
163
164    if( pipes.length > 1 )
165      throw new IllegalStateException( "cannot resolve SubAssemblies with multiple tails at this time" );
166
167    for( Pipe previous : pipes )
168      {
169      if( previous instanceof Splice || previous instanceof Operator )
170        return previous;
171
172      return resolvePrevious( previous );
173      }
174
175    return pipe;
176    }
177
178  protected Pipe()
179    {
180    }
181
182  @ConstructorProperties({"previous"})
183  protected Pipe( Pipe previous )
184    {
185    this.previous = previous;
186
187    verifyPipe();
188    }
189
190  /**
191   * Constructor Pipe creates a new Pipe instance with the given name. This is useful as the 'start' or head
192   * of a pipe assembly.
193   *
194   * @param name name for this branch of Pipes
195   */
196  @ConstructorProperties({"name"})
197  public Pipe( String name )
198    {
199    this.name = name;
200    }
201
202  /**
203   * Constructor Pipe creates a new Pipe instance with the given name and previous Pipe instance. This is useful for
204   * naming a branch in a pipe assembly. Or renaming the branch mid-way down.
205   *
206   * @param name     name for this branch of Pipes
207   * @param previous previous Pipe to receive input Tuples from
208   */
209  @ConstructorProperties({"name", "previous"})
210  public Pipe( String name, Pipe previous )
211    {
212    this.name = name;
213    this.previous = previous;
214
215    verifyPipe();
216    }
217
218  private void verifyPipe()
219    {
220    if( !( previous instanceof SubAssembly ) )
221      return;
222
223    String[] strings = ( (SubAssembly) previous ).getTailNames();
224    if( strings.length != 1 )
225      throw new IllegalArgumentException( "pipe assembly must not return more than one tail pipe instance, found " + Util.join( strings, ", " ) );
226    }
227
228  /**
229   * Get the name of this pipe. Guaranteed non-null.
230   *
231   * @return String the name of this pipe
232   */
233  public String getName()
234    {
235    if( name != null )
236      return name;
237
238    if( previous != null )
239      {
240      name = previous.getName();
241
242      return name;
243      }
244
245    return "ANONYMOUS";
246    }
247
248  /**
249   * Get all the upstream pipes this pipe is connected to. This method will return the Pipe instances
250   * passed on the constructors as inputs to this Pipe instance.
251   *
252   * @return all the upstream pipes this pipe is connected to.
253   */
254  public Pipe[] getPrevious()
255    {
256    if( previous == null )
257      return new Pipe[ 0 ];
258
259    return new Pipe[]{previous};
260    }
261
262  protected void setParent( Pipe parent )
263    {
264    this.parent = parent;
265    }
266
267  /**
268   * Returns the enclosing parent Pipe instance, if any. A parent is typically a {@link SubAssembly} that wraps
269   * this instance.
270   *
271   * @return of type Pipe
272   */
273  public Pipe getParent()
274    {
275    return parent;
276    }
277
278  /**
279   * Returns a {@link ConfigDef} instance that allows for local properties to be set and made available via
280   * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked.
281   * <p/>
282   * Any properties set on the configDef will not show up in any {@link cascading.flow.Flow} or
283   * {@link cascading.flow.FlowStep} process level configuration, but will override any of those values as seen by the
284   * current Pipe instance.
285   *
286   * @return an instance of ConfigDef
287   */
288  @Override
289  public ConfigDef getConfigDef()
290    {
291    if( configDef == null )
292      configDef = new ConfigDef();
293
294    return configDef;
295    }
296
297  /**
298   * Returns {@code true} if there are properties in the configDef instance.
299   *
300   * @return true if there are configDef properties
301   */
302  @Override
303  public boolean hasConfigDef()
304    {
305    return configDef != null && !configDef.isEmpty();
306    }
307
308  /**
309   * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
310   * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked.
311   * <p/>
312   * Any properties set on the nodeConfigDef will not show up in any Flow configuration, but will show up in
313   * the current process {@link cascading.flow.FlowNode} (in Apache Tez the Vertex configuration). Any value set in the
314   * nodeConfigDef will be overridden by the pipe local {@code #getConfigDef} instance.
315   * </p>
316   * Use this method to tweak properties in the process node this pipe instance is planned into. In the case of the
317   * Apache Tez platform, when set on a {@link GroupBy} instance, the number of gather partitions can be modified.
318   * <p/>
319   * In the case of any Pipe that spans FlowNode boundaries, like GroupBy and CoGroup may on some platforms,
320   * any ConfigDef properties will be applied to the downstream FlowNode. That is, if a GroupBy is the source
321   * to a node, any node ConfigDef properties will be applied. If the GroupBy encountered when applying properties
322   * is on the sink side of a node, the properties will be ignored.
323   *
324   * @return an instance of ConfigDef
325   */
326  @Override
327  public ConfigDef getNodeConfigDef()
328    {
329    if( nodeConfigDef == null )
330      nodeConfigDef = new ConfigDef();
331
332    return nodeConfigDef;
333    }
334
335  /**
336   * Returns {@code true} if there are properties in the nodeConfigDef instance.
337   *
338   * @return true if there are nodeConfigDef properties
339   */
340  @Override
341  public boolean hasNodeConfigDef()
342    {
343    return nodeConfigDef != null && !nodeConfigDef.isEmpty();
344    }
345
346  /**
347   * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
348   * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked.
349   * <p/>
350   * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in
351   * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the
352   * stepConfigDef will be overridden by the pipe local {@code #getConfigDef} instance.
353   * </p>
354   * Use this method to tweak properties in the process step this pipe instance is planned into. In the case of the
355   * Hadoop platform, when set on a {@link GroupBy} instance, the number of reducers can be modified.
356   *
357   * @return an instance of ConfigDef
358   */
359  @Override
360  public ConfigDef getStepConfigDef()
361    {
362    if( stepConfigDef == null )
363      stepConfigDef = new ConfigDef();
364
365    return stepConfigDef;
366    }
367
368  /**
369   * Returns {@code true} if there are properties in the stepConfigDef instance.
370   *
371   * @return true if there are stepConfigDef properties
372   */
373  @Override
374  public boolean hasStepConfigDef()
375    {
376    return stepConfigDef != null && !stepConfigDef.isEmpty();
377    }
378
379  /**
380   * Method getHeads returns the first Pipe instances in this pipe assembly.
381   *
382   * @return the first (type Pipe[]) of this Pipe object.
383   */
384  public Pipe[] getHeads()
385    {
386    Pipe[] pipes = getPrevious();
387
388    if( pipes.length == 0 )
389      return new Pipe[]{this};
390
391    if( pipes.length == 1 )
392      return pipes[ 0 ].getHeads();
393
394    Set<Pipe> heads = new HashSet<Pipe>();
395
396    for( Pipe pipe : pipes )
397      Collections.addAll( heads, pipe.getHeads() );
398
399    return heads.toArray( new Pipe[ heads.size() ] );
400    }
401
402  @Override
403  public Scope outgoingScopeFor( Set<Scope> incomingScopes )
404    {
405    return incomingScopes.iterator().next();
406    }
407
408  @Override
409  public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
410    {
411    throw new IllegalStateException( "resolveIncomingOperationFields should never be called" );
412    }
413
414  @Override
415  public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
416    {
417    throw new IllegalStateException( "resolveIncomingOperationPassThroughFields should never be called" );
418    }
419
420  @Override
421  public String getTrace()
422    {
423    return trace;
424    }
425
426  @Override
427  public String toString()
428    {
429    return getClass().getSimpleName() + "(" + getName() + ")";
430    }
431
432  Scope getFirst( Set<Scope> incomingScopes )
433    {
434    return incomingScopes.iterator().next();
435    }
436
437  @Override
438  public boolean isEquivalentTo( FlowElement element )
439    {
440    if( element == null )
441      return false;
442
443    if( this == element )
444      return true;
445
446    return getClass() == element.getClass();
447    }
448
449  @SuppressWarnings({"EqualsWhichDoesntCheckParameterClass"})
450  @Override
451  public boolean equals( Object object )
452    {
453    // we cannot test equality by names for this class, prevents detection of dupe names in heads or tails
454    return this == object;
455    }
456
457  @Override
458  public int hashCode()
459    {
460    return 31 * getName().hashCode() + getClass().hashCode();
461    }
462
463  /**
464   * Method print is used internally.
465   *
466   * @param scope of type Scope
467   * @return String
468   */
469  public String print( Scope scope )
470    {
471    StringBuffer buffer = new StringBuffer();
472
473    printInternal( buffer, scope );
474
475    return buffer.toString();
476    }
477
478  protected void printInternal( StringBuffer buffer, Scope scope )
479    {
480    buffer.append( getClass().getSimpleName() ).append( "('" ).append( getName() ).append( "')" );
481    }
482  }