001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.util.Arrays;
026    import java.util.Collection;
027    import java.util.HashMap;
028    import java.util.Map;
029    import java.util.Properties;
030    
031    import cascading.flow.FlowException;
032    import cascading.flow.FlowProcess;
033    import cascading.flow.hadoop.util.HadoopUtil;
034    import cascading.scheme.Scheme;
035    import cascading.scheme.SinkCall;
036    import cascading.scheme.SourceCall;
037    import cascading.stats.hadoop.ProcessFlowStats;
038    import cascading.tap.Tap;
039    import cascading.tuple.TupleEntryCollector;
040    import cascading.tuple.TupleEntryIterator;
041    import riffle.process.scheduler.ProcessException;
042    import riffle.process.scheduler.ProcessWrapper;
043    
044    /**
045     * Class ProcessFlow is a {@link cascading.flow.Flow} subclass that supports custom Riffle jobs.
046     * <p/>
047     * Use this class to allow custom Riffle jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If
048     * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled
049     * according to their dependencies (topologically).
050     * <p/>
051     * Though this class sub-classes {@link HadoopFlow}, it does not support all the methods available or features.
052     * <p/>
053     * Currently {@link cascading.flow.FlowListener}s are supported but the
054     * {@link cascading.flow.FlowListener#onThrowable(cascading.flow.Flow, Throwable)} event is not.
055     *
056     * @deprecated ProcessFlow will be decoupled from Hadoop and moved to a different package in Cascading 3.0.
057     */
058    @Deprecated
059    public class ProcessFlow<P> extends HadoopFlow
060      {
061      /** Field process */
062      private final P process;
063      /** Field processWrapper */
064      private final ProcessWrapper processWrapper;
065    
066      private boolean isStarted = false; // only used for event handling
067    
068      /**
069       * Constructor ProcessFlow creates a new ProcessFlow instance.
070       *
071       * @param name    of type String
072       * @param process of type JobConf
073       */
074      @ConstructorProperties({"name", "process"})
075      public ProcessFlow( String name, P process )
076        {
077        this( new Properties(), name, process );
078        }
079    
080      /**
081       * Constructor ProcessFlow creates a new ProcessFlow instance.
082       *
083       * @param properties of type Map<Object, Object>
084       * @param name       of type String
085       * @param process    of type P
086       */
087      @ConstructorProperties({"properties", "name", "process"})
088      public ProcessFlow( Map<Object, Object> properties, String name, P process )
089        {
090        this( properties, name, process, null );
091        }
092    
093      /**
094       * Constructor ProcessFlow creates a new ProcessFlow instance.
095       *
096       * @param properties     of type Map<Object, Object>
097       * @param name           of type String
098       * @param process        of type P
099       * @param flowDescriptor pf type LinkedHashMap<String, String>
100       */
101      @ConstructorProperties({"properties", "name", "process", "flowDescriptor"})
102      public ProcessFlow( Map<Object, Object> properties, String name, P process, Map<String, String> flowDescriptor )
103        {
104        super( HadoopUtil.getPlatformInfo(), properties, null, name, flowDescriptor );
105        this.process = process;
106        this.processWrapper = new ProcessWrapper( this.process );
107        setName( name );
108        setTapFromProcess();
109        initStats();
110        }
111    
112      private void initStats()
113        {
114        try
115          {
116          if( processWrapper.hasCounters() )
117            this.flowStats = new ProcessFlowStats( this, getFlowSession().getCascadingServices().createClientState( getID() ), processWrapper );
118          }
119        catch( ProcessException exception )
120          {
121          throw new FlowException( exception );
122          }
123        }
124    
125      /**
126       * Method setTapFromProcess build {@link Tap} instance for the give process incoming and outgoing dependencies.
127       * <p/>
128       * This method may be called repeatedly to re-configure the source and sink taps.
129       */
130      public void setTapFromProcess()
131        {
132        setSources( createSources( this.processWrapper ) );
133        setSinks( createSinks( this.processWrapper ) );
134        setTraps( createTraps( this.processWrapper ) );
135        }
136    
137      /**
138       * Method getProcess returns the process of this ProcessFlow object.
139       *
140       * @return the process (type P) of this ProcessFlow object.
141       */
142      public P getProcess()
143        {
144        return process;
145        }
146    
147      @Override
148      public void prepare()
149        {
150        try
151          {
152          processWrapper.prepare();
153          }
154        catch( Throwable throwable )
155          {
156          if( throwable.getCause() instanceof RuntimeException )
157            throw (RuntimeException) throwable.getCause();
158    
159          throw new FlowException( "could not call prepare on process", throwable.getCause() );
160          }
161        }
162    
163      @Override
164      public void start()
165        {
166        try
167          {
168          flowStats.markPending();
169          fireOnStarting();
170          processWrapper.start();
171          flowStats.markStarted();
172          isStarted = true;
173          }
174        catch( Throwable throwable )
175          {
176          fireOnThrowable( throwable );
177    
178          if( throwable.getCause() instanceof RuntimeException )
179            throw (RuntimeException) throwable.getCause();
180    
181          throw new FlowException( "could not call start on process", throwable.getCause() );
182          }
183        }
184    
185      @Override
186      public void stop()
187        {
188        try
189          {
190          fireOnStopping();
191          processWrapper.stop();
192    
193          if( !flowStats.isFinished() )
194            flowStats.markStopped();
195          }
196        catch( Throwable throwable )
197          {
198          flowStats.markFailed( throwable );
199          fireOnThrowable( throwable );
200    
201          if( throwable.getCause() instanceof RuntimeException )
202            throw (RuntimeException) throwable.getCause();
203    
204          throw new FlowException( "could not call stop on process", throwable.getCause() );
205          }
206        }
207    
208      @Override
209      public void complete()
210        {
211        try
212          {
213          if( !isStarted )
214            {
215            flowStats.markPending();
216            fireOnStarting();
217            isStarted = true;
218            flowStats.markStarted();
219            }
220    
221          flowStats.markRunning();
222          processWrapper.complete();
223          fireOnCompleted();
224          flowStats.markSuccessful();
225          }
226        catch( Throwable throwable )
227          {
228          flowStats.markFailed( throwable );
229          fireOnThrowable( throwable );
230    
231          if( throwable.getCause() instanceof RuntimeException )
232            throw (RuntimeException) throwable.getCause();
233    
234          throw new FlowException( "could not call complete on process", throwable.getCause() );
235          }
236        }
237    
238      @Override
239      public void cleanup()
240        {
241        try
242          {
243          processWrapper.cleanup();
244          }
245        catch( Throwable throwable )
246          {
247          if( throwable.getCause() instanceof RuntimeException )
248            throw (RuntimeException) throwable.getCause();
249    
250          throw new FlowException( "could not call cleanup on process", throwable.getCause() );
251          }
252        }
253    
254      private Map<String, Tap> createSources( ProcessWrapper processParent )
255        {
256        try
257          {
258          return makeTapMap( processParent.getDependencyIncoming() );
259          }
260        catch( ProcessException exception )
261          {
262          if( exception.getCause() instanceof RuntimeException )
263            throw (RuntimeException) exception.getCause();
264    
265          throw new FlowException( "could not get process incoming dependency", exception.getCause() );
266          }
267        }
268    
269      private Map<String, Tap> createSinks( ProcessWrapper processParent )
270        {
271        try
272          {
273          return makeTapMap( processParent.getDependencyOutgoing() );
274          }
275        catch( ProcessException exception )
276          {
277          if( exception.getCause() instanceof RuntimeException )
278            throw (RuntimeException) exception.getCause();
279    
280          throw new FlowException( "could not get process outgoing dependency", exception.getCause() );
281          }
282        }
283    
284      private Map<String, Tap> makeTapMap( Object resource )
285        {
286        Collection paths = makeCollection( resource );
287    
288        Map<String, Tap> taps = new HashMap<String, Tap>();
289    
290        for( Object path : paths )
291          {
292          if( path instanceof Tap )
293            taps.put( ( (Tap) path ).getIdentifier(), (Tap) path );
294          else
295            taps.put( path.toString(), new ProcessTap( new NullScheme(), path.toString() ) );
296          }
297        return taps;
298        }
299    
300      private Collection makeCollection( Object resource )
301        {
302        if( resource instanceof Collection )
303          return (Collection) resource;
304        else if( resource instanceof Object[] )
305          return Arrays.asList( (Object[]) resource );
306        else
307          return Arrays.asList( resource );
308        }
309    
310      private Map<String, Tap> createTraps( ProcessWrapper processParent )
311        {
312        return new HashMap<String, Tap>();
313        }
314    
315      @Override
316      public String toString()
317        {
318        return getName() + ":" + process;
319        }
320    
321      static class NullScheme extends Scheme
322        {
323        public void sourceConfInit( FlowProcess flowProcess, Tap tap, Object conf )
324          {
325          }
326    
327        public void sinkConfInit( FlowProcess flowProcess, Tap tap, Object conf )
328          {
329          }
330    
331        public boolean source( FlowProcess flowProcess, SourceCall sourceCall ) throws IOException
332          {
333          throw new UnsupportedOperationException( "sourcing is not supported in the scheme" );
334          }
335    
336        @Override
337        public String toString()
338          {
339          return getClass().getSimpleName();
340          }
341    
342        public void sink( FlowProcess flowProcess, SinkCall sinkCall ) throws IOException
343          {
344          throw new UnsupportedOperationException( "sinking is not supported in the scheme" );
345          }
346        }
347    
348      /**
349       *
350       */
351      static class ProcessTap extends Tap
352        {
353        private final String token;
354    
355        ProcessTap( NullScheme scheme, String token )
356          {
357          super( scheme );
358          this.token = token;
359          }
360    
361        @Override
362        public String getIdentifier()
363          {
364          return token;
365          }
366    
367        @Override
368        public TupleEntryIterator openForRead( FlowProcess flowProcess, Object input ) throws IOException
369          {
370          return null;
371          }
372    
373        @Override
374        public TupleEntryCollector openForWrite( FlowProcess flowProcess, Object output ) throws IOException
375          {
376          return null;
377          }
378    
379        @Override
380        public boolean createResource( Object conf ) throws IOException
381          {
382          return false;
383          }
384    
385        @Override
386        public boolean deleteResource( Object conf ) throws IOException
387          {
388          return false;
389          }
390    
391        @Override
392        public boolean resourceExists( Object conf ) throws IOException
393          {
394          return false;
395          }
396    
397        @Override
398        public long getModifiedTime( Object conf ) throws IOException
399          {
400          return 0;
401          }
402    
403        @Override
404        public String toString()
405          {
406          return token;
407          }
408        }
409      }