001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe.assembly; 022 023 import java.beans.ConstructorProperties; 024 025 import cascading.flow.FlowProcess; 026 import cascading.operation.aggregator.First; 027 import cascading.pipe.Pipe; 028 import cascading.tuple.Fields; 029 import cascading.tuple.Tuple; 030 import cascading.tuple.TupleEntry; 031 032 /** 033 * Class FirstBy is used to return the first encountered Tuple in a tuple stream grouping. 034 * <p/> 035 * Typically finding the first Tuple in a tuple stream grouping relies on a {@link cascading.pipe.GroupBy} and a 036 * {@link cascading.operation.aggregator.First} {@link cascading.operation.Aggregator} operation. 037 * <p/> 038 * If the {@code firstFields} argument has custom {@link java.util.Comparator} instances, they will be used 039 * as the GroupBy {@code sortFields}. 040 * <p/> 041 * This SubAssembly also uses the {@link cascading.pipe.assembly.FirstBy.FirstPartials} 042 * {@link cascading.pipe.assembly.AggregateBy.Functor} 043 * to collect field values before the GroupBy operator to reduce IO over the network. 044 * <p/> 045 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 046 * in a much simpler mechanism. 047 * <p/> 048 * The {@code threshold} value tells the underlying FirstPartials functions how many unique key counts to accumulate 049 * in the LRU cache, before emitting the least recently used entry. 050 * <p/> 051 * By default, either the value of {@link #AGGREGATE_BY_THRESHOLD} System property or {@link AggregateBy#DEFAULT_THRESHOLD} 052 * will be used. 053 * 054 * @see AggregateBy 055 */ 056 public class FirstBy extends AggregateBy 057 { 058 /** 059 * Class CountPartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count observed duplicates from the tuple stream. 060 * <p/> 061 * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum} 062 * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values 063 * as possible before the intermediate {@link cascading.pipe.GroupBy} operator. 064 * 065 * @see cascading.pipe.assembly.FirstBy 066 */ 067 public static class FirstPartials implements Functor 068 { 069 private final Fields declaredFields; 070 071 /** 072 * Constructor FirstPartials creates a new FirstPartials instance. 073 * 074 * @param declaredFields of type Fields 075 */ 076 public FirstPartials( Fields declaredFields ) 077 { 078 this.declaredFields = declaredFields; 079 } 080 081 @Override 082 public Fields getDeclaredFields() 083 { 084 return declaredFields; 085 } 086 087 @Override 088 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 089 { 090 if( context == null || args.getFields().compare( context, args.getTuple() ) > 0 ) 091 return args.getTupleCopy(); 092 093 return context; 094 } 095 096 @Override 097 public Tuple complete( FlowProcess flowProcess, Tuple context ) 098 { 099 return context; 100 } 101 } 102 103 /** 104 * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy} 105 * instance. 106 * 107 * @param firstFields of type Fields 108 */ 109 @ConstructorProperties({"firstFields"}) 110 public FirstBy( Fields firstFields ) 111 { 112 super( firstFields, new FirstPartials( firstFields ), new First( firstFields ) ); 113 } 114 115 /** 116 * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy} 117 * instance. 118 * 119 * @param firstFields of type Fields 120 */ 121 @ConstructorProperties({"argumentFields", "firstFields"}) 122 public FirstBy( Fields argumentFields, Fields firstFields ) 123 { 124 super( argumentFields, new FirstPartials( argumentFields ), new First( firstFields ) ); 125 } 126 127 /////// 128 129 /** 130 * Constructor FirstBy creates a new FirstBy instance. 131 * 132 * @param pipe of type Pipe 133 * @param groupingFields of type Fields 134 * @param firstFields of type Fields 135 */ 136 @ConstructorProperties({"pipe", "groupingFields", "firstFields"}) 137 public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields ) 138 { 139 this( null, pipe, groupingFields, firstFields ); 140 } 141 142 /** 143 * Constructor FirstBy creates a new FirstBy instance. 144 * 145 * @param pipe of type Pipe 146 * @param groupingFields of type Fields 147 * @param firstFields fo type Fields 148 * @param threshold of type int 149 */ 150 @ConstructorProperties({"pipe", "groupingFields", "firstFields", "threshold"}) 151 public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields, int threshold ) 152 { 153 this( null, pipe, groupingFields, firstFields, threshold ); 154 } 155 156 /** 157 * Constructor FirstBy creates a new FirstBy instance. 158 * 159 * @param name of type String 160 * @param pipe of type Pipe 161 * @param groupingFields of type Fields 162 * @param firstFields of type Fields 163 */ 164 @ConstructorProperties({"name", "pipe", "groupingFields", "firstFields"}) 165 public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields ) 166 { 167 this( name, pipe, groupingFields, firstFields, USE_DEFAULT_THRESHOLD ); 168 } 169 170 /** 171 * Constructor FirstBy creates a new FirstBy instance. 172 * 173 * @param name of type String 174 * @param pipe of type Pipe 175 * @param groupingFields of type Fields 176 * @param firstFields of type Fields 177 * @param threshold of type int 178 */ 179 @ConstructorProperties({"name", "pipe", "groupingFields", "firstFields", "threshold"}) 180 public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields, int threshold ) 181 { 182 this( name, Pipe.pipes( pipe ), groupingFields, firstFields, threshold ); 183 } 184 185 /** 186 * Constructor FirstBy creates a new FirstBy instance. 187 * 188 * @param pipes of type Pipe[] 189 * @param groupingFields of type Fields 190 * @param firstFields of type Fields 191 */ 192 @ConstructorProperties({"pipes", "groupingFields", "firstFields"}) 193 public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields ) 194 { 195 this( null, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD ); 196 } 197 198 /** 199 * Constructor FirstBy creates a new FirstBy instance. 200 * 201 * @param pipes of type Pipe[] 202 * @param groupingFields of type Fields 203 * @param firstFields of type Fields 204 * @param threshold of type int 205 */ 206 @ConstructorProperties({"pipes", "groupingFields", "firstFields", "threshold"}) 207 public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold ) 208 { 209 this( null, pipes, groupingFields, firstFields, threshold ); 210 } 211 212 /** 213 * Constructor FirstBy creates a new FirstBy instance. 214 * 215 * @param name of type String 216 * @param pipes of type Pipe[] 217 * @param groupingFields of type Fields 218 * @param firstFields of type Fields 219 */ 220 @ConstructorProperties({"name", "pipes", "groupingFields", "firstFields"}) 221 public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields ) 222 { 223 this( name, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD ); 224 } 225 226 /** 227 * Constructor FirstBy creates a new FirstBy instance. 228 * 229 * @param name of type String 230 * @param pipes of type Pipe[] 231 * @param groupingFields of type Fields 232 * @param firstFields of type Fields 233 * @param threshold of type int 234 */ 235 @ConstructorProperties({"name", "pipes", "groupingFields", "firstFields", "threshold"}) 236 public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold ) 237 { 238 super( name, pipes, groupingFields, firstFields, new FirstPartials( firstFields ), new First( firstFields ), threshold ); 239 } 240 }