001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.operation.filter; 022 023 import java.beans.ConstructorProperties; 024 025 import cascading.flow.FlowProcess; 026 import cascading.operation.BaseOperation; 027 import cascading.operation.Filter; 028 import cascading.operation.FilterCall; 029 import cascading.operation.OperationCall; 030 031 /** 032 * Class Limit is a {@link Filter} that will limit the number of {@link cascading.tuple.Tuple} instances that it will 033 * allow to pass. 034 * <br/> 035 * Note that the limit value is roughly a suggestion. It attempts to divide the limit number by the number of concurrent 036 * tasks, but knowing the number of tasks isn't easily available from configuration information provided to individual 037 * tasks. Further, the number of records/lines available to a task may be less than the limit amount. 038 * <br/> 039 * More consistent results will be received from using {@link Sample}. 040 * 041 * @see Sample 042 */ 043 public class Limit extends BaseOperation<Limit.Context> implements Filter<Limit.Context> 044 { 045 private long limit = 0; 046 047 public static class Context 048 { 049 public long limit = 0; 050 public long count = 0; 051 052 public boolean increment() 053 { 054 if( limit == count ) 055 return true; 056 057 count++; 058 059 return false; 060 } 061 } 062 063 /** 064 * Creates a new Limit class that only allows limit number of Tuple instances to pass. 065 * 066 * @param limit the number of tuples to let pass 067 */ 068 @ConstructorProperties({"limit"}) 069 public Limit( long limit ) 070 { 071 this.limit = limit; 072 } 073 074 public long getLimit() 075 { 076 return limit; 077 } 078 079 @Override 080 public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall ) 081 { 082 super.prepare( flowProcess, operationCall ); 083 084 Context context = new Context(); 085 086 operationCall.setContext( context ); 087 088 int numTasks = flowProcess.getNumProcessSlices(); 089 int taskNum = flowProcess.getCurrentSliceNum(); 090 091 context.limit = (long) Math.floor( (double) limit / (double) numTasks ); 092 093 long remainingLimit = limit % numTasks; 094 095 // evenly divide limits across tasks 096 context.limit += taskNum < remainingLimit ? 1 : 0; 097 } 098 099 @Override 100 public boolean isRemove( FlowProcess flowProcess, FilterCall<Context> filterCall ) 101 { 102 return filterCall.getContext().increment(); 103 } 104 105 @Override 106 public boolean equals( Object object ) 107 { 108 if( this == object ) 109 return true; 110 if( !( object instanceof Limit ) ) 111 return false; 112 if( !super.equals( object ) ) 113 return false; 114 115 Limit limit1 = (Limit) object; 116 117 if( limit != limit1.limit ) 118 return false; 119 120 return true; 121 } 122 123 @Override 124 public int hashCode() 125 { 126 int result = super.hashCode(); 127 result = 31 * result + (int) ( limit ^ limit >>> 32 ); 128 return result; 129 } 130 }