001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow; 022 023 import java.util.Collection; 024 import java.util.Collections; 025 import java.util.HashMap; 026 import java.util.HashSet; 027 import java.util.Map; 028 import java.util.Properties; 029 import java.util.Set; 030 031 import cascading.CascadingException; 032 import cascading.flow.planner.FlowPlanner; 033 import cascading.flow.planner.PlatformInfo; 034 import cascading.pipe.Pipe; 035 import cascading.property.AppProps; 036 import cascading.property.PropertyUtil; 037 import cascading.scheme.Scheme; 038 import cascading.tap.Tap; 039 import cascading.util.Util; 040 041 import static cascading.flow.FlowDef.flowDef; 042 043 /** 044 * Class FlowConnector is the base class for all platform planners. 045 * <p/> 046 * See the {@link FlowDef} class for a fluent way to define a new Flow. 047 * <p/> 048 * Use the FlowConnector to link source and sink {@link Tap} instances with an assembly of {@link Pipe} instances into 049 * an executable {@link cascading.flow.Flow}. 050 * <p/> 051 * FlowConnector invokes a planner for the target execution environment. 052 * <p/> 053 * For executing Flows in local memory against local files, see {@link cascading.flow.local.LocalFlowConnector}. 054 * <p/> 055 * For Apache Hadoop, see the {@link cascading.flow.hadoop.HadoopFlowConnector}. 056 * Or if you have a pre-existing custom Hadoop job to execute, see {@link cascading.flow.hadoop.MapReduceFlow}, which 057 * doesn't require a planner. 058 * <p/> 059 * Note that all {@code connect} methods take a single {@code tail} or an array of {@code tail} Pipe instances. "tail" 060 * refers to the last connected Pipe instances in a pipe-assembly. Pipe-assemblies are graphs of object with "heads" 061 * and "tails". From a given "tail", all connected heads can be found, but not the reverse. So "tails" must be 062 * supplied by the user. 063 * <p/> 064 * The FlowConnector and the underlying execution framework (Hadoop or local mode) can be configured via a 065 * {@link Map} or {@link Properties} instance given to the constructor. 066 * <p/> 067 * This properties map must be populated before constructing a FlowConnector instance. Many planner specific 068 * properties can be set through the {@link FlowConnectorProps} fluent interface. 069 * <p/> 070 * Some planners have required properties. Hadoop expects {@link AppProps#setApplicationJarPath(java.util.Map, String)} or 071 * {@link AppProps#setApplicationJarClass(java.util.Map, Class)} to be set. 072 * <p/> 073 * Any properties set and passed through the FlowConnector constructor will be global to all Flow instances created through 074 * the that FlowConnector instance. Some properties are on the {@link FlowDef} and would only be applicable to the 075 * resulting Flow instance. 076 * <p/> 077 * These properties are used to influence the current planner and are also passed down to the 078 * execution framework to override any default values. For example when using the Hadoop planner, the number of reducers 079 * or mappers can be set by using platform specific properties. 080 * <p/> 081 * Custom operations (Functions, Filter, etc) may also retrieve these property values at runtime through calls to 082 * {@link cascading.flow.FlowProcess#getProperty(String)} or {@link FlowProcess#getStringProperty(String)}. 083 * <p/> 084 * Most applications will need to call {@link cascading.property.AppProps#setApplicationJarClass(java.util.Map, Class)} or 085 * {@link cascading.property.AppProps#setApplicationJarPath(java.util.Map, String)} so that 086 * the correct application jar file is passed through to all child processes. The Class or path must reference 087 * the custom application jar, not a Cascading library class or jar. The easiest thing to do is give setApplicationJarClass 088 * the Class with your static main function and let Cascading figure out which jar to use. 089 * <p/> 090 * Note that Map<Object,Object> is compatible with the {@link Properties} class, so properties can be loaded at 091 * runtime from a configuration file. 092 * <p/> 093 * By default, all {@link cascading.operation.Assertion}s are planned into the resulting Flow instance. This can be 094 * changed for a given Flow by calling {@link FlowDef#setAssertionLevel(cascading.operation.AssertionLevel)} or globally 095 * via {@link FlowConnectorProps#setAssertionLevel(cascading.operation.AssertionLevel)}. 096 * <p/> 097 * Also by default, all {@link cascading.operation.Debug}s are planned into the resulting Flow instance. This can be 098 * changed for a given flow by calling {@link FlowDef#setDebugLevel(cascading.operation.DebugLevel)} or globally via 099 * {@link FlowConnectorProps#setDebugLevel(cascading.operation.DebugLevel)}. 100 * 101 * @see cascading.flow.local.LocalFlowConnector 102 * @see cascading.flow.hadoop.HadoopFlowConnector 103 */ 104 public abstract class FlowConnector 105 { 106 /** Field properties */ 107 protected Map<Object, Object> properties; // may be a Map or Properties instance. see PropertyUtil 108 109 /** 110 * Method getIntermediateSchemeClass is used for debugging. 111 * 112 * @param properties of type Map<Object, Object> 113 * @return Class 114 */ 115 public Class getIntermediateSchemeClass( Map<Object, Object> properties ) 116 { 117 // supporting stuffed classes to overcome classloading issue 118 Object type = PropertyUtil.getProperty( properties, FlowConnectorProps.INTERMEDIATE_SCHEME_CLASS, null ); 119 120 if( type == null ) 121 return getDefaultIntermediateSchemeClass(); 122 123 if( type instanceof Class ) 124 return (Class) type; 125 126 try 127 { 128 return FlowConnector.class.getClassLoader().loadClass( type.toString() ); 129 } 130 catch( ClassNotFoundException exception ) 131 { 132 throw new CascadingException( "unable to load class: " + type.toString(), exception ); 133 } 134 } 135 136 /** 137 * This has moved to {@link cascading.property.AppProps#setApplicationJarClass(java.util.Map, Class)}. 138 * 139 * @param properties 140 * @param type 141 */ 142 @Deprecated 143 public static void setApplicationJarClass( Map<Object, Object> properties, Class type ) 144 { 145 AppProps.setApplicationJarClass( properties, type ); 146 } 147 148 /** 149 * This has moved to {@link cascading.property.AppProps#setApplicationJarPath(java.util.Map, String)}. 150 * 151 * @param properties 152 * @param path 153 */ 154 @Deprecated 155 public static void setApplicationJarPath( Map<Object, Object> properties, String path ) 156 { 157 AppProps.setApplicationJarPath( properties, path ); 158 } 159 160 protected abstract Class<? extends Scheme> getDefaultIntermediateSchemeClass(); 161 162 protected FlowConnector() 163 { 164 this.properties = new HashMap<Object, Object>(); 165 } 166 167 protected FlowConnector( Map<Object, Object> properties ) 168 { 169 if( properties == null ) 170 this.properties = new HashMap<Object, Object>(); 171 else if( properties instanceof Properties ) 172 this.properties = new Properties( (Properties) properties ); 173 else 174 this.properties = new HashMap<Object, Object>( properties ); 175 } 176 177 /** 178 * Method getProperties returns the properties of this FlowConnector object. The returned Map instance 179 * is immutable to prevent changes to the underlying property values in this FlowConnector instance. 180 * <p/> 181 * If a {@link Properties} instance was passed to the constructor, the returned object will be a flattened 182 * {@link Map} instance. 183 * 184 * @return the properties (type Map<Object, Object>) of this FlowConnector object. 185 */ 186 public Map<Object, Object> getProperties() 187 { 188 // Sub-classes of FlowConnector should rely on PropertyUtil to manage access to properties objects internally. 189 return Collections.unmodifiableMap( PropertyUtil.asFlatMap( properties ) ); 190 } 191 192 /** 193 * Method connect links the given source and sink Taps to the given pipe assembly. 194 * 195 * @param source source Tap to bind to the head of the given tail Pipe 196 * @param sink sink Tap to bind to the given tail Pipe 197 * @param tail tail end of a pipe assembly 198 * @return Flow 199 */ 200 public Flow connect( Tap source, Tap sink, Pipe tail ) 201 { 202 return connect( null, source, sink, tail ); 203 } 204 205 /** 206 * Method connect links the given source and sink Taps to the given pipe assembly. 207 * 208 * @param name name to give the resulting Flow 209 * @param source source Tap to bind to the head of the given tail Pipe 210 * @param sink sink Tap to bind to the given tail Pipe 211 * @param tail tail end of a pipe assembly 212 * @return Flow 213 */ 214 public Flow connect( String name, Tap source, Tap sink, Pipe tail ) 215 { 216 Map<String, Tap> sources = new HashMap<String, Tap>(); 217 218 sources.put( tail.getHeads()[ 0 ].getName(), source ); 219 220 return connect( name, sources, sink, tail ); 221 } 222 223 /** 224 * Method connect links the given source, sink, and trap Taps to the given pipe assembly. The given trap will 225 * be linked to the assembly head along with the source. 226 * 227 * @param name name to give the resulting Flow 228 * @param source source Tap to bind to the head of the given tail Pipe 229 * @param sink sink Tap to bind to the given tail Pipe 230 * @param trap trap Tap to sink all failed Tuples into 231 * @param tail tail end of a pipe assembly 232 * @return Flow 233 */ 234 public Flow connect( String name, Tap source, Tap sink, Tap trap, Pipe tail ) 235 { 236 Map<String, Tap> sources = new HashMap<String, Tap>(); 237 238 sources.put( tail.getHeads()[ 0 ].getName(), source ); 239 240 Map<String, Tap> traps = new HashMap<String, Tap>(); 241 242 traps.put( tail.getHeads()[ 0 ].getName(), trap ); 243 244 return connect( name, sources, sink, traps, tail ); 245 } 246 247 /** 248 * Method connect links the named source Taps and sink Tap to the given pipe assembly. 249 * 250 * @param sources all head names and source Taps to bind to the heads of the given tail Pipe 251 * @param sink sink Tap to bind to the given tail Pipe 252 * @param tail tail end of a pipe assembly 253 * @return Flow 254 */ 255 public Flow connect( Map<String, Tap> sources, Tap sink, Pipe tail ) 256 { 257 return connect( null, sources, sink, tail ); 258 } 259 260 /** 261 * Method connect links the named source Taps and sink Tap to the given pipe assembly. 262 * 263 * @param name name to give the resulting Flow 264 * @param sources all head names and source Taps to bind to the heads of the given tail Pipe 265 * @param sink sink Tap to bind to the given tail Pipe 266 * @param tail tail end of a pipe assembly 267 * @return Flow 268 */ 269 public Flow connect( String name, Map<String, Tap> sources, Tap sink, Pipe tail ) 270 { 271 Map<String, Tap> sinks = new HashMap<String, Tap>(); 272 273 sinks.put( tail.getName(), sink ); 274 275 return connect( name, sources, sinks, tail ); 276 } 277 278 /** 279 * Method connect links the named source and trap Taps and sink Tap to the given pipe assembly. 280 * 281 * @param name name to give the resulting Flow 282 * @param sources all head names and source Taps to bind to the heads of the given tail Pipe 283 * @param sink sink Tap to bind to the given tail Pipe 284 * @param traps all pipe names and trap Taps to sink all failed Tuples into 285 * @param tail tail end of a pipe assembly 286 * @return Flow 287 */ 288 public Flow connect( String name, Map<String, Tap> sources, Tap sink, Map<String, Tap> traps, Pipe tail ) 289 { 290 Map<String, Tap> sinks = new HashMap<String, Tap>(); 291 292 sinks.put( tail.getName(), sink ); 293 294 return connect( name, sources, sinks, traps, tail ); 295 } 296 297 /** 298 * Method connect links the named trap Taps, source and sink Tap to the given pipe assembly. 299 * 300 * @param name name to give the resulting Flow 301 * @param source source Tap to bind to the head of the given tail Pipe 302 * @param sink sink Tap to bind to the given tail Pipe 303 * @param traps all pipe names and trap Taps to sink all failed Tuples into 304 * @param tail tail end of a pipe assembly 305 * @return Flow 306 */ 307 public Flow connect( String name, Tap source, Tap sink, Map<String, Tap> traps, Pipe tail ) 308 { 309 Map<String, Tap> sources = new HashMap<String, Tap>(); 310 311 sources.put( tail.getHeads()[ 0 ].getName(), source ); 312 313 Map<String, Tap> sinks = new HashMap<String, Tap>(); 314 315 sinks.put( tail.getName(), sink ); 316 317 return connect( name, sources, sinks, traps, tail ); 318 } 319 320 /** 321 * Method connect links the named source Taps and sink Tap to the given pipe assembly. 322 * <p/> 323 * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe. 324 * So the head pipe does not need to be included as an argument. 325 * 326 * @param source source Tap to bind to the head of the given tail Pipes 327 * @param sinks all tail names and sink Taps to bind to the given tail Pipes 328 * @param tails all tail ends of a pipe assembly 329 * @return Flow 330 */ 331 public Flow connect( Tap source, Map<String, Tap> sinks, Collection<Pipe> tails ) 332 { 333 return connect( null, source, sinks, tails.toArray( new Pipe[ tails.size() ] ) ); 334 } 335 336 /** 337 * Method connect links the named source Taps and sink Tap to the given pipe assembly. 338 * <p/> 339 * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe. 340 * So the head pipe does not need to be included as an argument. 341 * 342 * @param name name to give the resulting Flow 343 * @param source source Tap to bind to the head of the given tail Pipes 344 * @param sinks all tail names and sink Taps to bind to the given tail Pipes 345 * @param tails all tail ends of a pipe assembly 346 * @return Flow 347 */ 348 public Flow connect( String name, Tap source, Map<String, Tap> sinks, Collection<Pipe> tails ) 349 { 350 return connect( name, source, sinks, tails.toArray( new Pipe[ tails.size() ] ) ); 351 } 352 353 /** 354 * Method connect links the named source Taps and sink Tap to the given pipe assembly. 355 * <p/> 356 * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe. 357 * So the head pipe does not need to be included as an argument. 358 * 359 * @param source source Tap to bind to the head of the given tail Pipes 360 * @param sinks all tail names and sink Taps to bind to the given tail Pipes 361 * @param tails all tail ends of a pipe assembly 362 * @return Flow 363 */ 364 public Flow connect( Tap source, Map<String, Tap> sinks, Pipe... tails ) 365 { 366 return connect( null, source, sinks, tails ); 367 } 368 369 /** 370 * Method connect links the named source Taps and sink Tap to the given pipe assembly. 371 * <p/> 372 * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe. 373 * So the head pipe does not need to be included as an argument. 374 * 375 * @param name name to give the resulting Flow 376 * @param source source Tap to bind to the head of the given tail Pipes 377 * @param sinks all tail names and sink Taps to bind to the given tail Pipes 378 * @param tails all tail ends of a pipe assembly 379 * @return Flow 380 */ 381 public Flow connect( String name, Tap source, Map<String, Tap> sinks, Pipe... tails ) 382 { 383 Set<Pipe> heads = new HashSet<Pipe>(); 384 385 for( Pipe pipe : tails ) 386 Collections.addAll( heads, pipe.getHeads() ); 387 388 if( heads.isEmpty() ) 389 throw new IllegalArgumentException( "no pipe instance found" ); 390 391 if( heads.size() != 1 ) 392 throw new IllegalArgumentException( "there may be only 1 head pipe instance, found " + heads.size() ); 393 394 Map<String, Tap> sources = new HashMap<String, Tap>(); 395 396 for( Pipe pipe : heads ) 397 sources.put( pipe.getName(), source ); 398 399 return connect( name, sources, sinks, tails ); 400 } 401 402 /** 403 * Method connect links the named sources and sinks to the given pipe assembly. 404 * 405 * @param sources all head names and source Taps to bind to the heads of the given tail Pipes 406 * @param sinks all tail names and sink Taps to bind to the given tail Pipes 407 * @param tails all tail ends of a pipe assembly 408 * @return Flow 409 */ 410 public Flow connect( Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails ) 411 { 412 return connect( null, sources, sinks, tails ); 413 } 414 415 /** 416 * Method connect links the named sources and sinks to the given pipe assembly. 417 * 418 * @param name name to give the resulting Flow 419 * @param sources all head names and source Taps to bind to the heads of the given tail Pipes 420 * @param sinks all tail names and sink Taps to bind to the given tail Pipes 421 * @param tails all tail ends of a pipe assembly 422 * @return Flow 423 */ 424 public Flow connect( String name, Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails ) 425 { 426 return connect( name, sources, sinks, new HashMap<String, Tap>(), tails ); 427 } 428 429 /** 430 * Method connect links the named sources, sinks and traps to the given pipe assembly. 431 * 432 * @param name name to give the resulting Flow 433 * @param sources all head names and source Taps to bind to the heads of the given tail Pipes 434 * @param sinks all tail names and sink Taps to bind to the given tail Pipes 435 * @param traps all pipe names and trap Taps to sink all failed Tuples into 436 * @param tails all tail ends of a pipe assembly 437 * @return Flow 438 */ 439 public Flow connect( String name, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Pipe... tails ) 440 { 441 name = name == null ? makeName( tails ) : name; 442 443 FlowDef flowDef = flowDef() 444 .setName( name ) 445 .addSources( sources ) 446 .addSinks( sinks ) 447 .addTraps( traps ) 448 .addTails( tails ); 449 450 return connect( flowDef ); 451 } 452 453 public Flow connect( FlowDef flowDef ) 454 { 455 FlowPlanner flowPlanner = createFlowPlanner(); 456 457 flowPlanner.initialize( this, properties ); 458 459 return flowPlanner.buildFlow( flowDef ); 460 } 461 462 protected abstract FlowPlanner createFlowPlanner(); 463 464 /** 465 * Method getPlatformInfo returns an instance of {@link PlatformInfo} for the underlying platform. 466 * 467 * @return of type PlatformInfo 468 */ 469 public PlatformInfo getPlatformInfo() 470 { 471 return createFlowPlanner().getPlatformInfo(); 472 } 473 474 ///////// 475 // UTIL 476 ///////// 477 478 private String makeName( Pipe[] pipes ) 479 { 480 String[] names = new String[ pipes.length ]; 481 482 for( int i = 0; i < pipes.length; i++ ) 483 names[ i ] = pipes[ i ].getName(); 484 485 String name = Util.join( names, "+" ); 486 487 if( name.length() > 32 ) 488 name = name.substring( 0, 32 ); 489 490 return name; 491 } 492 }