001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe.assembly;
022
023import java.beans.ConstructorProperties;
024
025import cascading.flow.FlowProcess;
026import cascading.operation.aggregator.First;
027import cascading.pipe.Pipe;
028import cascading.tuple.Fields;
029import cascading.tuple.Tuple;
030import cascading.tuple.TupleEntry;
031
032/**
033 * Class FirstBy is used to return the first encountered Tuple in a tuple stream grouping.
034 * <p/>
035 * Typically finding the first Tuple in a tuple stream grouping relies on a {@link cascading.pipe.GroupBy} and a
036 * {@link cascading.operation.aggregator.First} {@link cascading.operation.Aggregator} operation.
037 * <p/>
038 * If the {@code firstFields} argument has custom {@link java.util.Comparator} instances, they will be used
039 * as the GroupBy {@code sortFields}.
040 * <p/>
041 * This SubAssembly also uses the {@link cascading.pipe.assembly.FirstBy.FirstPartials}
042 * {@link cascading.pipe.assembly.AggregateBy.Functor}
043 * to collect field values before the GroupBy operator to reduce IO over the network.
044 * <p/>
045 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
046 * in a much simpler mechanism.
047 * <p/>
048 * The {@code threshold} value tells the underlying FirstPartials functions how many unique key counts to accumulate
049 * in the LRU cache, before emitting the least recently used entry.  This accumulation happens map-side, and thus is
050 * bounded by the size of your map task JVM and the typical size of each group key.
051 * <p/>
052 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
053 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
054 *
055 * @see AggregateBy
056 */
057public class FirstBy extends AggregateBy
058  {
059  /**
060   * Class CountPartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count observed duplicates from the tuple stream.
061   * <p/>
062   * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum}
063   * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values
064   * as possible before the intermediate {@link cascading.pipe.GroupBy} operator.
065   *
066   * @see cascading.pipe.assembly.FirstBy
067   */
068  public static class FirstPartials implements Functor
069    {
070    private final Fields declaredFields;
071    private Boolean doComparison;
072
073    /**
074     * Constructor FirstPartials creates a new FirstPartials instance.
075     *
076     * @param declaredFields of type Fields
077     */
078    public FirstPartials( Fields declaredFields )
079      {
080      this.declaredFields = declaredFields;
081      }
082
083    @Override
084    public Fields getDeclaredFields()
085      {
086      return declaredFields;
087      }
088
089    @Override
090    public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
091      {
092      if( doComparison == null )
093        doComparison = args.getFields().hasComparators(); // ensure we use resolved fields
094
095      if( context == null || ( doComparison && args.getFields().compare( context, args.getTuple() ) > 0 ) )
096        return args.getTupleCopy();
097
098      return context;
099      }
100
101    @Override
102    public Tuple complete( FlowProcess flowProcess, Tuple context )
103      {
104      return context;
105      }
106    }
107
108  /**
109   * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy}
110   * instance.
111   *
112   * @param firstFields of type Fields
113   */
114  @ConstructorProperties({"firstFields"})
115  public FirstBy( Fields firstFields )
116    {
117    super( firstFields, new FirstPartials( firstFields ), new First( firstFields ) );
118    }
119
120  /**
121   * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy}
122   * instance.
123   *
124   * @param firstFields of type Fields
125   */
126  @ConstructorProperties({"argumentFields", "firstFields"})
127  public FirstBy( Fields argumentFields, Fields firstFields )
128    {
129    super( argumentFields, new FirstPartials( argumentFields ), new First( firstFields ) );
130    }
131
132  ///////
133
134  /**
135   * Constructor FirstBy creates a new FirstBy instance.
136   *
137   * @param pipe           of type Pipe
138   * @param groupingFields of type Fields
139   * @param firstFields    of type Fields
140   */
141  @ConstructorProperties({"pipe", "groupingFields", "firstFields"})
142  public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields )
143    {
144    this( null, pipe, groupingFields, firstFields );
145    }
146
147  /**
148   * Constructor FirstBy creates a new FirstBy instance.
149   *
150   * @param pipe           of type Pipe
151   * @param groupingFields of type Fields
152   * @param firstFields    fo type Fields
153   * @param threshold      of type int
154   */
155  @ConstructorProperties({"pipe", "groupingFields", "firstFields", "threshold"})
156  public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields, int threshold )
157    {
158    this( null, pipe, groupingFields, firstFields, threshold );
159    }
160
161  /**
162   * Constructor FirstBy creates a new FirstBy instance.
163   *
164   * @param name           of type String
165   * @param pipe           of type Pipe
166   * @param groupingFields of type Fields
167   * @param firstFields    of type Fields
168   */
169  @ConstructorProperties({"name", "pipe", "groupingFields", "firstFields"})
170  public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields )
171    {
172    this( name, pipe, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
173    }
174
175  /**
176   * Constructor FirstBy creates a new FirstBy instance.
177   *
178   * @param name           of type String
179   * @param pipe           of type Pipe
180   * @param groupingFields of type Fields
181   * @param firstFields    of type Fields
182   * @param threshold      of type int
183   */
184  @ConstructorProperties({"name", "pipe", "groupingFields", "firstFields", "threshold"})
185  public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields, int threshold )
186    {
187    this( name, Pipe.pipes( pipe ), groupingFields, firstFields, threshold );
188    }
189
190  /**
191   * Constructor FirstBy creates a new FirstBy instance.
192   *
193   * @param pipes          of type Pipe[]
194   * @param groupingFields of type Fields
195   * @param firstFields    of type Fields
196   */
197  @ConstructorProperties({"pipes", "groupingFields", "firstFields"})
198  public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields )
199    {
200    this( null, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
201    }
202
203  /**
204   * Constructor FirstBy creates a new FirstBy instance.
205   *
206   * @param pipes          of type Pipe[]
207   * @param groupingFields of type Fields
208   * @param firstFields    of type Fields
209   * @param threshold      of type int
210   */
211  @ConstructorProperties({"pipes", "groupingFields", "firstFields", "threshold"})
212  public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold )
213    {
214    this( null, pipes, groupingFields, firstFields, threshold );
215    }
216
217  /**
218   * Constructor FirstBy creates a new FirstBy instance.
219   *
220   * @param name           of type String
221   * @param pipes          of type Pipe[]
222   * @param groupingFields of type Fields
223   * @param firstFields    of type Fields
224   */
225  @ConstructorProperties({"name", "pipes", "groupingFields", "firstFields"})
226  public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields )
227    {
228    this( name, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
229    }
230
231  /**
232   * Constructor FirstBy creates a new FirstBy instance.
233   *
234   * @param name           of type String
235   * @param pipes          of type Pipe[]
236   * @param groupingFields of type Fields
237   * @param firstFields    of type Fields
238   * @param threshold      of type int
239   */
240  @ConstructorProperties({"name", "pipes", "groupingFields", "firstFields", "threshold"})
241  public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold )
242    {
243    super( name, pipes, groupingFields, firstFields, new FirstPartials( firstFields ), new First( firstFields ), threshold );
244    }
245  }