001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow;
022    
023    import java.io.IOException;
024    import java.util.Collection;
025    import java.util.Collections;
026    import java.util.Map;
027    
028    import cascading.tap.Tap;
029    import cascading.tuple.TupleEntryCollector;
030    import cascading.tuple.TupleEntryIterator;
031    
032    /**
033     * FlowProcess implementations provide a call-back interface into the current computing system. Each
034     * {@link cascading.operation.Operation} is given a reference to a particular implementation, allowing it
035     * to get configuration properties, send a "keep alive" ping, or to set a counter value.
036     * <p/>
037     * Depending on the underlying system, FlowProcess instances are not continuous across all operations in a {@link Flow}.
038     * Thus, a call to {@link #increment(Enum, long)} may start incrementing from zero if the operation making the call
039     * belongs to a subsequent 'job' or 'step' from any previous operations calling increment.
040     * <p/>
041     * A FlowProcess is roughly a child of {@link FlowSession}. FlowSession is roughly one to one with a particular {@link Flow}.
042     * And every FlowSession will have one or more FlowProcesses.
043     *
044     * @see FlowSession
045     */
046    public abstract class FlowProcess<Config>
047      {
048      /** Field NULL is a noop implementation of FlowSession. */
049      public static FlowProcess NULL = new NullFlowProcess();
050    
051      public static class NullFlowProcess extends FlowProcess<Object>
052        {
053        protected NullFlowProcess()
054          {
055          }
056    
057        @Override
058        public FlowProcess copyWith( Object object )
059          {
060          return new NullFlowProcess();
061          }
062    
063        public Object getProperty( String key )
064          {
065          return null;
066          }
067    
068        @Override
069        public Collection<String> getPropertyKeys()
070          {
071          return Collections.EMPTY_SET;
072          }
073    
074        @Override
075        public Object newInstance( String className )
076          {
077          return null;
078          }
079    
080        public void keepAlive()
081          {
082          }
083    
084        public void increment( Enum counter, long amount )
085          {
086          }
087    
088        public void increment( String group, String counter, long amount )
089          {
090          }
091    
092        public void setStatus( String status )
093          {
094          }
095    
096        @Override
097        public boolean isCounterStatusInitialized()
098          {
099          return true;
100          }
101    
102        @Override
103        public int getNumProcessSlices()
104          {
105          return 1;
106          }
107    
108        @Override
109        public int getCurrentSliceNum()
110          {
111          return 0;
112          }
113    
114        public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
115          {
116          return tap.openForRead( this );
117          }
118    
119        public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
120          {
121          return tap.openForWrite( this );
122          }
123    
124        @Override
125        public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException
126          {
127          return trap.openForWrite( this );
128          }
129    
130        @Override
131        public TupleEntryCollector openSystemIntermediateForWrite() throws IOException
132          {
133          return null;
134          }
135    
136        @Override
137        public Object getConfigCopy()
138          {
139          return null;
140          }
141    
142        @Override
143        public Object copyConfig( Object config )
144          {
145          return config;
146          }
147    
148        @Override
149        public Map<String, String> diffConfigIntoMap( Object defaultConfig, Object updatedConfig )
150          {
151          return null;
152          }
153    
154        @Override
155        public Object mergeMapIntoConfig( Object defaultConfig, Map<String, String> map )
156          {
157          return null;
158          }
159        }
160    
161      /** Field currentSession */
162      private FlowSession currentSession = FlowSession.NULL;
163    
164      protected FlowProcess()
165        {
166        }
167    
168      protected FlowProcess( FlowSession currentSession )
169        {
170        setCurrentSession( currentSession );
171        }
172    
173      public abstract FlowProcess copyWith( Config config );
174    
175      /**
176       * Method getID() returns the current
177       *
178       * @return of type String
179       */
180      public String getID()
181        {
182        return getStringProperty( FlowStep.CASCADING_FLOW_STEP_ID );
183        }
184    
185      /**
186       * Method getCurrentSession returns the currentSession of this FlowProcess object.
187       *
188       * @return the currentSession (type FlowSession) of this FlowProcess object.
189       */
190      public FlowSession getCurrentSession()
191        {
192        return currentSession;
193        }
194    
195      /**
196       * Method setCurrentSession sets the currentSession of this FlowProcess object.
197       *
198       * @param currentSession the currentSession of this FlowProcess object.
199       */
200      public void setCurrentSession( FlowSession currentSession )
201        {
202        this.currentSession = currentSession;
203    
204        currentSession.setCurrentProcess( this );
205        }
206    
207      /**
208       * Method getNumProcessSlices returns the number of parallel slices or tasks allocated
209       * for this process execution.
210       * <p/>
211       * For MapReduce platforms, this is the same as the number of tasks for a given MapReduce job.
212       *
213       * @return an int
214       */
215      public abstract int getNumProcessSlices();
216    
217      /**
218       * Method getCurrentSliceNum returns an integer representing which slice instance currently running.
219       * <p/>
220       * {@code 0} (zero) is the first slice instance.
221       *
222       * @return an int
223       */
224      public abstract int getCurrentSliceNum();
225    
226      /**
227       * Method getProperty should be used to return configuration parameters from the underlying system.
228       * <p/>
229       * In the case of Hadoop, the current Configuration will be queried.
230       *
231       * @param key of type String
232       * @return an Object
233       */
234      public abstract Object getProperty( String key );
235    
236      /**
237       * Method getStringProperty should be used to return configuration parameters from the underlying system.
238       * <p/>
239       * In the case of Hadoop, the current Configuration will be queried.
240       *
241       * @param key of type String, null if property is not set
242       * @return an Object
243       */
244      public String getStringProperty( String key )
245        {
246        Object value = getProperty( key );
247    
248        if( value == null )
249          return null;
250    
251        return value.toString();
252        }
253    
254      /**
255       * Method getIntegerProperty should be used to return configuration parameters from the underlying system.
256       * <p/>
257       * In the case of Hadoop, the current Configuration will be queried.
258       *
259       * @param key of type Integer, null if property is not set
260       * @return an Object
261       */
262      public Integer getIntegerProperty( String key )
263        {
264        String value = getStringProperty( key );
265    
266        if( value == null || value.isEmpty() )
267          return null;
268    
269        return Integer.valueOf( value );
270        }
271    
272      /**
273       * Method getPropertyKeys returns an immutable collection of all available property key values.
274       *
275       * @return a Collection<String>
276       */
277      public abstract Collection<String> getPropertyKeys();
278    
279      /**
280       * Method newInstance creates a new object instance from the given className argument delegating to any
281       * platform specific instantiation and configuration routines.
282       *
283       * @param className
284       * @return an instance of className
285       */
286      public abstract Object newInstance( String className );
287    
288      /**
289       * Method keepAlive notifies the system that the current process is still alive. Use this method if a particular
290       * {@link cascading.operation.Operation} takes some moments to complete. Each system is different, so calling
291       * ping every few seconds to every minute or so would be best.
292       * <p/>
293       * This method will fail silently if the underlying mechanism to notify keepAlive status are not initialized.
294       */
295      public abstract void keepAlive();
296    
297      /**
298       * Method increment is used to increment a custom counter. Counters must be of type Enum. The amount
299       * to increment must be a integer value.
300       * <p/>
301       * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
302       *
303       * @param counter of type Enum
304       * @param amount  of type int
305       */
306      public abstract void increment( Enum counter, long amount );
307    
308      /**
309       * Method increment is used to increment a custom counter. The amount to increment must be a integer value.
310       * <p/>
311       * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
312       *
313       * @param group   of type String
314       * @param counter of type String
315       * @param amount  of type int
316       */
317      public abstract void increment( String group, String counter, long amount );
318    
319      /**
320       * Method setStatus is used to set the status of the current operation.
321       * <p/>
322       * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
323       *
324       * @param status of type String
325       */
326      public abstract void setStatus( String status );
327    
328      /**
329       * Method isCounterStatusInitialized returns true if it is safe to increment a counter or set a status.
330       *
331       * @return boolean
332       */
333      public abstract boolean isCounterStatusInitialized();
334    
335      /**
336       * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance.
337       * <p/>
338       * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
339       * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
340       * stored in a Collection.
341       *
342       * @param tap of type Tap
343       * @return TupleIterator
344       * @throws java.io.IOException when there is a failure opening the resource
345       */
346      public abstract TupleEntryIterator openTapForRead( Tap tap ) throws IOException;
347    
348      /**
349       * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance.
350       *
351       * @param tap of type Tap
352       * @return TupleCollector
353       * @throws java.io.IOException when there is a failure opening the resource
354       */
355      public abstract TupleEntryCollector openTapForWrite( Tap tap ) throws IOException;
356    
357      /**
358       * Method openTrapForWrite returns a (@link TupleCollector} for the given Tap instance.
359       *
360       * @param trap of type Tap
361       * @return TupleCollector
362       * @throws java.io.IOException when there is a failure opening the resource
363       */
364      public abstract TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException;
365    
366      public abstract TupleEntryCollector openSystemIntermediateForWrite() throws IOException;
367    
368      public abstract Config getConfigCopy();
369    
370      public abstract Config copyConfig( Config jobConf );
371    
372      public abstract Map<String, String> diffConfigIntoMap( Config defaultConfig, Config updatedConfig );
373    
374      public abstract Config mergeMapIntoConfig( Config defaultConfig, Map<String, String> map );
375      }