001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.util.Arrays;
026    import java.util.Collection;
027    import java.util.HashMap;
028    import java.util.Map;
029    import java.util.Properties;
030    
031    import cascading.flow.FlowException;
032    import cascading.flow.FlowProcess;
033    import cascading.flow.hadoop.util.HadoopUtil;
034    import cascading.scheme.Scheme;
035    import cascading.scheme.SinkCall;
036    import cascading.scheme.SourceCall;
037    import cascading.tap.Tap;
038    import cascading.tuple.TupleEntryCollector;
039    import cascading.tuple.TupleEntryIterator;
040    import riffle.process.scheduler.ProcessException;
041    import riffle.process.scheduler.ProcessWrapper;
042    
043    /**
044     * Class ProcessFlow is a {@link cascading.flow.Flow} subclass that supports custom Riffle jobs.
045     * <p/>
046     * Use this class to allow custom Riffle jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If
047     * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled
048     * according to their dependencies (topologically).
049     * <p/>
050     * Though this class sub-classes {@link HadoopFlow}, it does not support all the methods available or features.
051     * <p/>
052     * Currently {@link cascading.flow.FlowListener}s are supported but the
053     * {@link cascading.flow.FlowListener#onThrowable(cascading.flow.Flow, Throwable)} event is not.
054     */
055    public class ProcessFlow<P> extends HadoopFlow
056      {
057      /** Field process */
058      private final P process;
059      /** Field processWrapper */
060      private final ProcessWrapper processWrapper;
061    
062      private boolean isStarted = false; // only used for event handling
063    
064      /**
065       * Constructor ProcessFlow creates a new ProcessFlow instance.
066       *
067       * @param name    of type String
068       * @param process of type JobConf
069       */
070      @ConstructorProperties({"name", "process"})
071      public ProcessFlow( String name, P process )
072        {
073        this( new Properties(), name, process );
074        }
075    
076      /**
077       * Constructor ProcessFlow creates a new ProcessFlow instance.
078       *
079       * @param properties of type Map<Object, Object>
080       * @param name       of type String
081       * @param process    of type P
082       */
083      @ConstructorProperties({"properties", "name", "process"})
084      public ProcessFlow( Map<Object, Object> properties, String name, P process )
085        {
086        this( properties, name, process, null );
087        }
088    
089      /**
090       * Constructor ProcessFlow creates a new ProcessFlow instance.
091       *
092       * @param properties     of type Map<Object, Object>
093       * @param name           of type String
094       * @param process        of type P
095       * @param flowDescriptor pf type LinkedHashMap<String, String>
096       */
097      @ConstructorProperties({"properties", "name", "process", "flowDescriptor"})
098      public ProcessFlow( Map<Object, Object> properties, String name, P process, Map<String, String> flowDescriptor )
099        {
100        super( HadoopUtil.getPlatformInfo(), properties, null, name, flowDescriptor );
101        this.process = process;
102        this.processWrapper = new ProcessWrapper( this.process );
103    
104        setName( name );
105        setTapFromProcess();
106        }
107    
108      /**
109       * Method setTapFromProcess build {@link Tap} instance for the give process incoming and outgoing dependencies.
110       * <p/>
111       * This method may be called repeatedly to re-configure the source and sink taps.
112       */
113      public void setTapFromProcess()
114        {
115        setSources( createSources( this.processWrapper ) );
116        setSinks( createSinks( this.processWrapper ) );
117        setTraps( createTraps( this.processWrapper ) );
118        }
119    
120      /**
121       * Method getProcess returns the process of this ProcessFlow object.
122       *
123       * @return the process (type P) of this ProcessFlow object.
124       */
125      public P getProcess()
126        {
127        return process;
128        }
129    
130      @Override
131      public void prepare()
132        {
133        try
134          {
135          processWrapper.prepare();
136          }
137        catch( ProcessException exception )
138          {
139          if( exception.getCause() instanceof RuntimeException )
140            throw (RuntimeException) exception.getCause();
141    
142          throw new FlowException( "could not call prepare on process", exception.getCause() );
143          }
144        }
145    
146      @Override
147      public void start()
148        {
149        try
150          {
151          fireOnStarting();
152          processWrapper.start();
153          flowStats.markStarted();
154          isStarted = true;
155          }
156        catch( ProcessException exception )
157          {
158          if( exception.getCause() instanceof RuntimeException )
159            throw (RuntimeException) exception.getCause();
160    
161          throw new FlowException( "could not call start on process", exception.getCause() );
162          }
163        }
164    
165      @Override
166      public void stop()
167        {
168        try
169          {
170          fireOnStopping();
171          processWrapper.stop();
172    
173          if( !flowStats.isFinished() )
174            flowStats.markStopped();
175          }
176        catch( ProcessException exception )
177          {
178          flowStats.markFailed( exception );
179          if( exception.getCause() instanceof RuntimeException )
180            throw (RuntimeException) exception.getCause();
181    
182          throw new FlowException( "could not call stop on process", exception.getCause() );
183          }
184        }
185    
186      @Override
187      public void complete()
188        {
189        try
190          {
191          if( !isStarted )
192            {
193            fireOnStarting();
194            isStarted = true;
195            flowStats.markStarted();
196            }
197    
198          flowStats.markRunning();
199          processWrapper.complete();
200          fireOnCompleted();
201          flowStats.markSuccessful();
202          }
203        catch( ProcessException exception )
204          {
205          flowStats.markFailed( exception );
206          if( exception.getCause() instanceof RuntimeException )
207            throw (RuntimeException) exception.getCause();
208    
209          throw new FlowException( "could not call complete on process", exception.getCause() );
210          }
211        }
212    
213      @Override
214      public void cleanup()
215        {
216        try
217          {
218          processWrapper.cleanup();
219          }
220        catch( ProcessException exception )
221          {
222          if( exception.getCause() instanceof RuntimeException )
223            throw (RuntimeException) exception.getCause();
224    
225          throw new FlowException( "could not call cleanup on process", exception.getCause() );
226          }
227        }
228    
229      private Map<String, Tap> createSources( ProcessWrapper processParent )
230        {
231        try
232          {
233          return makeTapMap( processParent.getDependencyIncoming() );
234          }
235        catch( ProcessException exception )
236          {
237          if( exception.getCause() instanceof RuntimeException )
238            throw (RuntimeException) exception.getCause();
239    
240          throw new FlowException( "could not get process incoming dependency", exception.getCause() );
241          }
242        }
243    
244      private Map<String, Tap> createSinks( ProcessWrapper processParent )
245        {
246        try
247          {
248          return makeTapMap( processParent.getDependencyOutgoing() );
249          }
250        catch( ProcessException exception )
251          {
252          if( exception.getCause() instanceof RuntimeException )
253            throw (RuntimeException) exception.getCause();
254    
255          throw new FlowException( "could not get process outgoing dependency", exception.getCause() );
256          }
257        }
258    
259      private Map<String, Tap> makeTapMap( Object resource )
260        {
261        Collection paths = makeCollection( resource );
262    
263        Map<String, Tap> taps = new HashMap<String, Tap>();
264    
265        for( Object path : paths )
266          {
267          if( path instanceof Tap )
268            taps.put( ( (Tap) path ).getIdentifier(), (Tap) path );
269          else
270            taps.put( path.toString(), new ProcessTap( new NullScheme(), path.toString() ) );
271          }
272        return taps;
273        }
274    
275      private Collection makeCollection( Object resource )
276        {
277        if( resource instanceof Collection )
278          return (Collection) resource;
279        else if( resource instanceof Object[] )
280          return Arrays.asList( (Object[]) resource );
281        else
282          return Arrays.asList( resource );
283        }
284    
285      private Map<String, Tap> createTraps( ProcessWrapper processParent )
286        {
287        return new HashMap<String, Tap>();
288        }
289    
290      @Override
291      public String toString()
292        {
293        return getName() + ":" + process;
294        }
295    
296      static class NullScheme extends Scheme
297        {
298        public void sourceConfInit( FlowProcess flowProcess, Tap tap, Object conf )
299          {
300          }
301    
302        public void sinkConfInit( FlowProcess flowProcess, Tap tap, Object conf )
303          {
304          }
305    
306        public boolean source( FlowProcess flowProcess, SourceCall sourceCall ) throws IOException
307          {
308          throw new UnsupportedOperationException( "sourcing is not supported in the scheme" );
309          }
310    
311        @Override
312        public String toString()
313          {
314          return getClass().getSimpleName();
315          }
316    
317        public void sink( FlowProcess flowProcess, SinkCall sinkCall ) throws IOException
318          {
319          throw new UnsupportedOperationException( "sinking is not supported in the scheme" );
320          }
321        }
322    
323      /**
324       *
325       */
326      static class ProcessTap extends Tap
327        {
328        private final String token;
329    
330        ProcessTap( NullScheme scheme, String token )
331          {
332          super( scheme );
333          this.token = token;
334          }
335    
336        @Override
337        public String getIdentifier()
338          {
339          return token;
340          }
341    
342        @Override
343        public TupleEntryIterator openForRead( FlowProcess flowProcess, Object input ) throws IOException
344          {
345          return null;
346          }
347    
348        @Override
349        public TupleEntryCollector openForWrite( FlowProcess flowProcess, Object output ) throws IOException
350          {
351          return null;
352          }
353    
354        @Override
355        public boolean createResource( Object conf ) throws IOException
356          {
357          return false;
358          }
359    
360        @Override
361        public boolean deleteResource( Object conf ) throws IOException
362          {
363          return false;
364          }
365    
366        @Override
367        public boolean resourceExists( Object conf ) throws IOException
368          {
369          return false;
370          }
371    
372        @Override
373        public long getModifiedTime( Object conf ) throws IOException
374          {
375          return 0;
376          }
377    
378        @Override
379        public String toString()
380          {
381          return token;
382          }
383        }
384      }