001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.process; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.HashMap; 028import java.util.Map; 029import java.util.Properties; 030 031import cascading.flow.BaseFlow; 032import cascading.flow.FlowException; 033import cascading.flow.FlowProcess; 034import cascading.flow.planner.PlatformInfo; 035import cascading.scheme.Scheme; 036import cascading.scheme.SinkCall; 037import cascading.scheme.SourceCall; 038import cascading.stats.process.ProcessFlowStats; 039import cascading.tap.Tap; 040import cascading.tuple.TupleEntryCollector; 041import cascading.tuple.TupleEntryIterator; 042import cascading.util.Version; 043import riffle.process.scheduler.ProcessException; 044import riffle.process.scheduler.ProcessWrapper; 045 046/** 047 * Class ProcessFlow is a {@link cascading.flow.Flow} subclass that supports custom Riffle jobs. 048 * <p/> 049 * Use this class to allow custom Riffle jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If 050 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled 051 * according to their dependencies (topologically). 052 * <p/> 053 * <p/> 054 * Currently {@link cascading.flow.FlowListener}s are supported but the 055 * {@link cascading.flow.FlowListener#onThrowable(cascading.flow.Flow, Throwable)} event is not. 056 */ 057public class ProcessFlow<Process, Config> extends BaseFlow<Config> 058 { 059 /** Field process */ 060 private final Process process; 061 /** Field processWrapper */ 062 private final ProcessWrapper processWrapper; 063 /** Configuration object */ 064 private Config config; 065 066 private boolean isStarted = false; // only used for event handling 067 068 /** flow related properties */ 069 private Map<Object, Object> properties; 070 071 /** 072 * Constructor ProcessFlow creates a new ProcessFlow instance. 073 * 074 * @param name of type String 075 * @param process of type JobConf 076 */ 077 @ConstructorProperties({"name", "process"}) 078 public ProcessFlow( String name, Process process ) 079 { 080 this( new Properties(), name, process ); 081 } 082 083 /** 084 * Constructor ProcessFlow creates a new ProcessFlow instance. 085 * 086 * @param properties of type Map<Object, Object> 087 * @param name of type String 088 * @param process of type P 089 */ 090 @ConstructorProperties({"properties", "name", "process"}) 091 public ProcessFlow( Map<Object, Object> properties, String name, Process process ) 092 { 093 this( properties, name, process, null ); 094 } 095 096 /** 097 * Constructor ProcessFlow creates a new ProcessFlow instance. 098 * 099 * @param properties of type Map<Object, Object> 100 * @param name of type String 101 * @param process of type P 102 * @param flowDescriptor pf type LinkedHashMap<String, String> 103 */ 104 @ConstructorProperties({"properties", "name", "process", "flowDescriptor"}) 105 public ProcessFlow( Map<Object, Object> properties, String name, Process process, Map<String, String> flowDescriptor ) 106 { 107 super( new PlatformInfo( "process", "Concurrent, Inc.", Version.getRelease() ), properties, null, name, flowDescriptor ); 108 this.process = process; 109 this.processWrapper = new ProcessWrapper( this.process ); 110 this.properties = properties; 111 112 setName( name ); 113 setTapFromProcess(); 114 initProcessConfig(); 115 initStats(); 116 } 117 118 private void initStats() 119 { 120 try 121 { 122 if( processWrapper.hasCounters() ) 123 this.flowStats = new ProcessFlowStats( this, getFlowSession().getCascadingServices().createClientState( getID() ), processWrapper ); 124 } 125 catch( ProcessException exception ) 126 { 127 throw new FlowException( exception ); 128 } 129 } 130 131 /** 132 * Method setTapFromProcess build {@link Tap} instance for the give process incoming and outgoing dependencies. 133 * <p/> 134 * This method may be called repeatedly to re-configure the source and sink taps. 135 */ 136 public void setTapFromProcess() 137 { 138 setSources( createSources( this.processWrapper ) ); 139 setSinks( createSinks( this.processWrapper ) ); 140 setTraps( createTraps( this.processWrapper ) ); 141 } 142 143 /** 144 * Method getProcess returns the process of this ProcessFlow object. 145 * 146 * @return the process (type P) of this ProcessFlow object. 147 */ 148 public Process getProcess() 149 { 150 return process; 151 } 152 153 @Override 154 protected void initConfig( Map<Object, Object> properties, Config parentConfig ) 155 { 156 157 } 158 159 private void initProcessConfig() 160 { 161 try 162 { 163 config = (Config) processWrapper.getConfiguration(); 164 } 165 catch( ProcessException exception ) 166 { 167 if( exception.getCause() instanceof RuntimeException ) 168 throw (RuntimeException) exception.getCause(); 169 170 throw new FlowException( "could not get configuration from process", exception.getCause() ); 171 } 172 } 173 174 @Override 175 protected void setConfigProperty( Config properties, Object key, Object value ) 176 { 177 178 } 179 180 @Override 181 protected Config newConfig( Config defaultConfig ) 182 { 183 return null; 184 } 185 186 @Override 187 public Config getConfig() 188 { 189 return config; 190 } 191 192 @Override 193 public Config getConfigCopy() 194 { 195 return null; 196 } 197 198 @Override 199 public Map<Object, Object> getConfigAsProperties() 200 { 201 Map<Object, Object> props = new HashMap<>(); 202 203 if( properties != null ) 204 props.putAll( this.properties ); 205 206 return props; 207 } 208 209 @Override 210 public String getProperty( String key ) 211 { 212 return null; 213 } 214 215 @Override 216 public FlowProcess<Config> getFlowProcess() 217 { 218 return FlowProcess.NULL; 219 } 220 221 @Override 222 public boolean stepsAreLocal() 223 { 224 return true; 225 } 226 227 @Override 228 public void prepare() 229 { 230 try 231 { 232 processWrapper.prepare(); 233 } 234 catch( Throwable throwable ) 235 { 236 if( throwable.getCause() instanceof RuntimeException ) 237 throw (RuntimeException) throwable.getCause(); 238 239 throw new FlowException( "could not call prepare on process", throwable.getCause() ); 240 } 241 } 242 243 @Override 244 public void start() 245 { 246 try 247 { 248 flowStats.markPending(); 249 fireOnStarting(); 250 processWrapper.start(); 251 flowStats.markStarted(); 252 isStarted = true; 253 } 254 catch( Throwable throwable ) 255 { 256 fireOnThrowable( throwable ); 257 258 if( throwable.getCause() instanceof RuntimeException ) 259 throw (RuntimeException) throwable.getCause(); 260 261 throw new FlowException( "could not call start on process", throwable.getCause() ); 262 } 263 } 264 265 @Override 266 protected void internalStart() 267 { 268 try 269 { 270 deleteSinksIfReplace(); 271 deleteTrapsIfReplace(); 272 deleteCheckpointsIfReplace(); 273 } 274 catch( IOException exception ) 275 { 276 throw new FlowException( "unable to delete sinks", exception ); 277 } 278 } 279 280 @Override 281 public void stop() 282 { 283 try 284 { 285 fireOnStopping(); 286 processWrapper.stop(); 287 288 if( !flowStats.isFinished() ) 289 flowStats.markStopped(); 290 } 291 catch( Throwable throwable ) 292 { 293 flowStats.markFailed( throwable ); 294 fireOnThrowable( throwable ); 295 296 if( throwable.getCause() instanceof RuntimeException ) 297 throw (RuntimeException) throwable.getCause(); 298 299 throw new FlowException( "could not call stop on process", throwable.getCause() ); 300 } 301 } 302 303 @Override 304 protected void internalClean( boolean stop ) 305 { 306 307 } 308 309 @Override 310 public void complete() 311 { 312 try 313 { 314 if( !isStarted ) 315 { 316 flowStats.markPending(); 317 fireOnStarting(); 318 isStarted = true; 319 flowStats.markStarted(); 320 } 321 322 flowStats.markRunning(); 323 processWrapper.complete(); 324 fireOnCompleted(); 325 flowStats.markSuccessful(); 326 } 327 catch( Throwable throwable ) 328 { 329 flowStats.markFailed( throwable ); 330 fireOnThrowable( throwable ); 331 332 if( throwable.getCause() instanceof RuntimeException ) 333 throw (RuntimeException) throwable.getCause(); 334 335 throw new FlowException( "could not call complete on process", throwable.getCause() ); 336 } 337 } 338 339 @Override 340 public void cleanup() 341 { 342 try 343 { 344 processWrapper.cleanup(); 345 } 346 catch( Throwable throwable ) 347 { 348 if( throwable.getCause() instanceof RuntimeException ) 349 throw (RuntimeException) throwable.getCause(); 350 351 throw new FlowException( "could not call cleanup on process", throwable.getCause() ); 352 } 353 } 354 355 @Override 356 protected int getMaxNumParallelSteps() 357 { 358 return 1; 359 } 360 361 @Override 362 protected void internalShutdown() 363 { 364 365 } 366 367 private Map<String, Tap> createSources( ProcessWrapper processParent ) 368 { 369 try 370 { 371 return makeTapMap( processParent.getDependencyIncoming() ); 372 } 373 catch( ProcessException exception ) 374 { 375 if( exception.getCause() instanceof RuntimeException ) 376 throw (RuntimeException) exception.getCause(); 377 378 throw new FlowException( "could not get process incoming dependency", exception.getCause() ); 379 } 380 } 381 382 private Map<String, Tap> createSinks( ProcessWrapper processParent ) 383 { 384 try 385 { 386 return makeTapMap( processParent.getDependencyOutgoing() ); 387 } 388 catch( ProcessException exception ) 389 { 390 if( exception.getCause() instanceof RuntimeException ) 391 throw (RuntimeException) exception.getCause(); 392 393 throw new FlowException( "could not get process outgoing dependency", exception.getCause() ); 394 } 395 } 396 397 private Map<String, Tap> makeTapMap( Object resource ) 398 { 399 Collection paths = makeCollection( resource ); 400 401 Map<String, Tap> taps = new HashMap<String, Tap>(); 402 403 for( Object path : paths ) 404 { 405 if( path instanceof Tap ) 406 taps.put( ( (Tap) path ).getIdentifier(), (Tap) path ); 407 else 408 taps.put( path.toString(), new ProcessTap( new NullScheme(), path.toString() ) ); 409 } 410 411 return taps; 412 } 413 414 private Collection makeCollection( Object resource ) 415 { 416 if( resource instanceof Collection ) 417 return (Collection) resource; 418 else if( resource instanceof Object[] ) 419 return Arrays.asList( (Object[]) resource ); 420 else 421 return Arrays.asList( resource ); 422 } 423 424 private Map<String, Tap> createTraps( ProcessWrapper processParent ) 425 { 426 return new HashMap<String, Tap>(); 427 } 428 429 @Override 430 public String toString() 431 { 432 return getName() + ":" + process; 433 } 434 435 static class NullScheme extends Scheme 436 { 437 public void sourceConfInit( FlowProcess flowProcess, Tap tap, Object conf ) 438 { 439 } 440 441 public void sinkConfInit( FlowProcess flowProcess, Tap tap, Object conf ) 442 { 443 } 444 445 public boolean source( FlowProcess flowProcess, SourceCall sourceCall ) throws IOException 446 { 447 throw new UnsupportedOperationException( "sourcing is not supported in the scheme" ); 448 } 449 450 @Override 451 public String toString() 452 { 453 return getClass().getSimpleName(); 454 } 455 456 public void sink( FlowProcess flowProcess, SinkCall sinkCall ) throws IOException 457 { 458 throw new UnsupportedOperationException( "sinking is not supported in the scheme" ); 459 } 460 } 461 462 /** 463 * 464 */ 465 static class ProcessTap<Config> extends Tap<Config, Object, Object> 466 { 467 private final String token; 468 469 ProcessTap( NullScheme scheme, String token ) 470 { 471 super( scheme ); 472 this.token = token; 473 } 474 475 @Override 476 public String getIdentifier() 477 { 478 return token; 479 } 480 481 @Override 482 public String getFullIdentifier( Config conf ) 483 { 484 return getIdentifier(); 485 } 486 487 @Override 488 public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Object input ) throws IOException 489 { 490 return null; 491 } 492 493 @Override 494 public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Object output ) throws IOException 495 { 496 return null; 497 } 498 499 @Override 500 public boolean createResource( Config conf ) throws IOException 501 { 502 return false; 503 } 504 505 @Override 506 public boolean deleteResource( Config conf ) throws IOException 507 { 508 return false; 509 } 510 511 @Override 512 public boolean resourceExists( Config conf ) throws IOException 513 { 514 return false; 515 } 516 517 @Override 518 public long getModifiedTime( Config conf ) throws IOException 519 { 520 return 0; 521 } 522 523 @Override 524 public String toString() 525 { 526 return token; 527 } 528 } 529 }