001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe.assembly;
022
023import java.beans.ConstructorProperties;
024
025import cascading.flow.FlowProcess;
026import cascading.operation.aggregator.MaxValue;
027import cascading.pipe.Pipe;
028import cascading.tuple.Fields;
029import cascading.tuple.Tuple;
030import cascading.tuple.TupleEntry;
031
032/**
033 * Class MaxBy is used to find the maximum value in a grouping.
034 * <p/>
035 * Typically finding the max value of a field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a
036 * {@link cascading.operation.aggregator.MaxValue} {@link cascading.operation.Aggregator} operation.
037 * <p/>
038 * This SubAssembly also uses the {@link cascading.pipe.assembly.MaxBy.MaxPartials} {@link cascading.pipe.assembly.AggregateBy.Functor}
039 * to track the maximum value before the GroupBy operator to reduce IO over the network.
040 * <p/>
041 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
042 * in a much simpler mechanism.
043 * <p/>
044 * The {@code threshold} value tells the underlying MaxPartials functions how many unique key sums to accumulate
045 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is
046 * bounded by the size of your map task JVM and the typical size of each group key.
047 * <p/>
048 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
049 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
050 *
051 * @see AggregateBy
052 */
053public class MaxBy extends AggregateBy
054  {
055  /** DEFAULT_THRESHOLD */
056  @Deprecated
057  public static final int DEFAULT_THRESHOLD = 10000;
058
059  public static class MaxPartials implements Functor
060    {
061    private final Fields declaredFields;
062
063    public MaxPartials( Fields declaredFields )
064      {
065      this.declaredFields = declaredFields;
066
067      if( declaredFields.size() != 1 )
068        throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
069      }
070
071    @Override
072    public Fields getDeclaredFields()
073      {
074      return declaredFields;
075      }
076
077    @Override
078    public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
079      {
080      if( context == null )
081        return args.getTupleCopy();
082      else if( args.getObject( 0 ) == null )
083        return context;
084
085      Comparable lhs = (Comparable) context.getObject( 0 );
086      Comparable rhs = (Comparable) args.getObject( 0 );
087
088      if( ( lhs == null ) || ( lhs.compareTo( rhs ) < 0 ) )
089        context.set( 0, rhs );
090
091      return context;
092      }
093
094    @Override
095    public Tuple complete( FlowProcess flowProcess, Tuple context )
096      {
097      return context;
098      }
099    }
100
101  /**
102   * Constructor MaxBy creates a new MaxBy instance. Use this constructor when used with a {@link AggregateBy}
103   * instance.
104   *
105   * @param valueField of type Fields
106   * @param maxField   of type Fields
107   */
108  @ConstructorProperties({"valueField", "maxField"})
109  public MaxBy( Fields valueField, Fields maxField )
110    {
111    super( valueField, new MaxPartials( maxField ), new MaxValue( maxField ) );
112    }
113
114  //////////////
115
116  /**
117   * Constructor MaxBy creates a new MaxBy instance.
118   *
119   * @param pipe           of type Pipe
120   * @param groupingFields of type Fields
121   * @param valueField     of type Fields
122   * @param maxField       of type Fields
123   */
124  @ConstructorProperties({"pipe", "groupingFields", "valueField", "maxField"})
125  public MaxBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField )
126    {
127    this( null, pipe, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
128    }
129
130  /**
131   * Constructor MaxBy creates a new MaxBy instance.
132   *
133   * @param pipe           of type Pipe
134   * @param groupingFields of type Fields
135   * @param valueField     of type Fields
136   * @param maxField       of type Fields
137   * @param threshold      of type int
138   */
139  @ConstructorProperties({"pipe", "groupingFields", "valueField", "maxField", "threshold"})
140  public MaxBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
141    {
142    this( null, pipe, groupingFields, valueField, maxField, threshold );
143    }
144
145  /**
146   * Constructor MaxBy creates a new MaxBy instance.
147   *
148   * @param name           of type String
149   * @param pipe           of type Pipe
150   * @param groupingFields of type Fields
151   * @param valueField     of type Fields
152   * @param maxField       of type Fields
153   */
154  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "maxField"})
155  public MaxBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField )
156    {
157    this( name, pipe, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
158    }
159
160  /**
161   * Constructor MaxBy creates a new MaxBy instance.
162   *
163   * @param name           of type String
164   * @param pipe           of type Pipe
165   * @param groupingFields of type Fields
166   * @param valueField     of type Fields
167   * @param maxField       of type Fields
168   * @param threshold      of type int
169   */
170  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "maxField", "threshold"})
171  public MaxBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
172    {
173    this( name, Pipe.pipes( pipe ), groupingFields, valueField, maxField, threshold );
174    }
175
176  /**
177   * Constructor MaxBy creates a new MaxBy instance.
178   *
179   * @param pipes          of type Pipe[]
180   * @param groupingFields of type Fields
181   * @param valueField     of type Fields
182   * @param maxField       of type Fields
183   */
184  @ConstructorProperties({"pipes", "groupingFields", "valueField", "maxField"})
185  public MaxBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField )
186    {
187    this( null, pipes, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
188    }
189
190  /**
191   * Constructor MaxBy creates a new MaxBy instance.
192   *
193   * @param pipes          of type Pipe[]
194   * @param groupingFields of type Fields
195   * @param valueField     of type Fields
196   * @param maxField       of type Fields
197   * @param threshold      of type int
198   */
199  @ConstructorProperties({"pipes", "groupingFields", "valueField", "maxField", "threshold"})
200  public MaxBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
201    {
202    this( null, pipes, groupingFields, valueField, maxField, threshold );
203    }
204
205  /**
206   * Constructor MaxBy creates a new MaxBy instance.
207   *
208   * @param name           of type String
209   * @param pipes          of type Pipe[]
210   * @param groupingFields of type Fields
211   * @param valueField     of type Fields
212   * @param maxField       of type Fields
213   */
214  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "maxField"})
215  public MaxBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField )
216    {
217    this( name, pipes, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
218    }
219
220  /**
221   * Constructor MaxBy creates a new MaxBy instance.
222   *
223   * @param name           of type String
224   * @param pipes          of type Pipe[]
225   * @param groupingFields of type Fields
226   * @param valueField     of type Fields
227   * @param maxField       of type Fields
228   * @param threshold      of type int
229   */
230  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "maxField", "threshold"})
231  public MaxBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
232    {
233    super( name, pipes, groupingFields, valueField, new MaxPartials( maxField ), new MaxValue( maxField ), threshold );
234    }
235  }