001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe; 022 023 import java.beans.ConstructorProperties; 024 import java.io.Serializable; 025 import java.util.Collections; 026 import java.util.HashSet; 027 import java.util.Set; 028 029 import cascading.flow.FlowElement; 030 import cascading.flow.planner.Scope; 031 import cascading.property.ConfigDef; 032 import cascading.tuple.Fields; 033 import cascading.util.TraceUtil; 034 import cascading.util.Traceable; 035 import cascading.util.Util; 036 037 import static java.util.Arrays.asList; 038 039 /** 040 * Class Pipe is used to name branches in pipe assemblies, and as a base class for core 041 * processing model types, specifically {@link Each}, {@link Every}, {@link GroupBy}, 042 * {@link CoGroup}, {@link Merge}, {@link HashJoin}, and {@link SubAssembly}. 043 * <p/> 044 * Pipes are chained together through their constructors. 045 * <p/> 046 * To effect a split in the pipe, 047 * simply pass a Pipe instance to two or more constructors of subsequent Pipe instances. 048 * </p> 049 * A join can be achieved by passing two or more Pipe instances to a {@link CoGroup} or {@link HashJoin} pipe. 050 * <p/> 051 * A merge can be achieved by passing two or more Pipe instances to a {@link GroupBy} or {@link Merge} pipe. 052 * 053 * @see Each 054 * @see Every 055 * @see GroupBy 056 * @see Merge 057 * @see CoGroup 058 * @see HashJoin 059 * @see SubAssembly 060 */ 061 public class Pipe implements FlowElement, Serializable, Traceable 062 { 063 /** Field serialVersionUID */ 064 private static final long serialVersionUID = 1L; 065 /** Field name */ 066 protected String name; 067 /** Field previous */ 068 protected Pipe previous; 069 /** Field parent */ 070 protected Pipe parent; 071 072 protected ConfigDef configDef; 073 074 protected ConfigDef stepConfigDef; 075 076 /** Field id */ 077 private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent 078 /** Field trace */ 079 private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override 080 081 public static synchronized String id( Pipe pipe ) 082 { 083 return pipe.id; 084 } 085 086 /** 087 * Convenience method to create an array of Pipe instances. 088 * 089 * @param pipes vararg list of pipes 090 * @return array of pipes 091 */ 092 public static Pipe[] pipes( Pipe... pipes ) 093 { 094 return pipes; 095 } 096 097 /** 098 * Convenience method for finding all Pipe names in an assembly. 099 * 100 * @param tails vararg list of all tails in given assembly 101 * @return array of Pipe names 102 */ 103 public static String[] names( Pipe... tails ) 104 { 105 Set<String> names = new HashSet<String>(); 106 107 collectNames( tails, names ); 108 109 return names.toArray( new String[ names.size() ] ); 110 } 111 112 private static void collectNames( Pipe[] pipes, Set<String> names ) 113 { 114 for( Pipe pipe : pipes ) 115 { 116 if( pipe instanceof SubAssembly ) 117 names.addAll( asList( ( (SubAssembly) pipe ).getTailNames() ) ); 118 else 119 names.add( pipe.getName() ); 120 121 collectNames( SubAssembly.unwind( pipe.getPrevious() ), names ); 122 } 123 } 124 125 public static Pipe[] named( String name, Pipe... tails ) 126 { 127 Set<Pipe> pipes = new HashSet<Pipe>(); 128 129 collectPipes( name, tails, pipes ); 130 131 return pipes.toArray( new Pipe[ pipes.size() ] ); 132 } 133 134 private static void collectPipes( String name, Pipe[] tails, Set<Pipe> pipes ) 135 { 136 for( Pipe tail : tails ) 137 { 138 if( !( tail instanceof SubAssembly ) && tail.getName().equals( name ) ) 139 pipes.add( tail ); 140 141 collectPipes( name, SubAssembly.unwind( tail.getPrevious() ), pipes ); 142 } 143 } 144 145 static Pipe[] resolvePreviousAll( Pipe... pipes ) 146 { 147 Pipe[] resolved = new Pipe[ pipes.length ]; 148 149 for( int i = 0; i < pipes.length; i++ ) 150 resolved[ i ] = resolvePrevious( pipes[ i ] ); 151 152 return resolved; 153 } 154 155 static Pipe resolvePrevious( Pipe pipe ) 156 { 157 if( pipe instanceof Splice || pipe instanceof Operator ) 158 return pipe; 159 160 Pipe[] pipes = pipe.getPrevious(); 161 162 if( pipes.length > 1 ) 163 throw new IllegalStateException( "cannot resolve SubAssemblies with multiple tails at this time" ); 164 165 for( Pipe previous : pipes ) 166 { 167 if( previous instanceof Splice || previous instanceof Operator ) 168 return previous; 169 170 return resolvePrevious( previous ); 171 } 172 173 return pipe; 174 } 175 176 protected Pipe() 177 { 178 } 179 180 @ConstructorProperties({"previous"}) 181 protected Pipe( Pipe previous ) 182 { 183 this.previous = previous; 184 185 verifyPipe(); 186 } 187 188 /** 189 * Constructor Pipe creates a new Pipe instance with the given name. This is useful as the 'start' or head 190 * of a pipe assembly. 191 * 192 * @param name name for this branch of Pipes 193 */ 194 @ConstructorProperties({"name"}) 195 public Pipe( String name ) 196 { 197 this.name = name; 198 } 199 200 /** 201 * Constructor Pipe creates a new Pipe instance with the given name and previous Pipe instance. This is useful for 202 * naming a branch in a pipe assembly. Or renaming the branch mid-way down. 203 * 204 * @param name name for this branch of Pipes 205 * @param previous previous Pipe to receive input Tuples from 206 */ 207 @ConstructorProperties({"name", "previous"}) 208 public Pipe( String name, Pipe previous ) 209 { 210 this.name = name; 211 this.previous = previous; 212 213 verifyPipe(); 214 } 215 216 private void verifyPipe() 217 { 218 if( !( previous instanceof SubAssembly ) ) 219 return; 220 221 String[] strings = ( (SubAssembly) previous ).getTailNames(); 222 if( strings.length != 1 ) 223 throw new IllegalArgumentException( "pipe assembly must not return more than one tail pipe instance, found " + Util.join( strings, ", " ) ); 224 } 225 226 /** 227 * Get the name of this pipe. Guaranteed non-null. 228 * 229 * @return String the name of this pipe 230 */ 231 public String getName() 232 { 233 if( name != null ) 234 return name; 235 236 if( previous != null ) 237 { 238 name = previous.getName(); 239 240 return name; 241 } 242 243 return "ANONYMOUS"; 244 } 245 246 /** 247 * Get all the upstream pipes this pipe is connected to. This method will return the Pipe instances 248 * passed on the constructors as inputs to this Pipe instance. 249 * 250 * @return all the upstream pipes this pipe is connected to. 251 */ 252 public Pipe[] getPrevious() 253 { 254 if( previous == null ) 255 return new Pipe[ 0 ]; 256 257 return new Pipe[]{previous}; 258 } 259 260 protected void setParent( Pipe parent ) 261 { 262 this.parent = parent; 263 } 264 265 /** 266 * Returns the enclosing parent Pipe instance, if any. A parent is typically a {@link SubAssembly} that wraps 267 * this instance. 268 * 269 * @return of type Pipe 270 */ 271 public Pipe getParent() 272 { 273 return parent; 274 } 275 276 /** 277 * Returns a {@link ConfigDef} instance that allows for local properties to be set and made available via 278 * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked. 279 * <p/> 280 * Any properties set on the configDef will not show up in any {@link cascading.flow.Flow} or 281 * {@link cascading.flow.FlowStep} process level configuration, but will override any of those values as seen by the 282 * current Pipe instance. 283 * 284 * @return an instance of ConfigDef 285 */ 286 @Override 287 public ConfigDef getConfigDef() 288 { 289 if( configDef == null ) 290 configDef = new ConfigDef(); 291 292 return configDef; 293 } 294 295 /** 296 * Returns {@code true} if there are properties in the configDef instance. 297 * 298 * @return true if there are configDef properties 299 */ 300 @Override 301 public boolean hasConfigDef() 302 { 303 return configDef != null && !configDef.isEmpty(); 304 } 305 306 /** 307 * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via 308 * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked. 309 * <p/> 310 * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in 311 * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the 312 * stepConfigDef will be overridden by the pipe local {@code #getConfigDef} instance. 313 * </p> 314 * Use this method to tweak properties in the process step this pipe instance is planned into. In the case of the 315 * Hadoop platform, when set on a {@link GroupBy} instance, the number of reducers can be modified. 316 * 317 * @return an instance of ConfigDef 318 */ 319 @Override 320 public ConfigDef getStepConfigDef() 321 { 322 if( stepConfigDef == null ) 323 stepConfigDef = new ConfigDef(); 324 325 return stepConfigDef; 326 } 327 328 /** 329 * Returns {@code true} if there are properties in the processConfigDef instance. 330 * 331 * @return true if there are processConfigDef properties 332 */ 333 @Override 334 public boolean hasStepConfigDef() 335 { 336 return stepConfigDef != null && !stepConfigDef.isEmpty(); 337 } 338 339 /** 340 * Method getHeads returns the first Pipe instances in this pipe assembly. 341 * 342 * @return the first (type Pipe[]) of this Pipe object. 343 */ 344 public Pipe[] getHeads() 345 { 346 Pipe[] pipes = getPrevious(); 347 348 if( pipes.length == 0 ) 349 return new Pipe[]{this}; 350 351 if( pipes.length == 1 ) 352 return pipes[ 0 ].getHeads(); 353 354 Set<Pipe> heads = new HashSet<Pipe>(); 355 356 for( Pipe pipe : pipes ) 357 Collections.addAll( heads, pipe.getHeads() ); 358 359 return heads.toArray( new Pipe[ heads.size() ] ); 360 } 361 362 @Override 363 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 364 { 365 return incomingScopes.iterator().next(); 366 } 367 368 @Override 369 public Fields resolveIncomingOperationArgumentFields( Scope incomingScope ) 370 { 371 throw new IllegalStateException( "resolveIncomingOperationFields should never be called" ); 372 } 373 374 @Override 375 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 376 { 377 throw new IllegalStateException( "resolveIncomingOperationPassThroughFields should never be called" ); 378 } 379 380 @Override 381 public String getTrace() 382 { 383 return trace; 384 } 385 386 @Override 387 public String toString() 388 { 389 return getClass().getSimpleName() + "(" + getName() + ")"; 390 } 391 392 Scope getFirst( Set<Scope> incomingScopes ) 393 { 394 return incomingScopes.iterator().next(); 395 } 396 397 @Override 398 public boolean isEquivalentTo( FlowElement element ) 399 { 400 if( element == null ) 401 return false; 402 403 if( this == element ) 404 return true; 405 406 return getClass() == element.getClass(); 407 } 408 409 @SuppressWarnings({"EqualsWhichDoesntCheckParameterClass"}) 410 @Override 411 public boolean equals( Object object ) 412 { 413 // we cannot test equality by names for this class, prevents detection of dupe names in heads or tails 414 return this == object; 415 } 416 417 @Override 418 public int hashCode() 419 { 420 return 31 * getName().hashCode() + getClass().hashCode(); 421 } 422 423 /** 424 * Method print is used internally. 425 * 426 * @param scope of type Scope 427 * @return String 428 */ 429 public String print( Scope scope ) 430 { 431 StringBuffer buffer = new StringBuffer(); 432 433 printInternal( buffer, scope ); 434 435 return buffer.toString(); 436 } 437 438 protected void printInternal( StringBuffer buffer, Scope scope ) 439 { 440 buffer.append( getClass().getSimpleName() ).append( "('" ).append( getName() ).append( "')" ); 441 } 442 }