001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.scheme;
022    
023    import java.io.IOException;
024    import java.io.Serializable;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.tap.Tap;
028    import cascading.tuple.Fields;
029    import cascading.tuple.Tuple;
030    import cascading.util.Util;
031    
032    /**
033     * A Scheme defines what is stored in a {@link Tap} instance by declaring the {@link Tuple}
034     * field names, and alternately parsing or rendering the incoming or outgoing {@link Tuple}
035     * stream, respectively.
036     * <p/>
037     * A Scheme defines the type of resource data will be sourced from or sinked to.
038     * <p/>
039     * The default sourceFields are {@link Fields#UNKNOWN} and the default sinkFields are {@link Fields#ALL}.
040     * <p/>
041     * Any given sourceFields only label the values in the {@link Tuple}s as they are sourced.
042     * It does not necessarily filter the output since a given implementation may choose to
043     * collapse values and ignore keys depending on the format.
044     * <p/>
045     * If the sinkFields are {@link Fields#ALL}, the Cascading planner will attempt to resolve the actual field names
046     * and make them available via the {@link cascading.scheme.SinkCall#getOutgoingEntry()} method. Sometimes this may
047     * not be possible (in the case the {@link Tap#openForWrite(cascading.flow.FlowProcess)} method is called from user
048     * code directly (without planner intervention).
049     * <p/>
050     * If the sinkFields are a valid selector, the {@link #sink(cascading.flow.FlowProcess, SinkCall)} method will
051     * only see the fields expected.
052     * <p/>
053     * Setting the {@code numSinkParts} value to 1 (one) attempts to ensure the output resource has only one part.
054     * In the case of MapReduce, this is only a suggestion for the Map side, on the Reduce side it does this by
055     * setting the number of reducers to the given value. This may affect performance, so be cautioned.
056     * </p>
057     * Note that setting numSinkParts does not force the planner to insert a final Reduce operation in the job, so
058     * numSinkParts may be ignored entirely if the final job is Map only. To force the Flow to have a final Reduce,
059     * add a {@link cascading.pipe.GroupBy} to the assembly before sinking.
060     */
061    public abstract class Scheme<Config, Input, Output, SourceContext, SinkContext> implements Serializable
062      {
063      /** Field sinkFields */
064      Fields sinkFields = Fields.ALL;
065      /** Field sourceFields */
066      Fields sourceFields = Fields.UNKNOWN;
067      /** Field numSinkParts */
068      int numSinkParts;
069      /** Field trace */
070      private final String trace = Util.captureDebugTrace( getClass() );
071    
072      /** Constructor Scheme creates a new Scheme instance. */
073      protected Scheme()
074        {
075        }
076    
077      /**
078       * Constructor Scheme creates a new Scheme instance.
079       *
080       * @param sourceFields of type Fields
081       */
082      protected Scheme( Fields sourceFields )
083        {
084        setSourceFields( sourceFields );
085        }
086    
087      /**
088       * Constructor Scheme creates a new Scheme instance.
089       *
090       * @param sourceFields of type Fields
091       * @param numSinkParts of type int
092       */
093      protected Scheme( Fields sourceFields, int numSinkParts )
094        {
095        setSourceFields( sourceFields );
096        this.numSinkParts = numSinkParts;
097        }
098    
099      /**
100       * Constructor Scheme creates a new Scheme instance.
101       *
102       * @param sourceFields of type Fields
103       * @param sinkFields   of type Fields
104       */
105      protected Scheme( Fields sourceFields, Fields sinkFields )
106        {
107        setSourceFields( sourceFields );
108        setSinkFields( sinkFields );
109        }
110    
111      /**
112       * Constructor Scheme creates a new Scheme instance.
113       *
114       * @param sourceFields of type Fields
115       * @param sinkFields   of type Fields
116       * @param numSinkParts of type int
117       */
118      protected Scheme( Fields sourceFields, Fields sinkFields, int numSinkParts )
119        {
120        setSourceFields( sourceFields );
121        setSinkFields( sinkFields );
122        this.numSinkParts = numSinkParts;
123        }
124    
125      /**
126       * Method getSinkFields returns the sinkFields of this Scheme object.
127       *
128       * @return the sinkFields (type Fields) of this Scheme object.
129       */
130      public Fields getSinkFields()
131        {
132        return sinkFields;
133        }
134    
135      /**
136       * Method setSinkFields sets the sinkFields of this Scheme object.
137       *
138       * @param sinkFields the sinkFields of this Scheme object.
139       */
140      public void setSinkFields( Fields sinkFields )
141        {
142        if( sinkFields.isUnknown() )
143          this.sinkFields = Fields.ALL;
144        else
145          this.sinkFields = sinkFields;
146        }
147    
148      /**
149       * Method getSourceFields returns the sourceFields of this Scheme object.
150       *
151       * @return the sourceFields (type Fields) of this Scheme object.
152       */
153      public Fields getSourceFields()
154        {
155        return sourceFields;
156        }
157    
158      /**
159       * Method setSourceFields sets the sourceFields of this Scheme object.
160       *
161       * @param sourceFields the sourceFields of this Scheme object.
162       */
163      public void setSourceFields( Fields sourceFields )
164        {
165        if( sourceFields.isAll() )
166          this.sourceFields = Fields.UNKNOWN;
167        else
168          this.sourceFields = sourceFields;
169        }
170    
171      /**
172       * Method getNumSinkParts returns the numSinkParts of this Scheme object.
173       *
174       * @return the numSinkParts (type int) of this Scheme object.
175       */
176      public int getNumSinkParts()
177        {
178        return numSinkParts;
179        }
180    
181      /**
182       * Method setNumSinkParts sets the numSinkParts of this Scheme object.
183       *
184       * @param numSinkParts the numSinkParts of this Scheme object.
185       */
186      public void setNumSinkParts( int numSinkParts )
187        {
188        this.numSinkParts = numSinkParts;
189        }
190    
191      /**
192       * Method getTrace returns a String that pinpoint where this instance was created for debugging.
193       *
194       * @return String
195       */
196      public String getTrace()
197        {
198        return trace;
199        }
200    
201      /**
202       * Method isSymmetrical returns {@code true} if the sink fields equal the source fields. That is, this
203       * scheme sources the same fields as it sinks.
204       *
205       * @return the symmetrical (type boolean) of this Scheme object.
206       */
207      public boolean isSymmetrical()
208        {
209        return getSourceFields().equals( Fields.UNKNOWN ) && getSinkFields().equals( Fields.ALL ) || getSinkFields().equals( getSourceFields() );
210        }
211    
212      /**
213       * Method isSource returns true if this Scheme instance can be used as a source.
214       *
215       * @return boolean
216       */
217      public boolean isSource()
218        {
219        return true;
220        }
221    
222      /**
223       * Method isSink returns true if this Scheme instance can be used as a sink.
224       *
225       * @return boolean
226       */
227      public boolean isSink()
228        {
229        return true;
230        }
231    
232      /**
233       * Method retrieveSourceFields notifies a Scheme when it is appropriate to dynamically
234       * update the fields it sources. By default the current declared fields are returned.
235       * <p/>
236       * The {@code FlowProcess} presents all known properties resolved by the current planner.
237       * <p/>
238       * The {@code tap} instance is the parent {@link Tap} for this Scheme instance.
239       *
240       * @param flowProcess of type FlowProcess
241       * @param tap         of type Tap
242       * @return Fields
243       */
244      public Fields retrieveSourceFields( FlowProcess<Config> flowProcess, Tap tap )
245        {
246        return getSourceFields();
247        }
248    
249      /**
250       * Method presentSourceFields is called after the planner is invoked and all fields are resolved. This
251       * method presents to the Scheme the actual source fields after any planner intervention.
252       * <p/>
253       * This method is called after {@link #retrieveSourceFields(cascading.flow.FlowProcess, cascading.tap.Tap)}.
254       *
255       * @param flowProcess of type FlowProcess
256       * @param tap         of type Tap
257       * @param fields      of type Fields
258       */
259      public void presentSourceFields( FlowProcess<Config> flowProcess, Tap tap, Fields fields )
260        {
261        presentSourceFieldsInternal( fields );
262        }
263    
264      protected void presentSourceFieldsInternal( Fields fields )
265        {
266        if( getSourceFields().equals( Fields.UNKNOWN ) )
267          setSourceFields( fields );
268        }
269    
270      /**
271       * Method retrieveSinkFields notifies a Scheme when it is appropriate to dynamically
272       * update the fields it sources. By default the current declared fields are returned.
273       * <p/>
274       * The {@code FlowProcess} presents all known properties resolved by the current planner.
275       * <p/>
276       * The {@code tap} instance is the parent {@link Tap} for this Scheme instance.
277       *
278       * @param flowProcess of type FlowProcess
279       * @param tap         of type Tap
280       * @return Fields
281       */
282      public Fields retrieveSinkFields( FlowProcess<Config> flowProcess, Tap tap )
283        {
284        return getSinkFields();
285        }
286    
287      /**
288       * Method presentSinkFields is called after the planner is invoked and all fields are resolved. This
289       * method presents to the Scheme the actual source fields after any planner intervention.
290       * <p/>
291       * This method is called after {@link #retrieveSinkFields(cascading.flow.FlowProcess, cascading.tap.Tap)}.
292       *
293       * @param flowProcess of type FlowProcess
294       * @param tap         of type Tap
295       * @param fields      of type Fields
296       */
297      public void presentSinkFields( FlowProcess<Config> flowProcess, Tap tap, Fields fields )
298        {
299        presentSinkFieldsInternal( fields );
300        }
301    
302      protected void presentSinkFieldsInternal( Fields fields )
303        {
304        if( getSinkFields().equals( Fields.ALL ) )
305          setSinkFields( fields );
306        }
307    
308      /**
309       * Method sourceInit initializes this instance as a source.
310       * <p/>
311       * This method is executed client side as a means to provide necessary configuration parameters
312       * used by the underlying platform.
313       * <p/>
314       * It is not intended to initialize resources that would be necessary during the execution of this
315       * class, like a "formatter" or "parser".
316       * <p/>
317       * See {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)} if resources much be initialized
318       * before use. And {@link #sourceCleanup(cascading.flow.FlowProcess, SourceCall)} if resources must be
319       * destroyed after use.
320       *
321       * @param flowProcess of type FlowProcess
322       * @param tap         of type Tap
323       * @param conf        of type Config
324       */
325      public abstract void sourceConfInit( FlowProcess<Config> flowProcess, Tap<Config, Input, Output> tap, Config conf );
326    
327      /**
328       * Method sinkInit initializes this instance as a sink.
329       * <p/>
330       * This method is executed client side as a means to provide necessary configuration parameters
331       * used by the underlying platform.
332       * <p/>
333       * It is not intended to initialize resources that would be necessary during the execution of this
334       * class, like a "formatter" or "parser".
335       * <p/>
336       * See {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)} if resources much be initialized
337       * before use. And {@link #sinkCleanup(cascading.flow.FlowProcess, SinkCall)} if resources must be
338       * destroyed after use.
339       *
340       * @param flowProcess of type FlowProcess
341       * @param tap         of type Tap
342       * @param conf        of type Config
343       */
344      public abstract void sinkConfInit( FlowProcess<Config> flowProcess, Tap<Config, Input, Output> tap, Config conf );
345    
346      /**
347       * Method sourcePrepare is used to initialize resources needed during each call of
348       * {@link #source(cascading.flow.FlowProcess, SourceCall)}.
349       * <p/>
350       * Be sure to place any initialized objects in the {@code SourceContext} so each instance
351       * will remain threadsafe.
352       *
353       * @param flowProcess of type FlowProcess
354       * @param sourceCall  of type SourceCall<SourceContext, Input>
355       */
356      public void sourcePrepare( FlowProcess<Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException
357        {
358        }
359    
360      /**
361       * Method source will read a new "record" or value from {@link cascading.scheme.SourceCall#getInput()} and populate
362       * the available {@link Tuple} via {@link cascading.scheme.SourceCall#getIncomingEntry()} and return {@code true}
363       * on success or {@code false} if no more values available.
364       * <p/>
365       * It's ok to set a new Tuple instance on the {@code incomingEntry} {@link cascading.tuple.TupleEntry}, or
366       * to simply re-use the existing instance.
367       * <p/>
368       * Note this is only time it is safe to modify a Tuple instance handed over via a method call.
369       * <p/>
370       * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular
371       * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to
372       * any applicable failure trap Tap.
373       *
374       * @param flowProcess of type FlowProcess
375       * @param sourceCall  of SourceCall
376       * @return returns {@code true} when a Tuple was successfully read
377       */
378      public abstract boolean source( FlowProcess<Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException;
379    
380      /**
381       * Method sourceCleanup is used to destroy resources created by
382       * {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)}.
383       *
384       * @param flowProcess of Process
385       * @param sourceCall  of type SourceCall<SourceContext, Input>
386       */
387      public void sourceCleanup( FlowProcess<Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException
388        {
389        }
390    
391      /**
392       * Method sinkPrepare is used to initialize resources needed during each call of
393       * {@link #sink(cascading.flow.FlowProcess, SinkCall)}.
394       * <p/>
395       * Be sure to place any initialized objects in the {@code SinkContext} so each instance
396       * will remain threadsafe.
397       *
398       * @param flowProcess of type FlowProcess
399       * @param sinkCall    of type SinkCall<SinkContext, Output>
400       */
401      public void sinkPrepare( FlowProcess<Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException
402        {
403        }
404    
405      /**
406       * Method sink writes out the given {@link Tuple} found on {@link cascading.scheme.SinkCall#getOutgoingEntry()} to
407       * the {@link cascading.scheme.SinkCall#getOutput()}.
408       * <p/>
409       * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular
410       * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to
411       * any applicable failure trap Tap. If not set, the incoming Tuple will be written instead.
412       *
413       * @param flowProcess of Process
414       * @param sinkCall    of SinkCall
415       */
416      public abstract void sink( FlowProcess<Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException;
417    
418      /**
419       * Method sinkCleanup is used to destroy resources created by
420       * {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)}.
421       *
422       * @param flowProcess of type FlowProcess
423       * @param sinkCall    of type SinkCall<SinkContext, Output>
424       */
425      public void sinkCleanup( FlowProcess<Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException
426        {
427        }
428    
429      @Override
430      public boolean equals( Object object )
431        {
432        if( this == object )
433          return true;
434        if( object == null || getClass() != object.getClass() )
435          return false;
436    
437        Scheme scheme = (Scheme) object;
438    
439        if( numSinkParts != scheme.numSinkParts )
440          return false;
441        if( sinkFields != null ? !sinkFields.equals( scheme.sinkFields ) : scheme.sinkFields != null )
442          return false;
443        if( sourceFields != null ? !sourceFields.equals( scheme.sourceFields ) : scheme.sourceFields != null )
444          return false;
445    
446        return true;
447        }
448    
449      @Override
450      public String toString()
451        {
452        if( getSinkFields().equals( getSourceFields() ) )
453          return getClass().getSimpleName() + "[" + getSourceFields().print() + "]";
454        else
455          return getClass().getSimpleName() + "[" + getSourceFields().print() + "->" + getSinkFields().print() + "]";
456        }
457    
458      public int hashCode()
459        {
460        int result;
461        result = sinkFields != null ? sinkFields.hashCode() : 0;
462        result = 31 * result + ( sourceFields != null ? sourceFields.hashCode() : 0 );
463        result = 31 * result + numSinkParts;
464        return result;
465        }
466      }