001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe.assembly;
022
023import java.beans.ConstructorProperties;
024import java.util.Comparator;
025import java.util.Map;
026
027import cascading.CascadingException;
028import cascading.flow.FlowProcess;
029import cascading.operation.BaseOperation;
030import cascading.operation.Filter;
031import cascading.operation.FilterCall;
032import cascading.operation.OperationCall;
033import cascading.operation.buffer.FirstNBuffer;
034import cascading.pipe.Each;
035import cascading.pipe.Every;
036import cascading.pipe.GroupBy;
037import cascading.pipe.Pipe;
038import cascading.pipe.SubAssembly;
039import cascading.provider.FactoryLoader;
040import cascading.tuple.Fields;
041import cascading.tuple.Tuple;
042import cascading.tuple.Tuples;
043import cascading.tuple.util.TupleHasher;
044import cascading.util.cache.BaseCacheFactory;
045import cascading.util.cache.CacheEvictionCallback;
046import cascading.util.cache.CascadingCache;
047
048/**
049 * Class Unique {@link SubAssembly} is used to filter all duplicates out of a tuple stream.
050 * <p/>
051 * Typically finding unique value in a tuple stream relies on a {@link GroupBy} and a {@link FirstNBuffer}
052 * {@link cascading.operation.Buffer} operation.
053 * <p/>
054 * If the {@code include} value is set to {@link Include#NO_NULLS}, any tuple consisting of only {@code null}
055 * values will be removed from the stream.
056 * <p/>
057 * This SubAssembly uses the {@link FilterPartialDuplicates} {@link cascading.operation.Filter}
058 * to remove as many observed duplicates before the GroupBy operator to reduce IO over the network.
059 * <p/>
060 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
061 * in a much simpler mechanism.
062 * <p/>
063 * Unique uses a {@link cascading.util.cache.CascadingCache} or LRU to do the filtering. To tune the cache, set the
064 * {@code capacity} value to a high enough value to utilize available memory. Or set a default value via the
065 * {@link cascading.pipe.assembly.UniqueProps#UNIQUE_CACHE_CAPACITY} property. The current default is {@code 10, 000} unique keys.
066 * <p/>
067 * The LRU cache is pluggable and defaults to {@link cascading.util.cache.LRUHashMapCache}. It can be changed
068 * by setting {@link cascading.pipe.assembly.UniqueProps#UNIQUE_CACHE_FACTORY} property to the name of a sub-class of
069 * {@link cascading.util.cache.BaseCacheFactory}.
070 * <p/>
071 * The {@code capacity} value tells the underlying FilterPartialDuplicates how many values to cache for duplicate
072 * comparison before dropping values from the LRU cache.
073 *
074 * @see cascading.util.cache.LRUHashMapCacheFactory
075 * @see cascading.util.cache.DirectMappedCacheFactory
076 * @see cascading.util.cache.LRUHashMapCache
077 * @see cascading.util.cache.DirectMappedCache
078 */
079public class Unique extends SubAssembly
080  {
081
082  public enum Include
083    {
084      ALL,
085      NO_NULLS
086    }
087
088  public enum Cache
089    {
090      Num_Keys_Flushed,
091      Num_Keys_Hit,
092      Num_Keys_Missed
093    }
094
095  /**
096   * Class FilterPartialDuplicates is a {@link cascading.operation.Filter} that is used to remove observed duplicates from the tuple stream.
097   * <p/>
098   * Use this class typically in tandem with a {@link cascading.operation.aggregator.First}
099   * {@link cascading.operation.Aggregator} in order to improve de-duping performance by removing as many values
100   * as possible before the intermediate {@link cascading.pipe.GroupBy} operator.
101   * <p/>
102   * The {@code capacity} value is used to maintain a LRU of a constant size. If more than capacity unique values
103   * are seen, the oldest cached values will be removed from the cache.
104   *
105   * @see Unique
106   */
107  public static class FilterPartialDuplicates extends BaseOperation<CascadingCache<Tuple, Object>> implements Filter<CascadingCache<Tuple, Object>>
108    {
109    /** special null value for the caches, since a cache might not permit 'null' as a value */
110    private final static Object NULL_VALUE = new Object();
111
112    private int capacity = 0;
113    private Include include = Include.ALL;
114    private TupleHasher tupleHasher;
115
116    /** Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. */
117    public FilterPartialDuplicates()
118      {
119      }
120
121    /**
122     * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance.
123     *
124     * @param capacity of type int
125     */
126    @ConstructorProperties({"capacity"})
127    public FilterPartialDuplicates( int capacity )
128      {
129      this.capacity = capacity;
130      }
131
132    /**
133     * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance.
134     *
135     * @param include  of type Include
136     * @param capacity of type int
137     */
138    @ConstructorProperties({"include", "capacity"})
139    public FilterPartialDuplicates( Include include, int capacity )
140      {
141      this( include, capacity, null );
142      }
143
144    /**
145     * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance.
146     *
147     * @param capacity    of type int
148     * @param include     of type Include
149     * @param tupleHasher of type TupleHasher
150     */
151    @ConstructorProperties({"include", "capacity", "tupleHasher"})
152    public FilterPartialDuplicates( Include include, int capacity, TupleHasher tupleHasher )
153      {
154      this.capacity = capacity;
155      this.include = include == null ? this.include : include;
156      this.tupleHasher = tupleHasher;
157      }
158
159    @Override
160    public void prepare( final FlowProcess flowProcess, OperationCall<CascadingCache<Tuple, Object>> operationCall )
161      {
162      CacheEvictionCallback callback = new CacheEvictionCallback()
163      {
164      @Override
165      public void evict( Map.Entry entry )
166        {
167        flowProcess.increment( Cache.Num_Keys_Flushed, 1 );
168        }
169      };
170      FactoryLoader loader = FactoryLoader.getInstance();
171      BaseCacheFactory cacheFactory = loader.loadFactoryFrom( flowProcess, UniqueProps.UNIQUE_CACHE_FACTORY, UniqueProps.DEFAULT_CACHE_FACTORY_CLASS );
172
173      if( cacheFactory == null )
174        throw new CascadingException( "unable to load cache factory, please check your '" + UniqueProps.UNIQUE_CACHE_FACTORY + "' setting." );
175
176      CascadingCache cache = cacheFactory.create( flowProcess );
177      cache.setCacheEvictionCallback( callback );
178      Integer cacheCapacity = capacity;
179
180      if( capacity == 0 )
181        {
182        cacheCapacity = flowProcess.getIntegerProperty( UniqueProps.UNIQUE_CACHE_CAPACITY );
183
184        if( cacheCapacity == null )
185          cacheCapacity = UniqueProps.UNIQUE_DEFAULT_CAPACITY;
186        }
187
188      cache.setCapacity( cacheCapacity.intValue() );
189      cache.initialize();
190
191      operationCall.setContext( cache );
192      }
193
194    @Override
195    public boolean isRemove( FlowProcess flowProcess, FilterCall<CascadingCache<Tuple, Object>> filterCall )
196      {
197      // we assume its more painful to create lots of tuple copies vs comparisons
198      Tuple args = TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTuple() );
199
200      switch( include )
201        {
202        case ALL:
203          break;
204
205        case NO_NULLS:
206          if( Tuples.frequency( args, null ) == args.size() )
207            return true;
208
209          break;
210        }
211
212      if( filterCall.getContext().containsKey( args ) )
213        {
214        flowProcess.increment( Cache.Num_Keys_Hit, 1 );
215        return true;
216        }
217
218      // only do the copy here
219      filterCall.getContext().put( TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTupleCopy() ), NULL_VALUE );
220
221      flowProcess.increment( Cache.Num_Keys_Missed, 1 );
222
223      return false;
224      }
225
226    @Override
227    public void cleanup( FlowProcess flowProcess, OperationCall<CascadingCache<Tuple, Object>> operationCall )
228      {
229      operationCall.setContext( null );
230      }
231
232    @Override
233    public boolean equals( Object object )
234      {
235      if( this == object )
236        return true;
237      if( !( object instanceof FilterPartialDuplicates ) )
238        return false;
239      if( !super.equals( object ) )
240        return false;
241
242      FilterPartialDuplicates that = (FilterPartialDuplicates) object;
243
244      if( capacity != that.capacity )
245        return false;
246
247      return true;
248      }
249
250    @Override
251    public int hashCode()
252      {
253      int result = super.hashCode();
254      result = 31 * result + capacity;
255      return result;
256      }
257    }
258
259  /**
260   * Constructor Unique creates a new Unique instance.
261   *
262   * @param pipe         of type Pipe
263   * @param uniqueFields of type Fields
264   */
265  @ConstructorProperties({"pipe", "uniqueFields"})
266  public Unique( Pipe pipe, Fields uniqueFields )
267    {
268    this( null, pipe, uniqueFields );
269    }
270
271  /**
272   * Constructor Unique creates a new Unique instance.
273   *
274   * @param pipe         of type Pipe
275   * @param uniqueFields of type Fields
276   * @param include      of type Include
277   */
278  @ConstructorProperties({"pipe", "uniqueFields", "include"})
279  public Unique( Pipe pipe, Fields uniqueFields, Include include )
280    {
281    this( null, pipe, uniqueFields, include );
282    }
283
284  /**
285   * Constructor Unique creates a new Unique instance.
286   *
287   * @param pipe         of type Pipe
288   * @param uniqueFields of type Fields
289   * @param capacity     of type int
290   */
291  @ConstructorProperties({"pipe", "uniqueFields", "capacity"})
292  public Unique( Pipe pipe, Fields uniqueFields, int capacity )
293    {
294    this( null, pipe, uniqueFields, capacity );
295    }
296
297  /**
298   * Constructor Unique creates a new Unique instance.
299   *
300   * @param pipe         of type Pipe
301   * @param uniqueFields of type Fields
302   * @param include      of type Include
303   * @param capacity     of type int
304   */
305  @ConstructorProperties({"pipe", "uniqueFields", "include", "capacity"})
306  public Unique( Pipe pipe, Fields uniqueFields, Include include, int capacity )
307    {
308    this( null, pipe, uniqueFields, include, capacity );
309    }
310
311  /**
312   * Constructor Unique creates a new Unique instance.
313   *
314   * @param name         of type String
315   * @param pipe         of type Pipe
316   * @param uniqueFields of type Fields
317   */
318  @ConstructorProperties({"name", "pipe", "uniqueFields"})
319  public Unique( String name, Pipe pipe, Fields uniqueFields )
320    {
321    this( name, pipe, uniqueFields, null );
322    }
323
324  /**
325   * Constructor Unique creates a new Unique instance.
326   *
327   * @param name         of type String
328   * @param pipe         of type Pipe
329   * @param uniqueFields of type Fields
330   * @param include      of type Include
331   */
332  @ConstructorProperties({"name", "pipe", "uniqueFields", "include"})
333  public Unique( String name, Pipe pipe, Fields uniqueFields, Include include )
334    {
335    this( name, pipe, uniqueFields, include, 0 );
336    }
337
338  /**
339   * Constructor Unique creates a new Unique instance.
340   *
341   * @param name         of type String
342   * @param pipe         of type Pipe
343   * @param uniqueFields of type Fields
344   * @param capacity     of type int
345   */
346  @ConstructorProperties({"name", "pipe", "uniqueFields", "capacity"})
347  public Unique( String name, Pipe pipe, Fields uniqueFields, int capacity )
348    {
349    this( name, Pipe.pipes( pipe ), uniqueFields, capacity );
350    }
351
352  /**
353   * Constructor Unique creates a new Unique instance.
354   *
355   * @param name         of type String
356   * @param pipe         of type Pipe
357   * @param uniqueFields of type Fields
358   * @param include      of type Include
359   * @param capacity     of type int
360   */
361  @ConstructorProperties({"name", "pipe", "uniqueFields", "include", "capacity"})
362  public Unique( String name, Pipe pipe, Fields uniqueFields, Include include, int capacity )
363    {
364    this( name, Pipe.pipes( pipe ), uniqueFields, include, capacity );
365    }
366
367  /**
368   * Constructor Unique creates a new Unique instance.
369   *
370   * @param pipes        of type Pipe[]
371   * @param uniqueFields of type Fields
372   */
373  @ConstructorProperties({"pipes", "uniqueFields"})
374  public Unique( Pipe[] pipes, Fields uniqueFields )
375    {
376    this( null, pipes, uniqueFields );
377    }
378
379  /**
380   * Constructor Unique creates a new Unique instance.
381   *
382   * @param pipes        of type Pipe[]
383   * @param uniqueFields of type Fields
384   * @param include      of type Include
385   */
386  @ConstructorProperties({"pipes", "uniqueFields", "include"})
387  public Unique( Pipe[] pipes, Fields uniqueFields, Include include )
388    {
389    this( null, pipes, uniqueFields, include );
390    }
391
392  /**
393   * Constructor Unique creates a new Unique instance.
394   *
395   * @param pipes        of type Pipe[]
396   * @param uniqueFields of type Fields
397   * @param capacity     of type int
398   */
399  @ConstructorProperties({"pipes", "uniqueFields", "capacity"})
400  public Unique( Pipe[] pipes, Fields uniqueFields, int capacity )
401    {
402    this( null, pipes, uniqueFields, capacity );
403    }
404
405  /**
406   * Constructor Unique creates a new Unique instance.
407   *
408   * @param pipes        of type Pipe[]
409   * @param uniqueFields of type Fields
410   * @param include      of type Include
411   * @param capacity     of type int
412   */
413  @ConstructorProperties({"pipes", "uniqueFields", "include", "capacity"})
414  public Unique( Pipe[] pipes, Fields uniqueFields, Include include, int capacity )
415    {
416    this( null, pipes, uniqueFields, include, capacity );
417    }
418
419  /**
420   * Constructor Unique creates a new Unique instance.
421   *
422   * @param name         of type String
423   * @param pipes        of type Pipe[]
424   * @param uniqueFields of type Fields
425   */
426  @ConstructorProperties({"name", "pipes", "uniqueFields"})
427  public Unique( String name, Pipe[] pipes, Fields uniqueFields )
428    {
429    this( name, pipes, uniqueFields, null );
430    }
431
432  /**
433   * Constructor Unique creates a new Unique instance.
434   *
435   * @param name         of type String
436   * @param pipes        of type Pipe[]
437   * @param uniqueFields of type Fields
438   * @param include      of type Include
439   */
440  @ConstructorProperties({"name", "pipes", "uniqueFields", "include"})
441  public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include )
442    {
443    this( name, pipes, uniqueFields, include, 0 );
444    }
445
446  /**
447   * Constructor Unique creates a new Unique instance.
448   *
449   * @param name         of type String
450   * @param pipes        of type Pipe[]
451   * @param uniqueFields of type Fields
452   * @param capacity     of type int
453   */
454  @ConstructorProperties({"name", "pipes", "uniqueFields", "capacity"})
455  public Unique( String name, Pipe[] pipes, Fields uniqueFields, int capacity )
456    {
457    this( name, pipes, uniqueFields, null, capacity );
458    }
459
460  /**
461   * Constructor Unique creates a new Unique instance.
462   *
463   * @param name         of type String
464   * @param pipes        of type Pipe[]
465   * @param uniqueFields of type Fields
466   * @param capacity     of type int
467   */
468  @ConstructorProperties({"name", "pipes", "uniqueFields", "include", "capacity"})
469  public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include, int capacity )
470    {
471    super( pipes );
472
473    if( uniqueFields == null )
474      throw new IllegalArgumentException( "uniqueFields may not be null" );
475
476    Pipe[] filters = new Pipe[ pipes.length ];
477
478    TupleHasher tupleHasher = null;
479    Comparator[] comparators = uniqueFields.getComparators();
480
481    if( !TupleHasher.isNull( comparators ) )
482      tupleHasher = new TupleHasher( null, comparators );
483
484    FilterPartialDuplicates partialDuplicates = new FilterPartialDuplicates( include, capacity, tupleHasher );
485
486    for( int i = 0; i < filters.length; i++ )
487      filters[ i ] = new Each( pipes[ i ], uniqueFields, partialDuplicates );
488
489    Pipe pipe = new GroupBy( name, filters, uniqueFields );
490    pipe = new Every( pipe, Fields.ALL, new FirstNBuffer(), Fields.RESULTS );
491
492    setTails( pipe );
493    }
494  }