001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation.aggregator;
022
023import java.beans.ConstructorProperties;
024
025import cascading.flow.FlowProcess;
026import cascading.management.annotation.Property;
027import cascading.management.annotation.PropertyDescription;
028import cascading.management.annotation.Visibility;
029import cascading.operation.Aggregator;
030import cascading.operation.AggregatorCall;
031import cascading.tuple.Fields;
032import cascading.tuple.Tuple;
033import cascading.tuple.TupleEntry;
034
035/**
036 * Class First is an {@link Aggregator} that returns the first {@link Tuple} encountered in a grouping.
037 * <p/>
038 * By default, it returns the first Tuple of {@link Fields#ARGS} found.
039 * <p/>
040 * If {@code firstN} is given, Tuples with each of the first N number of Tuples encountered are returned. That is,
041 * this Aggregator will return at maximum N tuples per grouping.
042 * <p/>
043 * Be sure to set the {@link cascading.pipe.GroupBy} {@code sortFields} to control which Tuples are seen first.
044 */
045public class First extends ExtentBase
046  {
047  private final int firstN;
048
049  /** Selects and returns the first argument Tuple encountered. */
050  public First()
051    {
052    super( Fields.ARGS );
053
054    this.firstN = 1;
055    }
056
057  /**
058   * Selects and returns the first N argument Tuples encountered.
059   *
060   * @param firstN of type int
061   */
062  @ConstructorProperties({"firstN"})
063  public First( int firstN )
064    {
065    super( Fields.ARGS );
066
067    this.firstN = firstN;
068    }
069
070  /**
071   * Selects and returns the first argument Tuple encountered.
072   *
073   * @param fieldDeclaration of type Fields
074   */
075  @ConstructorProperties({"fieldDeclaration"})
076  public First( Fields fieldDeclaration )
077    {
078    super( fieldDeclaration.size(), fieldDeclaration );
079
080    this.firstN = 1;
081    }
082
083  /**
084   * Selects and returns the first N argument Tuples encountered.
085   *
086   * @param fieldDeclaration of type Fields
087   * @param firstN           of type int
088   */
089  @ConstructorProperties({"fieldDeclaration", "firstN"})
090  public First( Fields fieldDeclaration, int firstN )
091    {
092    super( fieldDeclaration.size(), fieldDeclaration );
093
094    this.firstN = firstN;
095    }
096
097  /**
098   * Selects and returns the first argument Tuple encountered, unless the Tuple
099   * is a member of the set ignoreTuples.
100   *
101   * @param fieldDeclaration of type Fields
102   * @param ignoreTuples     of type Tuple...
103   */
104  @ConstructorProperties({"fieldDeclaration", "ignoreTuples"})
105  public First( Fields fieldDeclaration, Tuple... ignoreTuples )
106    {
107    super( fieldDeclaration, ignoreTuples );
108
109    this.firstN = 1;
110    }
111
112  @Property(name = "firstN", visibility = Visibility.PUBLIC)
113  @PropertyDescription("The number of tuples to return.")
114  public int getFirstN()
115    {
116    return firstN;
117    }
118
119  protected void performOperation( Tuple[] context, TupleEntry entry )
120    {
121    if( context[ 0 ] == null )
122      context[ 0 ] = new Tuple();
123
124    if( context[ 0 ].size() < firstN )
125      context[ 0 ].add( entry.getTupleCopy() );
126    }
127
128  @Override
129  public void complete( FlowProcess flowProcess, AggregatorCall<Tuple[]> aggregatorCall )
130    {
131    Tuple context = aggregatorCall.getContext()[ 0 ];
132
133    if( context == null )
134      return;
135
136    for( Object tuple : context )
137      aggregatorCall.getOutputCollector().add( (Tuple) tuple );
138    }
139  }