001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe; 022 023 import java.beans.ConstructorProperties; 024 import java.io.Serializable; 025 import java.util.Collections; 026 import java.util.HashSet; 027 import java.util.Set; 028 029 import cascading.flow.FlowElement; 030 import cascading.flow.planner.Scope; 031 import cascading.property.ConfigDef; 032 import cascading.tuple.Fields; 033 import cascading.util.Util; 034 035 import static java.util.Arrays.asList; 036 037 /** 038 * Class Pipe is used to name branches in pipe assemblies, and as a base class for core 039 * processing model types, specifically {@link Each}, {@link Every}, {@link GroupBy}, 040 * {@link CoGroup}, {@link Merge}, {@link HashJoin}, and {@link SubAssembly}. 041 * <p/> 042 * Pipes are chained together through their constructors. 043 * <p/> 044 * To effect a split in the pipe, 045 * simply pass a Pipe instance to two or more constructors of subsequent Pipe instances. 046 * </p> 047 * A join can be achieved by passing two or more Pipe instances to a {@link CoGroup} or {@link HashJoin} pipe. 048 * <p/> 049 * A merge can be achieved by passing two or more Pipe instances to a {@link GroupBy} or {@link Merge} pipe. 050 * 051 * @see Each 052 * @see Every 053 * @see GroupBy 054 * @see Merge 055 * @see CoGroup 056 * @see HashJoin 057 * @see SubAssembly 058 */ 059 public class Pipe implements FlowElement, Serializable 060 { 061 /** Field serialVersionUID */ 062 private static final long serialVersionUID = 1L; 063 /** Field name */ 064 protected String name; 065 /** Field previous */ 066 protected Pipe previous; 067 /** Field parent */ 068 protected Pipe parent; 069 070 protected ConfigDef configDef; 071 072 protected ConfigDef stepConfigDef; 073 074 /** Field id */ 075 private String id; 076 /** Field trace */ 077 private final String trace = Util.captureDebugTrace( getClass() ); 078 079 public static synchronized String id( Pipe pipe ) 080 { 081 if( pipe.id == null ) 082 pipe.id = Util.createUniqueID(); 083 084 return pipe.id; 085 } 086 087 /** 088 * Convenience method to create an array of Pipe instances. 089 * 090 * @param pipes vararg list of pipes 091 * @return array of pipes 092 */ 093 public static Pipe[] pipes( Pipe... pipes ) 094 { 095 return pipes; 096 } 097 098 /** 099 * Convenience method for finding all Pipe names in an assembly. 100 * 101 * @param tails vararg list of all tails in given assembly 102 * @return array of Pipe names 103 */ 104 public static String[] names( Pipe... tails ) 105 { 106 Set<String> names = new HashSet<String>(); 107 108 collectNames( tails, names ); 109 110 return names.toArray( new String[ names.size() ] ); 111 } 112 113 private static void collectNames( Pipe[] pipes, Set<String> names ) 114 { 115 for( Pipe pipe : pipes ) 116 { 117 if( pipe instanceof SubAssembly ) 118 names.addAll( asList( ( (SubAssembly) pipe ).getTailNames() ) ); 119 else 120 names.add( pipe.getName() ); 121 122 collectNames( SubAssembly.unwind( pipe.getPrevious() ), names ); 123 } 124 } 125 126 public static Pipe[] named( String name, Pipe... tails ) 127 { 128 Set<Pipe> pipes = new HashSet<Pipe>(); 129 130 collectPipes( name, tails, pipes ); 131 132 return pipes.toArray( new Pipe[ pipes.size() ] ); 133 } 134 135 private static void collectPipes( String name, Pipe[] tails, Set<Pipe> pipes ) 136 { 137 for( Pipe tail : tails ) 138 { 139 if( !( tail instanceof SubAssembly ) && tail.getName().equals( name ) ) 140 pipes.add( tail ); 141 142 collectPipes( name, SubAssembly.unwind( tail.getPrevious() ), pipes ); 143 } 144 } 145 146 static Pipe[] resolvePreviousAll( Pipe... pipes ) 147 { 148 Pipe[] resolved = new Pipe[ pipes.length ]; 149 150 for( int i = 0; i < pipes.length; i++ ) 151 resolved[ i ] = resolvePrevious( pipes[ i ] ); 152 153 return resolved; 154 } 155 156 static Pipe resolvePrevious( Pipe pipe ) 157 { 158 if( pipe instanceof Splice || pipe instanceof Operator ) 159 return pipe; 160 161 Pipe[] pipes = pipe.getPrevious(); 162 163 if( pipes.length > 1 ) 164 throw new IllegalStateException( "cannot resolve SubAssemblies with multiple tails at this time" ); 165 166 for( Pipe previous : pipes ) 167 { 168 if( previous instanceof Splice || previous instanceof Operator ) 169 return previous; 170 171 return resolvePrevious( previous ); 172 } 173 174 return pipe; 175 } 176 177 protected Pipe() 178 { 179 } 180 181 @ConstructorProperties({"previous"}) 182 protected Pipe( Pipe previous ) 183 { 184 this.previous = previous; 185 186 verifyPipe(); 187 } 188 189 /** 190 * Constructor Pipe creates a new Pipe instance with the given name. This is useful as the 'start' or head 191 * of a pipe assembly. 192 * 193 * @param name name for this branch of Pipes 194 */ 195 @ConstructorProperties({"name"}) 196 public Pipe( String name ) 197 { 198 this.name = name; 199 } 200 201 /** 202 * Constructor Pipe creates a new Pipe instance with the given name and previous Pipe instance. This is useful for 203 * naming a branch in a pipe assembly. Or renaming the branch mid-way down. 204 * 205 * @param name name for this branch of Pipes 206 * @param previous previous Pipe to receive input Tuples from 207 */ 208 @ConstructorProperties({"name", "previous"}) 209 public Pipe( String name, Pipe previous ) 210 { 211 this.name = name; 212 this.previous = previous; 213 214 verifyPipe(); 215 } 216 217 private void verifyPipe() 218 { 219 if( !( previous instanceof SubAssembly ) ) 220 return; 221 222 String[] strings = ( (SubAssembly) previous ).getTailNames(); 223 if( strings.length != 1 ) 224 throw new IllegalArgumentException( "pipe assembly must not return more than one tail pipe instance, found " + Util.join( strings, ", " ) ); 225 } 226 227 /** 228 * Get the name of this pipe. Guaranteed non-null. 229 * 230 * @return String the name of this pipe 231 */ 232 public String getName() 233 { 234 if( name != null ) 235 return name; 236 237 if( previous != null ) 238 { 239 name = previous.getName(); 240 241 return name; 242 } 243 244 return "ANONYMOUS"; 245 } 246 247 /** 248 * Get all the upstream pipes this pipe is connected to. This method will return the Pipe instances 249 * passed on the constructors as inputs to this Pipe instance. 250 * 251 * @return all the upstream pipes this pipe is connected to. 252 */ 253 public Pipe[] getPrevious() 254 { 255 if( previous == null ) 256 return new Pipe[ 0 ]; 257 258 return new Pipe[]{previous}; 259 } 260 261 protected void setParent( Pipe parent ) 262 { 263 this.parent = parent; 264 } 265 266 /** 267 * Returns the enclosing parent Pipe instance, if any. A parent is typically a {@link SubAssembly} that wraps 268 * this instance. 269 * 270 * @return of type Pipe 271 */ 272 public Pipe getParent() 273 { 274 return parent; 275 } 276 277 /** 278 * Returns a {@link ConfigDef} instance that allows for local properties to be set and made available via 279 * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked. 280 * <p/> 281 * Any properties set on the configDef will not show up in any {@link cascading.flow.Flow} or 282 * {@link cascading.flow.FlowStep} process level configuration, but will override any of those values as seen by the 283 * current Pipe instance. 284 * 285 * @return an instance of ConfigDef 286 */ 287 @Override 288 public ConfigDef getConfigDef() 289 { 290 if( configDef == null ) 291 configDef = new ConfigDef(); 292 293 return configDef; 294 } 295 296 /** 297 * Returns {@code true} if there are properties in the configDef instance. 298 * 299 * @return true if there are configDef properties 300 */ 301 @Override 302 public boolean hasConfigDef() 303 { 304 return configDef != null && !configDef.isEmpty(); 305 } 306 307 /** 308 * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via 309 * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked. 310 * <p/> 311 * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in 312 * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the 313 * stepConfigDef will be overridden by the pipe local {@code #getConfigDef} instance. 314 * </p> 315 * Use this method to tweak properties in the process step this pipe instance is planned into. In the case of the 316 * Hadoop platform, when set on a {@link GroupBy} instance, the number of reducers can be modified. 317 * 318 * @return an instance of ConfigDef 319 */ 320 @Override 321 public ConfigDef getStepConfigDef() 322 { 323 if( stepConfigDef == null ) 324 stepConfigDef = new ConfigDef(); 325 326 return stepConfigDef; 327 } 328 329 /** 330 * Returns {@code true} if there are properties in the processConfigDef instance. 331 * 332 * @return true if there are processConfigDef properties 333 */ 334 @Override 335 public boolean hasStepConfigDef() 336 { 337 return stepConfigDef != null && !stepConfigDef.isEmpty(); 338 } 339 340 /** 341 * Method getHeads returns the first Pipe instances in this pipe assembly. 342 * 343 * @return the first (type Pipe[]) of this Pipe object. 344 */ 345 public Pipe[] getHeads() 346 { 347 Pipe[] pipes = getPrevious(); 348 349 if( pipes.length == 0 ) 350 return new Pipe[]{this}; 351 352 if( pipes.length == 1 ) 353 return pipes[ 0 ].getHeads(); 354 355 Set<Pipe> heads = new HashSet<Pipe>(); 356 357 for( Pipe pipe : pipes ) 358 Collections.addAll( heads, pipe.getHeads() ); 359 360 return heads.toArray( new Pipe[ heads.size() ] ); 361 } 362 363 @Override 364 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 365 { 366 return incomingScopes.iterator().next(); 367 } 368 369 @Override 370 public Fields resolveIncomingOperationArgumentFields( Scope incomingScope ) 371 { 372 throw new IllegalStateException( "resolveIncomingOperationFields should never be called" ); 373 } 374 375 @Override 376 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 377 { 378 throw new IllegalStateException( "resolveIncomingOperationPassThroughFields should never be called" ); 379 } 380 381 /** 382 * Method getTrace returns a String that pinpoint where this instance was created for debugging. 383 * 384 * @return String 385 */ 386 public String getTrace() 387 { 388 return trace; 389 } 390 391 @Override 392 public String toString() 393 { 394 return getClass().getSimpleName() + "(" + getName() + ")"; 395 } 396 397 Scope getFirst( Set<Scope> incomingScopes ) 398 { 399 return incomingScopes.iterator().next(); 400 } 401 402 @Override 403 public boolean isEquivalentTo( FlowElement element ) 404 { 405 if( element == null ) 406 return false; 407 408 if( this == element ) 409 return true; 410 411 return getClass() == element.getClass(); 412 } 413 414 @SuppressWarnings({"EqualsWhichDoesntCheckParameterClass"}) 415 @Override 416 public boolean equals( Object object ) 417 { 418 // we cannot test equality by names for this class, prevents detection of dupe names in heads or tails 419 return this == object; 420 } 421 422 @Override 423 public int hashCode() 424 { 425 return 31 * getName().hashCode() + getClass().hashCode(); 426 } 427 428 /** 429 * Method print is used internally. 430 * 431 * @param scope of type Scope 432 * @return String 433 */ 434 public String print( Scope scope ) 435 { 436 StringBuffer buffer = new StringBuffer(); 437 438 printInternal( buffer, scope ); 439 440 return buffer.toString(); 441 } 442 443 protected void printInternal( StringBuffer buffer, Scope scope ) 444 { 445 buffer.append( getClass().getSimpleName() ).append( "('" ).append( getName() ).append( "')" ); 446 } 447 }