001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 import java.util.Arrays; 026 import java.util.Collection; 027 import java.util.HashMap; 028 import java.util.Map; 029 import java.util.Properties; 030 031 import cascading.flow.FlowException; 032 import cascading.flow.FlowProcess; 033 import cascading.flow.hadoop.util.HadoopUtil; 034 import cascading.scheme.Scheme; 035 import cascading.scheme.SinkCall; 036 import cascading.scheme.SourceCall; 037 import cascading.stats.hadoop.ProcessFlowStats; 038 import cascading.tap.Tap; 039 import cascading.tuple.TupleEntryCollector; 040 import cascading.tuple.TupleEntryIterator; 041 import riffle.process.scheduler.ProcessException; 042 import riffle.process.scheduler.ProcessWrapper; 043 044 /** 045 * Class ProcessFlow is a {@link cascading.flow.Flow} subclass that supports custom Riffle jobs. 046 * <p/> 047 * Use this class to allow custom Riffle jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If 048 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled 049 * according to their dependencies (topologically). 050 * <p/> 051 * Though this class sub-classes {@link HadoopFlow}, it does not support all the methods available or features. 052 * <p/> 053 * Currently {@link cascading.flow.FlowListener}s are supported but the 054 * {@link cascading.flow.FlowListener#onThrowable(cascading.flow.Flow, Throwable)} event is not. 055 * 056 * @deprecated ProcessFlow will be decoupled from Hadoop and moved to a different package in Cascading 3.0. 057 */ 058 @Deprecated 059 public class ProcessFlow<P> extends HadoopFlow 060 { 061 /** Field process */ 062 private final P process; 063 /** Field processWrapper */ 064 private final ProcessWrapper processWrapper; 065 066 private boolean isStarted = false; // only used for event handling 067 068 /** 069 * Constructor ProcessFlow creates a new ProcessFlow instance. 070 * 071 * @param name of type String 072 * @param process of type JobConf 073 */ 074 @ConstructorProperties({"name", "process"}) 075 public ProcessFlow( String name, P process ) 076 { 077 this( new Properties(), name, process ); 078 } 079 080 /** 081 * Constructor ProcessFlow creates a new ProcessFlow instance. 082 * 083 * @param properties of type Map<Object, Object> 084 * @param name of type String 085 * @param process of type P 086 */ 087 @ConstructorProperties({"properties", "name", "process"}) 088 public ProcessFlow( Map<Object, Object> properties, String name, P process ) 089 { 090 this( properties, name, process, null ); 091 } 092 093 /** 094 * Constructor ProcessFlow creates a new ProcessFlow instance. 095 * 096 * @param properties of type Map<Object, Object> 097 * @param name of type String 098 * @param process of type P 099 * @param flowDescriptor pf type LinkedHashMap<String, String> 100 */ 101 @ConstructorProperties({"properties", "name", "process", "flowDescriptor"}) 102 public ProcessFlow( Map<Object, Object> properties, String name, P process, Map<String, String> flowDescriptor ) 103 { 104 super( HadoopUtil.getPlatformInfo(), properties, null, name, flowDescriptor ); 105 this.process = process; 106 this.processWrapper = new ProcessWrapper( this.process ); 107 setName( name ); 108 setTapFromProcess(); 109 initStats(); 110 } 111 112 private void initStats() 113 { 114 try 115 { 116 if( processWrapper.hasCounters() ) 117 this.flowStats = new ProcessFlowStats( this, getFlowSession().getCascadingServices().createClientState( getID() ), processWrapper ); 118 } 119 catch( ProcessException exception ) 120 { 121 throw new FlowException( exception ); 122 } 123 } 124 125 /** 126 * Method setTapFromProcess build {@link Tap} instance for the give process incoming and outgoing dependencies. 127 * <p/> 128 * This method may be called repeatedly to re-configure the source and sink taps. 129 */ 130 public void setTapFromProcess() 131 { 132 setSources( createSources( this.processWrapper ) ); 133 setSinks( createSinks( this.processWrapper ) ); 134 setTraps( createTraps( this.processWrapper ) ); 135 } 136 137 /** 138 * Method getProcess returns the process of this ProcessFlow object. 139 * 140 * @return the process (type P) of this ProcessFlow object. 141 */ 142 public P getProcess() 143 { 144 return process; 145 } 146 147 @Override 148 public void prepare() 149 { 150 try 151 { 152 processWrapper.prepare(); 153 } 154 catch( Throwable throwable ) 155 { 156 if( throwable.getCause() instanceof RuntimeException ) 157 throw (RuntimeException) throwable.getCause(); 158 159 throw new FlowException( "could not call prepare on process", throwable.getCause() ); 160 } 161 } 162 163 @Override 164 public void start() 165 { 166 try 167 { 168 flowStats.markPending(); 169 fireOnStarting(); 170 processWrapper.start(); 171 flowStats.markStarted(); 172 isStarted = true; 173 } 174 catch( Throwable throwable ) 175 { 176 fireOnThrowable( throwable ); 177 178 if( throwable.getCause() instanceof RuntimeException ) 179 throw (RuntimeException) throwable.getCause(); 180 181 throw new FlowException( "could not call start on process", throwable.getCause() ); 182 } 183 } 184 185 @Override 186 public void stop() 187 { 188 try 189 { 190 fireOnStopping(); 191 processWrapper.stop(); 192 193 if( !flowStats.isFinished() ) 194 flowStats.markStopped(); 195 } 196 catch( Throwable throwable ) 197 { 198 flowStats.markFailed( throwable ); 199 fireOnThrowable( throwable ); 200 201 if( throwable.getCause() instanceof RuntimeException ) 202 throw (RuntimeException) throwable.getCause(); 203 204 throw new FlowException( "could not call stop on process", throwable.getCause() ); 205 } 206 } 207 208 @Override 209 public void complete() 210 { 211 try 212 { 213 if( !isStarted ) 214 { 215 flowStats.markPending(); 216 fireOnStarting(); 217 isStarted = true; 218 flowStats.markStarted(); 219 } 220 221 flowStats.markRunning(); 222 processWrapper.complete(); 223 fireOnCompleted(); 224 flowStats.markSuccessful(); 225 } 226 catch( Throwable throwable ) 227 { 228 flowStats.markFailed( throwable ); 229 fireOnThrowable( throwable ); 230 231 if( throwable.getCause() instanceof RuntimeException ) 232 throw (RuntimeException) throwable.getCause(); 233 234 throw new FlowException( "could not call complete on process", throwable.getCause() ); 235 } 236 } 237 238 @Override 239 public void cleanup() 240 { 241 try 242 { 243 processWrapper.cleanup(); 244 } 245 catch( Throwable throwable ) 246 { 247 if( throwable.getCause() instanceof RuntimeException ) 248 throw (RuntimeException) throwable.getCause(); 249 250 throw new FlowException( "could not call cleanup on process", throwable.getCause() ); 251 } 252 } 253 254 private Map<String, Tap> createSources( ProcessWrapper processParent ) 255 { 256 try 257 { 258 return makeTapMap( processParent.getDependencyIncoming() ); 259 } 260 catch( ProcessException exception ) 261 { 262 if( exception.getCause() instanceof RuntimeException ) 263 throw (RuntimeException) exception.getCause(); 264 265 throw new FlowException( "could not get process incoming dependency", exception.getCause() ); 266 } 267 } 268 269 private Map<String, Tap> createSinks( ProcessWrapper processParent ) 270 { 271 try 272 { 273 return makeTapMap( processParent.getDependencyOutgoing() ); 274 } 275 catch( ProcessException exception ) 276 { 277 if( exception.getCause() instanceof RuntimeException ) 278 throw (RuntimeException) exception.getCause(); 279 280 throw new FlowException( "could not get process outgoing dependency", exception.getCause() ); 281 } 282 } 283 284 private Map<String, Tap> makeTapMap( Object resource ) 285 { 286 Collection paths = makeCollection( resource ); 287 288 Map<String, Tap> taps = new HashMap<String, Tap>(); 289 290 for( Object path : paths ) 291 { 292 if( path instanceof Tap ) 293 taps.put( ( (Tap) path ).getIdentifier(), (Tap) path ); 294 else 295 taps.put( path.toString(), new ProcessTap( new NullScheme(), path.toString() ) ); 296 } 297 return taps; 298 } 299 300 private Collection makeCollection( Object resource ) 301 { 302 if( resource instanceof Collection ) 303 return (Collection) resource; 304 else if( resource instanceof Object[] ) 305 return Arrays.asList( (Object[]) resource ); 306 else 307 return Arrays.asList( resource ); 308 } 309 310 private Map<String, Tap> createTraps( ProcessWrapper processParent ) 311 { 312 return new HashMap<String, Tap>(); 313 } 314 315 @Override 316 public String toString() 317 { 318 return getName() + ":" + process; 319 } 320 321 static class NullScheme extends Scheme 322 { 323 public void sourceConfInit( FlowProcess flowProcess, Tap tap, Object conf ) 324 { 325 } 326 327 public void sinkConfInit( FlowProcess flowProcess, Tap tap, Object conf ) 328 { 329 } 330 331 public boolean source( FlowProcess flowProcess, SourceCall sourceCall ) throws IOException 332 { 333 throw new UnsupportedOperationException( "sourcing is not supported in the scheme" ); 334 } 335 336 @Override 337 public String toString() 338 { 339 return getClass().getSimpleName(); 340 } 341 342 public void sink( FlowProcess flowProcess, SinkCall sinkCall ) throws IOException 343 { 344 throw new UnsupportedOperationException( "sinking is not supported in the scheme" ); 345 } 346 } 347 348 /** 349 * 350 */ 351 static class ProcessTap extends Tap 352 { 353 private final String token; 354 355 ProcessTap( NullScheme scheme, String token ) 356 { 357 super( scheme ); 358 this.token = token; 359 } 360 361 @Override 362 public String getIdentifier() 363 { 364 return token; 365 } 366 367 @Override 368 public TupleEntryIterator openForRead( FlowProcess flowProcess, Object input ) throws IOException 369 { 370 return null; 371 } 372 373 @Override 374 public TupleEntryCollector openForWrite( FlowProcess flowProcess, Object output ) throws IOException 375 { 376 return null; 377 } 378 379 @Override 380 public boolean createResource( Object conf ) throws IOException 381 { 382 return false; 383 } 384 385 @Override 386 public boolean deleteResource( Object conf ) throws IOException 387 { 388 return false; 389 } 390 391 @Override 392 public boolean resourceExists( Object conf ) throws IOException 393 { 394 return false; 395 } 396 397 @Override 398 public long getModifiedTime( Object conf ) throws IOException 399 { 400 return 0; 401 } 402 403 @Override 404 public String toString() 405 { 406 return token; 407 } 408 } 409 }