001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    
025    import cascading.flow.FlowProcess;
026    import cascading.operation.aggregator.First;
027    import cascading.pipe.Pipe;
028    import cascading.tuple.Fields;
029    import cascading.tuple.Tuple;
030    import cascading.tuple.TupleEntry;
031    
032    /**
033     * Class FirstBy is used to return the first encountered Tuple in a tuple stream grouping.
034     * <p/>
035     * Typically finding the first Tuple in a tuple stream grouping relies on a {@link cascading.pipe.GroupBy} and a
036     * {@link cascading.operation.aggregator.First} {@link cascading.operation.Aggregator} operation.
037     * <p/>
038     * If the {@code firstFields} argument has custom {@link java.util.Comparator} instances, they will be used
039     * as the GroupBy {@code sortFields}.
040     * <p/>
041     * This SubAssembly also uses the {@link cascading.pipe.assembly.FirstBy.FirstPartials}
042     * {@link cascading.pipe.assembly.AggregateBy.Functor}
043     * to collect field values before the GroupBy operator to reduce IO over the network.
044     * <p/>
045     * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
046     * in a much simpler mechanism.
047     * <p/>
048     * The {@code threshold} value tells the underlying FirstPartials functions how many unique key counts to accumulate
049     * in the LRU cache, before emitting the least recently used entry.
050     * <p/>
051     * By default, either the value of {@link #AGGREGATE_BY_THRESHOLD} System property or {@link AggregateBy#DEFAULT_THRESHOLD}
052     * will be used.
053     *
054     * @see AggregateBy
055     */
056    public class FirstBy extends AggregateBy
057      {
058      /**
059       * Class CountPartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count observed duplicates from the tuple stream.
060       * <p/>
061       * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum}
062       * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values
063       * as possible before the intermediate {@link cascading.pipe.GroupBy} operator.
064       *
065       * @see cascading.pipe.assembly.FirstBy
066       */
067      public static class FirstPartials implements Functor
068        {
069        private final Fields declaredFields;
070    
071        /**
072         * Constructor FirstPartials creates a new FirstPartials instance.
073         *
074         * @param declaredFields of type Fields
075         */
076        public FirstPartials( Fields declaredFields )
077          {
078          this.declaredFields = declaredFields;
079          }
080    
081        @Override
082        public Fields getDeclaredFields()
083          {
084          return declaredFields;
085          }
086    
087        @Override
088        public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
089          {
090          if( context == null || args.getFields().compare( context, args.getTuple() ) > 0 )
091            return args.getTupleCopy();
092    
093          return context;
094          }
095    
096        @Override
097        public Tuple complete( FlowProcess flowProcess, Tuple context )
098          {
099          return context;
100          }
101        }
102    
103      /**
104       * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy}
105       * instance.
106       *
107       * @param firstFields of type Fields
108       */
109      @ConstructorProperties({"firstFields"})
110      public FirstBy( Fields firstFields )
111        {
112        super( firstFields, new FirstPartials( firstFields ), new First( firstFields ) );
113        }
114    
115      /**
116       * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy}
117       * instance.
118       *
119       * @param firstFields of type Fields
120       */
121      @ConstructorProperties({"argumentFields", "firstFields"})
122      public FirstBy( Fields argumentFields, Fields firstFields )
123        {
124        super( argumentFields, new FirstPartials( argumentFields ), new First( firstFields ) );
125        }
126    
127      ///////
128    
129      /**
130       * Constructor FirstBy creates a new FirstBy instance.
131       *
132       * @param pipe           of type Pipe
133       * @param groupingFields of type Fields
134       * @param firstFields    of type Fields
135       */
136      @ConstructorProperties({"pipe", "groupingFields", "firstFields"})
137      public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields )
138        {
139        this( null, pipe, groupingFields, firstFields );
140        }
141    
142      /**
143       * Constructor FirstBy creates a new FirstBy instance.
144       *
145       * @param pipe           of type Pipe
146       * @param groupingFields of type Fields
147       * @param firstFields    fo type Fields
148       * @param threshold      of type int
149       */
150      @ConstructorProperties({"pipe", "groupingFields", "firstFields", "threshold"})
151      public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields, int threshold )
152        {
153        this( null, pipe, groupingFields, firstFields, threshold );
154        }
155    
156      /**
157       * Constructor FirstBy creates a new FirstBy instance.
158       *
159       * @param name           of type String
160       * @param pipe           of type Pipe
161       * @param groupingFields of type Fields
162       * @param firstFields    of type Fields
163       */
164      @ConstructorProperties({"name", "pipe", "groupingFields", "firstFields"})
165      public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields )
166        {
167        this( name, pipe, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
168        }
169    
170      /**
171       * Constructor FirstBy creates a new FirstBy instance.
172       *
173       * @param name           of type String
174       * @param pipe           of type Pipe
175       * @param groupingFields of type Fields
176       * @param firstFields    of type Fields
177       * @param threshold      of type int
178       */
179      @ConstructorProperties({"name", "pipe", "groupingFields", "firstFields", "threshold"})
180      public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields, int threshold )
181        {
182        this( name, Pipe.pipes( pipe ), groupingFields, firstFields, threshold );
183        }
184    
185      /**
186       * Constructor FirstBy creates a new FirstBy instance.
187       *
188       * @param pipes          of type Pipe[]
189       * @param groupingFields of type Fields
190       * @param firstFields    of type Fields
191       */
192      @ConstructorProperties({"pipes", "groupingFields", "firstFields"})
193      public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields )
194        {
195        this( null, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
196        }
197    
198      /**
199       * Constructor FirstBy creates a new FirstBy instance.
200       *
201       * @param pipes          of type Pipe[]
202       * @param groupingFields of type Fields
203       * @param firstFields    of type Fields
204       * @param threshold      of type int
205       */
206      @ConstructorProperties({"pipes", "groupingFields", "firstFields", "threshold"})
207      public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold )
208        {
209        this( null, pipes, groupingFields, firstFields, threshold );
210        }
211    
212      /**
213       * Constructor FirstBy creates a new FirstBy instance.
214       *
215       * @param name           of type String
216       * @param pipes          of type Pipe[]
217       * @param groupingFields of type Fields
218       * @param firstFields    of type Fields
219       */
220      @ConstructorProperties({"name", "pipes", "groupingFields", "firstFields"})
221      public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields )
222        {
223        this( name, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
224        }
225    
226      /**
227       * Constructor FirstBy creates a new FirstBy instance.
228       *
229       * @param name           of type String
230       * @param pipes          of type Pipe[]
231       * @param groupingFields of type Fields
232       * @param firstFields    of type Fields
233       * @param threshold      of type int
234       */
235      @ConstructorProperties({"name", "pipes", "groupingFields", "firstFields", "threshold"})
236      public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold )
237        {
238        super( name, pipes, groupingFields, firstFields, new FirstPartials( firstFields ), new First( firstFields ), threshold );
239        }
240      }