001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.operation.filter; 022 023 import java.beans.ConstructorProperties; 024 import java.util.Random; 025 026 import cascading.flow.FlowProcess; 027 import cascading.operation.BaseOperation; 028 import cascading.operation.Filter; 029 import cascading.operation.FilterCall; 030 import cascading.operation.OperationCall; 031 032 /** 033 * Class Sample is a {@link Filter} that only allows the given fraction of {@link cascading.tuple.Tuple} instances to pass. 034 * <p/> 035 * Where fraction is between 1 and zero, inclusive. Thus to sample {@code 50%} of the tuples in a stream, use the 036 * fraction {@code 0.5}. 037 * <p/> 038 * By default, the seed is created at random on the constructor. This implies every branch using the Sample 039 * filter will return the same random stream based on that seed. So if this Sample instance is distributed 040 * into multiple systems against the same data, the result will be the same tuple stream. The alternative 041 * would be to make this Operation "not safe". See {@link cascading.operation.Operation#isSafe()}. 042 * <p/> 043 * Conversely, if the same stream of random data is require across application executions, set the seed manually. 044 * <p/> 045 * The seed is generated from the following code: 046 * <p/> 047 * {@code System.identityHashCode(this) * 2654435761L ^ System.currentTimeMillis()} 048 * <p/> 049 * Override {@link #makeSeed()} to customize. 050 */ 051 public class Sample extends BaseOperation<Random> implements Filter<Random> 052 { 053 private long seed = 0; 054 private double fraction = 1.0d; 055 056 /** 057 * Creates a new Sample that permits percent Tuples to pass. 058 * 059 * @param fraction of type double 060 */ 061 @ConstructorProperties({"fraction"}) 062 public Sample( double fraction ) 063 { 064 this.seed = makeSeed(); 065 this.fraction = fraction; 066 } 067 068 /** 069 * Creates a new Sample that permits percent Tuples to pass. The given seed value seeds the random number generator. 070 * 071 * @param seed of type long 072 * @param fraction of type double 073 */ 074 @ConstructorProperties({"seed", "fraction"}) 075 public Sample( long seed, double fraction ) 076 { 077 this.seed = seed; 078 this.fraction = fraction; 079 } 080 081 public long getSeed() 082 { 083 return seed; 084 } 085 086 public double getFraction() 087 { 088 return fraction; 089 } 090 091 protected long makeSeed() 092 { 093 return System.identityHashCode( this ) * 2654435761L ^ System.currentTimeMillis(); 094 } 095 096 @Override 097 public void prepare( FlowProcess flowProcess, OperationCall<Random> operationCall ) 098 { 099 super.prepare( flowProcess, operationCall ); 100 101 operationCall.setContext( new Random( seed ) ); 102 } 103 104 @Override 105 public boolean isRemove( FlowProcess flowProcess, FilterCall<Random> filterCall ) 106 { 107 return !( filterCall.getContext().nextDouble() < fraction ); 108 } 109 110 @Override 111 public boolean equals( Object object ) 112 { 113 if( this == object ) 114 return true; 115 if( !( object instanceof Sample ) ) 116 return false; 117 if( !super.equals( object ) ) 118 return false; 119 120 Sample sample = (Sample) object; 121 122 if( Double.compare( sample.fraction, fraction ) != 0 ) 123 return false; 124 if( seed != sample.seed ) 125 return false; 126 127 return true; 128 } 129 130 @Override 131 public int hashCode() 132 { 133 int result = super.hashCode(); 134 long temp; 135 result = 31 * result + (int) ( seed ^ seed >>> 32 ); 136 temp = fraction != +0.0d ? Double.doubleToLongBits( fraction ) : 0L; 137 result = 31 * result + (int) ( temp ^ temp >>> 32 ); 138 return result; 139 } 140 }