001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe;
022    
023    import java.beans.ConstructorProperties;
024    
025    import cascading.operation.Aggregator;
026    import cascading.operation.Filter;
027    import cascading.operation.Function;
028    import cascading.tuple.Fields;
029    import cascading.tuple.Tuple;
030    
031    /**
032     * The GroupBy pipe groups the {@link Tuple} stream by the given groupFields.
033     * </p>
034     * If more than one {@link Pipe} instance is provided on the constructor, all branches will be merged. It is required
035     * that all Pipe instances output the same field names, otherwise the {@link cascading.flow.FlowConnector} will fail to create a
036     * {@link cascading.flow.Flow} instance. Again, the Pipe instances are merged together as if one Tuple stream and not joined.
037     * See {@link CoGroup} for joining by common fields.
038     * </p>
039     * Typically an {@link Every} follows GroupBy to apply an {@link Aggregator} function to every grouping. The
040     * {@link Each} operator may also follow GroupBy to apply a {@link Function} or {@link Filter} to the resulting
041     * stream. But an Each cannot come immediately before an Every.
042     * <p/>
043     * Optionally a stream can be further sorted by providing sortFields. This allows an Aggregator to receive
044     * values in the order of the sortedFields.
045     * <p/>
046     * Note that local sorting always happens on the groupFields, sortFields are a secondary sorting on the grouped values within the
047     * current grouping. sortFields is particularly useful if the Aggregators following the GroupBy would like to see their arguments
048     * in order.
049     * <p/>
050     * For more control over sorting at the group or secondary sort level, use {@link cascading.tuple.Fields}
051     * containing {@link java.util.Comparator} instances for the appropriate fields when setting the groupFields or
052     * sortFields values. Fields allows you to set a custom {@link java.util.Comparator} instance for each field name or
053     * position. It is required that each Comparator class also be {@link java.io.Serializable}.
054     * <p/>
055     * It should be noted for MapReduce systems, distributed group sorting is not 'total'. That is groups are sorted
056     * as seen by each Reducer, but they are not sorted across Reducers. See the MapReduce algorithm for details.
057     * <p/>
058     * See the {@link cascading.tuple.Hasher} interface when a custom {@link java.util.Comparator} on the grouping keys is
059     * being provided that makes two values with differing hashCode values equal. For example,
060     * {@code new BigDecimal( 100.0D )} and {@code new Double 100.0D )} are equal using a custom Comparator, but
061     * {@link Object#hashCode()} will be different, thus forcing each value into differing partitions.
062     * <p/>
063     * Note that grouping one String key with a lowercase value with another String key with an uppercase value using a
064     * "case insensitive" Comparator will not have consistent results. The grouping will execute and be correct,
065     * but the actual values in the key columns may be replaced with "equivalent" values from other streams.
066     * <p/>
067     * That is, if two streams are merged and then grouped on a key, where one stream the key values are uppercase and the
068     * other stream values are lowercase, the resulting key value for the grouping may arbitrarily be either upper or
069     * lower case.
070     * <p/>
071     * If the original key values must be retained, consider normalizing the keys with a Function and then grouping on the
072     * resulting field.
073     */
074    public class GroupBy extends Splice implements Group
075      {
076      /**
077       * Creates a new GroupBy instance that will group on {@link Fields#ALL} fields.
078       *
079       * @param pipe of type Pipe
080       */
081      @ConstructorProperties({"pipe"})
082      public GroupBy( Pipe pipe )
083        {
084        super( pipe );
085        }
086    
087      /**
088       * Creates a new GroupBy instance that will group on the given groupFields field names.
089       *
090       * @param pipe        of type Pipe
091       * @param groupFields of type Fields
092       */
093      @ConstructorProperties({"pipe", "groupFields"})
094      public GroupBy( Pipe pipe, Fields groupFields )
095        {
096        super( pipe, groupFields );
097        }
098    
099      /**
100       * Creates a new GroupBy instance that will group on the given groupFields field names.
101       *
102       * @param pipe         of type Pipe
103       * @param groupFields  of type Fields
104       * @param reverseOrder of type boolean
105       */
106      @ConstructorProperties({"pipe", "groupFields", "reverseOrder"})
107      public GroupBy( Pipe pipe, Fields groupFields, boolean reverseOrder )
108        {
109        super( pipe, groupFields, null, reverseOrder );
110        }
111    
112      /**
113       * Creates a new GroupBy instance that will group on the given groupFields field names.
114       *
115       * @param groupName   of type String
116       * @param pipe        of type Pipe
117       * @param groupFields of type Fields
118       */
119      @ConstructorProperties({"groupName", "pipe", "groupFields"})
120      public GroupBy( String groupName, Pipe pipe, Fields groupFields )
121        {
122        super( groupName, pipe, groupFields );
123        }
124    
125      /**
126       * Creates a new GroupBy instance that will group on the given groupFields field names.
127       *
128       * @param groupName    of type String
129       * @param pipe         of type Pipe
130       * @param groupFields  of type Fields
131       * @param reverseOrder of type boolean
132       */
133      @ConstructorProperties({"groupName", "pipe", "groupFields", "reverseOrder"})
134      public GroupBy( String groupName, Pipe pipe, Fields groupFields, boolean reverseOrder )
135        {
136        super( groupName, pipe, groupFields, null, reverseOrder );
137        }
138    
139      /**
140       * Creates a new GroupBy instance that will group on the given groupFields field names
141       * and sorts the grouped values on the given sortFields fields names.
142       *
143       * @param pipe        of type Pipe
144       * @param groupFields of type Fields
145       * @param sortFields  of type Fields
146       */
147      @ConstructorProperties({"pipe", "groupFields", "sortFields"})
148      public GroupBy( Pipe pipe, Fields groupFields, Fields sortFields )
149        {
150        super( pipe, groupFields, sortFields );
151        }
152    
153      /**
154       * Creates a new GroupBy instance that will group on the given groupFields field names
155       * and sorts the grouped values on the given sortFields fields names.
156       *
157       * @param groupName   of type String
158       * @param pipe        of type Pipe
159       * @param groupFields of type Fields
160       * @param sortFields  of type Fields
161       */
162      @ConstructorProperties({"groupName", "pipe", "groupFields", "sortFields"})
163      public GroupBy( String groupName, Pipe pipe, Fields groupFields, Fields sortFields )
164        {
165        super( groupName, pipe, groupFields, sortFields );
166        }
167    
168      /**
169       * Creates a new GroupBy instance that will group on the given groupFields field names
170       * and sorts the grouped values on the given sortFields fields names.
171       *
172       * @param pipe         of type Pipe
173       * @param groupFields  of type Fields
174       * @param sortFields   of type Fields
175       * @param reverseOrder of type boolean
176       */
177      @ConstructorProperties({"pipe", "groupFields", "sortFields", "reverseOrder"})
178      public GroupBy( Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder )
179        {
180        super( pipe, groupFields, sortFields, reverseOrder );
181        }
182    
183      /**
184       * Creates a new GroupBy instance that will group on the given groupFields field names
185       * and sorts the grouped values on the given sortFields fields names.
186       *
187       * @param groupName    of type String
188       * @param pipe         of type Pipe
189       * @param groupFields  of type Fields
190       * @param sortFields   of type Fields
191       * @param reverseOrder of type boolean
192       */
193      @ConstructorProperties({"groupName", "pipe", "groupFields", "sortFields", "reverseOrder"})
194      public GroupBy( String groupName, Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder )
195        {
196        super( groupName, pipe, groupFields, sortFields, reverseOrder );
197        }
198    
199      //////////
200      // MERGE
201      //////////
202    
203      /**
204       * Creates a new GroupBy instance that will first merge the given pipes, then group on Fields.FIRST.
205       * <p/>
206       * The assumption is that the first fields in all streams are logically the same field, which should be true
207       * as merging assumes all incoming streams have the same fields in the same order.
208       * <p/>
209       * To get the best performance, choose a field(s) that has many unique values, by using the constructor that takes
210       * a groupFields argument. If the first field has few unique values, data will only be sent to that number of reducers,
211       * or less, in the cluster, making the reduce phase a larger bottleneck.
212       *
213       * @param pipes of type Pipe
214       */
215      @ConstructorProperties({"pipes"})
216      public GroupBy( Pipe[] pipes )
217        {
218        super( pipes, Fields.FIRST );
219        }
220    
221      /**
222       * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names.
223       *
224       * @param pipes       of type Pipe
225       * @param groupFields of type Fields
226       */
227      @ConstructorProperties({"pipes", "groupFields"})
228      public GroupBy( Pipe[] pipes, Fields groupFields )
229        {
230        super( pipes, groupFields );
231        }
232    
233      /**
234       * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names.
235       *
236       * @param lhsPipe     of type Pipe
237       * @param rhsPipe     of type Pipe
238       * @param groupFields of type Fields
239       */
240      public GroupBy( Pipe lhsPipe, Pipe rhsPipe, Fields groupFields )
241        {
242        super( Pipe.pipes( lhsPipe, rhsPipe ), groupFields );
243        }
244    
245      /**
246       * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names.
247       *
248       * @param groupName   of type String
249       * @param pipes       of type Pipe
250       * @param groupFields of type Fields
251       */
252      @ConstructorProperties({"groupName", "pipes", "groupFields"})
253      public GroupBy( String groupName, Pipe[] pipes, Fields groupFields )
254        {
255        super( groupName, pipes, groupFields );
256        }
257    
258      /**
259       * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names.
260       *
261       * @param groupName   of type String
262       * @param lhsPipe     of type Pipe
263       * @param rhsPipe     of type Pipe
264       * @param groupFields of type Fields
265       */
266      public GroupBy( String groupName, Pipe lhsPipe, Pipe rhsPipe, Fields groupFields )
267        {
268        super( groupName, Pipe.pipes( lhsPipe, rhsPipe ), groupFields );
269        }
270    
271      /**
272       * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names
273       * and sorts the grouped values on the given sortFields fields names.
274       *
275       * @param pipes       of type Pipe
276       * @param groupFields of type Fields
277       * @param sortFields  of type Fields
278       */
279      @ConstructorProperties({"pipes", "groupFields", "sortFields"})
280      public GroupBy( Pipe[] pipes, Fields groupFields, Fields sortFields )
281        {
282        super( pipes, groupFields, sortFields );
283        }
284    
285      /**
286       * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names
287       * and sorts the grouped values on the given sortFields fields names.
288       *
289       * @param groupName   of type String
290       * @param pipes       of type Pipe
291       * @param groupFields of type Fields
292       * @param sortFields  of type Fields
293       */
294      @ConstructorProperties({"groupName", "pipes", "groupFields", "sortFields"})
295      public GroupBy( String groupName, Pipe[] pipes, Fields groupFields, Fields sortFields )
296        {
297        super( groupName, pipes, groupFields, sortFields );
298        }
299    
300      /**
301       * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names
302       * and sorts the grouped values on the given sortFields fields names.
303       *
304       * @param pipes        of type Pipe
305       * @param groupFields  of type Fields
306       * @param sortFields   of type Fields
307       * @param reverseOrder of type boolean
308       */
309      @ConstructorProperties({"pipes", "groupFields", "sortFields", "reverseOrder"})
310      public GroupBy( Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder )
311        {
312        super( pipes, groupFields, sortFields, reverseOrder );
313        }
314    
315      /**
316       * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names
317       * and sorts the grouped values on the given sortFields fields names.
318       *
319       * @param groupName    of type String
320       * @param pipes        of type Pipe
321       * @param groupFields  of type Fields
322       * @param sortFields   of type Fields
323       * @param reverseOrder of type boolean
324       */
325      @ConstructorProperties({"groupName", "pipes", "groupFields", "sortFields", "reverseOrder"})
326      public GroupBy( String groupName, Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder )
327        {
328        super( groupName, pipes, groupFields, sortFields, reverseOrder );
329        }
330      }