001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.operation.buffer; 022 023 import java.beans.ConstructorProperties; 024 import java.util.Iterator; 025 026 import cascading.flow.FlowProcess; 027 import cascading.management.annotation.Property; 028 import cascading.management.annotation.PropertyDescription; 029 import cascading.management.annotation.Visibility; 030 import cascading.operation.BaseOperation; 031 import cascading.operation.Buffer; 032 import cascading.operation.BufferCall; 033 import cascading.tuple.Fields; 034 import cascading.tuple.TupleEntry; 035 036 /** 037 * Class FirstNBuffer will return the first N tuples seen in a given grouping. After the tuples 038 * are returned the Buffer stops iterating the arguments unlike the {@link cascading.operation.aggregator.First} 039 * {@link cascading.operation.Aggregator} which by contract sees all the values in the grouping. 040 * <p/> 041 * By default it returns one Tuple. 042 * <p/> 043 * Order can be controlled through the prior {@link cascading.pipe.GroupBy} or {@link cascading.pipe.CoGroup} 044 * pipes. 045 * <p/> 046 * This class is used by {@link cascading.pipe.assembly.Unique}. 047 */ 048 public class FirstNBuffer extends BaseOperation implements Buffer 049 { 050 private final int firstN; 051 052 /** Selects and returns the first argument Tuple encountered. */ 053 public FirstNBuffer() 054 { 055 super( Fields.ARGS ); 056 057 firstN = 1; 058 } 059 060 /** 061 * Selects and returns the first N argument Tuples encountered. 062 * 063 * @param firstN of type int 064 */ 065 @ConstructorProperties({"firstN"}) 066 public FirstNBuffer( int firstN ) 067 { 068 super( Fields.ARGS ); 069 070 this.firstN = firstN; 071 } 072 073 /** 074 * Selects and returns the first argument Tuple encountered. 075 * 076 * @param fieldDeclaration of type Fields 077 */ 078 @ConstructorProperties({"fieldDeclaration"}) 079 public FirstNBuffer( Fields fieldDeclaration ) 080 { 081 super( fieldDeclaration.size(), fieldDeclaration ); 082 083 this.firstN = 1; 084 } 085 086 /** 087 * Selects and returns the first N argument Tuples encountered. 088 * 089 * @param fieldDeclaration of type Fields 090 * @param firstN of type int 091 */ 092 @ConstructorProperties({"fieldDeclaration", "firstN"}) 093 public FirstNBuffer( Fields fieldDeclaration, int firstN ) 094 { 095 super( fieldDeclaration.size(), fieldDeclaration ); 096 097 this.firstN = firstN; 098 } 099 100 @Property(name = "firstN", visibility = Visibility.PUBLIC) 101 @PropertyDescription("The number of tuples to return.") 102 public int getFirstN() 103 { 104 return firstN; 105 } 106 107 @Override 108 public void operate( FlowProcess flowProcess, BufferCall bufferCall ) 109 { 110 Iterator<TupleEntry> iterator = bufferCall.getArgumentsIterator(); 111 112 int count = 0; 113 114 while( count < firstN && iterator.hasNext() ) 115 { 116 bufferCall.getOutputCollector().add( iterator.next() ); 117 count++; 118 } 119 } 120 }