001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.operation.aggregator; 022 023import java.beans.ConstructorProperties; 024 025import cascading.flow.FlowProcess; 026import cascading.management.annotation.Property; 027import cascading.management.annotation.PropertyDescription; 028import cascading.management.annotation.Visibility; 029import cascading.operation.Aggregator; 030import cascading.operation.AggregatorCall; 031import cascading.tuple.Fields; 032import cascading.tuple.Tuple; 033import cascading.tuple.TupleEntry; 034 035/** 036 * Class First is an {@link Aggregator} that returns the first {@link Tuple} encountered in a grouping. 037 * <p/> 038 * By default, it returns the first Tuple of {@link Fields#ARGS} found. 039 * <p/> 040 * If {@code firstN} is given, Tuples with each of the first N number of Tuples encountered are returned. That is, 041 * this Aggregator will return at maximum N tuples per grouping. 042 * <p/> 043 * Be sure to set the {@link cascading.pipe.GroupBy} {@code sortFields} to control which Tuples are seen first. 044 */ 045public class First extends ExtentBase 046 { 047 private final int firstN; 048 049 /** Selects and returns the first argument Tuple encountered. */ 050 public First() 051 { 052 super( Fields.ARGS ); 053 054 this.firstN = 1; 055 } 056 057 /** 058 * Selects and returns the first N argument Tuples encountered. 059 * 060 * @param firstN of type int 061 */ 062 @ConstructorProperties({"firstN"}) 063 public First( int firstN ) 064 { 065 super( Fields.ARGS ); 066 067 this.firstN = firstN; 068 } 069 070 /** 071 * Selects and returns the first argument Tuple encountered. 072 * 073 * @param fieldDeclaration of type Fields 074 */ 075 @ConstructorProperties({"fieldDeclaration"}) 076 public First( Fields fieldDeclaration ) 077 { 078 super( fieldDeclaration.size(), fieldDeclaration ); 079 080 this.firstN = 1; 081 } 082 083 /** 084 * Selects and returns the first N argument Tuples encountered. 085 * 086 * @param fieldDeclaration of type Fields 087 * @param firstN of type int 088 */ 089 @ConstructorProperties({"fieldDeclaration", "firstN"}) 090 public First( Fields fieldDeclaration, int firstN ) 091 { 092 super( fieldDeclaration.size(), fieldDeclaration ); 093 094 this.firstN = firstN; 095 } 096 097 /** 098 * Selects and returns the first argument Tuple encountered, unless the Tuple 099 * is a member of the set ignoreTuples. 100 * 101 * @param fieldDeclaration of type Fields 102 * @param ignoreTuples of type Tuple... 103 */ 104 @ConstructorProperties({"fieldDeclaration", "ignoreTuples"}) 105 public First( Fields fieldDeclaration, Tuple... ignoreTuples ) 106 { 107 super( fieldDeclaration, ignoreTuples ); 108 109 this.firstN = 1; 110 } 111 112 @Property(name = "firstN", visibility = Visibility.PUBLIC) 113 @PropertyDescription("The number of tuples to return.") 114 public int getFirstN() 115 { 116 return firstN; 117 } 118 119 protected void performOperation( Tuple[] context, TupleEntry entry ) 120 { 121 if( context[ 0 ] == null ) 122 context[ 0 ] = new Tuple(); 123 124 if( context[ 0 ].size() < firstN ) 125 context[ 0 ].add( entry.getTupleCopy() ); 126 } 127 128 @Override 129 public void complete( FlowProcess flowProcess, AggregatorCall<Tuple[]> aggregatorCall ) 130 { 131 Tuple context = aggregatorCall.getContext()[ 0 ]; 132 133 if( context == null ) 134 return; 135 136 for( Object tuple : context ) 137 aggregatorCall.getOutputCollector().add( (Tuple) tuple ); 138 } 139 }