001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe.assembly;
022
023import java.beans.ConstructorProperties;
024
025import cascading.flow.FlowProcess;
026import cascading.operation.aggregator.MinValue;
027import cascading.pipe.Pipe;
028import cascading.tuple.Fields;
029import cascading.tuple.Tuple;
030import cascading.tuple.TupleEntry;
031
032/**
033 * Class MinBy is used to find the minimum value in a grouping.
034 * <p/>
035 * Typically finding the min value of a field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a
036 * {@link cascading.operation.aggregator.MinValue} {@link cascading.operation.Aggregator} operation.
037 * <p/>
038 * This SubAssembly also uses the {@link cascading.pipe.assembly.MinBy.MinPartials} {@link cascading.pipe.assembly.AggregateBy.Functor}
039 * to track the minimum value before the GroupBy operator to reduce IO over the network.
040 * <p/>
041 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
042 * in a much simpler mechanism.
043 * <p/>
044 * The {@code threshold} value tells the underlying MinPartials functions how many unique key sums to accumulate
045 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is
046 * bounded by the size of your map task JVM and the typical size of each group key.
047 * <p/>
048 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
049 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
050 *
051 * @see cascading.pipe.assembly.AggregateBy
052 */
053public class MinBy extends AggregateBy
054  {
055  public static class MinPartials implements Functor
056    {
057    private final Fields declaredFields;
058
059    /** Constructor MinPartials creates a new MinPartials instance. */
060    public MinPartials( Fields declaredFields )
061      {
062      this.declaredFields = declaredFields;
063
064      if( declaredFields.size() != 1 )
065        throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
066      }
067
068    @Override
069    public Fields getDeclaredFields()
070      {
071      return declaredFields;
072      }
073
074    @Override
075    public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
076      {
077      if( context == null )
078        return args.getTupleCopy();
079      else if( args.getObject( 0 ) == null )
080        return context;
081
082      Comparable lhs = (Comparable) context.getObject( 0 );
083      Comparable rhs = (Comparable) args.getObject( 0 );
084
085      if( ( lhs == null ) || ( lhs.compareTo( rhs ) > 0 ) )
086        context.set( 0, rhs );
087
088      return context;
089      }
090
091    @Override
092    public Tuple complete( FlowProcess flowProcess, Tuple context )
093      {
094      return context;
095      }
096    }
097
098  /**
099   * Constructor MinBy creates a new MinBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy}
100   * instance.
101   *
102   * @param valueField of type Fields
103   * @param minField   of type Fields
104   */
105  @ConstructorProperties({"valueField", "minField"})
106  public MinBy( Fields valueField, Fields minField )
107    {
108    super( valueField, new MinPartials( minField ), new MinValue( minField ) );
109    }
110
111  //////////////
112
113  /**
114   * Constructor MinBy creates a new MinBy instance.
115   *
116   * @param pipe           of type Pipe
117   * @param groupingFields of type Fields
118   * @param valueField     of type Fields
119   * @param minField       of type Fields
120   */
121  @ConstructorProperties({"pipe", "groupingFields", "valueField", "minField"})
122  public MinBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields minField )
123    {
124    this( null, pipe, groupingFields, valueField, minField, 0 );
125    }
126
127  /**
128   * Constructor MinBy creates a new MinBy instance.
129   *
130   * @param pipe           of type Pipe
131   * @param groupingFields of type Fields
132   * @param valueField     of type Fields
133   * @param minField       of type Fields
134   * @param threshold      of type int
135   */
136  @ConstructorProperties({"pipe", "groupingFields", "valueField", "minField", "threshold"})
137  public MinBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields minField, int threshold )
138    {
139    this( null, pipe, groupingFields, valueField, minField, threshold );
140    }
141
142  /**
143   * Constructor MinBy creates a new MinBy instance.
144   *
145   * @param name           of type String
146   * @param pipe           of type Pipe
147   * @param groupingFields of type Fields
148   * @param valueField     of type Fields
149   * @param minField       of type Fields
150   */
151  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "minField"})
152  public MinBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields minField )
153    {
154    this( name, pipe, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD );
155    }
156
157  /**
158   * Constructor MinBy creates a new MinBy instance.
159   *
160   * @param name           of type String
161   * @param pipe           of type Pipe
162   * @param groupingFields of type Fields
163   * @param valueField     of type Fields
164   * @param minField       of type Fields
165   * @param threshold      of type int
166   */
167  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "minField", "threshold"})
168  public MinBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields minField, int threshold )
169    {
170    this( name, Pipe.pipes( pipe ), groupingFields, valueField, minField, threshold );
171    }
172
173  /**
174   * Constructor MinBy creates a new MinBy instance.
175   *
176   * @param pipes          of type Pipe[]
177   * @param groupingFields of type Fields
178   * @param valueField     of type Fields
179   * @param minField       of type Fields
180   */
181  @ConstructorProperties({"pipes", "groupingFields", "valueField", "minField"})
182  public MinBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField )
183    {
184    this( null, pipes, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD );
185    }
186
187  /**
188   * Constructor MinBy creates a new MinBy instance.
189   *
190   * @param pipes          of type Pipe[]
191   * @param groupingFields of type Fields
192   * @param valueField     of type Fields
193   * @param minField       of type Fields
194   * @param threshold      of type int
195   */
196  @ConstructorProperties({"pipes", "groupingFields", "valueField", "minField", "threshold"})
197  public MinBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField, int threshold )
198    {
199    this( null, pipes, groupingFields, valueField, minField, threshold );
200    }
201
202  /**
203   * Constructor MinBy creates a new MinBy instance.
204   *
205   * @param name           of type String
206   * @param pipes          of type Pipe[]
207   * @param groupingFields of type Fields
208   * @param valueField     of type Fields
209   * @param minField       of type Fields
210   */
211  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "minField"})
212  public MinBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField )
213    {
214    this( name, pipes, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD );
215    }
216
217  /**
218   * Constructor MinBy creates a new MinBy instance.
219   *
220   * @param name           of type String
221   * @param pipes          of type Pipe[]
222   * @param groupingFields of type Fields
223   * @param valueField     of type Fields
224   * @param minField       of type Fields
225   * @param threshold      of type int
226   */
227  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "minField", "threshold"})
228  public MinBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField, int threshold )
229    {
230    super( name, pipes, groupingFields, valueField, new MinPartials( minField ), new MinValue( minField ), threshold );
231    }
232  }