001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation;
022    
023    import cascading.flow.FlowProcess;
024    
025    /**
026     * A Buffer is similar to an {@link Aggregator} by the fact that it operates on unique groups of values. It differs
027     * by the fact that an {@link java.util.Iterator} is provided and it is the responsibility
028     * of the {@link #operate(cascading.flow.FlowProcess, BufferCall)} method to iterate overall all the input
029     * arguments returned by this Iterator, if any.
030     * <p/>
031     * For the case where a Buffer follows a CoGroup, the method {@link #operate(cascading.flow.FlowProcess, BufferCall)}
032     * will be called for every unique group whether or not there are values available to iterate over. This may be
033     * counter-intuitive for the case of an 'inner join' where the left or right stream may have a null grouping key value.
034     * Regardless, the current grouping value can be retrieved through {@link BufferCall#getGroup()}.
035     * <p/>
036     * Buffer is very useful when header or footer values need to be inserted into a grouping, or if values need to be
037     * inserted into the middle of the group values. For example, consider a stream of timestamps. A Buffer could
038     * be used to add missing entries, or to calculate running or moving averages over a smaller "window" within the grouping.
039     * <p/>
040     * By default, if a result is emitted from the Buffer before the argumentsIterator is started or after it is
041     * completed ({@code argumentsIterator.hasNext() == false}), non-grouping values are forced to null (to allow for header
042     * and footer tuple results).
043     * <p/>
044     * By setting {@link BufferCall#setRetainValues(boolean)} to {@code true} in the
045     * {@link Buffer#prepare(cascading.flow.FlowProcess, OperationCall)} method, the last seen Tuple values will not be
046     * nulled after completion and will be treated as the current incoming Tuple when merged with the Buffer result Tuple
047     * via the Every outgoing selector.
048     * <p/>
049     * There may be only one Buffer after a {@link cascading.pipe.GroupBy} or {@link cascading.pipe.CoGroup}. And there
050     * may not be any additional {@link cascading.pipe.Every} pipes before or after the buffers Every pipe instance. A
051     * {@link cascading.flow.planner.PlannerException} will be thrown if these rules are violated.
052     * <p/>
053     * Buffer implementations should be re-entrant. There is no guarantee a Buffer instance will be executed in a
054     * unique vm, or by a single thread. Also, note the Iterator will return the same {@link cascading.tuple.TupleEntry}
055     * instance, but with new values in its child {@link cascading.tuple.Tuple}.
056     * <p/>
057     * As of Cascading 2.5, if the previous CoGroup uses a {@link cascading.pipe.joiner.BufferJoin} as the
058     * {@link cascading.pipe.joiner.Joiner}, a Buffer may be used to implement differing Joiner strategies.
059     * <p/>
060     * Instead of calling {@link cascading.operation.BufferCall#getArgumentsIterator()} (which will return null),
061     * {@link cascading.operation.BufferCall#getJoinerClosure()} will return an {@link cascading.pipe.joiner.JoinerClosure}
062     * instance with direct access to each CoGrouped Iterator.
063     */
064    public interface Buffer<Context> extends Operation<Context>
065      {
066      /**
067       * Method operate is called once for each grouping. {@link BufferCall} passes in an {@link java.util.Iterator}
068       * that returns an argument {@link cascading.tuple.TupleEntry} for each value in the grouping defined by the
069       * argument selector on the parent Every pipe instance.
070       * <p/>
071       * TupleEntry entry, or entry.getTuple() should not be stored directly in a collection or modified.
072       * A copy of the tuple should be made via the {@code new Tuple( entry.getTuple() )} copy constructor.
073       * <p/>
074       * This method is called for every unique group, whether or not there are values in the arguments Iterator.
075       *
076       * @param flowProcess of type FlowProcess
077       * @param bufferCall  of type BufferCall
078       */
079      void operate( FlowProcess flowProcess, BufferCall<Context> bufferCall );
080      }