001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.buffer;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Iterator;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.operation.BaseOperation;
028    import cascading.operation.Buffer;
029    import cascading.operation.BufferCall;
030    import cascading.tuple.Fields;
031    import cascading.tuple.TupleEntry;
032    
033    /**
034     * Class FirstNBuffer will return the first N tuples seen in a given grouping. After the tuples
035     * are returned the Buffer stops iterating the arguments unlike the {@link cascading.operation.aggregator.First}
036     * {@link cascading.operation.Aggregator} which by contract sees all the values in the grouping.
037     * <p/>
038     * By default it returns one Tuple.
039     * <p/>
040     * Order can be controlled through the prior {@link cascading.pipe.GroupBy} or {@link cascading.pipe.CoGroup}
041     * pipes.
042     * <p/>
043     * This class is used by {@link cascading.pipe.assembly.Unique}.
044     */
045    public class FirstNBuffer extends BaseOperation implements Buffer
046      {
047      private final int firstN;
048    
049      /** Selects and returns the first argument Tuple encountered. */
050      public FirstNBuffer()
051        {
052        super( Fields.ARGS );
053    
054        firstN = 1;
055        }
056    
057      /**
058       * Selects and returns the first N argument Tuples encountered.
059       *
060       * @param firstN of type int
061       */
062      @ConstructorProperties({"firstN"})
063      public FirstNBuffer( int firstN )
064        {
065        super( Fields.ARGS );
066    
067        this.firstN = firstN;
068        }
069    
070      /**
071       * Selects and returns the first argument Tuple encountered.
072       *
073       * @param fieldDeclaration of type Fields
074       */
075      @ConstructorProperties({"fieldDeclaration"})
076      public FirstNBuffer( Fields fieldDeclaration )
077        {
078        super( fieldDeclaration.size(), fieldDeclaration );
079    
080        this.firstN = 1;
081        }
082    
083      /**
084       * Selects and returns the first N argument Tuples encountered.
085       *
086       * @param fieldDeclaration of type Fields
087       * @param firstN           of type int
088       */
089      @ConstructorProperties({"fieldDeclaration", "firstN"})
090      public FirstNBuffer( Fields fieldDeclaration, int firstN )
091        {
092        super( fieldDeclaration.size(), fieldDeclaration );
093    
094        this.firstN = firstN;
095        }
096    
097      public int getFirstN()
098        {
099        return firstN;
100        }
101    
102      @Override
103      public void operate( FlowProcess flowProcess, BufferCall bufferCall )
104        {
105        Iterator<TupleEntry> iterator = bufferCall.getArgumentsIterator();
106    
107        int count = 0;
108    
109        while( count < firstN && iterator.hasNext() )
110          {
111          bufferCall.getOutputCollector().add( iterator.next() );
112          count++;
113          }
114        }
115      }