001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.operation; 022 023import cascading.flow.FlowProcess; 024 025/** 026 * A Buffer is similar to an {@link Aggregator} by the fact that it operates on unique groups of values. It differs 027 * by the fact that an {@link java.util.Iterator} is provided and it is the responsibility 028 * of the {@link #operate(cascading.flow.FlowProcess, BufferCall)} method to iterate overall all the input 029 * arguments returned by this Iterator, if any. 030 * <p/> 031 * For the case where a Buffer follows a CoGroup, the method {@link #operate(cascading.flow.FlowProcess, BufferCall)} 032 * will be called for every unique group whether or not there are values available to iterate over. This may be 033 * counter-intuitive for the case of an 'inner join' where the left or right stream may have a null grouping key value. 034 * Regardless, the current grouping value can be retrieved through {@link BufferCall#getGroup()}. 035 * <p/> 036 * Buffer is very useful when header or footer values need to be inserted into a grouping, or if values need to be 037 * inserted into the middle of the group values. For example, consider a stream of timestamps. A Buffer could 038 * be used to add missing entries, or to calculate running or moving averages over a smaller "window" within the grouping. 039 * <p/> 040 * By default, if a result is emitted from the Buffer before the argumentsIterator is started or after it is 041 * completed ({@code argumentsIterator.hasNext() == false}), non-grouping values are forced to null (to allow for header 042 * and footer tuple results). 043 * <p/> 044 * By setting {@link BufferCall#setRetainValues(boolean)} to {@code true} in the 045 * {@link Buffer#prepare(cascading.flow.FlowProcess, OperationCall)} method, the last seen Tuple values will not be 046 * nulled after completion and will be treated as the current incoming Tuple when merged with the Buffer result Tuple 047 * via the Every outgoing selector. 048 * <p/> 049 * There may be only one Buffer after a {@link cascading.pipe.GroupBy} or {@link cascading.pipe.CoGroup}. And there 050 * may not be any additional {@link cascading.pipe.Every} pipes before or after the buffers Every pipe instance. A 051 * {@link cascading.flow.planner.PlannerException} will be thrown if these rules are violated. 052 * <p/> 053 * Buffer implementations should be re-entrant. There is no guarantee a Buffer instance will be executed in a 054 * unique vm, or by a single thread. Also, note the Iterator will return the same {@link cascading.tuple.TupleEntry} 055 * instance, but with new values in its child {@link cascading.tuple.Tuple}. 056 * <p/> 057 * As of Cascading 2.5, if the previous CoGroup uses a {@link cascading.pipe.joiner.BufferJoin} as the 058 * {@link cascading.pipe.joiner.Joiner}, a Buffer may be used to implement differing Joiner strategies. 059 * <p/> 060 * Instead of calling {@link cascading.operation.BufferCall#getArgumentsIterator()} (which will return null), 061 * {@link cascading.operation.BufferCall#getJoinerClosure()} will return an {@link cascading.pipe.joiner.JoinerClosure} 062 * instance with direct access to each CoGrouped Iterator. 063 */ 064public interface Buffer<Context> extends Operation<Context> 065 { 066 /** 067 * Method operate is called once for each grouping. {@link BufferCall} passes in an {@link java.util.Iterator} 068 * that returns an argument {@link cascading.tuple.TupleEntry} for each value in the grouping defined by the 069 * argument selector on the parent Every pipe instance. 070 * <p/> 071 * TupleEntry entry, or entry.getTuple() should not be stored directly in a collection or modified. 072 * A copy of the tuple should be made via the {@code new Tuple( entry.getTuple() )} copy constructor. 073 * <p/> 074 * This method is called for every unique group, whether or not there are values in the arguments Iterator. 075 * 076 * @param flowProcess of type FlowProcess 077 * @param bufferCall of type BufferCall 078 */ 079 void operate( FlowProcess flowProcess, BufferCall<Context> bufferCall ); 080 }