001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    
025    import cascading.flow.FlowProcess;
026    import cascading.operation.aggregator.First;
027    import cascading.pipe.Pipe;
028    import cascading.tuple.Fields;
029    import cascading.tuple.Tuple;
030    import cascading.tuple.TupleEntry;
031    
032    /**
033     * Class FirstBy is used to return the first encountered Tuple in a tuple stream grouping.
034     * <p/>
035     * Typically finding the first Tuple in a tuple stream grouping relies on a {@link cascading.pipe.GroupBy} and a
036     * {@link cascading.operation.aggregator.First} {@link cascading.operation.Aggregator} operation.
037     * <p/>
038     * If the {@code firstFields} argument has custom {@link java.util.Comparator} instances, they will be used
039     * as the GroupBy {@code sortFields}.
040     * <p/>
041     * This SubAssembly also uses the {@link cascading.pipe.assembly.FirstBy.FirstPartials}
042     * {@link cascading.pipe.assembly.AggregateBy.Functor}
043     * to collect field values before the GroupBy operator to reduce IO over the network.
044     * <p/>
045     * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
046     * in a much simpler mechanism.
047     * <p/>
048     * The {@code threshold} value tells the underlying FirstPartials functions how many unique key counts to accumulate
049     * in the LRU cache, before emitting the least recently used entry.
050     * <p/>
051     * By default, either the value of {@link #AGGREGATE_BY_THRESHOLD} System property or {@link AggregateBy#DEFAULT_THRESHOLD}
052     * will be used.
053     *
054     * @see AggregateBy
055     */
056    public class FirstBy extends AggregateBy
057      {
058      /**
059       * Class CountPartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count observed duplicates from the tuple stream.
060       * <p/>
061       * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum}
062       * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values
063       * as possible before the intermediate {@link cascading.pipe.GroupBy} operator.
064       *
065       * @see cascading.pipe.assembly.FirstBy
066       */
067      public static class FirstPartials implements Functor
068        {
069        private final Fields declaredFields;
070        private Boolean doComparison;
071    
072        /**
073         * Constructor FirstPartials creates a new FirstPartials instance.
074         *
075         * @param declaredFields of type Fields
076         */
077        public FirstPartials( Fields declaredFields )
078          {
079          this.declaredFields = declaredFields;
080          }
081    
082        @Override
083        public Fields getDeclaredFields()
084          {
085          return declaredFields;
086          }
087    
088        @Override
089        public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
090          {
091          if( doComparison == null )
092            doComparison = args.getFields().hasComparators(); // ensure we use resolved fields
093    
094          if( context == null || ( doComparison && args.getFields().compare( context, args.getTuple() ) > 0 ) )
095            return args.getTupleCopy();
096    
097          return context;
098          }
099    
100        @Override
101        public Tuple complete( FlowProcess flowProcess, Tuple context )
102          {
103          return context;
104          }
105        }
106    
107      /**
108       * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy}
109       * instance.
110       *
111       * @param firstFields of type Fields
112       */
113      @ConstructorProperties( {"firstFields"} )
114      public FirstBy( Fields firstFields )
115        {
116        super( firstFields, new FirstPartials( firstFields ), new First( firstFields ) );
117        }
118    
119      /**
120       * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy}
121       * instance.
122       *
123       * @param firstFields of type Fields
124       */
125      @ConstructorProperties( {"argumentFields", "firstFields"} )
126      public FirstBy( Fields argumentFields, Fields firstFields )
127        {
128        super( argumentFields, new FirstPartials( argumentFields ), new First( firstFields ) );
129        }
130    
131      ///////
132    
133      /**
134       * Constructor FirstBy creates a new FirstBy instance.
135       *
136       * @param pipe           of type Pipe
137       * @param groupingFields of type Fields
138       * @param firstFields    of type Fields
139       */
140      @ConstructorProperties( {"pipe", "groupingFields", "firstFields"} )
141      public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields )
142        {
143        this( null, pipe, groupingFields, firstFields );
144        }
145    
146      /**
147       * Constructor FirstBy creates a new FirstBy instance.
148       *
149       * @param pipe           of type Pipe
150       * @param groupingFields of type Fields
151       * @param firstFields    fo type Fields
152       * @param threshold      of type int
153       */
154      @ConstructorProperties( {"pipe", "groupingFields", "firstFields", "threshold"} )
155      public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields, int threshold )
156        {
157        this( null, pipe, groupingFields, firstFields, threshold );
158        }
159    
160      /**
161       * Constructor FirstBy creates a new FirstBy instance.
162       *
163       * @param name           of type String
164       * @param pipe           of type Pipe
165       * @param groupingFields of type Fields
166       * @param firstFields    of type Fields
167       */
168      @ConstructorProperties( {"name", "pipe", "groupingFields", "firstFields"} )
169      public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields )
170        {
171        this( name, pipe, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
172        }
173    
174      /**
175       * Constructor FirstBy creates a new FirstBy instance.
176       *
177       * @param name           of type String
178       * @param pipe           of type Pipe
179       * @param groupingFields of type Fields
180       * @param firstFields    of type Fields
181       * @param threshold      of type int
182       */
183      @ConstructorProperties( {"name", "pipe", "groupingFields", "firstFields", "threshold"} )
184      public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields, int threshold )
185        {
186        this( name, Pipe.pipes( pipe ), groupingFields, firstFields, threshold );
187        }
188    
189      /**
190       * Constructor FirstBy creates a new FirstBy instance.
191       *
192       * @param pipes          of type Pipe[]
193       * @param groupingFields of type Fields
194       * @param firstFields    of type Fields
195       */
196      @ConstructorProperties( {"pipes", "groupingFields", "firstFields"} )
197      public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields )
198        {
199        this( null, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
200        }
201    
202      /**
203       * Constructor FirstBy creates a new FirstBy instance.
204       *
205       * @param pipes          of type Pipe[]
206       * @param groupingFields of type Fields
207       * @param firstFields    of type Fields
208       * @param threshold      of type int
209       */
210      @ConstructorProperties( {"pipes", "groupingFields", "firstFields", "threshold"} )
211      public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold )
212        {
213        this( null, pipes, groupingFields, firstFields, threshold );
214        }
215    
216      /**
217       * Constructor FirstBy creates a new FirstBy instance.
218       *
219       * @param name           of type String
220       * @param pipes          of type Pipe[]
221       * @param groupingFields of type Fields
222       * @param firstFields    of type Fields
223       */
224      @ConstructorProperties( {"name", "pipes", "groupingFields", "firstFields"} )
225      public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields )
226        {
227        this( name, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
228        }
229    
230      /**
231       * Constructor FirstBy creates a new FirstBy instance.
232       *
233       * @param name           of type String
234       * @param pipes          of type Pipe[]
235       * @param groupingFields of type Fields
236       * @param firstFields    of type Fields
237       * @param threshold      of type int
238       */
239      @ConstructorProperties( {"name", "pipes", "groupingFields", "firstFields", "threshold"} )
240      public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold )
241        {
242        super( name, pipes, groupingFields, firstFields, new FirstPartials( firstFields ), new First( firstFields ), threshold );
243        }
244      }