001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe; 022 023 import java.util.Arrays; 024 import java.util.Collections; 025 import java.util.HashSet; 026 import java.util.Set; 027 028 import cascading.util.Util; 029 import org.slf4j.Logger; 030 import org.slf4j.LoggerFactory; 031 032 /** 033 * Subclasses of SubAssembly encapsulate complex assemblies of {@link Pipe}s so they my be reused in the same manner 034 * a Pipe is used. 035 * <p/> 036 * That is, a typical SubAssembly subclass will accept a 'previous' Pipe instance, and a few 037 * arguments for configuring the resulting sub-assembly. 038 * <p/> 039 * The previous pipe (or pipes) must be passed on the super constructor, or set via {@link #setPrevious(Pipe...)}. This 040 * allows the current SubAssembly to become the parent of any Pipe instances between the previous and the tails, 041 * exclusive of the previous, and inclusive of the tails. 042 * <p/> 043 * Subsequently all tail Pipes must be set via the {@link #setTails(Pipe...)} method. 044 * <p/> 045 * Note if the SubAssembly represents a split in the pipeline process, 046 * all the 'tails' of the assembly must be passed to {@link #setTails(Pipe...)}. It is up the the developer to 047 * provide any other access to the tails so they may be chained into any subsequent Pipes. 048 * <p/> 049 * Any {@link cascading.property.ConfigDef} values on this SubAssembly will be honored by child Pipe instances via the 050 * {@link cascading.pipe.Pipe#getParent()} back link described above. 051 */ 052 public abstract class SubAssembly extends Pipe 053 { 054 private static final Logger LOG = LoggerFactory.getLogger( SubAssembly.class ); 055 056 /** Field previous */ 057 private Pipe[] previous; // actual previous pipes instances 058 /** Field tails */ 059 private Pipe[] tails; 060 061 private transient String[] names; 062 063 /** 064 * Is responsible for unwinding nested SubAssembly instances. 065 * 066 * @param tails of type Pipe[] 067 * @return a Pipe[] 068 */ 069 public static Pipe[] unwind( Pipe... tails ) 070 { 071 Set<Pipe> previous = new HashSet<Pipe>(); 072 073 for( Pipe pipe : tails ) 074 { 075 if( pipe instanceof SubAssembly ) 076 Collections.addAll( previous, unwind( pipe.getPrevious() ) ); 077 else 078 previous.add( pipe ); 079 } 080 081 return previous.toArray( new Pipe[ previous.size() ] ); 082 } 083 084 protected SubAssembly() 085 { 086 } 087 088 protected SubAssembly( Pipe... previous ) 089 { 090 setPrevious( previous ); 091 } 092 093 protected SubAssembly( String name, Pipe[] previous ) 094 { 095 super( name ); 096 setPrevious( previous ); 097 } 098 099 /** 100 * Must be called by subclasses to set the start end points of the assembly the subclass represents. 101 * 102 * @param previous of type Pipe 103 */ 104 protected void setPrevious( Pipe... previous ) 105 { 106 this.previous = previous; 107 } 108 109 /** 110 * Must be called by subclasses to set the final end points of the assembly the subclass represents. 111 * 112 * @param tails of type Pipe 113 */ 114 protected void setTails( Pipe... tails ) 115 { 116 this.tails = tails; 117 118 if( previous == null ) 119 { 120 LOG.warn( "previous pipes not set via setPrevious or constructor on: {}", this ); 121 return; 122 } 123 124 Set<Pipe> stopSet = new HashSet<Pipe>(); 125 126 Collections.addAll( stopSet, previous ); 127 128 setParent( stopSet, tails ); 129 } 130 131 private void setParent( Set<Pipe> stopSet, Pipe[] tails ) 132 { 133 if( tails == null ) 134 return; 135 136 for( Pipe tail : tails ) 137 { 138 if( stopSet.contains( tail ) ) 139 continue; 140 141 tail.setParent( this ); 142 143 Pipe[] current; 144 145 if( tail instanceof SubAssembly ) 146 current = ( (SubAssembly) tail ).previous; 147 else 148 current = tail.getPrevious(); 149 150 if( current == null && tail instanceof SubAssembly ) 151 LOG.warn( "previous pipes not set via setPrevious or constructor on: {}", tail ); 152 153 setParent( stopSet, current ); 154 } 155 } 156 157 /** 158 * Method getTails returns all the tails of this SubAssembly object. These values are set by {@link #setTails(Pipe...)}. 159 * 160 * @return the tails (type Pipe[]) of this SubAssembly object. 161 */ 162 public Pipe[] getTails() 163 { 164 return getPrevious(); // just returns a clone of tails 165 } 166 167 /** 168 * Method getTailNames returns the tailNames of this SubAssembly object. 169 * 170 * @return the tailNames (type String[]) of this SubAssembly object. 171 */ 172 public String[] getTailNames() 173 { 174 if( tails == null ) 175 throw new IllegalStateException( Util.formatRawTrace( this, "setTails must be called in the constructor" ) ); 176 177 if( names != null ) 178 return names; 179 180 names = new String[ tails.length ]; 181 182 for( int i = 0; i < tails.length; i++ ) 183 names[ i ] = tails[ i ].getName(); 184 185 return names; 186 } 187 188 @Override 189 public String getName() 190 { 191 if( name == null ) 192 name = Util.join( getTailNames(), "+" ); 193 194 return name; 195 } 196 197 @Override 198 public Pipe[] getPrevious() 199 { 200 // returns the semantically equivalent to Pipe#previous to simplify logic in the planner 201 // SubAssemblies are really aliases for their tails 202 if( tails == null ) 203 throw new IllegalStateException( Util.formatRawTrace( this, "setTails must be called after the sub-assembly is assembled" ) ); 204 205 return Arrays.copyOf( tails, tails.length ); 206 } 207 }