001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation;
022
023import cascading.flow.FlowProcess;
024
025/**
026 * A Buffer is similar to an {@link Aggregator} by the fact that it operates on unique groups of values. It differs
027 * by the fact that an {@link java.util.Iterator} is provided and it is the responsibility
028 * of the {@link #operate(cascading.flow.FlowProcess, BufferCall)} method to iterate overall all the input
029 * arguments returned by this Iterator, if any.
030 * <p/>
031 * For the case where a Buffer follows a CoGroup, the method {@link #operate(cascading.flow.FlowProcess, BufferCall)}
032 * will be called for every unique group whether or not there are values available to iterate over. This may be
033 * counter-intuitive for the case of an 'inner join' where the left or right stream may have a null grouping key value.
034 * Regardless, the current grouping value can be retrieved through {@link BufferCall#getGroup()}.
035 * <p/>
036 * Buffer is very useful when header or footer values need to be inserted into a grouping, or if values need to be
037 * inserted into the middle of the group values. For example, consider a stream of timestamps. A Buffer could
038 * be used to add missing entries, or to calculate running or moving averages over a smaller "window" within the grouping.
039 * <p/>
040 * By default, if a result is emitted from the Buffer before the argumentsIterator is started or after it is
041 * completed ({@code argumentsIterator.hasNext() == false}), non-grouping values are forced to null (to allow for header
042 * and footer tuple results).
043 * <p/>
044 * By setting {@link BufferCall#setRetainValues(boolean)} to {@code true} in the
045 * {@link Buffer#prepare(cascading.flow.FlowProcess, OperationCall)} method, the last seen Tuple values will not be
046 * nulled after completion and will be treated as the current incoming Tuple when merged with the Buffer result Tuple
047 * via the Every outgoing selector.
048 * <p/>
049 * There may be only one Buffer after a {@link cascading.pipe.GroupBy} or {@link cascading.pipe.CoGroup}. And there
050 * may not be any additional {@link cascading.pipe.Every} pipes before or after the buffers Every pipe instance. A
051 * {@link cascading.flow.planner.PlannerException} will be thrown if these rules are violated.
052 * <p/>
053 * Buffer implementations should be re-entrant. There is no guarantee a Buffer instance will be executed in a
054 * unique vm, or by a single thread. Also, note the Iterator will return the same {@link cascading.tuple.TupleEntry}
055 * instance, but with new values in its child {@link cascading.tuple.Tuple}.
056 * <p/>
057 * As of Cascading 2.5, if the previous CoGroup uses a {@link cascading.pipe.joiner.BufferJoin} as the
058 * {@link cascading.pipe.joiner.Joiner}, a Buffer may be used to implement differing Joiner strategies.
059 * <p/>
060 * Instead of calling {@link cascading.operation.BufferCall#getArgumentsIterator()} (which will return null),
061 * {@link cascading.operation.BufferCall#getJoinerClosure()} will return an {@link cascading.pipe.joiner.JoinerClosure}
062 * instance with direct access to each CoGrouped Iterator.
063 */
064public interface Buffer<Context> extends Operation<Context>
065  {
066  /**
067   * Method operate is called once for each grouping. {@link BufferCall} passes in an {@link java.util.Iterator}
068   * that returns an argument {@link cascading.tuple.TupleEntry} for each value in the grouping defined by the
069   * argument selector on the parent Every pipe instance.
070   * <p/>
071   * TupleEntry entry, or entry.getTuple() should not be stored directly in a collection or modified.
072   * A copy of the tuple should be made via the {@code new Tuple( entry.getTuple() )} copy constructor.
073   * <p/>
074   * This method is called for every unique group, whether or not there are values in the arguments Iterator.
075   *
076   * @param flowProcess of type FlowProcess
077   * @param bufferCall  of type BufferCall
078   */
079  void operate( FlowProcess flowProcess, BufferCall<Context> bufferCall );
080  }