001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe;
022    
023    import java.util.Arrays;
024    import java.util.Collections;
025    import java.util.HashSet;
026    import java.util.Set;
027    
028    import cascading.util.Util;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    /**
033     * Subclasses of SubAssembly encapsulate complex assemblies of {@link Pipe}s so they my be reused in the same manner
034     * a Pipe is used.
035     * <p/>
036     * That is, a typical SubAssembly subclass will accept a 'previous' Pipe instance, and a few
037     * arguments for configuring the resulting sub-assembly.
038     * <p/>
039     * The previous pipe (or pipes) must be passed on the super constructor, or set via {@link #setPrevious(Pipe...)}. This
040     * allows the current SubAssembly to become the parent of any Pipe instances between the previous and the tails,
041     * exclusive of the previous, and inclusive of the tails.
042     * <p/>
043     * Subsequently all tail Pipes must be set via the {@link #setTails(Pipe...)} method.
044     * <p/>
045     * Note if the SubAssembly represents a split in the pipeline process,
046     * all the 'tails' of the assembly must be passed to {@link #setTails(Pipe...)}. It is up the the developer to
047     * provide any other access to the tails so they may be chained into any subsequent Pipes.
048     * <p/>
049     * Any {@link cascading.property.ConfigDef} values on this SubAssembly will be honored by child Pipe instances via the
050     * {@link cascading.pipe.Pipe#getParent()} back link described above.
051     */
052    public abstract class SubAssembly extends Pipe
053      {
054      private static final Logger LOG = LoggerFactory.getLogger( SubAssembly.class );
055    
056      /** Field previous */
057      private Pipe[] previous; // actual previous pipes instances
058      /** Field tails */
059      private Pipe[] tails;
060    
061      private transient String[] names;
062    
063      /**
064       * Is responsible for unwinding nested SubAssembly instances.
065       *
066       * @param tails of type Pipe[]
067       * @return a Pipe[]
068       */
069      public static Pipe[] unwind( Pipe... tails )
070        {
071        Set<Pipe> previous = new HashSet<Pipe>();
072    
073        for( Pipe pipe : tails )
074          {
075          if( pipe instanceof SubAssembly )
076            Collections.addAll( previous, unwind( pipe.getPrevious() ) );
077          else
078            previous.add( pipe );
079          }
080    
081        return previous.toArray( new Pipe[ previous.size() ] );
082        }
083    
084      protected SubAssembly()
085        {
086        }
087    
088      protected SubAssembly( Pipe... previous )
089        {
090        setPrevious( previous );
091        }
092    
093      protected SubAssembly( String name, Pipe[] previous )
094        {
095        super( name );
096        setPrevious( previous );
097        }
098    
099      /**
100       * Must be called by subclasses to set the start end points of the assembly the subclass represents.
101       *
102       * @param previous of type Pipe
103       */
104      protected void setPrevious( Pipe... previous )
105        {
106        this.previous = previous;
107        }
108    
109      /**
110       * Must be called by subclasses to set the final end points of the assembly the subclass represents.
111       *
112       * @param tails of type Pipe
113       */
114      protected void setTails( Pipe... tails )
115        {
116        this.tails = tails;
117    
118        if( previous == null )
119          {
120          LOG.warn( "previous pipes not set via setPrevious or constructor on: {}", this );
121          return;
122          }
123    
124        Set<Pipe> stopSet = new HashSet<Pipe>();
125    
126        Collections.addAll( stopSet, previous );
127    
128        setParent( stopSet, tails );
129        }
130    
131      private void setParent( Set<Pipe> stopSet, Pipe[] tails )
132        {
133        if( tails == null )
134          return;
135    
136        for( Pipe tail : tails )
137          {
138          if( stopSet.contains( tail ) )
139            continue;
140    
141          tail.setParent( this );
142    
143          Pipe[] current;
144    
145          if( tail instanceof SubAssembly )
146            current = ( (SubAssembly) tail ).previous;
147          else
148            current = tail.getPrevious();
149    
150          if( current == null && tail instanceof SubAssembly )
151            LOG.warn( "previous pipes not set via setPrevious or constructor on: {}", tail );
152    
153          setParent( stopSet, current );
154          }
155        }
156    
157      /**
158       * Method getTails returns all the tails of this SubAssembly object. These values are set by {@link #setTails(Pipe...)}.
159       *
160       * @return the tails (type Pipe[]) of this SubAssembly object.
161       */
162      public Pipe[] getTails()
163        {
164        return getPrevious(); // just returns a clone of tails
165        }
166    
167      /**
168       * Method getTailNames returns the tailNames of this SubAssembly object.
169       *
170       * @return the tailNames (type String[]) of this SubAssembly object.
171       */
172      public String[] getTailNames()
173        {
174        if( tails == null )
175          throw new IllegalStateException( Util.formatRawTrace( this, "setTails must be called in the constructor" ) );
176    
177        if( names != null )
178          return names;
179    
180        names = new String[ tails.length ];
181    
182        for( int i = 0; i < tails.length; i++ )
183          names[ i ] = tails[ i ].getName();
184    
185        return names;
186        }
187    
188      @Override
189      public String getName()
190        {
191        if( name == null )
192          name = Util.join( getTailNames(), "+" );
193    
194        return name;
195        }
196    
197      @Override
198      public Pipe[] getPrevious()
199        {
200        // returns the semantically equivalent to Pipe#previous to simplify logic in the planner
201        // SubAssemblies are really aliases for their tails
202        if( tails == null )
203          throw new IllegalStateException( Util.formatRawTrace( this, "setTails must be called after the sub-assembly is assembled" ) );
204    
205        return Arrays.copyOf( tails, tails.length );
206        }
207      }