001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.operation.buffer; 022 023 import java.beans.ConstructorProperties; 024 import java.util.Iterator; 025 026 import cascading.flow.FlowProcess; 027 import cascading.operation.BaseOperation; 028 import cascading.operation.Buffer; 029 import cascading.operation.BufferCall; 030 import cascading.tuple.Fields; 031 import cascading.tuple.TupleEntry; 032 033 /** 034 * Class FirstNBuffer will return the first N tuples seen in a given grouping. After the tuples 035 * are returned the Buffer stops iterating the arguments unlike the {@link cascading.operation.aggregator.First} 036 * {@link cascading.operation.Aggregator} which by contract sees all the values in the grouping. 037 * <p/> 038 * By default it returns one Tuple. 039 * <p/> 040 * Order can be controlled through the prior {@link cascading.pipe.GroupBy} or {@link cascading.pipe.CoGroup} 041 * pipes. 042 * <p/> 043 * This class is used by {@link cascading.pipe.assembly.Unique}. 044 */ 045 public class FirstNBuffer extends BaseOperation implements Buffer 046 { 047 private final int firstN; 048 049 /** Selects and returns the first argument Tuple encountered. */ 050 public FirstNBuffer() 051 { 052 super( Fields.ARGS ); 053 054 firstN = 1; 055 } 056 057 /** 058 * Selects and returns the first N argument Tuples encountered. 059 * 060 * @param firstN of type int 061 */ 062 @ConstructorProperties({"firstN"}) 063 public FirstNBuffer( int firstN ) 064 { 065 super( Fields.ARGS ); 066 067 this.firstN = firstN; 068 } 069 070 /** 071 * Selects and returns the first argument Tuple encountered. 072 * 073 * @param fieldDeclaration of type Fields 074 */ 075 @ConstructorProperties({"fieldDeclaration"}) 076 public FirstNBuffer( Fields fieldDeclaration ) 077 { 078 super( fieldDeclaration.size(), fieldDeclaration ); 079 080 this.firstN = 1; 081 } 082 083 /** 084 * Selects and returns the first N argument Tuples encountered. 085 * 086 * @param fieldDeclaration of type Fields 087 * @param firstN of type int 088 */ 089 @ConstructorProperties({"fieldDeclaration", "firstN"}) 090 public FirstNBuffer( Fields fieldDeclaration, int firstN ) 091 { 092 super( fieldDeclaration.size(), fieldDeclaration ); 093 094 this.firstN = firstN; 095 } 096 097 public int getFirstN() 098 { 099 return firstN; 100 } 101 102 @Override 103 public void operate( FlowProcess flowProcess, BufferCall bufferCall ) 104 { 105 Iterator<TupleEntry> iterator = bufferCall.getArgumentsIterator(); 106 107 int count = 0; 108 109 while( count < firstN && iterator.hasNext() ) 110 { 111 bufferCall.getOutputCollector().add( iterator.next() ); 112 count++; 113 } 114 } 115 }