001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.scheme;
022
023import java.io.IOException;
024import java.io.Serializable;
025
026import cascading.flow.FlowProcess;
027import cascading.tap.Tap;
028import cascading.tuple.Fields;
029import cascading.tuple.Tuple;
030import cascading.util.TraceUtil;
031import cascading.util.Traceable;
032
033/**
034 * A Scheme defines what is stored in a {@link Tap} instance by declaring the {@link Tuple}
035 * field names, and alternately parsing or rendering the incoming or outgoing {@link Tuple}
036 * stream, respectively.
037 * <p/>
038 * A Scheme defines the type of resource data will be sourced from or sinked to.
039 * <p/>
040 * The default sourceFields are {@link Fields#UNKNOWN} and the default sinkFields are {@link Fields#ALL}.
041 * <p/>
042 * Any given sourceFields only label the values in the {@link Tuple}s as they are sourced.
043 * It does not necessarily filter the output since a given implementation may choose to
044 * collapse values and ignore keys depending on the format.
045 * <p/>
046 * If the sinkFields are {@link Fields#ALL}, the Cascading planner will attempt to resolve the actual field names
047 * and make them available via the {@link cascading.scheme.SinkCall#getOutgoingEntry()} method. Sometimes this may
048 * not be possible (in the case the {@link Tap#openForWrite(cascading.flow.FlowProcess)} method is called from user
049 * code directly (without planner intervention).
050 * <p/>
051 * If the sinkFields are a valid selector, the {@link #sink(cascading.flow.FlowProcess, SinkCall)} method will
052 * only see the fields expected.
053 * <p/>
054 * Setting the {@code numSinkParts} value to 1 (one) attempts to ensure the output resource has only one part.
055 * In the case of MapReduce, this is only a suggestion for the Map side, on the Reduce side it does this by
056 * setting the number of reducers to the given value. This may affect performance, so be cautioned.
057 * </p>
058 * Note that setting numSinkParts does not force the planner to insert a final Reduce operation in the job, so
059 * numSinkParts may be ignored entirely if the final job is Map only. To force the Flow to have a final Reduce,
060 * add a {@link cascading.pipe.GroupBy} to the assembly before sinking.
061 */
062public abstract class Scheme<Config, Input, Output, SourceContext, SinkContext> implements Serializable, Traceable
063  {
064  /** Field sinkFields */
065  Fields sinkFields = Fields.ALL;
066  /** Field sourceFields */
067  Fields sourceFields = Fields.UNKNOWN;
068  /** Field numSinkParts */
069  int numSinkParts;
070  /** Field trace */
071  private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override
072
073  /** Constructor Scheme creates a new Scheme instance. */
074  protected Scheme()
075    {
076    }
077
078  /**
079   * Constructor Scheme creates a new Scheme instance.
080   *
081   * @param sourceFields of type Fields
082   */
083  protected Scheme( Fields sourceFields )
084    {
085    setSourceFields( sourceFields );
086    }
087
088  /**
089   * Constructor Scheme creates a new Scheme instance.
090   *
091   * @param sourceFields of type Fields
092   * @param numSinkParts of type int
093   */
094  protected Scheme( Fields sourceFields, int numSinkParts )
095    {
096    setSourceFields( sourceFields );
097    this.numSinkParts = numSinkParts;
098    }
099
100  /**
101   * Constructor Scheme creates a new Scheme instance.
102   *
103   * @param sourceFields of type Fields
104   * @param sinkFields   of type Fields
105   */
106  protected Scheme( Fields sourceFields, Fields sinkFields )
107    {
108    setSourceFields( sourceFields );
109    setSinkFields( sinkFields );
110    }
111
112  /**
113   * Constructor Scheme creates a new Scheme instance.
114   *
115   * @param sourceFields of type Fields
116   * @param sinkFields   of type Fields
117   * @param numSinkParts of type int
118   */
119  protected Scheme( Fields sourceFields, Fields sinkFields, int numSinkParts )
120    {
121    setSourceFields( sourceFields );
122    setSinkFields( sinkFields );
123    this.numSinkParts = numSinkParts;
124    }
125
126  /**
127   * Method getSinkFields returns the sinkFields of this Scheme object.
128   *
129   * @return the sinkFields (type Fields) of this Scheme object.
130   */
131  public Fields getSinkFields()
132    {
133    return sinkFields;
134    }
135
136  /**
137   * Method setSinkFields sets the sinkFields of this Scheme object.
138   *
139   * @param sinkFields the sinkFields of this Scheme object.
140   */
141  public void setSinkFields( Fields sinkFields )
142    {
143    if( sinkFields.isUnknown() )
144      this.sinkFields = Fields.ALL;
145    else
146      this.sinkFields = sinkFields;
147    }
148
149  /**
150   * Method getSourceFields returns the sourceFields of this Scheme object.
151   *
152   * @return the sourceFields (type Fields) of this Scheme object.
153   */
154  public Fields getSourceFields()
155    {
156    return sourceFields;
157    }
158
159  /**
160   * Method setSourceFields sets the sourceFields of this Scheme object.
161   *
162   * @param sourceFields the sourceFields of this Scheme object.
163   */
164  public void setSourceFields( Fields sourceFields )
165    {
166    if( sourceFields.isAll() )
167      this.sourceFields = Fields.UNKNOWN;
168    else
169      this.sourceFields = sourceFields;
170    }
171
172  /**
173   * Method getNumSinkParts returns the numSinkParts of this Scheme object.
174   *
175   * @return the numSinkParts (type int) of this Scheme object.
176   */
177  public int getNumSinkParts()
178    {
179    return numSinkParts;
180    }
181
182  /**
183   * Method setNumSinkParts sets the numSinkParts of this Scheme object.
184   *
185   * @param numSinkParts the numSinkParts of this Scheme object.
186   */
187  public void setNumSinkParts( int numSinkParts )
188    {
189    this.numSinkParts = numSinkParts;
190    }
191
192  @Override
193  public String getTrace()
194    {
195    return trace;
196    }
197
198  /**
199   * Method isSymmetrical returns {@code true} if the sink fields equal the source fields. That is, this
200   * scheme sources the same fields as it sinks.
201   *
202   * @return the symmetrical (type boolean) of this Scheme object.
203   */
204  public boolean isSymmetrical()
205    {
206    return getSourceFields().equals( Fields.UNKNOWN ) && getSinkFields().equals( Fields.ALL ) || getSinkFields().equals( getSourceFields() );
207    }
208
209  /**
210   * Method isSource returns true if this Scheme instance can be used as a source.
211   *
212   * @return boolean
213   */
214  public boolean isSource()
215    {
216    return true;
217    }
218
219  /**
220   * Method isSink returns true if this Scheme instance can be used as a sink.
221   *
222   * @return boolean
223   */
224  public boolean isSink()
225    {
226    return true;
227    }
228
229  /**
230   * Method retrieveSourceFields notifies a Scheme when it is appropriate to dynamically
231   * update the fields it sources. By default the current declared fields are returned.
232   * <p/>
233   * The {@code FlowProcess} presents all known properties resolved by the current planner.
234   * <p/>
235   * The {@code tap} instance is the parent {@link Tap} for this Scheme instance.
236   *
237   * @param flowProcess of type FlowProcess
238   * @param tap         of type Tap
239   * @return Fields
240   */
241  public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess, Tap tap )
242    {
243    return getSourceFields();
244    }
245
246  /**
247   * Method presentSourceFields is called after the planner is invoked and all fields are resolved. This
248   * method presents to the Scheme the actual source fields after any planner intervention.
249   * <p/>
250   * This method is called after {@link #retrieveSourceFields(cascading.flow.FlowProcess, cascading.tap.Tap)}.
251   *
252   * @param flowProcess of type FlowProcess
253   * @param tap         of type Tap
254   * @param fields      of type Fields
255   */
256  public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Tap tap, Fields fields )
257    {
258    presentSourceFieldsInternal( fields );
259    }
260
261  protected void presentSourceFieldsInternal( Fields fields )
262    {
263    if( getSourceFields().equals( Fields.UNKNOWN ) )
264      setSourceFields( fields );
265    }
266
267  /**
268   * Method retrieveSinkFields notifies a Scheme when it is appropriate to dynamically
269   * update the fields it sources. By default the current declared fields are returned.
270   * <p/>
271   * The {@code FlowProcess} presents all known properties resolved by the current planner.
272   * <p/>
273   * The {@code tap} instance is the parent {@link Tap} for this Scheme instance.
274   *
275   * @param flowProcess of type FlowProcess
276   * @param tap         of type Tap
277   * @return Fields
278   */
279  public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess, Tap tap )
280    {
281    return getSinkFields();
282    }
283
284  /**
285   * Method presentSinkFields is called after the planner is invoked and all fields are resolved. This
286   * method presents to the Scheme the actual source fields after any planner intervention.
287   * <p/>
288   * This method is called after {@link #retrieveSinkFields(cascading.flow.FlowProcess, cascading.tap.Tap)}.
289   *
290   * @param flowProcess of type FlowProcess
291   * @param tap         of type Tap
292   * @param fields      of type Fields
293   */
294  public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Tap tap, Fields fields )
295    {
296    presentSinkFieldsInternal( fields );
297    }
298
299  protected void presentSinkFieldsInternal( Fields fields )
300    {
301    if( getSinkFields().equals( Fields.ALL ) )
302      setSinkFields( fields );
303    }
304
305  /**
306   * Method sourceInit initializes this instance as a source.
307   * <p/>
308   * This method is executed client side as a means to provide necessary configuration parameters
309   * used by the underlying platform.
310   * <p/>
311   * It is not intended to initialize resources that would be necessary during the execution of this
312   * class, like a "formatter" or "parser".
313   * <p/>
314   * See {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)} if resources much be initialized
315   * before use. And {@link #sourceCleanup(cascading.flow.FlowProcess, SourceCall)} if resources must be
316   * destroyed after use.
317   *
318   * @param flowProcess of type FlowProcess
319   * @param tap         of type Tap
320   * @param conf        of type Config
321   */
322  public abstract void sourceConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf );
323
324  /**
325   * Method sinkInit initializes this instance as a sink.
326   * <p/>
327   * This method is executed client side as a means to provide necessary configuration parameters
328   * used by the underlying platform.
329   * <p/>
330   * It is not intended to initialize resources that would be necessary during the execution of this
331   * class, like a "formatter" or "parser".
332   * <p/>
333   * See {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)} if resources much be initialized
334   * before use. And {@link #sinkCleanup(cascading.flow.FlowProcess, SinkCall)} if resources must be
335   * destroyed after use.
336   *
337   * @param flowProcess of type FlowProcess
338   * @param tap         of type Tap
339   * @param conf        of type Config
340   */
341  public abstract void sinkConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf );
342
343  /**
344   * Method sourcePrepare is used to initialize resources needed during each call of
345   * {@link #source(cascading.flow.FlowProcess, SourceCall)}.
346   * <p/>
347   * Be sure to place any initialized objects in the {@code SourceContext} so each instance
348   * will remain threadsafe.
349   *
350   * @param flowProcess of type FlowProcess
351   * @param sourceCall  of type SourceCall<SourceContext, Input>
352   */
353  public void sourcePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException
354    {
355    }
356
357  /**
358   * Method source will read a new "record" or value from {@link cascading.scheme.SourceCall#getInput()} and populate
359   * the available {@link Tuple} via {@link cascading.scheme.SourceCall#getIncomingEntry()} and return {@code true}
360   * on success or {@code false} if no more values available.
361   * <p/>
362   * It's ok to set a new Tuple instance on the {@code incomingEntry} {@link cascading.tuple.TupleEntry}, or
363   * to simply re-use the existing instance.
364   * <p/>
365   * Note this is only time it is safe to modify a Tuple instance handed over via a method call.
366   * <p/>
367   * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular
368   * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to
369   * any applicable failure trap Tap.
370   *
371   * @param flowProcess of type FlowProcess
372   * @param sourceCall  of SourceCall
373   * @return returns {@code true} when a Tuple was successfully read
374   */
375  public abstract boolean source( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException;
376
377  /**
378   * Method sourceCleanup is used to destroy resources created by
379   * {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)}.
380   *
381   * @param flowProcess of Process
382   * @param sourceCall  of type SourceCall<SourceContext, Input>
383   */
384  public void sourceCleanup( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException
385    {
386    }
387
388  /**
389   * Method sinkPrepare is used to initialize resources needed during each call of
390   * {@link #sink(cascading.flow.FlowProcess, SinkCall)}.
391   * <p/>
392   * Be sure to place any initialized objects in the {@code SinkContext} so each instance
393   * will remain threadsafe.
394   *
395   * @param flowProcess of type FlowProcess
396   * @param sinkCall    of type SinkCall<SinkContext, Output>
397   */
398  public void sinkPrepare( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException
399    {
400    }
401
402  /**
403   * Method sink writes out the given {@link Tuple} found on {@link cascading.scheme.SinkCall#getOutgoingEntry()} to
404   * the {@link cascading.scheme.SinkCall#getOutput()}.
405   * <p/>
406   * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular
407   * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to
408   * any applicable failure trap Tap. If not set, the incoming Tuple will be written instead.
409   *
410   * @param flowProcess of Process
411   * @param sinkCall    of SinkCall
412   */
413  public abstract void sink( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException;
414
415  /**
416   * Method sinkCleanup is used to destroy resources created by
417   * {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)}.
418   *
419   * @param flowProcess of type FlowProcess
420   * @param sinkCall    of type SinkCall<SinkContext, Output>
421   */
422  public void sinkCleanup( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException
423    {
424    }
425
426  @Override
427  public boolean equals( Object object )
428    {
429    if( this == object )
430      return true;
431    if( object == null || getClass() != object.getClass() )
432      return false;
433
434    Scheme scheme = (Scheme) object;
435
436    if( numSinkParts != scheme.numSinkParts )
437      return false;
438    if( sinkFields != null ? !sinkFields.equals( scheme.sinkFields ) : scheme.sinkFields != null )
439      return false;
440    if( sourceFields != null ? !sourceFields.equals( scheme.sourceFields ) : scheme.sourceFields != null )
441      return false;
442
443    return true;
444    }
445
446  @Override
447  public String toString()
448    {
449    if( getSinkFields().equals( getSourceFields() ) )
450      return getClass().getSimpleName() + "[" + getSourceFields().print() + "]";
451    else
452      return getClass().getSimpleName() + "[" + getSourceFields().print() + "->" + getSinkFields().print() + "]";
453    }
454
455  public int hashCode()
456    {
457    int result;
458    result = sinkFields != null ? sinkFields.hashCode() : 0;
459    result = 31 * result + ( sourceFields != null ? sourceFields.hashCode() : 0 );
460    result = 31 * result + numSinkParts;
461    return result;
462    }
463  }