001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation.filter;
022
023import java.beans.ConstructorProperties;
024
025import cascading.flow.FlowProcess;
026import cascading.management.annotation.Property;
027import cascading.management.annotation.PropertyDescription;
028import cascading.management.annotation.Visibility;
029import cascading.operation.BaseOperation;
030import cascading.operation.Filter;
031import cascading.operation.FilterCall;
032import cascading.operation.OperationCall;
033
034/**
035 * Class Limit is a {@link Filter} that will limit the number of {@link cascading.tuple.Tuple} instances that it will
036 * allow to pass.
037 * <br/>
038 * Note that the limit value is roughly a suggestion. It attempts to divide the limit number by the number of concurrent
039 * tasks, but knowing the number of tasks isn't easily available from configuration information provided to individual
040 * tasks. Further, the number of records/lines available to a task may be less than the limit amount.
041 * <br/>
042 * More consistent results will be received from using {@link Sample}.
043 *
044 * @see Sample
045 */
046public class Limit extends BaseOperation<Limit.Context> implements Filter<Limit.Context>
047  {
048  private long limit = 0;
049
050  public static class Context
051    {
052    public long limit = 0;
053    public long count = 0;
054
055    public boolean increment()
056      {
057      if( limit == count )
058        return true;
059
060      count++;
061
062      return false;
063      }
064    }
065
066  /**
067   * Creates a new Limit class that only allows limit number of Tuple instances to pass.
068   *
069   * @param limit the number of tuples to let pass
070   */
071  @ConstructorProperties({"limit"})
072  public Limit( long limit )
073    {
074    this.limit = limit;
075    }
076
077  @Property(name = "limit", visibility = Visibility.PUBLIC)
078  @PropertyDescription("The upper limit.")
079  public long getLimit()
080    {
081    return limit;
082    }
083
084  @Override
085  public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
086    {
087    super.prepare( flowProcess, operationCall );
088
089    Context context = new Context();
090
091    operationCall.setContext( context );
092
093    int numTasks = flowProcess.getNumProcessSlices();
094    int taskNum = flowProcess.getCurrentSliceNum();
095
096    context.limit = (long) Math.floor( (double) limit / (double) numTasks );
097
098    long remainingLimit = limit % numTasks;
099
100    // evenly divide limits across tasks
101    context.limit += taskNum < remainingLimit ? 1 : 0;
102    }
103
104  @Override
105  public boolean isRemove( FlowProcess flowProcess, FilterCall<Context> filterCall )
106    {
107    return filterCall.getContext().increment();
108    }
109
110  @Override
111  public boolean equals( Object object )
112    {
113    if( this == object )
114      return true;
115    if( !( object instanceof Limit ) )
116      return false;
117    if( !super.equals( object ) )
118      return false;
119
120    Limit limit1 = (Limit) object;
121
122    if( limit != limit1.limit )
123      return false;
124
125    return true;
126    }
127
128  @Override
129  public int hashCode()
130    {
131    int result = super.hashCode();
132    result = 31 * result + (int) ( limit ^ limit >>> 32 );
133    return result;
134    }
135  }