001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.filter;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Random;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.operation.BaseOperation;
028    import cascading.operation.Filter;
029    import cascading.operation.FilterCall;
030    import cascading.operation.OperationCall;
031    
032    /**
033     * Class Sample is a {@link Filter} that only allows the given fraction of {@link cascading.tuple.Tuple} instances to pass.
034     * <p/>
035     * Where fraction is between 1 and zero, inclusive. Thus to sample {@code 50%} of the tuples in a stream, use the
036     * fraction {@code 0.5}.
037     * <p/>
038     * By default, the seed is created at random on the constructor. This implies every branch using the Sample
039     * filter will return the same random stream based on that seed. So if this Sample instance is distributed
040     * into multiple systems against the same data, the result will be the same tuple stream. The alternative
041     * would be to make this Operation "not safe". See {@link cascading.operation.Operation#isSafe()}.
042     * <p/>
043     * Conversely, if the same stream of random data is require across application executions, set the seed manually.
044     * <p/>
045     * The seed is generated from the following code:
046     * <p/>
047     * {@code System.identityHashCode(this) * 2654435761L ^ System.currentTimeMillis()}
048     * <p/>
049     * Override {@link #makeSeed()} to customize.
050     */
051    public class Sample extends BaseOperation<Random> implements Filter<Random>
052      {
053      private long seed = 0;
054      private double fraction = 1.0d;
055    
056      /**
057       * Creates a new Sample that permits percent Tuples to pass.
058       *
059       * @param fraction of type double
060       */
061      @ConstructorProperties({"fraction"})
062      public Sample( double fraction )
063        {
064        this.seed = makeSeed();
065        this.fraction = fraction;
066        }
067    
068      /**
069       * Creates a new Sample that permits percent Tuples to pass. The given seed value seeds the random number generator.
070       *
071       * @param seed     of type long
072       * @param fraction of type double
073       */
074      @ConstructorProperties({"seed", "fraction"})
075      public Sample( long seed, double fraction )
076        {
077        this.seed = seed;
078        this.fraction = fraction;
079        }
080    
081      public long getSeed()
082        {
083        return seed;
084        }
085    
086      public double getFraction()
087        {
088        return fraction;
089        }
090    
091      protected long makeSeed()
092        {
093        return System.identityHashCode( this ) * 2654435761L ^ System.currentTimeMillis();
094        }
095    
096      @Override
097      public void prepare( FlowProcess flowProcess, OperationCall<Random> operationCall )
098        {
099        super.prepare( flowProcess, operationCall );
100    
101        operationCall.setContext( new Random( seed ) );
102        }
103    
104      @Override
105      public boolean isRemove( FlowProcess flowProcess, FilterCall<Random> filterCall )
106        {
107        return !( filterCall.getContext().nextDouble() < fraction );
108        }
109    
110      @Override
111      public boolean equals( Object object )
112        {
113        if( this == object )
114          return true;
115        if( !( object instanceof Sample ) )
116          return false;
117        if( !super.equals( object ) )
118          return false;
119    
120        Sample sample = (Sample) object;
121    
122        if( Double.compare( sample.fraction, fraction ) != 0 )
123          return false;
124        if( seed != sample.seed )
125          return false;
126    
127        return true;
128        }
129    
130      @Override
131      public int hashCode()
132        {
133        int result = super.hashCode();
134        long temp;
135        result = 31 * result + (int) ( seed ^ seed >>> 32 );
136        temp = fraction != +0.0d ? Double.doubleToLongBits( fraction ) : 0L;
137        result = 31 * result + (int) ( temp ^ temp >>> 32 );
138        return result;
139        }
140      }