001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.aggregator;
022    
023    import java.beans.ConstructorProperties;
024    
025    import cascading.flow.FlowProcess;
026    import cascading.operation.Aggregator;
027    import cascading.operation.AggregatorCall;
028    import cascading.tuple.Fields;
029    import cascading.tuple.Tuple;
030    import cascading.tuple.TupleEntry;
031    
032    /**
033     * Class First is an {@link Aggregator} that returns the first {@link Tuple} encountered in a grouping.
034     * <p/>
035     * By default, it returns the first Tuple of {@link Fields#ARGS} found.
036     * <p/>
037     * If {@code firstN} is given, Tuples with each of the first N number of Tuples encountered are returned. That is,
038     * this Aggregator will return at maximum N tuples per grouping.
039     * <p/>
040     * Be sure to set the {@link cascading.pipe.GroupBy} {@code sortFields} to control which Tuples are seen first.
041     */
042    public class First extends ExtentBase
043      {
044      private final int firstN;
045    
046      /** Selects and returns the first argument Tuple encountered. */
047      public First()
048        {
049        super( Fields.ARGS );
050    
051        this.firstN = 1;
052        }
053    
054      /**
055       * Selects and returns the first N argument Tuples encountered.
056       *
057       * @param firstN of type int
058       */
059      @ConstructorProperties({"firstN"})
060      public First( int firstN )
061        {
062        super( Fields.ARGS );
063    
064        this.firstN = firstN;
065        }
066    
067      /**
068       * Selects and returns the first argument Tuple encountered.
069       *
070       * @param fieldDeclaration of type Fields
071       */
072      @ConstructorProperties({"fieldDeclaration"})
073      public First( Fields fieldDeclaration )
074        {
075        super( fieldDeclaration.size(), fieldDeclaration );
076    
077        this.firstN = 1;
078        }
079    
080      /**
081       * Selects and returns the first N argument Tuples encountered.
082       *
083       * @param fieldDeclaration of type Fields
084       * @param firstN           of type int
085       */
086      @ConstructorProperties({"fieldDeclaration", "firstN"})
087      public First( Fields fieldDeclaration, int firstN )
088        {
089        super( fieldDeclaration.size(), fieldDeclaration );
090    
091        this.firstN = firstN;
092        }
093    
094      /**
095       * Selects and returns the first argument Tuple encountered, unless the Tuple
096       * is a member of the set ignoreTuples.
097       *
098       * @param fieldDeclaration of type Fields
099       * @param ignoreTuples     of type Tuple...
100       */
101      @ConstructorProperties({"fieldDeclaration", "ignoreTuples"})
102      public First( Fields fieldDeclaration, Tuple... ignoreTuples )
103        {
104        super( fieldDeclaration, ignoreTuples );
105    
106        this.firstN = 1;
107        }
108    
109      public int getFirstN()
110        {
111        return firstN;
112        }
113    
114      protected void performOperation( Tuple[] context, TupleEntry entry )
115        {
116        if( context[ 0 ] == null )
117          context[ 0 ] = new Tuple();
118    
119        if( context[ 0 ].size() < firstN )
120          context[ 0 ].add( entry.getTupleCopy() );
121        }
122    
123      @Override
124      public void complete( FlowProcess flowProcess, AggregatorCall<Tuple[]> aggregatorCall )
125        {
126        Tuple context = aggregatorCall.getContext()[ 0 ];
127    
128        if( context == null )
129          return;
130    
131        for( Object tuple : context )
132          aggregatorCall.getOutputCollector().add( (Tuple) tuple );
133        }
134      }