001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.process;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.HashMap;
028import java.util.Map;
029import java.util.Properties;
030
031import cascading.flow.BaseFlow;
032import cascading.flow.FlowException;
033import cascading.flow.FlowProcess;
034import cascading.flow.planner.PlatformInfo;
035import cascading.scheme.Scheme;
036import cascading.scheme.SinkCall;
037import cascading.scheme.SourceCall;
038import cascading.stats.process.ProcessFlowStats;
039import cascading.tap.Tap;
040import cascading.tuple.TupleEntryCollector;
041import cascading.tuple.TupleEntryIterator;
042import cascading.util.Version;
043import riffle.process.scheduler.ProcessException;
044import riffle.process.scheduler.ProcessWrapper;
045
046/**
047 * Class ProcessFlow is a {@link cascading.flow.Flow} subclass that supports custom Riffle jobs.
048 * <p/>
049 * Use this class to allow custom Riffle jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If
050 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled
051 * according to their dependencies (topologically).
052 * <p/>
053 * <p/>
054 * Currently {@link cascading.flow.FlowListener}s are supported but the
055 * {@link cascading.flow.FlowListener#onThrowable(cascading.flow.Flow, Throwable)} event is not.
056 */
057public class ProcessFlow<Process, Config> extends BaseFlow<Config>
058  {
059  /** Field process */
060  private final Process process;
061  /** Field processWrapper */
062  private final ProcessWrapper processWrapper;
063  /** Configuration object */
064  private Config config;
065
066  private boolean isStarted = false; // only used for event handling
067
068  /** flow related properties */
069  private Map<Object, Object> properties;
070
071  /**
072   * Constructor ProcessFlow creates a new ProcessFlow instance.
073   *
074   * @param name    of type String
075   * @param process of type JobConf
076   */
077  @ConstructorProperties({"name", "process"})
078  public ProcessFlow( String name, Process process )
079    {
080    this( new Properties(), name, process );
081    }
082
083  /**
084   * Constructor ProcessFlow creates a new ProcessFlow instance.
085   *
086   * @param properties of type Map<Object, Object>
087   * @param name       of type String
088   * @param process    of type P
089   */
090  @ConstructorProperties({"properties", "name", "process"})
091  public ProcessFlow( Map<Object, Object> properties, String name, Process process )
092    {
093    this( properties, name, process, null );
094    }
095
096  /**
097   * Constructor ProcessFlow creates a new ProcessFlow instance.
098   *
099   * @param properties     of type Map<Object, Object>
100   * @param name           of type String
101   * @param process        of type P
102   * @param flowDescriptor pf type LinkedHashMap<String, String>
103   */
104  @ConstructorProperties({"properties", "name", "process", "flowDescriptor"})
105  public ProcessFlow( Map<Object, Object> properties, String name, Process process, Map<String, String> flowDescriptor )
106    {
107    super( new PlatformInfo( "process", "Xplenty, Inc.", Version.getRelease() ), properties, null, name, flowDescriptor );
108    this.process = process;
109    this.processWrapper = new ProcessWrapper( this.process );
110    this.properties = properties;
111
112    setName( name );
113    setTapFromProcess();
114    initProcessConfig();
115    initStats();
116    }
117
118  private void initStats()
119    {
120    try
121      {
122      if( processWrapper.hasCounters() )
123        {
124        flowStats = new ProcessFlowStats( this, getFlowSession().getCascadingServices().createClientState( getID() ), processWrapper );
125        flowStats.prepare();
126        flowStats.markPending();
127        }
128      else
129        {
130        flowStats = createPrepareFlowStats();
131        }
132      }
133    catch( ProcessException exception )
134      {
135      throw new FlowException( exception );
136      }
137    }
138
139  /**
140   * Method setTapFromProcess build {@link Tap} instance for the give process incoming and outgoing dependencies.
141   * <p/>
142   * This method may be called repeatedly to re-configure the source and sink taps.
143   */
144  public void setTapFromProcess()
145    {
146    setSources( createSources( this.processWrapper ) );
147    setSinks( createSinks( this.processWrapper ) );
148    setTraps( createTraps( this.processWrapper ) );
149    }
150
151  /**
152   * Method getProcess returns the process of this ProcessFlow object.
153   *
154   * @return the process (type P) of this ProcessFlow object.
155   */
156  public Process getProcess()
157    {
158    return process;
159    }
160
161  @Override
162  protected void initConfig( Map<Object, Object> properties, Config parentConfig )
163    {
164
165    }
166
167  private void initProcessConfig()
168    {
169    try
170      {
171      config = (Config) processWrapper.getConfiguration();
172      }
173    catch( ProcessException exception )
174      {
175      if( exception.getCause() instanceof RuntimeException )
176        throw (RuntimeException) exception.getCause();
177
178      throw new FlowException( "could not get configuration from process", exception.getCause() );
179      }
180    }
181
182  @Override
183  protected void setConfigProperty( Config properties, Object key, Object value )
184    {
185
186    }
187
188  @Override
189  protected Config newConfig( Config defaultConfig )
190    {
191    return null;
192    }
193
194  @Override
195  public Config getConfig()
196    {
197    return config;
198    }
199
200  @Override
201  public Config getConfigCopy()
202    {
203    return null;
204    }
205
206  @Override
207  public Map<Object, Object> getConfigAsProperties()
208    {
209    Map<Object, Object> props = new HashMap<>();
210
211    if( properties != null )
212      props.putAll( this.properties );
213
214    return props;
215    }
216
217  @Override
218  public String getProperty( String key )
219    {
220    return null;
221    }
222
223  @Override
224  public FlowProcess<Config> getFlowProcess()
225    {
226    return FlowProcess.NULL;
227    }
228
229  @Override
230  public boolean stepsAreLocal()
231    {
232    return true;
233    }
234
235  @Override
236  public void prepare()
237    {
238    try
239      {
240      processWrapper.prepare();
241      }
242    catch( Throwable throwable )
243      {
244      if( throwable.getCause() instanceof RuntimeException )
245        throw (RuntimeException) throwable.getCause();
246
247      throw new FlowException( "could not call prepare on process", throwable.getCause() );
248      }
249    }
250
251  @Override
252  public void start()
253    {
254    try
255      {
256      flowStats.markPending();
257      fireOnStarting();
258      processWrapper.start();
259      flowStats.markStarted();
260      isStarted = true;
261      }
262    catch( Throwable throwable )
263      {
264      fireOnThrowable( throwable );
265
266      if( throwable.getCause() instanceof RuntimeException )
267        throw (RuntimeException) throwable.getCause();
268
269      throw new FlowException( "could not call start on process", throwable.getCause() );
270      }
271    }
272
273  @Override
274  protected void internalStart()
275    {
276    try
277      {
278      deleteSinksIfReplace();
279      deleteTrapsIfReplace();
280      deleteCheckpointsIfReplace();
281      }
282    catch( IOException exception )
283      {
284      throw new FlowException( "unable to delete sinks", exception );
285      }
286    }
287
288  @Override
289  public void stop()
290    {
291    try
292      {
293      fireOnStopping();
294      processWrapper.stop();
295
296      if( !flowStats.isFinished() )
297        flowStats.markStopped();
298      }
299    catch( Throwable throwable )
300      {
301      flowStats.markFailed( throwable );
302      fireOnThrowable( throwable );
303
304      if( throwable.getCause() instanceof RuntimeException )
305        throw (RuntimeException) throwable.getCause();
306
307      throw new FlowException( "could not call stop on process", throwable.getCause() );
308      }
309    }
310
311  @Override
312  protected void internalClean( boolean stop )
313    {
314
315    }
316
317  @Override
318  public void complete()
319    {
320    try
321      {
322      if( !isStarted )
323        {
324        flowStats.markPending();
325        fireOnStarting();
326        isStarted = true;
327        flowStats.markStarted();
328        }
329
330      flowStats.markRunning();
331      processWrapper.complete();
332      fireOnCompleted();
333      flowStats.markSuccessful();
334      }
335    catch( Throwable throwable )
336      {
337      flowStats.markFailed( throwable );
338      fireOnThrowable( throwable );
339
340      if( throwable.getCause() instanceof RuntimeException )
341        throw (RuntimeException) throwable.getCause();
342
343      throw new FlowException( "could not call complete on process", throwable.getCause() );
344      }
345    }
346
347  @Override
348  public void cleanup()
349    {
350    try
351      {
352      processWrapper.cleanup();
353      }
354    catch( Throwable throwable )
355      {
356      if( throwable.getCause() instanceof RuntimeException )
357        throw (RuntimeException) throwable.getCause();
358
359      throw new FlowException( "could not call cleanup on process", throwable.getCause() );
360      }
361    }
362
363  @Override
364  protected int getMaxNumParallelSteps()
365    {
366    return 1;
367    }
368
369  @Override
370  protected void internalShutdown()
371    {
372
373    }
374
375  private Map<String, Tap> createSources( ProcessWrapper processParent )
376    {
377    try
378      {
379      return makeTapMap( processParent.getDependencyIncoming() );
380      }
381    catch( ProcessException exception )
382      {
383      if( exception.getCause() instanceof RuntimeException )
384        throw (RuntimeException) exception.getCause();
385
386      throw new FlowException( "could not get process incoming dependency", exception.getCause() );
387      }
388    }
389
390  private Map<String, Tap> createSinks( ProcessWrapper processParent )
391    {
392    try
393      {
394      return makeTapMap( processParent.getDependencyOutgoing() );
395      }
396    catch( ProcessException exception )
397      {
398      if( exception.getCause() instanceof RuntimeException )
399        throw (RuntimeException) exception.getCause();
400
401      throw new FlowException( "could not get process outgoing dependency", exception.getCause() );
402      }
403    }
404
405  private Map<String, Tap> makeTapMap( Object resource )
406    {
407    Collection paths = makeCollection( resource );
408
409    Map<String, Tap> taps = new HashMap<String, Tap>();
410
411    for( Object path : paths )
412      {
413      if( path instanceof Tap && ( (Tap) path ).getIdentifier() != null )
414        taps.put( ( (Tap) path ).getIdentifier(), (Tap) path );
415      else
416        taps.put( path.toString(), new ProcessTap( new NullScheme(), path.toString() ) );
417      }
418
419    return taps;
420    }
421
422  private Collection makeCollection( Object resource )
423    {
424    if( resource instanceof Collection )
425      return (Collection) resource;
426    else if( resource instanceof Object[] )
427      return Arrays.asList( (Object[]) resource );
428    else
429      return Arrays.asList( resource );
430    }
431
432  private Map<String, Tap> createTraps( ProcessWrapper processParent )
433    {
434    return new HashMap<String, Tap>();
435    }
436
437  @Override
438  public String toString()
439    {
440    return getName() + ":" + process;
441    }
442
443  static class NullScheme extends Scheme
444    {
445    public void sourceConfInit( FlowProcess flowProcess, Tap tap, Object conf )
446      {
447      }
448
449    public void sinkConfInit( FlowProcess flowProcess, Tap tap, Object conf )
450      {
451      }
452
453    public boolean source( FlowProcess flowProcess, SourceCall sourceCall ) throws IOException
454      {
455      throw new UnsupportedOperationException( "sourcing is not supported in the scheme" );
456      }
457
458    @Override
459    public String toString()
460      {
461      return getClass().getSimpleName();
462      }
463
464    public void sink( FlowProcess flowProcess, SinkCall sinkCall ) throws IOException
465      {
466      throw new UnsupportedOperationException( "sinking is not supported in the scheme" );
467      }
468    }
469
470  /**
471   *
472   */
473  static class ProcessTap<Config> extends Tap<Config, Object, Object>
474    {
475    private final String token;
476
477    ProcessTap( NullScheme scheme, String token )
478      {
479      super( scheme );
480      this.token = token;
481      }
482
483    @Override
484    public String getIdentifier()
485      {
486      return token;
487      }
488
489    @Override
490    public String getFullIdentifier( Config conf )
491      {
492      return getIdentifier();
493      }
494
495    @Override
496    public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Object input ) throws IOException
497      {
498      return null;
499      }
500
501    @Override
502    public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Object output ) throws IOException
503      {
504      return null;
505      }
506
507    @Override
508    public boolean createResource( Config conf ) throws IOException
509      {
510      return false;
511      }
512
513    @Override
514    public boolean deleteResource( Config conf ) throws IOException
515      {
516      return false;
517      }
518
519    @Override
520    public boolean resourceExists( Config conf ) throws IOException
521      {
522      return false;
523      }
524
525    @Override
526    public long getModifiedTime( Config conf ) throws IOException
527      {
528      return 0;
529      }
530
531    @Override
532    public String toString()
533      {
534      return token;
535      }
536    }
537  }