001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.Serializable;
025    import java.util.Collections;
026    import java.util.HashSet;
027    import java.util.Set;
028    
029    import cascading.flow.FlowElement;
030    import cascading.flow.planner.Scope;
031    import cascading.property.ConfigDef;
032    import cascading.tuple.Fields;
033    import cascading.util.Util;
034    
035    import static java.util.Arrays.asList;
036    
037    /**
038     * Class Pipe is used to name branches in pipe assemblies, and as a base class for core
039     * processing model types, specifically {@link Each}, {@link Every}, {@link GroupBy},
040     * {@link CoGroup}, {@link Merge}, {@link HashJoin}, and {@link SubAssembly}.
041     * <p/>
042     * Pipes are chained together through their constructors.
043     * <p/>
044     * To effect a split in the pipe,
045     * simply pass a Pipe instance to two or more constructors of subsequent Pipe instances.
046     * </p>
047     * A join can be achieved by passing two or more Pipe instances to a {@link CoGroup} or {@link HashJoin} pipe.
048     * <p/>
049     * A merge can be achieved by passing two or more Pipe instances to a {@link GroupBy} or {@link Merge} pipe.
050     *
051     * @see Each
052     * @see Every
053     * @see GroupBy
054     * @see Merge
055     * @see CoGroup
056     * @see HashJoin
057     * @see SubAssembly
058     */
059    public class Pipe implements FlowElement, Serializable
060      {
061      /** Field serialVersionUID */
062      private static final long serialVersionUID = 1L;
063      /** Field name */
064      protected String name;
065      /** Field previous */
066      protected Pipe previous;
067      /** Field parent */
068      protected Pipe parent;
069    
070      protected ConfigDef configDef;
071    
072      protected ConfigDef stepConfigDef;
073    
074      /** Field id */
075      private String id;
076      /** Field trace */
077      private final String trace = Util.captureDebugTrace( getClass() );
078    
079      public static synchronized String id( Pipe pipe )
080        {
081        if( pipe.id == null )
082          pipe.id = Util.createUniqueID();
083    
084        return pipe.id;
085        }
086    
087      /**
088       * Convenience method to create an array of Pipe instances.
089       *
090       * @param pipes vararg list of pipes
091       * @return array of pipes
092       */
093      public static Pipe[] pipes( Pipe... pipes )
094        {
095        return pipes;
096        }
097    
098      /**
099       * Convenience method for finding all Pipe names in an assembly.
100       *
101       * @param tails vararg list of all tails in given assembly
102       * @return array of Pipe names
103       */
104      public static String[] names( Pipe... tails )
105        {
106        Set<String> names = new HashSet<String>();
107    
108        collectNames( tails, names );
109    
110        return names.toArray( new String[ names.size() ] );
111        }
112    
113      private static void collectNames( Pipe[] pipes, Set<String> names )
114        {
115        for( Pipe pipe : pipes )
116          {
117          if( pipe instanceof SubAssembly )
118            names.addAll( asList( ( (SubAssembly) pipe ).getTailNames() ) );
119          else
120            names.add( pipe.getName() );
121    
122          collectNames( SubAssembly.unwind( pipe.getPrevious() ), names );
123          }
124        }
125    
126      public static Pipe[] named( String name, Pipe... tails )
127        {
128        Set<Pipe> pipes = new HashSet<Pipe>();
129    
130        collectPipes( name, tails, pipes );
131    
132        return pipes.toArray( new Pipe[ pipes.size() ] );
133        }
134    
135      private static void collectPipes( String name, Pipe[] tails, Set<Pipe> pipes )
136        {
137        for( Pipe tail : tails )
138          {
139          if( !( tail instanceof SubAssembly ) && tail.getName().equals( name ) )
140            pipes.add( tail );
141    
142          collectPipes( name, SubAssembly.unwind( tail.getPrevious() ), pipes );
143          }
144        }
145    
146      static Pipe[] resolvePreviousAll( Pipe... pipes )
147        {
148        Pipe[] resolved = new Pipe[ pipes.length ];
149    
150        for( int i = 0; i < pipes.length; i++ )
151          resolved[ i ] = resolvePrevious( pipes[ i ] );
152    
153        return resolved;
154        }
155    
156      static Pipe resolvePrevious( Pipe pipe )
157        {
158        if( pipe instanceof Splice || pipe instanceof Operator )
159          return pipe;
160    
161        Pipe[] pipes = pipe.getPrevious();
162    
163        if( pipes.length > 1 )
164          throw new IllegalStateException( "cannot resolve SubAssemblies with multiple tails at this time" );
165    
166        for( Pipe previous : pipes )
167          {
168          if( previous instanceof Splice || previous instanceof Operator )
169            return previous;
170    
171          return resolvePrevious( previous );
172          }
173    
174        return pipe;
175        }
176    
177      protected Pipe()
178        {
179        }
180    
181      @ConstructorProperties({"previous"})
182      protected Pipe( Pipe previous )
183        {
184        this.previous = previous;
185    
186        verifyPipe();
187        }
188    
189      /**
190       * Constructor Pipe creates a new Pipe instance with the given name. This is useful as the 'start' or head
191       * of a pipe assembly.
192       *
193       * @param name name for this branch of Pipes
194       */
195      @ConstructorProperties({"name"})
196      public Pipe( String name )
197        {
198        this.name = name;
199        }
200    
201      /**
202       * Constructor Pipe creates a new Pipe instance with the given name and previous Pipe instance. This is useful for
203       * naming a branch in a pipe assembly. Or renaming the branch mid-way down.
204       *
205       * @param name     name for this branch of Pipes
206       * @param previous previous Pipe to receive input Tuples from
207       */
208      @ConstructorProperties({"name", "previous"})
209      public Pipe( String name, Pipe previous )
210        {
211        this.name = name;
212        this.previous = previous;
213    
214        verifyPipe();
215        }
216    
217      private void verifyPipe()
218        {
219        if( !( previous instanceof SubAssembly ) )
220          return;
221    
222        String[] strings = ( (SubAssembly) previous ).getTailNames();
223        if( strings.length != 1 )
224          throw new IllegalArgumentException( "pipe assembly must not return more than one tail pipe instance, found " + Util.join( strings, ", " ) );
225        }
226    
227      /**
228       * Get the name of this pipe. Guaranteed non-null.
229       *
230       * @return String the name of this pipe
231       */
232      public String getName()
233        {
234        if( name != null )
235          return name;
236    
237        if( previous != null )
238          {
239          name = previous.getName();
240    
241          return name;
242          }
243    
244        return "ANONYMOUS";
245        }
246    
247      /**
248       * Get all the upstream pipes this pipe is connected to. This method will return the Pipe instances
249       * passed on the constructors as inputs to this Pipe instance.
250       *
251       * @return all the upstream pipes this pipe is connected to.
252       */
253      public Pipe[] getPrevious()
254        {
255        if( previous == null )
256          return new Pipe[ 0 ];
257    
258        return new Pipe[]{previous};
259        }
260    
261      protected void setParent( Pipe parent )
262        {
263        this.parent = parent;
264        }
265    
266      /**
267       * Returns the enclosing parent Pipe instance, if any. A parent is typically a {@link SubAssembly} that wraps
268       * this instance.
269       *
270       * @return of type Pipe
271       */
272      public Pipe getParent()
273        {
274        return parent;
275        }
276    
277      /**
278       * Returns a {@link ConfigDef} instance that allows for local properties to be set and made available via
279       * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked.
280       * <p/>
281       * Any properties set on the configDef will not show up in any {@link cascading.flow.Flow} or
282       * {@link cascading.flow.FlowStep} process level configuration, but will override any of those values as seen by the
283       * current Pipe instance.
284       *
285       * @return an instance of ConfigDef
286       */
287      @Override
288      public ConfigDef getConfigDef()
289        {
290        if( configDef == null )
291          configDef = new ConfigDef();
292    
293        return configDef;
294        }
295    
296      /**
297       * Returns {@code true} if there are properties in the configDef instance.
298       *
299       * @return true if there are configDef properties
300       */
301      @Override
302      public boolean hasConfigDef()
303        {
304        return configDef != null && !configDef.isEmpty();
305        }
306    
307      /**
308       * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
309       * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked.
310       * <p/>
311       * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in
312       * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the
313       * stepConfigDef will be overridden by the pipe local {@code #getConfigDef} instance.
314       * </p>
315       * Use this method to tweak properties in the process step this pipe instance is planned into. In the case of the
316       * Hadoop platform, when set on a {@link GroupBy} instance, the number of reducers can be modified.
317       *
318       * @return an instance of ConfigDef
319       */
320      @Override
321      public ConfigDef getStepConfigDef()
322        {
323        if( stepConfigDef == null )
324          stepConfigDef = new ConfigDef();
325    
326        return stepConfigDef;
327        }
328    
329      /**
330       * Returns {@code true} if there are properties in the processConfigDef instance.
331       *
332       * @return true if there are processConfigDef properties
333       */
334      @Override
335      public boolean hasStepConfigDef()
336        {
337        return stepConfigDef != null && !stepConfigDef.isEmpty();
338        }
339    
340      /**
341       * Method getHeads returns the first Pipe instances in this pipe assembly.
342       *
343       * @return the first (type Pipe[]) of this Pipe object.
344       */
345      public Pipe[] getHeads()
346        {
347        Pipe[] pipes = getPrevious();
348    
349        if( pipes.length == 0 )
350          return new Pipe[]{this};
351    
352        if( pipes.length == 1 )
353          return pipes[ 0 ].getHeads();
354    
355        Set<Pipe> heads = new HashSet<Pipe>();
356    
357        for( Pipe pipe : pipes )
358          Collections.addAll( heads, pipe.getHeads() );
359    
360        return heads.toArray( new Pipe[ heads.size() ] );
361        }
362    
363      @Override
364      public Scope outgoingScopeFor( Set<Scope> incomingScopes )
365        {
366        return incomingScopes.iterator().next();
367        }
368    
369      @Override
370      public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
371        {
372        throw new IllegalStateException( "resolveIncomingOperationFields should never be called" );
373        }
374    
375      @Override
376      public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
377        {
378        throw new IllegalStateException( "resolveIncomingOperationPassThroughFields should never be called" );
379        }
380    
381      /**
382       * Method getTrace returns a String that pinpoint where this instance was created for debugging.
383       *
384       * @return String
385       */
386      public String getTrace()
387        {
388        return trace;
389        }
390    
391      @Override
392      public String toString()
393        {
394        return getClass().getSimpleName() + "(" + getName() + ")";
395        }
396    
397      Scope getFirst( Set<Scope> incomingScopes )
398        {
399        return incomingScopes.iterator().next();
400        }
401    
402      @Override
403      public boolean isEquivalentTo( FlowElement element )
404        {
405        if( element == null )
406          return false;
407    
408        if( this == element )
409          return true;
410    
411        return getClass() == element.getClass();
412        }
413    
414      @SuppressWarnings({"EqualsWhichDoesntCheckParameterClass"})
415      @Override
416      public boolean equals( Object object )
417        {
418        // we cannot test equality by names for this class, prevents detection of dupe names in heads or tails
419        return this == object;
420        }
421    
422      @Override
423      public int hashCode()
424        {
425        return 31 * getName().hashCode() + getClass().hashCode();
426        }
427    
428      /**
429       * Method print is used internally.
430       *
431       * @param scope of type Scope
432       * @return String
433       */
434      public String print( Scope scope )
435        {
436        StringBuffer buffer = new StringBuffer();
437    
438        printInternal( buffer, scope );
439    
440        return buffer.toString();
441        }
442    
443      protected void printInternal( StringBuffer buffer, Scope scope )
444        {
445        buffer.append( getClass().getSimpleName() ).append( "('" ).append( getName() ).append( "')" );
446        }
447      }