001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.LinkedHashMap;
025    import java.util.Map;
026    
027    import cascading.flow.FlowProcess;
028    import cascading.operation.BaseOperation;
029    import cascading.operation.Filter;
030    import cascading.operation.FilterCall;
031    import cascading.operation.OperationCall;
032    import cascading.operation.buffer.FirstNBuffer;
033    import cascading.pipe.Each;
034    import cascading.pipe.Every;
035    import cascading.pipe.GroupBy;
036    import cascading.pipe.Pipe;
037    import cascading.pipe.SubAssembly;
038    import cascading.tuple.Fields;
039    import cascading.tuple.Tuple;
040    import cascading.tuple.Tuples;
041    
042    /**
043     * Class Unique {@link SubAssembly} is used to filter all duplicates out of a tuple stream.
044     * <p/>
045     * Typically finding unique value in a tuple stream relies on a {@link GroupBy} and a {@link FirstNBuffer}
046     * {@link cascading.operation.Buffer} operation.
047     * <p/>
048     * If the {@code include} value is set to {@link Include#NO_NULLS}, any tuple consisting of only {@code null}
049     * values will be removed from the stream.
050     * <p/>
051     * This SubAssembly uses the {@link FilterPartialDuplicates} {@link cascading.operation.Filter}
052     * to remove as many observed duplicates before the GroupBy operator to reduce IO over the network.
053     * <p/>
054     * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
055     * in a much simpler mechanism.
056     * <p/>
057     * The {@code threshold} value tells the underlying FilterPartialDuplicates how many values to cache for duplicate
058     * comparison before dropping values from the LRU cache.
059     */
060    public class Unique extends SubAssembly
061      {
062      public enum Include
063        {
064          ALL,
065          NO_NULLS
066        }
067    
068      /**
069       * Class FilterPartialDuplicates is a {@link cascading.operation.Filter} that is used to remove observed duplicates from the tuple stream.
070       * <p/>
071       * Use this class typically in tandem with a {@link cascading.operation.aggregator.First}
072       * {@link cascading.operation.Aggregator} in order to improve de-duping performance by removing as many values
073       * as possible before the intermediate {@link cascading.pipe.GroupBy} operator.
074       * <p/>
075       * The {@code threshold} value is used to maintain a LRU of a constant size. If more than threshold unique values
076       * are seen, the oldest cached values will be removed from the cache.
077       *
078       * @see Unique
079       */
080      public static class FilterPartialDuplicates extends BaseOperation<LinkedHashMap<Tuple, Object>> implements Filter<LinkedHashMap<Tuple, Object>>
081        {
082        private int threshold = 10000;
083        private Include include = Include.ALL;
084    
085        /** Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. */
086        public FilterPartialDuplicates()
087          {
088          }
089    
090        /**
091         * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance.
092         *
093         * @param threshold of type int
094         */
095        @ConstructorProperties({"threshold"})
096        public FilterPartialDuplicates( int threshold )
097          {
098          this.threshold = threshold;
099          }
100    
101        /**
102         * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance.
103         *
104         * @param threshold of type int
105         */
106        @ConstructorProperties({"include", "threshold"})
107        public FilterPartialDuplicates( Include include, int threshold )
108          {
109          this.threshold = threshold;
110          this.include = include == null ? this.include : include;
111          }
112    
113        @Override
114        public void prepare( FlowProcess flowProcess, OperationCall<LinkedHashMap<Tuple, Object>> operationCall )
115          {
116          operationCall.setContext( new LinkedHashMap<Tuple, Object>( threshold, 0.75f, true )
117          {
118          @Override
119          protected boolean removeEldestEntry( Map.Entry eldest )
120            {
121            return size() > threshold;
122            }
123          } );
124          }
125    
126        @Override
127        public boolean isRemove( FlowProcess flowProcess, FilterCall<LinkedHashMap<Tuple, Object>> filterCall )
128          {
129          // we assume its more painful to create lots of tuple copies vs comparisons
130          Tuple args = filterCall.getArguments().getTuple();
131    
132          switch( include )
133            {
134            case ALL:
135              break;
136    
137            case NO_NULLS:
138              if( Tuples.frequency( args, null ) == args.size() )
139                return true;
140    
141              break;
142            }
143    
144          if( filterCall.getContext().containsKey( args ) )
145            return true;
146    
147          filterCall.getContext().put( filterCall.getArguments().getTupleCopy(), null );
148    
149          return false;
150          }
151    
152        @Override
153        public void cleanup( FlowProcess flowProcess, OperationCall<LinkedHashMap<Tuple, Object>> operationCall )
154          {
155          operationCall.setContext( null );
156          }
157    
158        @Override
159        public boolean equals( Object object )
160          {
161          if( this == object )
162            return true;
163          if( !( object instanceof FilterPartialDuplicates ) )
164            return false;
165          if( !super.equals( object ) )
166            return false;
167    
168          FilterPartialDuplicates that = (FilterPartialDuplicates) object;
169    
170          if( threshold != that.threshold )
171            return false;
172    
173          return true;
174          }
175    
176        @Override
177        public int hashCode()
178          {
179          int result = super.hashCode();
180          result = 31 * result + threshold;
181          return result;
182          }
183        }
184    
185      /**
186       * Constructor Unique creates a new Unique instance.
187       *
188       * @param pipe         of type Pipe
189       * @param uniqueFields of type Fields
190       */
191      @ConstructorProperties({"pipe", "uniqueFields"})
192      public Unique( Pipe pipe, Fields uniqueFields )
193        {
194        this( null, pipe, uniqueFields );
195        }
196    
197      /**
198       * Constructor Unique creates a new Unique instance.
199       *
200       * @param pipe         of type Pipe
201       * @param uniqueFields of type Fields
202       * @param include      of type Include
203       */
204      @ConstructorProperties({"pipe", "uniqueFields", "include"})
205      public Unique( Pipe pipe, Fields uniqueFields, Include include )
206        {
207        this( null, pipe, uniqueFields, include );
208        }
209    
210      /**
211       * Constructor Unique creates a new Unique instance.
212       *
213       * @param pipe         of type Pipe
214       * @param uniqueFields of type Fields
215       * @param threshold    of type int
216       */
217      @ConstructorProperties({"pipe", "uniqueFields", "threshold"})
218      public Unique( Pipe pipe, Fields uniqueFields, int threshold )
219        {
220        this( null, pipe, uniqueFields, threshold );
221        }
222    
223      /**
224       * Constructor Unique creates a new Unique instance.
225       *
226       * @param pipe         of type Pipe
227       * @param uniqueFields of type Fields
228       * @param include      of type Include
229       * @param threshold    of type int
230       */
231      @ConstructorProperties({"pipe", "uniqueFields", "include", "threshold"})
232      public Unique( Pipe pipe, Fields uniqueFields, Include include, int threshold )
233        {
234        this( null, pipe, uniqueFields, include, threshold );
235        }
236    
237      /**
238       * Constructor Unique creates a new Unique instance.
239       *
240       * @param name         of type String
241       * @param pipe         of type Pipe
242       * @param uniqueFields of type Fields
243       */
244      @ConstructorProperties({"name", "pipe", "uniqueFields"})
245      public Unique( String name, Pipe pipe, Fields uniqueFields )
246        {
247        this( name, pipe, uniqueFields, 10000 );
248        }
249    
250      /**
251       * Constructor Unique creates a new Unique instance.
252       *
253       * @param name         of type String
254       * @param pipe         of type Pipe
255       * @param uniqueFields of type Fields
256       * @param include      of type Include
257       */
258      @ConstructorProperties({"name", "pipe", "uniqueFields", "include"})
259      public Unique( String name, Pipe pipe, Fields uniqueFields, Include include )
260        {
261        this( name, pipe, uniqueFields, include, 10000 );
262        }
263    
264      /**
265       * Constructor Unique creates a new Unique instance.
266       *
267       * @param name         of type String
268       * @param pipe         of type Pipe
269       * @param uniqueFields of type Fields
270       * @param threshold    of type int
271       */
272      @ConstructorProperties({"name", "pipe", "uniqueFields", "threshold"})
273      public Unique( String name, Pipe pipe, Fields uniqueFields, int threshold )
274        {
275        this( name, Pipe.pipes( pipe ), uniqueFields, threshold );
276        }
277    
278      /**
279       * Constructor Unique creates a new Unique instance.
280       *
281       * @param name         of type String
282       * @param pipe         of type Pipe
283       * @param uniqueFields of type Fields
284       * @param include      of type Include
285       * @param threshold    of type int
286       */
287      @ConstructorProperties({"name", "pipe", "uniqueFields", "include", "threshold"})
288      public Unique( String name, Pipe pipe, Fields uniqueFields, Include include, int threshold )
289        {
290        this( name, Pipe.pipes( pipe ), uniqueFields, include, threshold );
291        }
292    
293      /**
294       * Constructor Unique creates a new Unique instance.
295       *
296       * @param pipes        of type Pipe[]
297       * @param uniqueFields of type Fields
298       */
299      @ConstructorProperties({"pipes", "uniqueFields"})
300      public Unique( Pipe[] pipes, Fields uniqueFields )
301        {
302        this( null, pipes, uniqueFields, 10000 );
303        }
304    
305      /**
306       * Constructor Unique creates a new Unique instance.
307       *
308       * @param pipes        of type Pipe[]
309       * @param uniqueFields of type Fields
310       * @param include      of type Include
311       */
312      @ConstructorProperties({"pipes", "uniqueFields", "include"})
313      public Unique( Pipe[] pipes, Fields uniqueFields, Include include )
314        {
315        this( null, pipes, uniqueFields, include, 10000 );
316        }
317    
318      /**
319       * Constructor Unique creates a new Unique instance.
320       *
321       * @param pipes        of type Pipe[]
322       * @param uniqueFields of type Fields
323       * @param threshold    of type int
324       */
325      @ConstructorProperties({"pipes", "uniqueFields", "threshold"})
326      public Unique( Pipe[] pipes, Fields uniqueFields, int threshold )
327        {
328        this( null, pipes, uniqueFields, threshold );
329        }
330    
331      /**
332       * Constructor Unique creates a new Unique instance.
333       *
334       * @param pipes        of type Pipe[]
335       * @param uniqueFields of type Fields
336       * @param include      of type Include
337       * @param threshold    of type int
338       */
339      @ConstructorProperties({"pipes", "uniqueFields", "threshold"})
340      public Unique( Pipe[] pipes, Fields uniqueFields, Include include, int threshold )
341        {
342        this( null, pipes, uniqueFields, include, threshold );
343        }
344    
345      /**
346       * Constructor Unique creates a new Unique instance.
347       *
348       * @param name         of type String
349       * @param pipes        of type Pipe[]
350       * @param uniqueFields of type Fields
351       */
352      @ConstructorProperties({"name", "pipes", "uniqueFields"})
353      public Unique( String name, Pipe[] pipes, Fields uniqueFields )
354        {
355        this( name, pipes, uniqueFields, 10000 );
356        }
357    
358      /**
359       * Constructor Unique creates a new Unique instance.
360       *
361       * @param name         of type String
362       * @param pipes        of type Pipe[]
363       * @param uniqueFields of type Fields
364       * @param include      of type Include
365       */
366      @ConstructorProperties({"name", "pipes", "uniqueFields", "include"})
367      public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include )
368        {
369        this( name, pipes, uniqueFields, include, 10000 );
370        }
371    
372      /**
373       * Constructor Unique creates a new Unique instance.
374       *
375       * @param name         of type String
376       * @param pipes        of type Pipe[]
377       * @param uniqueFields of type Fields
378       * @param threshold    of type int
379       */
380      @ConstructorProperties({"name", "pipes", "uniqueFields", "threshold"})
381      public Unique( String name, Pipe[] pipes, Fields uniqueFields, int threshold )
382        {
383        this( name, pipes, uniqueFields, null, threshold );
384        }
385    
386      /**
387       * Constructor Unique creates a new Unique instance.
388       *
389       * @param name         of type String
390       * @param pipes        of type Pipe[]
391       * @param uniqueFields of type Fields
392       * @param threshold    of type int
393       */
394      @ConstructorProperties({"name", "pipes", "uniqueFields", "include", "threshold"})
395      public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include, int threshold )
396        {
397        super( pipes );
398    
399        Pipe[] filters = new Pipe[ pipes.length ];
400        FilterPartialDuplicates partialDuplicates = new FilterPartialDuplicates( include, threshold );
401    
402        for( int i = 0; i < filters.length; i++ )
403          filters[ i ] = new Each( pipes[ i ], uniqueFields, partialDuplicates );
404    
405        Pipe pipe = new GroupBy( name, filters, uniqueFields );
406        pipe = new Every( pipe, Fields.ALL, new FirstNBuffer(), Fields.RESULTS );
407    
408        setTails( pipe );
409        }
410      }