001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe.assembly; 022 023 import java.beans.ConstructorProperties; 024 025 import cascading.flow.FlowProcess; 026 import cascading.operation.aggregator.First; 027 import cascading.pipe.Pipe; 028 import cascading.tuple.Fields; 029 import cascading.tuple.Tuple; 030 import cascading.tuple.TupleEntry; 031 032 /** 033 * Class FirstBy is used to return the first encountered Tuple in a tuple stream grouping. 034 * <p/> 035 * Typically finding the first Tuple in a tuple stream grouping relies on a {@link cascading.pipe.GroupBy} and a 036 * {@link cascading.operation.aggregator.First} {@link cascading.operation.Aggregator} operation. 037 * <p/> 038 * If the {@code firstFields} argument has custom {@link java.util.Comparator} instances, they will be used 039 * as the GroupBy {@code sortFields}. 040 * <p/> 041 * This SubAssembly also uses the {@link cascading.pipe.assembly.FirstBy.FirstPartials} 042 * {@link cascading.pipe.assembly.AggregateBy.Functor} 043 * to collect field values before the GroupBy operator to reduce IO over the network. 044 * <p/> 045 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 046 * in a much simpler mechanism. 047 * <p/> 048 * The {@code threshold} value tells the underlying FirstPartials functions how many unique key counts to accumulate 049 * in the LRU cache, before emitting the least recently used entry. 050 * <p/> 051 * By default, either the value of {@link #AGGREGATE_BY_THRESHOLD} System property or {@link AggregateBy#DEFAULT_THRESHOLD} 052 * will be used. 053 * 054 * @see AggregateBy 055 */ 056 public class FirstBy extends AggregateBy 057 { 058 /** 059 * Class CountPartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count observed duplicates from the tuple stream. 060 * <p/> 061 * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum} 062 * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values 063 * as possible before the intermediate {@link cascading.pipe.GroupBy} operator. 064 * 065 * @see cascading.pipe.assembly.FirstBy 066 */ 067 public static class FirstPartials implements Functor 068 { 069 private final Fields declaredFields; 070 private Boolean doComparison; 071 072 /** 073 * Constructor FirstPartials creates a new FirstPartials instance. 074 * 075 * @param declaredFields of type Fields 076 */ 077 public FirstPartials( Fields declaredFields ) 078 { 079 this.declaredFields = declaredFields; 080 } 081 082 @Override 083 public Fields getDeclaredFields() 084 { 085 return declaredFields; 086 } 087 088 @Override 089 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 090 { 091 if( doComparison == null ) 092 doComparison = args.getFields().hasComparators(); // ensure we use resolved fields 093 094 if( context == null || ( doComparison && args.getFields().compare( context, args.getTuple() ) > 0 ) ) 095 return args.getTupleCopy(); 096 097 return context; 098 } 099 100 @Override 101 public Tuple complete( FlowProcess flowProcess, Tuple context ) 102 { 103 return context; 104 } 105 } 106 107 /** 108 * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy} 109 * instance. 110 * 111 * @param firstFields of type Fields 112 */ 113 @ConstructorProperties( {"firstFields"} ) 114 public FirstBy( Fields firstFields ) 115 { 116 super( firstFields, new FirstPartials( firstFields ), new First( firstFields ) ); 117 } 118 119 /** 120 * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy} 121 * instance. 122 * 123 * @param firstFields of type Fields 124 */ 125 @ConstructorProperties( {"argumentFields", "firstFields"} ) 126 public FirstBy( Fields argumentFields, Fields firstFields ) 127 { 128 super( argumentFields, new FirstPartials( argumentFields ), new First( firstFields ) ); 129 } 130 131 /////// 132 133 /** 134 * Constructor FirstBy creates a new FirstBy instance. 135 * 136 * @param pipe of type Pipe 137 * @param groupingFields of type Fields 138 * @param firstFields of type Fields 139 */ 140 @ConstructorProperties( {"pipe", "groupingFields", "firstFields"} ) 141 public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields ) 142 { 143 this( null, pipe, groupingFields, firstFields ); 144 } 145 146 /** 147 * Constructor FirstBy creates a new FirstBy instance. 148 * 149 * @param pipe of type Pipe 150 * @param groupingFields of type Fields 151 * @param firstFields fo type Fields 152 * @param threshold of type int 153 */ 154 @ConstructorProperties( {"pipe", "groupingFields", "firstFields", "threshold"} ) 155 public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields, int threshold ) 156 { 157 this( null, pipe, groupingFields, firstFields, threshold ); 158 } 159 160 /** 161 * Constructor FirstBy creates a new FirstBy instance. 162 * 163 * @param name of type String 164 * @param pipe of type Pipe 165 * @param groupingFields of type Fields 166 * @param firstFields of type Fields 167 */ 168 @ConstructorProperties( {"name", "pipe", "groupingFields", "firstFields"} ) 169 public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields ) 170 { 171 this( name, pipe, groupingFields, firstFields, USE_DEFAULT_THRESHOLD ); 172 } 173 174 /** 175 * Constructor FirstBy creates a new FirstBy instance. 176 * 177 * @param name of type String 178 * @param pipe of type Pipe 179 * @param groupingFields of type Fields 180 * @param firstFields of type Fields 181 * @param threshold of type int 182 */ 183 @ConstructorProperties( {"name", "pipe", "groupingFields", "firstFields", "threshold"} ) 184 public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields, int threshold ) 185 { 186 this( name, Pipe.pipes( pipe ), groupingFields, firstFields, threshold ); 187 } 188 189 /** 190 * Constructor FirstBy creates a new FirstBy instance. 191 * 192 * @param pipes of type Pipe[] 193 * @param groupingFields of type Fields 194 * @param firstFields of type Fields 195 */ 196 @ConstructorProperties( {"pipes", "groupingFields", "firstFields"} ) 197 public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields ) 198 { 199 this( null, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD ); 200 } 201 202 /** 203 * Constructor FirstBy creates a new FirstBy instance. 204 * 205 * @param pipes of type Pipe[] 206 * @param groupingFields of type Fields 207 * @param firstFields of type Fields 208 * @param threshold of type int 209 */ 210 @ConstructorProperties( {"pipes", "groupingFields", "firstFields", "threshold"} ) 211 public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold ) 212 { 213 this( null, pipes, groupingFields, firstFields, threshold ); 214 } 215 216 /** 217 * Constructor FirstBy creates a new FirstBy instance. 218 * 219 * @param name of type String 220 * @param pipes of type Pipe[] 221 * @param groupingFields of type Fields 222 * @param firstFields of type Fields 223 */ 224 @ConstructorProperties( {"name", "pipes", "groupingFields", "firstFields"} ) 225 public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields ) 226 { 227 this( name, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD ); 228 } 229 230 /** 231 * Constructor FirstBy creates a new FirstBy instance. 232 * 233 * @param name of type String 234 * @param pipes of type Pipe[] 235 * @param groupingFields of type Fields 236 * @param firstFields of type Fields 237 * @param threshold of type int 238 */ 239 @ConstructorProperties( {"name", "pipes", "groupingFields", "firstFields", "threshold"} ) 240 public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold ) 241 { 242 super( name, pipes, groupingFields, firstFields, new FirstPartials( firstFields ), new First( firstFields ), threshold ); 243 } 244 }