001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe.assembly; 022 023import java.beans.ConstructorProperties; 024 025import cascading.flow.FlowProcess; 026import cascading.operation.aggregator.MinValue; 027import cascading.pipe.Pipe; 028import cascading.tuple.Fields; 029import cascading.tuple.Tuple; 030import cascading.tuple.TupleEntry; 031 032/** 033 * Class MinBy is used to find the minimum value in a grouping. 034 * <p/> 035 * Typically finding the min value of a field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a 036 * {@link cascading.operation.aggregator.MinValue} {@link cascading.operation.Aggregator} operation. 037 * <p/> 038 * This SubAssembly also uses the {@link cascading.pipe.assembly.MinBy.MinPartials} {@link cascading.pipe.assembly.AggregateBy.Functor} 039 * to track the minimum value before the GroupBy operator to reduce IO over the network. 040 * <p/> 041 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 042 * in a much simpler mechanism. 043 * <p/> 044 * The {@code threshold} value tells the underlying MinPartials functions how many unique key sums to accumulate 045 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is 046 * bounded by the size of your map task JVM and the typical size of each group key. 047 * <p/> 048 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property 049 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used. 050 * 051 * @see cascading.pipe.assembly.AggregateBy 052 */ 053public class MinBy extends AggregateBy 054 { 055 public static class MinPartials implements Functor 056 { 057 private final Fields declaredFields; 058 059 /** Constructor MinPartials creates a new MinPartials instance. */ 060 public MinPartials( Fields declaredFields ) 061 { 062 this.declaredFields = declaredFields; 063 064 if( declaredFields.size() != 1 ) 065 throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields ); 066 } 067 068 @Override 069 public Fields getDeclaredFields() 070 { 071 return declaredFields; 072 } 073 074 @Override 075 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 076 { 077 if( context == null ) 078 return args.getTupleCopy(); 079 else if( args.getObject( 0 ) == null ) 080 return context; 081 082 Comparable lhs = (Comparable) context.getObject( 0 ); 083 Comparable rhs = (Comparable) args.getObject( 0 ); 084 085 if( ( lhs == null ) || ( lhs.compareTo( rhs ) > 0 ) ) 086 context.set( 0, rhs ); 087 088 return context; 089 } 090 091 @Override 092 public Tuple complete( FlowProcess flowProcess, Tuple context ) 093 { 094 return context; 095 } 096 } 097 098 /** 099 * Constructor MinBy creates a new MinBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy} 100 * instance. 101 * 102 * @param valueField of type Fields 103 * @param minField of type Fields 104 */ 105 @ConstructorProperties({"valueField", "minField"}) 106 public MinBy( Fields valueField, Fields minField ) 107 { 108 super( valueField, new MinPartials( minField ), new MinValue( minField ) ); 109 } 110 111 ////////////// 112 113 /** 114 * Constructor MinBy creates a new MinBy instance. 115 * 116 * @param pipe of type Pipe 117 * @param groupingFields of type Fields 118 * @param valueField of type Fields 119 * @param minField of type Fields 120 */ 121 @ConstructorProperties({"pipe", "groupingFields", "valueField", "minField"}) 122 public MinBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields minField ) 123 { 124 this( null, pipe, groupingFields, valueField, minField, 0 ); 125 } 126 127 /** 128 * Constructor MinBy creates a new MinBy instance. 129 * 130 * @param pipe of type Pipe 131 * @param groupingFields of type Fields 132 * @param valueField of type Fields 133 * @param minField of type Fields 134 * @param threshold of type int 135 */ 136 @ConstructorProperties({"pipe", "groupingFields", "valueField", "minField", "threshold"}) 137 public MinBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 138 { 139 this( null, pipe, groupingFields, valueField, minField, threshold ); 140 } 141 142 /** 143 * Constructor MinBy creates a new MinBy instance. 144 * 145 * @param name of type String 146 * @param pipe of type Pipe 147 * @param groupingFields of type Fields 148 * @param valueField of type Fields 149 * @param minField of type Fields 150 */ 151 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "minField"}) 152 public MinBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields minField ) 153 { 154 this( name, pipe, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD ); 155 } 156 157 /** 158 * Constructor MinBy creates a new MinBy instance. 159 * 160 * @param name of type String 161 * @param pipe of type Pipe 162 * @param groupingFields of type Fields 163 * @param valueField of type Fields 164 * @param minField of type Fields 165 * @param threshold of type int 166 */ 167 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "minField", "threshold"}) 168 public MinBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 169 { 170 this( name, Pipe.pipes( pipe ), groupingFields, valueField, minField, threshold ); 171 } 172 173 /** 174 * Constructor MinBy creates a new MinBy instance. 175 * 176 * @param pipes of type Pipe[] 177 * @param groupingFields of type Fields 178 * @param valueField of type Fields 179 * @param minField of type Fields 180 */ 181 @ConstructorProperties({"pipes", "groupingFields", "valueField", "minField"}) 182 public MinBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField ) 183 { 184 this( null, pipes, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD ); 185 } 186 187 /** 188 * Constructor MinBy creates a new MinBy instance. 189 * 190 * @param pipes of type Pipe[] 191 * @param groupingFields of type Fields 192 * @param valueField of type Fields 193 * @param minField of type Fields 194 * @param threshold of type int 195 */ 196 @ConstructorProperties({"pipes", "groupingFields", "valueField", "minField", "threshold"}) 197 public MinBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 198 { 199 this( null, pipes, groupingFields, valueField, minField, threshold ); 200 } 201 202 /** 203 * Constructor MinBy creates a new MinBy instance. 204 * 205 * @param name of type String 206 * @param pipes of type Pipe[] 207 * @param groupingFields of type Fields 208 * @param valueField of type Fields 209 * @param minField of type Fields 210 */ 211 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "minField"}) 212 public MinBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField ) 213 { 214 this( name, pipes, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD ); 215 } 216 217 /** 218 * Constructor MinBy creates a new MinBy instance. 219 * 220 * @param name of type String 221 * @param pipes of type Pipe[] 222 * @param groupingFields of type Fields 223 * @param valueField of type Fields 224 * @param minField of type Fields 225 * @param threshold of type int 226 */ 227 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "minField", "threshold"}) 228 public MinBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 229 { 230 super( name, pipes, groupingFields, valueField, new MinPartials( minField ), new MinValue( minField ), threshold ); 231 } 232 }