001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe;
022    
023    import java.util.Arrays;
024    import java.util.Collections;
025    import java.util.HashSet;
026    import java.util.Set;
027    
028    import cascading.util.TraceUtil;
029    import cascading.util.Util;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     * Subclasses of SubAssembly encapsulate complex assemblies of {@link Pipe}s so they my be reused in the same manner
035     * a Pipe is used.
036     * <p/>
037     * That is, a typical SubAssembly subclass will accept a 'previous' Pipe instance, and a few
038     * arguments for configuring the resulting sub-assembly.
039     * <p/>
040     * The previous pipe (or pipes) must be passed on the super constructor, or set via {@link #setPrevious(Pipe...)}. This
041     * allows the current SubAssembly to become the parent of any Pipe instances between the previous and the tails,
042     * exclusive of the previous, and inclusive of the tails.
043     * <p/>
044     * Subsequently all tail Pipes must be set via the {@link #setTails(Pipe...)} method.
045     * <p/>
046     * Note if the SubAssembly represents a split in the pipeline process,
047     * all the 'tails' of the assembly must be passed to {@link #setTails(Pipe...)}. It is up the the developer to
048     * provide any other access to the tails so they may be chained into any subsequent Pipes.
049     * <p/>
050     * Any {@link cascading.property.ConfigDef} values on this SubAssembly will be honored by child Pipe instances via the
051     * {@link cascading.pipe.Pipe#getParent()} back link described above.
052     */
053    public abstract class SubAssembly extends Pipe
054      {
055      private static final Logger LOG = LoggerFactory.getLogger( SubAssembly.class );
056    
057      /** Field previous */
058      private Pipe[] previous; // actual previous pipes instances
059      /** Field tails */
060      private Pipe[] tails;
061    
062      private transient String[] names;
063    
064      /**
065       * Is responsible for unwinding nested SubAssembly instances.
066       *
067       * @param tails of type Pipe[]
068       * @return a Pipe[]
069       */
070      public static Pipe[] unwind( Pipe... tails )
071        {
072        Set<Pipe> previous = new HashSet<Pipe>();
073    
074        for( Pipe pipe : tails )
075          {
076          if( pipe instanceof SubAssembly )
077            Collections.addAll( previous, unwind( pipe.getPrevious() ) );
078          else
079            previous.add( pipe );
080          }
081    
082        return previous.toArray( new Pipe[ previous.size() ] );
083        }
084    
085      protected SubAssembly()
086        {
087        }
088    
089      protected SubAssembly( Pipe... previous )
090        {
091        setPrevious( previous );
092        }
093    
094      protected SubAssembly( String name, Pipe[] previous )
095        {
096        super( name );
097        setPrevious( previous );
098        }
099    
100      /**
101       * Must be called by subclasses to set the start end points of the assembly the subclass represents.
102       *
103       * @param previous of type Pipe
104       */
105      protected void setPrevious( Pipe... previous )
106        {
107        this.previous = previous;
108        }
109    
110      /**
111       * Must be called by subclasses to set the final end points of the assembly the subclass represents.
112       *
113       * @param tails of type Pipe
114       */
115      protected void setTails( Pipe... tails )
116        {
117        this.tails = tails;
118    
119        if( previous == null )
120          {
121          LOG.warn( "previous pipes not set via setPrevious or constructor on: {}", this );
122          return;
123          }
124    
125        Set<Pipe> stopSet = new HashSet<Pipe>();
126    
127        Collections.addAll( stopSet, previous );
128    
129        setParent( stopSet, tails );
130        }
131    
132      private void setParent( Set<Pipe> stopSet, Pipe[] tails )
133        {
134        if( tails == null )
135          return;
136    
137        for( Pipe tail : tails )
138          {
139          if( stopSet.contains( tail ) )
140            continue;
141    
142          tail.setParent( this );
143    
144          Pipe[] current;
145    
146          if( tail instanceof SubAssembly )
147            current = ( (SubAssembly) tail ).previous;
148          else
149            current = tail.getPrevious();
150    
151          if( current == null && tail instanceof SubAssembly )
152            LOG.warn( "previous pipes not set via setPrevious or constructor on: {}", tail );
153    
154          setParent( stopSet, current );
155          }
156        }
157    
158      /**
159       * Method getTails returns all the tails of this SubAssembly object. These values are set by {@link #setTails(Pipe...)}.
160       *
161       * @return the tails (type Pipe[]) of this SubAssembly object.
162       */
163      public Pipe[] getTails()
164        {
165        return getPrevious(); // just returns a clone of tails
166        }
167    
168      /**
169       * Method getTailNames returns the tailNames of this SubAssembly object.
170       *
171       * @return the tailNames (type String[]) of this SubAssembly object.
172       */
173      public String[] getTailNames()
174        {
175        if( tails == null )
176          throw new IllegalStateException( TraceUtil.formatRawTrace( this, "setTails must be called in the constructor" ) );
177    
178        if( names != null )
179          return names;
180    
181        names = new String[ tails.length ];
182    
183        for( int i = 0; i < tails.length; i++ )
184          names[ i ] = tails[ i ].getName();
185    
186        return names;
187        }
188    
189      @Override
190      public String getName()
191        {
192        if( name == null )
193          name = Util.join( getTailNames(), "+" );
194    
195        return name;
196        }
197    
198      @Override
199      public Pipe[] getPrevious()
200        {
201        // returns the semantically equivalent to Pipe#previous to simplify logic in the planner
202        // SubAssemblies are really aliases for their tails
203        if( tails == null )
204          throw new IllegalStateException( TraceUtil.formatRawTrace( this, "setTails must be called after the sub-assembly is assembled" ) );
205    
206        return Arrays.copyOf( tails, tails.length );
207        }
208      }