001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Comparator;
025    import java.util.LinkedHashMap;
026    import java.util.Map;
027    
028    import cascading.flow.FlowProcess;
029    import cascading.operation.BaseOperation;
030    import cascading.operation.Filter;
031    import cascading.operation.FilterCall;
032    import cascading.operation.OperationCall;
033    import cascading.operation.buffer.FirstNBuffer;
034    import cascading.pipe.Each;
035    import cascading.pipe.Every;
036    import cascading.pipe.GroupBy;
037    import cascading.pipe.Pipe;
038    import cascading.pipe.SubAssembly;
039    import cascading.tuple.Fields;
040    import cascading.tuple.Tuple;
041    import cascading.tuple.Tuples;
042    import cascading.tuple.util.TupleHasher;
043    
044    /**
045     * Class Unique {@link SubAssembly} is used to filter all duplicates out of a tuple stream.
046     * <p/>
047     * Typically finding unique value in a tuple stream relies on a {@link GroupBy} and a {@link FirstNBuffer}
048     * {@link cascading.operation.Buffer} operation.
049     * <p/>
050     * If the {@code include} value is set to {@link Include#NO_NULLS}, any tuple consisting of only {@code null}
051     * values will be removed from the stream.
052     * <p/>
053     * This SubAssembly uses the {@link FilterPartialDuplicates} {@link cascading.operation.Filter}
054     * to remove as many observed duplicates before the GroupBy operator to reduce IO over the network.
055     * <p/>
056     * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
057     * in a much simpler mechanism.
058     * <p/>
059     * The {@code threshold} value tells the underlying FilterPartialDuplicates how many values to cache for duplicate
060     * comparison before dropping values from the LRU cache.
061     */
062    public class Unique extends SubAssembly
063      {
064      public enum Include
065        {
066          ALL,
067          NO_NULLS
068        }
069    
070      public enum Cache
071        {
072          Num_Keys_Flushed,
073          Num_Keys_Hit,
074          Num_Keys_Missed
075        }
076    
077      /**
078       * Class FilterPartialDuplicates is a {@link cascading.operation.Filter} that is used to remove observed duplicates from the tuple stream.
079       * <p/>
080       * Use this class typically in tandem with a {@link cascading.operation.aggregator.First}
081       * {@link cascading.operation.Aggregator} in order to improve de-duping performance by removing as many values
082       * as possible before the intermediate {@link cascading.pipe.GroupBy} operator.
083       * <p/>
084       * The {@code threshold} value is used to maintain a LRU of a constant size. If more than threshold unique values
085       * are seen, the oldest cached values will be removed from the cache.
086       *
087       * @see Unique
088       */
089      public static class FilterPartialDuplicates extends BaseOperation<LinkedHashMap<Tuple, Object>> implements Filter<LinkedHashMap<Tuple, Object>>
090        {
091        private int threshold = 10000;
092        private Include include = Include.ALL;
093        private TupleHasher tupleHasher;
094    
095        /** Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. */
096        public FilterPartialDuplicates()
097          {
098          }
099    
100        /**
101         * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance.
102         *
103         * @param threshold of type int
104         */
105        @ConstructorProperties( {"threshold"} )
106        public FilterPartialDuplicates( int threshold )
107          {
108          this.threshold = threshold;
109          }
110    
111        /**
112         * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance.
113         *
114         * @param include   of type Include
115         * @param threshold of type int
116         */
117        @ConstructorProperties( {"include", "threshold"} )
118        public FilterPartialDuplicates( Include include, int threshold )
119          {
120          this( include, threshold, null );
121          }
122    
123        /**
124         * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance.
125         *
126         * @param threshold   of type int
127         * @param include     of type Include
128         * @param tupleHasher of type TupleHasher
129         */
130        @ConstructorProperties( {"include", "threshold", "tupleHasher"} )
131        public FilterPartialDuplicates( Include include, int threshold, TupleHasher tupleHasher )
132          {
133          this.threshold = threshold;
134          this.include = include == null ? this.include : include;
135          this.tupleHasher = tupleHasher;
136          }
137    
138        @Override
139        public void prepare( final FlowProcess flowProcess, OperationCall<LinkedHashMap<Tuple, Object>> operationCall )
140          {
141          operationCall.setContext( new LinkedHashMap<Tuple, Object>( threshold, 0.75f, true )
142          {
143          @Override
144          protected boolean removeEldestEntry( Map.Entry eldest )
145            {
146            boolean doFlush = size() > threshold;
147    
148            if( doFlush )
149              flowProcess.increment( Cache.Num_Keys_Flushed, 1 );
150    
151            return doFlush;
152            }
153          } );
154          }
155    
156        @Override
157        public boolean isRemove( FlowProcess flowProcess, FilterCall<LinkedHashMap<Tuple, Object>> filterCall )
158          {
159          // we assume its more painful to create lots of tuple copies vs comparisons
160          Tuple args = TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTuple() );
161    
162          switch( include )
163            {
164            case ALL:
165              break;
166    
167            case NO_NULLS:
168              if( Tuples.frequency( args, null ) == args.size() )
169                return true;
170    
171              break;
172            }
173    
174          if( filterCall.getContext().containsKey( args ) )
175            {
176            flowProcess.increment( Cache.Num_Keys_Hit, 1 );
177            return true;
178            }
179    
180          // only do the copy here
181          filterCall.getContext().put( TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTupleCopy() ), null );
182    
183          flowProcess.increment( Cache.Num_Keys_Missed, 1 );
184    
185          return false;
186          }
187    
188        @Override
189        public void cleanup( FlowProcess flowProcess, OperationCall<LinkedHashMap<Tuple, Object>> operationCall )
190          {
191          operationCall.setContext( null );
192          }
193    
194        @Override
195        public boolean equals( Object object )
196          {
197          if( this == object )
198            return true;
199          if( !( object instanceof FilterPartialDuplicates ) )
200            return false;
201          if( !super.equals( object ) )
202            return false;
203    
204          FilterPartialDuplicates that = (FilterPartialDuplicates) object;
205    
206          if( threshold != that.threshold )
207            return false;
208    
209          return true;
210          }
211    
212        @Override
213        public int hashCode()
214          {
215          int result = super.hashCode();
216          result = 31 * result + threshold;
217          return result;
218          }
219        }
220    
221      /**
222       * Constructor Unique creates a new Unique instance.
223       *
224       * @param pipe         of type Pipe
225       * @param uniqueFields of type Fields
226       */
227      @ConstructorProperties( {"pipe", "uniqueFields"} )
228      public Unique( Pipe pipe, Fields uniqueFields )
229        {
230        this( null, pipe, uniqueFields );
231        }
232    
233      /**
234       * Constructor Unique creates a new Unique instance.
235       *
236       * @param pipe         of type Pipe
237       * @param uniqueFields of type Fields
238       * @param include      of type Include
239       */
240      @ConstructorProperties( {"pipe", "uniqueFields", "include"} )
241      public Unique( Pipe pipe, Fields uniqueFields, Include include )
242        {
243        this( null, pipe, uniqueFields, include );
244        }
245    
246      /**
247       * Constructor Unique creates a new Unique instance.
248       *
249       * @param pipe         of type Pipe
250       * @param uniqueFields of type Fields
251       * @param threshold    of type int
252       */
253      @ConstructorProperties( {"pipe", "uniqueFields", "threshold"} )
254      public Unique( Pipe pipe, Fields uniqueFields, int threshold )
255        {
256        this( null, pipe, uniqueFields, threshold );
257        }
258    
259      /**
260       * Constructor Unique creates a new Unique instance.
261       *
262       * @param pipe         of type Pipe
263       * @param uniqueFields of type Fields
264       * @param include      of type Include
265       * @param threshold    of type int
266       */
267      @ConstructorProperties( {"pipe", "uniqueFields", "include", "threshold"} )
268      public Unique( Pipe pipe, Fields uniqueFields, Include include, int threshold )
269        {
270        this( null, pipe, uniqueFields, include, threshold );
271        }
272    
273      /**
274       * Constructor Unique creates a new Unique instance.
275       *
276       * @param name         of type String
277       * @param pipe         of type Pipe
278       * @param uniqueFields of type Fields
279       */
280      @ConstructorProperties( {"name", "pipe", "uniqueFields"} )
281      public Unique( String name, Pipe pipe, Fields uniqueFields )
282        {
283        this( name, pipe, uniqueFields, 10000 );
284        }
285    
286      /**
287       * Constructor Unique creates a new Unique instance.
288       *
289       * @param name         of type String
290       * @param pipe         of type Pipe
291       * @param uniqueFields of type Fields
292       * @param include      of type Include
293       */
294      @ConstructorProperties( {"name", "pipe", "uniqueFields", "include"} )
295      public Unique( String name, Pipe pipe, Fields uniqueFields, Include include )
296        {
297        this( name, pipe, uniqueFields, include, 10000 );
298        }
299    
300      /**
301       * Constructor Unique creates a new Unique instance.
302       *
303       * @param name         of type String
304       * @param pipe         of type Pipe
305       * @param uniqueFields of type Fields
306       * @param threshold    of type int
307       */
308      @ConstructorProperties( {"name", "pipe", "uniqueFields", "threshold"} )
309      public Unique( String name, Pipe pipe, Fields uniqueFields, int threshold )
310        {
311        this( name, Pipe.pipes( pipe ), uniqueFields, threshold );
312        }
313    
314      /**
315       * Constructor Unique creates a new Unique instance.
316       *
317       * @param name         of type String
318       * @param pipe         of type Pipe
319       * @param uniqueFields of type Fields
320       * @param include      of type Include
321       * @param threshold    of type int
322       */
323      @ConstructorProperties( {"name", "pipe", "uniqueFields", "include", "threshold"} )
324      public Unique( String name, Pipe pipe, Fields uniqueFields, Include include, int threshold )
325        {
326        this( name, Pipe.pipes( pipe ), uniqueFields, include, threshold );
327        }
328    
329      /**
330       * Constructor Unique creates a new Unique instance.
331       *
332       * @param pipes        of type Pipe[]
333       * @param uniqueFields of type Fields
334       */
335      @ConstructorProperties( {"pipes", "uniqueFields"} )
336      public Unique( Pipe[] pipes, Fields uniqueFields )
337        {
338        this( null, pipes, uniqueFields, 10000 );
339        }
340    
341      /**
342       * Constructor Unique creates a new Unique instance.
343       *
344       * @param pipes        of type Pipe[]
345       * @param uniqueFields of type Fields
346       * @param include      of type Include
347       */
348      @ConstructorProperties( {"pipes", "uniqueFields", "include"} )
349      public Unique( Pipe[] pipes, Fields uniqueFields, Include include )
350        {
351        this( null, pipes, uniqueFields, include, 10000 );
352        }
353    
354      /**
355       * Constructor Unique creates a new Unique instance.
356       *
357       * @param pipes        of type Pipe[]
358       * @param uniqueFields of type Fields
359       * @param threshold    of type int
360       */
361      @ConstructorProperties( {"pipes", "uniqueFields", "threshold"} )
362      public Unique( Pipe[] pipes, Fields uniqueFields, int threshold )
363        {
364        this( null, pipes, uniqueFields, threshold );
365        }
366    
367      /**
368       * Constructor Unique creates a new Unique instance.
369       *
370       * @param pipes        of type Pipe[]
371       * @param uniqueFields of type Fields
372       * @param include      of type Include
373       * @param threshold    of type int
374       */
375      @ConstructorProperties( {"pipes", "uniqueFields", "include", "threshold"} )
376      public Unique( Pipe[] pipes, Fields uniqueFields, Include include, int threshold )
377        {
378        this( null, pipes, uniqueFields, include, threshold );
379        }
380    
381      /**
382       * Constructor Unique creates a new Unique instance.
383       *
384       * @param name         of type String
385       * @param pipes        of type Pipe[]
386       * @param uniqueFields of type Fields
387       */
388      @ConstructorProperties( {"name", "pipes", "uniqueFields"} )
389      public Unique( String name, Pipe[] pipes, Fields uniqueFields )
390        {
391        this( name, pipes, uniqueFields, 10000 );
392        }
393    
394      /**
395       * Constructor Unique creates a new Unique instance.
396       *
397       * @param name         of type String
398       * @param pipes        of type Pipe[]
399       * @param uniqueFields of type Fields
400       * @param include      of type Include
401       */
402      @ConstructorProperties( {"name", "pipes", "uniqueFields", "include"} )
403      public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include )
404        {
405        this( name, pipes, uniqueFields, include, 10000 );
406        }
407    
408      /**
409       * Constructor Unique creates a new Unique instance.
410       *
411       * @param name         of type String
412       * @param pipes        of type Pipe[]
413       * @param uniqueFields of type Fields
414       * @param threshold    of type int
415       */
416      @ConstructorProperties( {"name", "pipes", "uniqueFields", "threshold"} )
417      public Unique( String name, Pipe[] pipes, Fields uniqueFields, int threshold )
418        {
419        this( name, pipes, uniqueFields, null, threshold );
420        }
421    
422      /**
423       * Constructor Unique creates a new Unique instance.
424       *
425       * @param name         of type String
426       * @param pipes        of type Pipe[]
427       * @param uniqueFields of type Fields
428       * @param threshold    of type int
429       */
430      @ConstructorProperties( {"name", "pipes", "uniqueFields", "include", "threshold"} )
431      public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include, int threshold )
432        {
433        super( pipes );
434    
435        if( uniqueFields == null )
436          throw new IllegalArgumentException( "uniqueFields may not be null" );
437    
438        Pipe[] filters = new Pipe[ pipes.length ];
439    
440        TupleHasher tupleHasher = null;
441        Comparator[] comparators = uniqueFields.getComparators();
442    
443        if( !TupleHasher.isNull( comparators ) )
444          tupleHasher = new TupleHasher( null, comparators );
445    
446        FilterPartialDuplicates partialDuplicates = new FilterPartialDuplicates( include, threshold, tupleHasher );
447    
448        for( int i = 0; i < filters.length; i++ )
449          filters[ i ] = new Each( pipes[ i ], uniqueFields, partialDuplicates );
450    
451        Pipe pipe = new GroupBy( name, filters, uniqueFields );
452        pipe = new Every( pipe, Fields.ALL, new FirstNBuffer(), Fields.RESULTS );
453    
454        setTails( pipe );
455        }
456      }