001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.aggregator;
022    
023    import java.beans.ConstructorProperties;
024    
025    import cascading.flow.FlowProcess;
026    import cascading.management.annotation.Property;
027    import cascading.management.annotation.PropertyDescription;
028    import cascading.management.annotation.Visibility;
029    import cascading.operation.Aggregator;
030    import cascading.operation.AggregatorCall;
031    import cascading.tuple.Fields;
032    import cascading.tuple.Tuple;
033    import cascading.tuple.TupleEntry;
034    
035    /**
036     * Class First is an {@link Aggregator} that returns the first {@link Tuple} encountered in a grouping.
037     * <p/>
038     * By default, it returns the first Tuple of {@link Fields#ARGS} found.
039     * <p/>
040     * If {@code firstN} is given, Tuples with each of the first N number of Tuples encountered are returned. That is,
041     * this Aggregator will return at maximum N tuples per grouping.
042     * <p/>
043     * Be sure to set the {@link cascading.pipe.GroupBy} {@code sortFields} to control which Tuples are seen first.
044     */
045    public class First extends ExtentBase
046      {
047      private final int firstN;
048    
049      /** Selects and returns the first argument Tuple encountered. */
050      public First()
051        {
052        super( Fields.ARGS );
053    
054        this.firstN = 1;
055        }
056    
057      /**
058       * Selects and returns the first N argument Tuples encountered.
059       *
060       * @param firstN of type int
061       */
062      @ConstructorProperties({"firstN"})
063      public First( int firstN )
064        {
065        super( Fields.ARGS );
066    
067        this.firstN = firstN;
068        }
069    
070      /**
071       * Selects and returns the first argument Tuple encountered.
072       *
073       * @param fieldDeclaration of type Fields
074       */
075      @ConstructorProperties({"fieldDeclaration"})
076      public First( Fields fieldDeclaration )
077        {
078        super( fieldDeclaration.size(), fieldDeclaration );
079    
080        this.firstN = 1;
081        }
082    
083      /**
084       * Selects and returns the first N argument Tuples encountered.
085       *
086       * @param fieldDeclaration of type Fields
087       * @param firstN           of type int
088       */
089      @ConstructorProperties({"fieldDeclaration", "firstN"})
090      public First( Fields fieldDeclaration, int firstN )
091        {
092        super( fieldDeclaration.size(), fieldDeclaration );
093    
094        this.firstN = firstN;
095        }
096    
097      /**
098       * Selects and returns the first argument Tuple encountered, unless the Tuple
099       * is a member of the set ignoreTuples.
100       *
101       * @param fieldDeclaration of type Fields
102       * @param ignoreTuples     of type Tuple...
103       */
104      @ConstructorProperties({"fieldDeclaration", "ignoreTuples"})
105      public First( Fields fieldDeclaration, Tuple... ignoreTuples )
106        {
107        super( fieldDeclaration, ignoreTuples );
108    
109        this.firstN = 1;
110        }
111    
112      @Property(name = "firstN", visibility = Visibility.PUBLIC)
113      @PropertyDescription("The number of tuples to return.")
114      public int getFirstN()
115        {
116        return firstN;
117        }
118    
119      protected void performOperation( Tuple[] context, TupleEntry entry )
120        {
121        if( context[ 0 ] == null )
122          context[ 0 ] = new Tuple();
123    
124        if( context[ 0 ].size() < firstN )
125          context[ 0 ].add( entry.getTupleCopy() );
126        }
127    
128      @Override
129      public void complete( FlowProcess flowProcess, AggregatorCall<Tuple[]> aggregatorCall )
130        {
131        Tuple context = aggregatorCall.getContext()[ 0 ];
132    
133        if( context == null )
134          return;
135    
136        for( Object tuple : context )
137          aggregatorCall.getOutputCollector().add( (Tuple) tuple );
138        }
139      }