001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.operation.aggregator; 022 023 import java.beans.ConstructorProperties; 024 025 import cascading.flow.FlowProcess; 026 import cascading.operation.Aggregator; 027 import cascading.operation.AggregatorCall; 028 import cascading.tuple.Fields; 029 import cascading.tuple.Tuple; 030 import cascading.tuple.TupleEntry; 031 032 /** 033 * Class First is an {@link Aggregator} that returns the first {@link Tuple} encountered in a grouping. 034 * <p/> 035 * By default, it returns the first Tuple of {@link Fields#ARGS} found. 036 * <p/> 037 * If {@code firstN} is given, Tuples with each of the first N number of Tuples encountered are returned. That is, 038 * this Aggregator will return at maximum N tuples per grouping. 039 * <p/> 040 * Be sure to set the {@link cascading.pipe.GroupBy} {@code sortFields} to control which Tuples are seen first. 041 */ 042 public class First extends ExtentBase 043 { 044 private final int firstN; 045 046 /** Selects and returns the first argument Tuple encountered. */ 047 public First() 048 { 049 super( Fields.ARGS ); 050 051 this.firstN = 1; 052 } 053 054 /** 055 * Selects and returns the first N argument Tuples encountered. 056 * 057 * @param firstN of type int 058 */ 059 @ConstructorProperties({"firstN"}) 060 public First( int firstN ) 061 { 062 super( Fields.ARGS ); 063 064 this.firstN = firstN; 065 } 066 067 /** 068 * Selects and returns the first argument Tuple encountered. 069 * 070 * @param fieldDeclaration of type Fields 071 */ 072 @ConstructorProperties({"fieldDeclaration"}) 073 public First( Fields fieldDeclaration ) 074 { 075 super( fieldDeclaration.size(), fieldDeclaration ); 076 077 this.firstN = 1; 078 } 079 080 /** 081 * Selects and returns the first N argument Tuples encountered. 082 * 083 * @param fieldDeclaration of type Fields 084 * @param firstN of type int 085 */ 086 @ConstructorProperties({"fieldDeclaration", "firstN"}) 087 public First( Fields fieldDeclaration, int firstN ) 088 { 089 super( fieldDeclaration.size(), fieldDeclaration ); 090 091 this.firstN = firstN; 092 } 093 094 /** 095 * Selects and returns the first argument Tuple encountered, unless the Tuple 096 * is a member of the set ignoreTuples. 097 * 098 * @param fieldDeclaration of type Fields 099 * @param ignoreTuples of type Tuple... 100 */ 101 @ConstructorProperties({"fieldDeclaration", "ignoreTuples"}) 102 public First( Fields fieldDeclaration, Tuple... ignoreTuples ) 103 { 104 super( fieldDeclaration, ignoreTuples ); 105 106 this.firstN = 1; 107 } 108 109 public int getFirstN() 110 { 111 return firstN; 112 } 113 114 protected void performOperation( Tuple[] context, TupleEntry entry ) 115 { 116 if( context[ 0 ] == null ) 117 context[ 0 ] = new Tuple(); 118 119 if( context[ 0 ].size() < firstN ) 120 context[ 0 ].add( entry.getTupleCopy() ); 121 } 122 123 @Override 124 public void complete( FlowProcess flowProcess, AggregatorCall<Tuple[]> aggregatorCall ) 125 { 126 Tuple context = aggregatorCall.getContext()[ 0 ]; 127 128 if( context == null ) 129 return; 130 131 for( Object tuple : context ) 132 aggregatorCall.getOutputCollector().add( (Tuple) tuple ); 133 } 134 }