001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    
025    import cascading.flow.FlowProcess;
026    import cascading.operation.aggregator.MaxValue;
027    import cascading.pipe.Pipe;
028    import cascading.tuple.Fields;
029    import cascading.tuple.Tuple;
030    import cascading.tuple.TupleEntry;
031    
032    /**
033     * Class MaxBy is used to find the maximum value in a grouping.
034     * <p/>
035     * Typically finding the max value of a field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a
036     * {@link cascading.operation.aggregator.MaxValue} {@link cascading.operation.Aggregator} operation.
037     * <p/>
038     * This SubAssembly also uses the {@link cascading.pipe.assembly.MaxBy.MaxPartials} {@link cascading.pipe.assembly.AggregateBy.Functor}
039     * to track the maximum value before the GroupBy operator to reduce IO over the network.
040     * <p/>
041     * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
042     * in a much simpler mechanism.
043     * <p/>
044     * The {@code threshold} value tells the underlying MaxPartials functions how many unique key sums to accumulate
045     * in the LRU cache, before emitting the least recently used entry.
046     * <p/>
047     * By default, either the value of {@link #AGGREGATE_BY_THRESHOLD} System property or {@link AggregateBy#DEFAULT_THRESHOLD}
048     * will be used.
049     *
050     * @see AggregateBy
051     */
052    public class MaxBy extends AggregateBy
053      {
054      /** DEFAULT_THRESHOLD */
055      public static final int DEFAULT_THRESHOLD = 10000;
056    
057      public static class MaxPartials implements Functor
058        {
059        private final Fields declaredFields;
060    
061        public MaxPartials( Fields declaredFields )
062          {
063          this.declaredFields = declaredFields;
064    
065          if( declaredFields.size() != 1 )
066            throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
067          }
068    
069        @Override
070        public Fields getDeclaredFields()
071          {
072          return declaredFields;
073          }
074    
075        @Override
076        public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
077          {
078          if( context == null )
079            return args.getTupleCopy();
080          else if( args.getObject( 0 ) == null )
081            return context;
082    
083          Comparable lhs = (Comparable) context.getObject( 0 );
084          Comparable rhs = (Comparable) args.getObject( 0 );
085    
086          if( ( lhs == null ) || ( lhs.compareTo( rhs ) < 0 ) )
087            context.set( 0, rhs );
088    
089          return context;
090          }
091    
092        @Override
093        public Tuple complete( FlowProcess flowProcess, Tuple context )
094          {
095          return context;
096          }
097        }
098    
099      /**
100       * Constructor MaxBy creates a new MaxBy instance. Use this constructor when used with a {@link AggregateBy}
101       * instance.
102       *
103       * @param valueField of type Fields
104       * @param maxField   of type Fields
105       */
106      @ConstructorProperties({"valueField", "maxField"})
107      public MaxBy( Fields valueField, Fields maxField )
108        {
109        super( valueField, new MaxPartials( maxField ), new MaxValue( maxField ) );
110        }
111    
112      //////////////
113    
114      /**
115       * Constructor MaxBy creates a new MaxBy instance.
116       *
117       * @param pipe           of type Pipe
118       * @param groupingFields of type Fields
119       * @param valueField     of type Fields
120       * @param maxField       of type Fields
121       */
122      @ConstructorProperties({"pipe", "groupingFields", "valueField", "maxField"})
123      public MaxBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField )
124        {
125        this( null, pipe, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
126        }
127    
128      /**
129       * Constructor MaxBy creates a new MaxBy instance.
130       *
131       * @param pipe           of type Pipe
132       * @param groupingFields of type Fields
133       * @param valueField     of type Fields
134       * @param maxField       of type Fields
135       * @param threshold      of type int
136       */
137      @ConstructorProperties({"pipe", "groupingFields", "valueField", "maxField", "threshold"})
138      public MaxBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
139        {
140        this( null, pipe, groupingFields, valueField, maxField, threshold );
141        }
142    
143      /**
144       * Constructor MaxBy creates a new MaxBy instance.
145       *
146       * @param name           of type String
147       * @param pipe           of type Pipe
148       * @param groupingFields of type Fields
149       * @param valueField     of type Fields
150       * @param maxField       of type Fields
151       */
152      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "maxField"})
153      public MaxBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField )
154        {
155        this( name, pipe, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
156        }
157    
158      /**
159       * Constructor MaxBy creates a new MaxBy instance.
160       *
161       * @param name           of type String
162       * @param pipe           of type Pipe
163       * @param groupingFields of type Fields
164       * @param valueField     of type Fields
165       * @param maxField       of type Fields
166       * @param threshold      of type int
167       */
168      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "maxField", "threshold"})
169      public MaxBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
170        {
171        this( name, Pipe.pipes( pipe ), groupingFields, valueField, maxField, threshold );
172        }
173    
174      /**
175       * Constructor MaxBy creates a new MaxBy instance.
176       *
177       * @param pipes          of type Pipe[]
178       * @param groupingFields of type Fields
179       * @param valueField     of type Fields
180       * @param maxField       of type Fields
181       */
182      @ConstructorProperties({"pipes", "groupingFields", "valueField", "maxField"})
183      public MaxBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField )
184        {
185        this( null, pipes, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
186        }
187    
188      /**
189       * Constructor MaxBy creates a new MaxBy instance.
190       *
191       * @param pipes          of type Pipe[]
192       * @param groupingFields of type Fields
193       * @param valueField     of type Fields
194       * @param maxField       of type Fields
195       * @param threshold      of type int
196       */
197      @ConstructorProperties({"pipes", "groupingFields", "valueField", "maxField", "threshold"})
198      public MaxBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
199        {
200        this( null, pipes, groupingFields, valueField, maxField, threshold );
201        }
202    
203      /**
204       * Constructor MaxBy creates a new MaxBy instance.
205       *
206       * @param name           of type String
207       * @param pipes          of type Pipe[]
208       * @param groupingFields of type Fields
209       * @param valueField     of type Fields
210       * @param maxField       of type Fields
211       */
212      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "maxField"})
213      public MaxBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField )
214        {
215        this( name, pipes, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
216        }
217    
218      /**
219       * Constructor MaxBy creates a new MaxBy instance.
220       *
221       * @param name           of type String
222       * @param pipes          of type Pipe[]
223       * @param groupingFields of type Fields
224       * @param valueField     of type Fields
225       * @param maxField       of type Fields
226       * @param threshold      of type int
227       */
228      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "maxField", "threshold"})
229      public MaxBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
230        {
231        super( name, pipes, groupingFields, valueField, new MaxPartials( maxField ), new MaxValue( maxField ), threshold );
232        }
233      }