001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe; 022 023import java.beans.ConstructorProperties; 024import java.io.Serializable; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Set; 028 029import cascading.flow.FlowElement; 030import cascading.flow.planner.Scope; 031import cascading.property.ConfigDef; 032import cascading.tuple.Fields; 033import cascading.util.TraceUtil; 034import cascading.util.Traceable; 035import cascading.util.Util; 036 037import static java.util.Arrays.asList; 038 039/** 040 * Class Pipe is used to name branches in pipe assemblies, and as a base class for core 041 * processing model types, specifically {@link Each}, {@link Every}, {@link GroupBy}, 042 * {@link CoGroup}, {@link Merge}, {@link HashJoin}, and {@link SubAssembly}. 043 * <p/> 044 * Pipes are chained together through their constructors. 045 * <p/> 046 * To effect a split in the pipe, 047 * simply pass a Pipe instance to two or more constructors of subsequent Pipe instances. 048 * </p> 049 * A join can be achieved by passing two or more Pipe instances to a {@link CoGroup} or {@link HashJoin} pipe. 050 * <p/> 051 * A merge can be achieved by passing two or more Pipe instances to a {@link GroupBy} or {@link Merge} pipe. 052 * 053 * @see Each 054 * @see Every 055 * @see GroupBy 056 * @see Merge 057 * @see CoGroup 058 * @see HashJoin 059 * @see SubAssembly 060 */ 061public class Pipe implements FlowElement, Serializable, Traceable 062 { 063 /** Field serialVersionUID */ 064 private static final long serialVersionUID = 1L; 065 /** Field name */ 066 protected String name; 067 /** Field previous */ 068 protected Pipe previous; 069 /** Field parent */ 070 protected Pipe parent; 071 072 protected ConfigDef configDef; 073 074 protected ConfigDef stepConfigDef; 075 076 protected ConfigDef nodeConfigDef; 077 078 /** Field id */ 079 private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent 080 /** Field trace */ 081 private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override 082 083 public static synchronized String id( Pipe pipe ) 084 { 085 return pipe.id; 086 } 087 088 /** 089 * Convenience method to create an array of Pipe instances. 090 * 091 * @param pipes vararg list of pipes 092 * @return array of pipes 093 */ 094 public static Pipe[] pipes( Pipe... pipes ) 095 { 096 return pipes; 097 } 098 099 /** 100 * Convenience method for finding all Pipe names in an assembly. 101 * 102 * @param tails vararg list of all tails in given assembly 103 * @return array of Pipe names 104 */ 105 public static String[] names( Pipe... tails ) 106 { 107 Set<String> names = new HashSet<String>(); 108 109 collectNames( tails, names ); 110 111 return names.toArray( new String[ names.size() ] ); 112 } 113 114 private static void collectNames( Pipe[] pipes, Set<String> names ) 115 { 116 for( Pipe pipe : pipes ) 117 { 118 if( pipe instanceof SubAssembly ) 119 names.addAll( asList( ( (SubAssembly) pipe ).getTailNames() ) ); 120 else 121 names.add( pipe.getName() ); 122 123 collectNames( SubAssembly.unwind( pipe.getPrevious() ), names ); 124 } 125 } 126 127 public static Pipe[] named( String name, Pipe... tails ) 128 { 129 Set<Pipe> pipes = new HashSet<Pipe>(); 130 131 collectPipes( name, tails, pipes ); 132 133 return pipes.toArray( new Pipe[ pipes.size() ] ); 134 } 135 136 private static void collectPipes( String name, Pipe[] tails, Set<Pipe> pipes ) 137 { 138 for( Pipe tail : tails ) 139 { 140 if( !( tail instanceof SubAssembly ) && tail.getName().equals( name ) ) 141 pipes.add( tail ); 142 143 collectPipes( name, SubAssembly.unwind( tail.getPrevious() ), pipes ); 144 } 145 } 146 147 static Pipe[] resolvePreviousAll( Pipe... pipes ) 148 { 149 Pipe[] resolved = new Pipe[ pipes.length ]; 150 151 for( int i = 0; i < pipes.length; i++ ) 152 resolved[ i ] = resolvePrevious( pipes[ i ] ); 153 154 return resolved; 155 } 156 157 static Pipe resolvePrevious( Pipe pipe ) 158 { 159 if( pipe instanceof Splice || pipe instanceof Operator ) 160 return pipe; 161 162 Pipe[] pipes = pipe.getPrevious(); 163 164 if( pipes.length > 1 ) 165 throw new IllegalStateException( "cannot resolve SubAssemblies with multiple tails at this time" ); 166 167 for( Pipe previous : pipes ) 168 { 169 if( previous instanceof Splice || previous instanceof Operator ) 170 return previous; 171 172 return resolvePrevious( previous ); 173 } 174 175 return pipe; 176 } 177 178 protected Pipe() 179 { 180 } 181 182 @ConstructorProperties({"previous"}) 183 protected Pipe( Pipe previous ) 184 { 185 this.previous = previous; 186 187 verifyPipe(); 188 } 189 190 /** 191 * Constructor Pipe creates a new Pipe instance with the given name. This is useful as the 'start' or head 192 * of a pipe assembly. 193 * 194 * @param name name for this branch of Pipes 195 */ 196 @ConstructorProperties({"name"}) 197 public Pipe( String name ) 198 { 199 this.name = name; 200 } 201 202 /** 203 * Constructor Pipe creates a new Pipe instance with the given name and previous Pipe instance. This is useful for 204 * naming a branch in a pipe assembly. Or renaming the branch mid-way down. 205 * 206 * @param name name for this branch of Pipes 207 * @param previous previous Pipe to receive input Tuples from 208 */ 209 @ConstructorProperties({"name", "previous"}) 210 public Pipe( String name, Pipe previous ) 211 { 212 this.name = name; 213 this.previous = previous; 214 215 verifyPipe(); 216 } 217 218 private void verifyPipe() 219 { 220 if( !( previous instanceof SubAssembly ) ) 221 return; 222 223 String[] strings = ( (SubAssembly) previous ).getTailNames(); 224 if( strings.length != 1 ) 225 throw new IllegalArgumentException( "pipe assembly must not return more than one tail pipe instance, found " + Util.join( strings, ", " ) ); 226 } 227 228 /** 229 * Get the name of this pipe. Guaranteed non-null. 230 * 231 * @return String the name of this pipe 232 */ 233 public String getName() 234 { 235 if( name != null ) 236 return name; 237 238 if( previous != null ) 239 { 240 name = previous.getName(); 241 242 return name; 243 } 244 245 return "ANONYMOUS"; 246 } 247 248 /** 249 * Get all the upstream pipes this pipe is connected to. This method will return the Pipe instances 250 * passed on the constructors as inputs to this Pipe instance. 251 * 252 * @return all the upstream pipes this pipe is connected to. 253 */ 254 public Pipe[] getPrevious() 255 { 256 if( previous == null ) 257 return new Pipe[ 0 ]; 258 259 return new Pipe[]{previous}; 260 } 261 262 protected void setParent( Pipe parent ) 263 { 264 this.parent = parent; 265 } 266 267 /** 268 * Returns the enclosing parent Pipe instance, if any. A parent is typically a {@link SubAssembly} that wraps 269 * this instance. 270 * 271 * @return of type Pipe 272 */ 273 public Pipe getParent() 274 { 275 return parent; 276 } 277 278 /** 279 * Returns a {@link ConfigDef} instance that allows for local properties to be set and made available via 280 * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked. 281 * <p/> 282 * Any properties set on the configDef will not show up in any {@link cascading.flow.Flow} or 283 * {@link cascading.flow.FlowStep} process level configuration, but will override any of those values as seen by the 284 * current Pipe instance. 285 * 286 * @return an instance of ConfigDef 287 */ 288 @Override 289 public ConfigDef getConfigDef() 290 { 291 if( configDef == null ) 292 configDef = new ConfigDef(); 293 294 return configDef; 295 } 296 297 /** 298 * Returns {@code true} if there are properties in the configDef instance. 299 * 300 * @return true if there are configDef properties 301 */ 302 @Override 303 public boolean hasConfigDef() 304 { 305 return configDef != null && !configDef.isEmpty(); 306 } 307 308 /** 309 * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via 310 * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked. 311 * <p/> 312 * Any properties set on the nodeConfigDef will not show up in any Flow configuration, but will show up in 313 * the current process {@link cascading.flow.FlowNode} (in Apache Tez the Vertex configuration). Any value set in the 314 * nodeConfigDef will be overridden by the pipe local {@code #getConfigDef} instance. 315 * </p> 316 * Use this method to tweak properties in the process node this pipe instance is planned into. In the case of the 317 * Apache Tez platform, when set on a {@link GroupBy} instance, the number of gather partitions can be modified. 318 * <p/> 319 * In the case of any Pipe that spans FlowNode boundaries, like GroupBy and CoGroup may on some platforms, 320 * any ConfigDef properties will be applied to the downstream FlowNode. That is, if a GroupBy is the source 321 * to a node, any node ConfigDef properties will be applied. If the GroupBy encountered when applying properties 322 * is on the sink side of a node, the properties will be ignored. 323 * 324 * @return an instance of ConfigDef 325 */ 326 @Override 327 public ConfigDef getNodeConfigDef() 328 { 329 if( nodeConfigDef == null ) 330 nodeConfigDef = new ConfigDef(); 331 332 return nodeConfigDef; 333 } 334 335 /** 336 * Returns {@code true} if there are properties in the nodeConfigDef instance. 337 * 338 * @return true if there are nodeConfigDef properties 339 */ 340 @Override 341 public boolean hasNodeConfigDef() 342 { 343 return nodeConfigDef != null && !nodeConfigDef.isEmpty(); 344 } 345 346 /** 347 * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via 348 * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked. 349 * <p/> 350 * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in 351 * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the 352 * stepConfigDef will be overridden by the pipe local {@code #getConfigDef} instance. 353 * </p> 354 * Use this method to tweak properties in the process step this pipe instance is planned into. In the case of the 355 * Hadoop platform, when set on a {@link GroupBy} instance, the number of reducers can be modified. 356 * 357 * @return an instance of ConfigDef 358 */ 359 @Override 360 public ConfigDef getStepConfigDef() 361 { 362 if( stepConfigDef == null ) 363 stepConfigDef = new ConfigDef(); 364 365 return stepConfigDef; 366 } 367 368 /** 369 * Returns {@code true} if there are properties in the stepConfigDef instance. 370 * 371 * @return true if there are stepConfigDef properties 372 */ 373 @Override 374 public boolean hasStepConfigDef() 375 { 376 return stepConfigDef != null && !stepConfigDef.isEmpty(); 377 } 378 379 /** 380 * Method getHeads returns the first Pipe instances in this pipe assembly. 381 * 382 * @return the first (type Pipe[]) of this Pipe object. 383 */ 384 public Pipe[] getHeads() 385 { 386 Pipe[] pipes = getPrevious(); 387 388 if( pipes.length == 0 ) 389 return new Pipe[]{this}; 390 391 if( pipes.length == 1 ) 392 return pipes[ 0 ].getHeads(); 393 394 Set<Pipe> heads = new HashSet<Pipe>(); 395 396 for( Pipe pipe : pipes ) 397 Collections.addAll( heads, pipe.getHeads() ); 398 399 return heads.toArray( new Pipe[ heads.size() ] ); 400 } 401 402 @Override 403 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 404 { 405 return incomingScopes.iterator().next(); 406 } 407 408 @Override 409 public Fields resolveIncomingOperationArgumentFields( Scope incomingScope ) 410 { 411 throw new IllegalStateException( "resolveIncomingOperationFields should never be called" ); 412 } 413 414 @Override 415 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 416 { 417 throw new IllegalStateException( "resolveIncomingOperationPassThroughFields should never be called" ); 418 } 419 420 @Override 421 public String getTrace() 422 { 423 return trace; 424 } 425 426 @Override 427 public String toString() 428 { 429 return getClass().getSimpleName() + "(" + getName() + ")"; 430 } 431 432 Scope getFirst( Set<Scope> incomingScopes ) 433 { 434 return incomingScopes.iterator().next(); 435 } 436 437 @Override 438 public boolean isEquivalentTo( FlowElement element ) 439 { 440 if( element == null ) 441 return false; 442 443 if( this == element ) 444 return true; 445 446 return getClass() == element.getClass(); 447 } 448 449 @SuppressWarnings({"EqualsWhichDoesntCheckParameterClass"}) 450 @Override 451 public boolean equals( Object object ) 452 { 453 // we cannot test equality by names for this class, prevents detection of dupe names in heads or tails 454 return this == object; 455 } 456 457 @Override 458 public int hashCode() 459 { 460 return 31 * getName().hashCode() + getClass().hashCode(); 461 } 462 463 /** 464 * Method print is used internally. 465 * 466 * @param scope of type Scope 467 * @return String 468 */ 469 public String print( Scope scope ) 470 { 471 StringBuffer buffer = new StringBuffer(); 472 473 printInternal( buffer, scope ); 474 475 return buffer.toString(); 476 } 477 478 protected void printInternal( StringBuffer buffer, Scope scope ) 479 { 480 buffer.append( getClass().getSimpleName() ).append( "('" ).append( getName() ).append( "')" ); 481 } 482 }