001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.process;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.HashMap;
028import java.util.Map;
029import java.util.Properties;
030
031import cascading.flow.BaseFlow;
032import cascading.flow.FlowException;
033import cascading.flow.FlowProcess;
034import cascading.flow.planner.PlatformInfo;
035import cascading.scheme.Scheme;
036import cascading.scheme.SinkCall;
037import cascading.scheme.SourceCall;
038import cascading.stats.process.ProcessFlowStats;
039import cascading.tap.Tap;
040import cascading.tuple.TupleEntryCollector;
041import cascading.tuple.TupleEntryIterator;
042import cascading.util.Version;
043import riffle.process.scheduler.ProcessException;
044import riffle.process.scheduler.ProcessWrapper;
045
046/**
047 * Class ProcessFlow is a {@link cascading.flow.Flow} subclass that supports custom Riffle jobs.
048 * <p/>
049 * Use this class to allow custom Riffle jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If
050 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled
051 * according to their dependencies (topologically).
052 * <p/>
053 * <p/>
054 * Currently {@link cascading.flow.FlowListener}s are supported but the
055 * {@link cascading.flow.FlowListener#onThrowable(cascading.flow.Flow, Throwable)} event is not.
056 */
057public class ProcessFlow<Process, Config> extends BaseFlow<Config>
058  {
059  /** Field process */
060  private final Process process;
061  /** Field processWrapper */
062  private final ProcessWrapper processWrapper;
063  /** Configuration object */
064  private Config config;
065
066  private boolean isStarted = false; // only used for event handling
067
068  /** flow related properties */
069  private Map<Object, Object> properties;
070
071  /**
072   * Constructor ProcessFlow creates a new ProcessFlow instance.
073   *
074   * @param name    of type String
075   * @param process of type JobConf
076   */
077  @ConstructorProperties({"name", "process"})
078  public ProcessFlow( String name, Process process )
079    {
080    this( new Properties(), name, process );
081    }
082
083  /**
084   * Constructor ProcessFlow creates a new ProcessFlow instance.
085   *
086   * @param properties of type Map<Object, Object>
087   * @param name       of type String
088   * @param process    of type P
089   */
090  @ConstructorProperties({"properties", "name", "process"})
091  public ProcessFlow( Map<Object, Object> properties, String name, Process process )
092    {
093    this( properties, name, process, null );
094    }
095
096  /**
097   * Constructor ProcessFlow creates a new ProcessFlow instance.
098   *
099   * @param properties     of type Map<Object, Object>
100   * @param name           of type String
101   * @param process        of type P
102   * @param flowDescriptor pf type LinkedHashMap<String, String>
103   */
104  @ConstructorProperties({"properties", "name", "process", "flowDescriptor"})
105  public ProcessFlow( Map<Object, Object> properties, String name, Process process, Map<String, String> flowDescriptor )
106    {
107    super( new PlatformInfo( "process", "Concurrent, Inc.", Version.getRelease() ), properties, null, name, flowDescriptor );
108    this.process = process;
109    this.processWrapper = new ProcessWrapper( this.process );
110    this.properties = properties;
111
112    setName( name );
113    setTapFromProcess();
114    initProcessConfig();
115    initStats();
116    }
117
118  private void initStats()
119    {
120    try
121      {
122      if( processWrapper.hasCounters() )
123        this.flowStats = new ProcessFlowStats( this, getFlowSession().getCascadingServices().createClientState( getID() ), processWrapper );
124      }
125    catch( ProcessException exception )
126      {
127      throw new FlowException( exception );
128      }
129    }
130
131  /**
132   * Method setTapFromProcess build {@link Tap} instance for the give process incoming and outgoing dependencies.
133   * <p/>
134   * This method may be called repeatedly to re-configure the source and sink taps.
135   */
136  public void setTapFromProcess()
137    {
138    setSources( createSources( this.processWrapper ) );
139    setSinks( createSinks( this.processWrapper ) );
140    setTraps( createTraps( this.processWrapper ) );
141    }
142
143  /**
144   * Method getProcess returns the process of this ProcessFlow object.
145   *
146   * @return the process (type P) of this ProcessFlow object.
147   */
148  public Process getProcess()
149    {
150    return process;
151    }
152
153  @Override
154  protected void initConfig( Map<Object, Object> properties, Config parentConfig )
155    {
156
157    }
158
159  private void initProcessConfig()
160    {
161    try
162      {
163      config = (Config) processWrapper.getConfiguration();
164      }
165    catch( ProcessException exception )
166      {
167      if( exception.getCause() instanceof RuntimeException )
168        throw (RuntimeException) exception.getCause();
169
170      throw new FlowException( "could not get configuration from process", exception.getCause() );
171      }
172    }
173
174  @Override
175  protected void setConfigProperty( Config properties, Object key, Object value )
176    {
177
178    }
179
180  @Override
181  protected Config newConfig( Config defaultConfig )
182    {
183    return null;
184    }
185
186  @Override
187  public Config getConfig()
188    {
189    return config;
190    }
191
192  @Override
193  public Config getConfigCopy()
194    {
195    return null;
196    }
197
198  @Override
199  public Map<Object, Object> getConfigAsProperties()
200    {
201    Map<Object, Object> props = new HashMap<>();
202
203    if( properties != null )
204      props.putAll( this.properties );
205
206    return props;
207    }
208
209  @Override
210  public String getProperty( String key )
211    {
212    return null;
213    }
214
215  @Override
216  public FlowProcess<Config> getFlowProcess()
217    {
218    return FlowProcess.NULL;
219    }
220
221  @Override
222  public boolean stepsAreLocal()
223    {
224    return true;
225    }
226
227  @Override
228  public void prepare()
229    {
230    try
231      {
232      processWrapper.prepare();
233      }
234    catch( Throwable throwable )
235      {
236      if( throwable.getCause() instanceof RuntimeException )
237        throw (RuntimeException) throwable.getCause();
238
239      throw new FlowException( "could not call prepare on process", throwable.getCause() );
240      }
241    }
242
243  @Override
244  public void start()
245    {
246    try
247      {
248      flowStats.markPending();
249      fireOnStarting();
250      processWrapper.start();
251      flowStats.markStarted();
252      isStarted = true;
253      }
254    catch( Throwable throwable )
255      {
256      fireOnThrowable( throwable );
257
258      if( throwable.getCause() instanceof RuntimeException )
259        throw (RuntimeException) throwable.getCause();
260
261      throw new FlowException( "could not call start on process", throwable.getCause() );
262      }
263    }
264
265  @Override
266  protected void internalStart()
267    {
268    try
269      {
270      deleteSinksIfReplace();
271      deleteTrapsIfReplace();
272      deleteCheckpointsIfReplace();
273      }
274    catch( IOException exception )
275      {
276      throw new FlowException( "unable to delete sinks", exception );
277      }
278    }
279
280  @Override
281  public void stop()
282    {
283    try
284      {
285      fireOnStopping();
286      processWrapper.stop();
287
288      if( !flowStats.isFinished() )
289        flowStats.markStopped();
290      }
291    catch( Throwable throwable )
292      {
293      flowStats.markFailed( throwable );
294      fireOnThrowable( throwable );
295
296      if( throwable.getCause() instanceof RuntimeException )
297        throw (RuntimeException) throwable.getCause();
298
299      throw new FlowException( "could not call stop on process", throwable.getCause() );
300      }
301    }
302
303  @Override
304  protected void internalClean( boolean stop )
305    {
306
307    }
308
309  @Override
310  public void complete()
311    {
312    try
313      {
314      if( !isStarted )
315        {
316        flowStats.markPending();
317        fireOnStarting();
318        isStarted = true;
319        flowStats.markStarted();
320        }
321
322      flowStats.markRunning();
323      processWrapper.complete();
324      fireOnCompleted();
325      flowStats.markSuccessful();
326      }
327    catch( Throwable throwable )
328      {
329      flowStats.markFailed( throwable );
330      fireOnThrowable( throwable );
331
332      if( throwable.getCause() instanceof RuntimeException )
333        throw (RuntimeException) throwable.getCause();
334
335      throw new FlowException( "could not call complete on process", throwable.getCause() );
336      }
337    }
338
339  @Override
340  public void cleanup()
341    {
342    try
343      {
344      processWrapper.cleanup();
345      }
346    catch( Throwable throwable )
347      {
348      if( throwable.getCause() instanceof RuntimeException )
349        throw (RuntimeException) throwable.getCause();
350
351      throw new FlowException( "could not call cleanup on process", throwable.getCause() );
352      }
353    }
354
355  @Override
356  protected int getMaxNumParallelSteps()
357    {
358    return 1;
359    }
360
361  @Override
362  protected void internalShutdown()
363    {
364
365    }
366
367  private Map<String, Tap> createSources( ProcessWrapper processParent )
368    {
369    try
370      {
371      return makeTapMap( processParent.getDependencyIncoming() );
372      }
373    catch( ProcessException exception )
374      {
375      if( exception.getCause() instanceof RuntimeException )
376        throw (RuntimeException) exception.getCause();
377
378      throw new FlowException( "could not get process incoming dependency", exception.getCause() );
379      }
380    }
381
382  private Map<String, Tap> createSinks( ProcessWrapper processParent )
383    {
384    try
385      {
386      return makeTapMap( processParent.getDependencyOutgoing() );
387      }
388    catch( ProcessException exception )
389      {
390      if( exception.getCause() instanceof RuntimeException )
391        throw (RuntimeException) exception.getCause();
392
393      throw new FlowException( "could not get process outgoing dependency", exception.getCause() );
394      }
395    }
396
397  private Map<String, Tap> makeTapMap( Object resource )
398    {
399    Collection paths = makeCollection( resource );
400
401    Map<String, Tap> taps = new HashMap<String, Tap>();
402
403    for( Object path : paths )
404      {
405      if( path instanceof Tap )
406        taps.put( ( (Tap) path ).getIdentifier(), (Tap) path );
407      else
408        taps.put( path.toString(), new ProcessTap( new NullScheme(), path.toString() ) );
409      }
410
411    return taps;
412    }
413
414  private Collection makeCollection( Object resource )
415    {
416    if( resource instanceof Collection )
417      return (Collection) resource;
418    else if( resource instanceof Object[] )
419      return Arrays.asList( (Object[]) resource );
420    else
421      return Arrays.asList( resource );
422    }
423
424  private Map<String, Tap> createTraps( ProcessWrapper processParent )
425    {
426    return new HashMap<String, Tap>();
427    }
428
429  @Override
430  public String toString()
431    {
432    return getName() + ":" + process;
433    }
434
435  static class NullScheme extends Scheme
436    {
437    public void sourceConfInit( FlowProcess flowProcess, Tap tap, Object conf )
438      {
439      }
440
441    public void sinkConfInit( FlowProcess flowProcess, Tap tap, Object conf )
442      {
443      }
444
445    public boolean source( FlowProcess flowProcess, SourceCall sourceCall ) throws IOException
446      {
447      throw new UnsupportedOperationException( "sourcing is not supported in the scheme" );
448      }
449
450    @Override
451    public String toString()
452      {
453      return getClass().getSimpleName();
454      }
455
456    public void sink( FlowProcess flowProcess, SinkCall sinkCall ) throws IOException
457      {
458      throw new UnsupportedOperationException( "sinking is not supported in the scheme" );
459      }
460    }
461
462  /**
463   *
464   */
465  static class ProcessTap<Config> extends Tap<Config, Object, Object>
466    {
467    private final String token;
468
469    ProcessTap( NullScheme scheme, String token )
470      {
471      super( scheme );
472      this.token = token;
473      }
474
475    @Override
476    public String getIdentifier()
477      {
478      return token;
479      }
480
481    @Override
482    public String getFullIdentifier( Config conf )
483      {
484      return getIdentifier();
485      }
486
487    @Override
488    public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Object input ) throws IOException
489      {
490      return null;
491      }
492
493    @Override
494    public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Object output ) throws IOException
495      {
496      return null;
497      }
498
499    @Override
500    public boolean createResource( Config conf ) throws IOException
501      {
502      return false;
503      }
504
505    @Override
506    public boolean deleteResource( Config conf ) throws IOException
507      {
508      return false;
509      }
510
511    @Override
512    public boolean resourceExists( Config conf ) throws IOException
513      {
514      return false;
515      }
516
517    @Override
518    public long getModifiedTime( Config conf ) throws IOException
519      {
520      return 0;
521      }
522
523    @Override
524    public String toString()
525      {
526      return token;
527      }
528    }
529  }