001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.buffer;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Iterator;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.management.annotation.Property;
028    import cascading.management.annotation.PropertyDescription;
029    import cascading.management.annotation.Visibility;
030    import cascading.operation.BaseOperation;
031    import cascading.operation.Buffer;
032    import cascading.operation.BufferCall;
033    import cascading.tuple.Fields;
034    import cascading.tuple.TupleEntry;
035    
036    /**
037     * Class FirstNBuffer will return the first N tuples seen in a given grouping. After the tuples
038     * are returned the Buffer stops iterating the arguments unlike the {@link cascading.operation.aggregator.First}
039     * {@link cascading.operation.Aggregator} which by contract sees all the values in the grouping.
040     * <p/>
041     * By default it returns one Tuple.
042     * <p/>
043     * Order can be controlled through the prior {@link cascading.pipe.GroupBy} or {@link cascading.pipe.CoGroup}
044     * pipes.
045     * <p/>
046     * This class is used by {@link cascading.pipe.assembly.Unique}.
047     */
048    public class FirstNBuffer extends BaseOperation implements Buffer
049      {
050      private final int firstN;
051    
052      /** Selects and returns the first argument Tuple encountered. */
053      public FirstNBuffer()
054        {
055        super( Fields.ARGS );
056    
057        firstN = 1;
058        }
059    
060      /**
061       * Selects and returns the first N argument Tuples encountered.
062       *
063       * @param firstN of type int
064       */
065      @ConstructorProperties({"firstN"})
066      public FirstNBuffer( int firstN )
067        {
068        super( Fields.ARGS );
069    
070        this.firstN = firstN;
071        }
072    
073      /**
074       * Selects and returns the first argument Tuple encountered.
075       *
076       * @param fieldDeclaration of type Fields
077       */
078      @ConstructorProperties({"fieldDeclaration"})
079      public FirstNBuffer( Fields fieldDeclaration )
080        {
081        super( fieldDeclaration.size(), fieldDeclaration );
082    
083        this.firstN = 1;
084        }
085    
086      /**
087       * Selects and returns the first N argument Tuples encountered.
088       *
089       * @param fieldDeclaration of type Fields
090       * @param firstN           of type int
091       */
092      @ConstructorProperties({"fieldDeclaration", "firstN"})
093      public FirstNBuffer( Fields fieldDeclaration, int firstN )
094        {
095        super( fieldDeclaration.size(), fieldDeclaration );
096    
097        this.firstN = firstN;
098        }
099    
100      @Property(name = "firstN", visibility = Visibility.PUBLIC)
101      @PropertyDescription("The number of tuples to return.")
102      public int getFirstN()
103        {
104        return firstN;
105        }
106    
107      @Override
108      public void operate( FlowProcess flowProcess, BufferCall bufferCall )
109        {
110        Iterator<TupleEntry> iterator = bufferCall.getArgumentsIterator();
111    
112        int count = 0;
113    
114        while( count < firstN && iterator.hasNext() )
115          {
116          bufferCall.getOutputCollector().add( iterator.next() );
117          count++;
118          }
119        }
120      }