001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe;
022
023import java.beans.ConstructorProperties;
024
025import cascading.operation.Aggregator;
026import cascading.operation.Filter;
027import cascading.operation.Function;
028import cascading.tuple.Fields;
029import cascading.tuple.Tuple;
030
031/**
032 * The GroupBy pipe groups the {@link Tuple} stream by the given groupFields.
033 * </p>
034 * If more than one {@link Pipe} instance is provided on the constructor, all branches will be merged. It is required
035 * that all Pipe instances output the same field names, otherwise the {@link cascading.flow.FlowConnector} will fail to create a
036 * {@link cascading.flow.Flow} instance. Again, the Pipe instances are merged together as if one Tuple stream and not joined.
037 * See {@link CoGroup} for joining by common fields.
038 * </p>
039 * Typically an {@link Every} follows GroupBy to apply an {@link Aggregator} function to every grouping. The
040 * {@link Each} operator may also follow GroupBy to apply a {@link Function} or {@link Filter} to the resulting
041 * stream. But an Each cannot come immediately before an Every.
042 * <p/>
043 * Optionally a stream can be further sorted by providing sortFields. This allows an Aggregator to receive
044 * values in the order of the sortedFields.
045 * <p/>
046 * Note that local sorting always happens on the groupFields, sortFields are a secondary sorting on the grouped values within the
047 * current grouping. sortFields is particularly useful if the Aggregators following the GroupBy would like to see their arguments
048 * in order.
049 * <p/>
050 * For more control over sorting at the group or secondary sort level, use {@link cascading.tuple.Fields}
051 * containing {@link java.util.Comparator} instances for the appropriate fields when setting the groupFields or
052 * sortFields values. Fields allows you to set a custom {@link java.util.Comparator} instance for each field name or
053 * position. It is required that each Comparator class also be {@link java.io.Serializable}.
054 * <p/>
055 * It should be noted for MapReduce systems, distributed group sorting is not 'total'. That is groups are sorted
056 * as seen by each Reducer, but they are not sorted across Reducers. See the MapReduce algorithm for details.
057 * <p/>
058 * See the {@link cascading.tuple.Hasher} interface when a custom {@link java.util.Comparator} on the grouping keys is
059 * being provided that makes two values with differing hashCode values equal. For example,
060 * {@code new BigDecimal( 100.0D )} and {@code new Double 100.0D )} are equal using a custom Comparator, but
061 * {@link Object#hashCode()} will be different, thus forcing each value into differing partitions.
062 * <p/>
063 * Note that grouping one String key with a lowercase value with another String key with an uppercase value using a
064 * "case insensitive" Comparator will not have consistent results. The grouping will execute and be correct,
065 * but the actual values in the key columns may be replaced with "equivalent" values from other streams.
066 * <p/>
067 * That is, if two streams are merged and then grouped on a key, where one stream the key values are uppercase and the
068 * other stream values are lowercase, the resulting key value for the grouping may arbitrarily be either upper or
069 * lower case.
070 * <p/>
071 * If the original key values must be retained, consider normalizing the keys with a Function and then grouping on the
072 * resulting field.
073 */
074public class GroupBy extends Splice implements Group
075  {
076  /**
077   * Creates a new GroupBy instance that will group on {@link Fields#ALL} fields.
078   *
079   * @param pipe of type Pipe
080   */
081  @ConstructorProperties({"pipe"})
082  public GroupBy( Pipe pipe )
083    {
084    super( pipe );
085    }
086
087  /**
088   * Creates a new GroupBy instance that will group on the given groupFields field names.
089   *
090   * @param pipe        of type Pipe
091   * @param groupFields of type Fields
092   */
093  @ConstructorProperties({"pipe", "groupFields"})
094  public GroupBy( Pipe pipe, Fields groupFields )
095    {
096    super( pipe, groupFields );
097    }
098
099  /**
100   * Creates a new GroupBy instance that will group on the given groupFields field names.
101   *
102   * @param pipe         of type Pipe
103   * @param groupFields  of type Fields
104   * @param reverseOrder of type boolean
105   */
106  @ConstructorProperties({"pipe", "groupFields", "reverseOrder"})
107  public GroupBy( Pipe pipe, Fields groupFields, boolean reverseOrder )
108    {
109    super( pipe, groupFields, null, reverseOrder );
110    }
111
112  /**
113   * Creates a new GroupBy instance that will group on the given groupFields field names.
114   *
115   * @param groupName   of type String
116   * @param pipe        of type Pipe
117   * @param groupFields of type Fields
118   */
119  @ConstructorProperties({"groupName", "pipe", "groupFields"})
120  public GroupBy( String groupName, Pipe pipe, Fields groupFields )
121    {
122    super( groupName, pipe, groupFields );
123    }
124
125  /**
126   * Creates a new GroupBy instance that will group on the given groupFields field names.
127   *
128   * @param groupName    of type String
129   * @param pipe         of type Pipe
130   * @param groupFields  of type Fields
131   * @param reverseOrder of type boolean
132   */
133  @ConstructorProperties({"groupName", "pipe", "groupFields", "reverseOrder"})
134  public GroupBy( String groupName, Pipe pipe, Fields groupFields, boolean reverseOrder )
135    {
136    super( groupName, pipe, groupFields, null, reverseOrder );
137    }
138
139  /**
140   * Creates a new GroupBy instance that will group on the given groupFields field names
141   * and sorts the grouped values on the given sortFields fields names.
142   *
143   * @param pipe        of type Pipe
144   * @param groupFields of type Fields
145   * @param sortFields  of type Fields
146   */
147  @ConstructorProperties({"pipe", "groupFields", "sortFields"})
148  public GroupBy( Pipe pipe, Fields groupFields, Fields sortFields )
149    {
150    super( pipe, groupFields, sortFields );
151    }
152
153  /**
154   * Creates a new GroupBy instance that will group on the given groupFields field names
155   * and sorts the grouped values on the given sortFields fields names.
156   *
157   * @param groupName   of type String
158   * @param pipe        of type Pipe
159   * @param groupFields of type Fields
160   * @param sortFields  of type Fields
161   */
162  @ConstructorProperties({"groupName", "pipe", "groupFields", "sortFields"})
163  public GroupBy( String groupName, Pipe pipe, Fields groupFields, Fields sortFields )
164    {
165    super( groupName, pipe, groupFields, sortFields );
166    }
167
168  /**
169   * Creates a new GroupBy instance that will group on the given groupFields field names
170   * and sorts the grouped values on the given sortFields fields names.
171   *
172   * @param pipe         of type Pipe
173   * @param groupFields  of type Fields
174   * @param sortFields   of type Fields
175   * @param reverseOrder of type boolean
176   */
177  @ConstructorProperties({"pipe", "groupFields", "sortFields", "reverseOrder"})
178  public GroupBy( Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder )
179    {
180    super( pipe, groupFields, sortFields, reverseOrder );
181    }
182
183  /**
184   * Creates a new GroupBy instance that will group on the given groupFields field names
185   * and sorts the grouped values on the given sortFields fields names.
186   *
187   * @param groupName    of type String
188   * @param pipe         of type Pipe
189   * @param groupFields  of type Fields
190   * @param sortFields   of type Fields
191   * @param reverseOrder of type boolean
192   */
193  @ConstructorProperties({"groupName", "pipe", "groupFields", "sortFields", "reverseOrder"})
194  public GroupBy( String groupName, Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder )
195    {
196    super( groupName, pipe, groupFields, sortFields, reverseOrder );
197    }
198
199  //////////
200  // MERGE
201  //////////
202
203  /**
204   * Creates a new GroupBy instance that will first merge the given pipes, then group on Fields.FIRST.
205   * <p/>
206   * The assumption is that the first fields in all streams are logically the same field, which should be true
207   * as merging assumes all incoming streams have the same fields in the same order.
208   * <p/>
209   * To get the best performance, choose a field(s) that has many unique values, by using the constructor that takes
210   * a groupFields argument. If the first field has few unique values, data will only be sent to that number of reducers,
211   * or less, in the cluster, making the reduce phase a larger bottleneck.
212   *
213   * @param pipes of type Pipe
214   */
215  @ConstructorProperties({"pipes"})
216  public GroupBy( Pipe[] pipes )
217    {
218    super( pipes, Fields.FIRST );
219    }
220
221  /**
222   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names.
223   *
224   * @param pipes       of type Pipe
225   * @param groupFields of type Fields
226   */
227  @ConstructorProperties({"pipes", "groupFields"})
228  public GroupBy( Pipe[] pipes, Fields groupFields )
229    {
230    super( pipes, groupFields );
231    }
232
233  /**
234   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names.
235   *
236   * @param lhsPipe     of type Pipe
237   * @param rhsPipe     of type Pipe
238   * @param groupFields of type Fields
239   */
240  public GroupBy( Pipe lhsPipe, Pipe rhsPipe, Fields groupFields )
241    {
242    super( Pipe.pipes( lhsPipe, rhsPipe ), groupFields );
243    }
244
245  /**
246   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names.
247   *
248   * @param groupName   of type String
249   * @param pipes       of type Pipe
250   * @param groupFields of type Fields
251   */
252  @ConstructorProperties({"groupName", "pipes", "groupFields"})
253  public GroupBy( String groupName, Pipe[] pipes, Fields groupFields )
254    {
255    super( groupName, pipes, groupFields );
256    }
257
258  /**
259   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names.
260   *
261   * @param groupName   of type String
262   * @param lhsPipe     of type Pipe
263   * @param rhsPipe     of type Pipe
264   * @param groupFields of type Fields
265   */
266  public GroupBy( String groupName, Pipe lhsPipe, Pipe rhsPipe, Fields groupFields )
267    {
268    super( groupName, Pipe.pipes( lhsPipe, rhsPipe ), groupFields );
269    }
270
271  /**
272   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names
273   * and sorts the grouped values on the given sortFields fields names.
274   *
275   * @param pipes       of type Pipe
276   * @param groupFields of type Fields
277   * @param sortFields  of type Fields
278   */
279  @ConstructorProperties({"pipes", "groupFields", "sortFields"})
280  public GroupBy( Pipe[] pipes, Fields groupFields, Fields sortFields )
281    {
282    super( pipes, groupFields, sortFields );
283    }
284
285  /**
286   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names
287   * and sorts the grouped values on the given sortFields fields names.
288   *
289   * @param groupName   of type String
290   * @param pipes       of type Pipe
291   * @param groupFields of type Fields
292   * @param sortFields  of type Fields
293   */
294  @ConstructorProperties({"groupName", "pipes", "groupFields", "sortFields"})
295  public GroupBy( String groupName, Pipe[] pipes, Fields groupFields, Fields sortFields )
296    {
297    super( groupName, pipes, groupFields, sortFields );
298    }
299
300  /**
301   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names
302   * and sorts the grouped values on the given sortFields fields names.
303   *
304   * @param pipes        of type Pipe
305   * @param groupFields  of type Fields
306   * @param sortFields   of type Fields
307   * @param reverseOrder of type boolean
308   */
309  @ConstructorProperties({"pipes", "groupFields", "sortFields", "reverseOrder"})
310  public GroupBy( Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder )
311    {
312    super( pipes, groupFields, sortFields, reverseOrder );
313    }
314
315  /**
316   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names
317   * and sorts the grouped values on the given sortFields fields names.
318   *
319   * @param groupName    of type String
320   * @param pipes        of type Pipe
321   * @param groupFields  of type Fields
322   * @param sortFields   of type Fields
323   * @param reverseOrder of type boolean
324   */
325  @ConstructorProperties({"groupName", "pipes", "groupFields", "sortFields", "reverseOrder"})
326  public GroupBy( String groupName, Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder )
327    {
328    super( groupName, pipes, groupFields, sortFields, reverseOrder );
329    }
330  }