001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe.assembly; 022 023 import java.beans.ConstructorProperties; 024 import java.util.Comparator; 025 import java.util.Map; 026 027 import cascading.CascadingException; 028 import cascading.flow.FlowProcess; 029 import cascading.operation.BaseOperation; 030 import cascading.operation.Filter; 031 import cascading.operation.FilterCall; 032 import cascading.operation.OperationCall; 033 import cascading.operation.buffer.FirstNBuffer; 034 import cascading.pipe.Each; 035 import cascading.pipe.Every; 036 import cascading.pipe.GroupBy; 037 import cascading.pipe.Pipe; 038 import cascading.pipe.SubAssembly; 039 import cascading.provider.FactoryLoader; 040 import cascading.tuple.Fields; 041 import cascading.tuple.Tuple; 042 import cascading.tuple.Tuples; 043 import cascading.tuple.util.TupleHasher; 044 import cascading.util.cache.BaseCacheFactory; 045 import cascading.util.cache.CacheEvictionCallback; 046 import cascading.util.cache.CascadingCache; 047 048 /** 049 * Class Unique {@link SubAssembly} is used to filter all duplicates out of a tuple stream. 050 * <p/> 051 * Typically finding unique value in a tuple stream relies on a {@link GroupBy} and a {@link FirstNBuffer} 052 * {@link cascading.operation.Buffer} operation. 053 * <p/> 054 * If the {@code include} value is set to {@link Include#NO_NULLS}, any tuple consisting of only {@code null} 055 * values will be removed from the stream. 056 * <p/> 057 * This SubAssembly uses the {@link FilterPartialDuplicates} {@link cascading.operation.Filter} 058 * to remove as many observed duplicates before the GroupBy operator to reduce IO over the network. 059 * <p/> 060 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 061 * in a much simpler mechanism. 062 * <p/> 063 * Unique uses a {@link cascading.util.cache.CascadingCache} or LRU to do the filtering. To tune the cache, set the 064 * {@code capacity} value to a high enough value to utilize available memory. Or set a default value via the 065 * {@link cascading.pipe.assembly.UniqueProps#UNIQUE_CACHE_CAPACITY} property. The current default is {@code 10, 000} unique keys. 066 * <p/> 067 * The LRU cache is pluggable and defaults to {@link cascading.util.cache.LRUHashMapCache}. It can be changed 068 * by setting {@link cascading.pipe.assembly.UniqueProps#UNIQUE_CACHE_FACTORY} property to the name of a sub-class of 069 * {@link cascading.util.cache.BaseCacheFactory}. 070 * <p/> 071 * The {@code capacity} value tells the underlying FilterPartialDuplicates how many values to cache for duplicate 072 * comparison before dropping values from the LRU cache. 073 * 074 * @see cascading.util.cache.LRUHashMapCacheFactory 075 * @see cascading.util.cache.DirectMappedCacheFactory 076 * @see cascading.util.cache.LRUHashMapCache 077 * @see cascading.util.cache.DirectMappedCache 078 */ 079 public class Unique extends SubAssembly 080 { 081 082 public enum Include 083 { 084 ALL, 085 NO_NULLS 086 } 087 088 public enum Cache 089 { 090 Num_Keys_Flushed, 091 Num_Keys_Hit, 092 Num_Keys_Missed 093 } 094 095 /** 096 * Class FilterPartialDuplicates is a {@link cascading.operation.Filter} that is used to remove observed duplicates from the tuple stream. 097 * <p/> 098 * Use this class typically in tandem with a {@link cascading.operation.aggregator.First} 099 * {@link cascading.operation.Aggregator} in order to improve de-duping performance by removing as many values 100 * as possible before the intermediate {@link cascading.pipe.GroupBy} operator. 101 * <p/> 102 * The {@code capacity} value is used to maintain a LRU of a constant size. If more than capacity unique values 103 * are seen, the oldest cached values will be removed from the cache. 104 * 105 * @see Unique 106 */ 107 public static class FilterPartialDuplicates extends BaseOperation<CascadingCache<Tuple, Object>> implements Filter<CascadingCache<Tuple, Object>> 108 { 109 /** special null value for the caches, since a cache might not permit 'null' as a value */ 110 private final static Object NULL_VALUE = new Object(); 111 112 private int capacity = 0; 113 private Include include = Include.ALL; 114 private TupleHasher tupleHasher; 115 116 /** Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. */ 117 public FilterPartialDuplicates() 118 { 119 } 120 121 /** 122 * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. 123 * 124 * @param capacity of type int 125 */ 126 @ConstructorProperties({"capacity"}) 127 public FilterPartialDuplicates( int capacity ) 128 { 129 this.capacity = capacity; 130 } 131 132 /** 133 * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. 134 * 135 * @param include of type Include 136 * @param capacity of type int 137 */ 138 @ConstructorProperties({"include", "capacity"}) 139 public FilterPartialDuplicates( Include include, int capacity ) 140 { 141 this( include, capacity, null ); 142 } 143 144 /** 145 * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. 146 * 147 * @param capacity of type int 148 * @param include of type Include 149 * @param tupleHasher of type TupleHasher 150 */ 151 @ConstructorProperties({"include", "capacity", "tupleHasher"}) 152 public FilterPartialDuplicates( Include include, int capacity, TupleHasher tupleHasher ) 153 { 154 this.capacity = capacity; 155 this.include = include == null ? this.include : include; 156 this.tupleHasher = tupleHasher; 157 } 158 159 @Override 160 public void prepare( final FlowProcess flowProcess, OperationCall<CascadingCache<Tuple, Object>> operationCall ) 161 { 162 CacheEvictionCallback callback = new CacheEvictionCallback() 163 { 164 @Override 165 public void evict( Map.Entry entry ) 166 { 167 flowProcess.increment( Cache.Num_Keys_Flushed, 1 ); 168 } 169 }; 170 FactoryLoader loader = FactoryLoader.getInstance(); 171 BaseCacheFactory cacheFactory = loader.loadFactoryFrom( flowProcess, UniqueProps.UNIQUE_CACHE_FACTORY, UniqueProps.DEFAULT_CACHE_FACTORY_CLASS ); 172 if( cacheFactory == null ) 173 throw new CascadingException( "unable to load cache factory, please check your '" + UniqueProps.UNIQUE_CACHE_FACTORY + "' setting." ); 174 175 CascadingCache cache = cacheFactory.create( flowProcess ); 176 cache.setCacheEvictionCallback( callback ); 177 Integer cacheCapacity = capacity; 178 if( capacity == 0 ) 179 { 180 cacheCapacity = flowProcess.getIntegerProperty( UniqueProps.UNIQUE_CACHE_CAPACITY ); 181 if( cacheCapacity == null ) 182 cacheCapacity = UniqueProps.UNIQUE_DEFAULT_CAPACITY; 183 } 184 cache.setCapacity( cacheCapacity.intValue() ); 185 cache.initialize(); 186 187 operationCall.setContext( cache ); 188 } 189 190 @Override 191 public boolean isRemove( FlowProcess flowProcess, FilterCall<CascadingCache<Tuple, Object>> filterCall ) 192 { 193 // we assume its more painful to create lots of tuple copies vs comparisons 194 Tuple args = TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTuple() ); 195 196 switch( include ) 197 { 198 case ALL: 199 break; 200 201 case NO_NULLS: 202 if( Tuples.frequency( args, null ) == args.size() ) 203 return true; 204 205 break; 206 } 207 208 if( filterCall.getContext().containsKey( args ) ) 209 { 210 flowProcess.increment( Cache.Num_Keys_Hit, 1 ); 211 return true; 212 } 213 214 // only do the copy here 215 filterCall.getContext().put( TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTupleCopy() ), NULL_VALUE ); 216 217 flowProcess.increment( Cache.Num_Keys_Missed, 1 ); 218 219 return false; 220 } 221 222 @Override 223 public void cleanup( FlowProcess flowProcess, OperationCall<CascadingCache<Tuple, Object>> operationCall ) 224 { 225 operationCall.setContext( null ); 226 } 227 228 @Override 229 public boolean equals( Object object ) 230 { 231 if( this == object ) 232 return true; 233 if( !( object instanceof FilterPartialDuplicates ) ) 234 return false; 235 if( !super.equals( object ) ) 236 return false; 237 238 FilterPartialDuplicates that = (FilterPartialDuplicates) object; 239 240 if( capacity != that.capacity ) 241 return false; 242 243 return true; 244 } 245 246 @Override 247 public int hashCode() 248 { 249 int result = super.hashCode(); 250 result = 31 * result + capacity; 251 return result; 252 } 253 } 254 255 /** 256 * Constructor Unique creates a new Unique instance. 257 * 258 * @param pipe of type Pipe 259 * @param uniqueFields of type Fields 260 */ 261 @ConstructorProperties({"pipe", "uniqueFields"}) 262 public Unique( Pipe pipe, Fields uniqueFields ) 263 { 264 this( null, pipe, uniqueFields ); 265 } 266 267 /** 268 * Constructor Unique creates a new Unique instance. 269 * 270 * @param pipe of type Pipe 271 * @param uniqueFields of type Fields 272 * @param include of type Include 273 */ 274 @ConstructorProperties({"pipe", "uniqueFields", "include"}) 275 public Unique( Pipe pipe, Fields uniqueFields, Include include ) 276 { 277 this( null, pipe, uniqueFields, include ); 278 } 279 280 /** 281 * Constructor Unique creates a new Unique instance. 282 * 283 * @param pipe of type Pipe 284 * @param uniqueFields of type Fields 285 * @param capacity of type int 286 */ 287 @ConstructorProperties({"pipe", "uniqueFields", "capacity"}) 288 public Unique( Pipe pipe, Fields uniqueFields, int capacity ) 289 { 290 this( null, pipe, uniqueFields, capacity ); 291 } 292 293 /** 294 * Constructor Unique creates a new Unique instance. 295 * 296 * @param pipe of type Pipe 297 * @param uniqueFields of type Fields 298 * @param include of type Include 299 * @param capacity of type int 300 */ 301 @ConstructorProperties({"pipe", "uniqueFields", "include", "capacity"}) 302 public Unique( Pipe pipe, Fields uniqueFields, Include include, int capacity ) 303 { 304 this( null, pipe, uniqueFields, include, capacity ); 305 } 306 307 /** 308 * Constructor Unique creates a new Unique instance. 309 * 310 * @param name of type String 311 * @param pipe of type Pipe 312 * @param uniqueFields of type Fields 313 */ 314 @ConstructorProperties({"name", "pipe", "uniqueFields"}) 315 public Unique( String name, Pipe pipe, Fields uniqueFields ) 316 { 317 this( name, pipe, uniqueFields, null ); 318 } 319 320 /** 321 * Constructor Unique creates a new Unique instance. 322 * 323 * @param name of type String 324 * @param pipe of type Pipe 325 * @param uniqueFields of type Fields 326 * @param include of type Include 327 */ 328 @ConstructorProperties({"name", "pipe", "uniqueFields", "include"}) 329 public Unique( String name, Pipe pipe, Fields uniqueFields, Include include ) 330 { 331 this( name, pipe, uniqueFields, include, 0 ); 332 } 333 334 /** 335 * Constructor Unique creates a new Unique instance. 336 * 337 * @param name of type String 338 * @param pipe of type Pipe 339 * @param uniqueFields of type Fields 340 * @param capacity of type int 341 */ 342 @ConstructorProperties({"name", "pipe", "uniqueFields", "capacity"}) 343 public Unique( String name, Pipe pipe, Fields uniqueFields, int capacity ) 344 { 345 this( name, Pipe.pipes( pipe ), uniqueFields, capacity ); 346 } 347 348 /** 349 * Constructor Unique creates a new Unique instance. 350 * 351 * @param name of type String 352 * @param pipe of type Pipe 353 * @param uniqueFields of type Fields 354 * @param include of type Include 355 * @param capacity of type int 356 */ 357 @ConstructorProperties({"name", "pipe", "uniqueFields", "include", "capacity"}) 358 public Unique( String name, Pipe pipe, Fields uniqueFields, Include include, int capacity ) 359 { 360 this( name, Pipe.pipes( pipe ), uniqueFields, include, capacity ); 361 } 362 363 /** 364 * Constructor Unique creates a new Unique instance. 365 * 366 * @param pipes of type Pipe[] 367 * @param uniqueFields of type Fields 368 */ 369 @ConstructorProperties({"pipes", "uniqueFields"}) 370 public Unique( Pipe[] pipes, Fields uniqueFields ) 371 { 372 this( null, pipes, uniqueFields ); 373 } 374 375 /** 376 * Constructor Unique creates a new Unique instance. 377 * 378 * @param pipes of type Pipe[] 379 * @param uniqueFields of type Fields 380 * @param include of type Include 381 */ 382 @ConstructorProperties({"pipes", "uniqueFields", "include"}) 383 public Unique( Pipe[] pipes, Fields uniqueFields, Include include ) 384 { 385 this( null, pipes, uniqueFields, include ); 386 } 387 388 /** 389 * Constructor Unique creates a new Unique instance. 390 * 391 * @param pipes of type Pipe[] 392 * @param uniqueFields of type Fields 393 * @param capacity of type int 394 */ 395 @ConstructorProperties({"pipes", "uniqueFields", "capacity"}) 396 public Unique( Pipe[] pipes, Fields uniqueFields, int capacity ) 397 { 398 this( null, pipes, uniqueFields, capacity ); 399 } 400 401 /** 402 * Constructor Unique creates a new Unique instance. 403 * 404 * @param pipes of type Pipe[] 405 * @param uniqueFields of type Fields 406 * @param include of type Include 407 * @param capacity of type int 408 */ 409 @ConstructorProperties({"pipes", "uniqueFields", "include", "capacity"}) 410 public Unique( Pipe[] pipes, Fields uniqueFields, Include include, int capacity ) 411 { 412 this( null, pipes, uniqueFields, include, capacity ); 413 } 414 415 /** 416 * Constructor Unique creates a new Unique instance. 417 * 418 * @param name of type String 419 * @param pipes of type Pipe[] 420 * @param uniqueFields of type Fields 421 */ 422 @ConstructorProperties({"name", "pipes", "uniqueFields"}) 423 public Unique( String name, Pipe[] pipes, Fields uniqueFields ) 424 { 425 this( name, pipes, uniqueFields, null ); 426 } 427 428 /** 429 * Constructor Unique creates a new Unique instance. 430 * 431 * @param name of type String 432 * @param pipes of type Pipe[] 433 * @param uniqueFields of type Fields 434 * @param include of type Include 435 */ 436 @ConstructorProperties({"name", "pipes", "uniqueFields", "include"}) 437 public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include ) 438 { 439 this( name, pipes, uniqueFields, include, 0 ); 440 } 441 442 /** 443 * Constructor Unique creates a new Unique instance. 444 * 445 * @param name of type String 446 * @param pipes of type Pipe[] 447 * @param uniqueFields of type Fields 448 * @param capacity of type int 449 */ 450 @ConstructorProperties({"name", "pipes", "uniqueFields", "capacity"}) 451 public Unique( String name, Pipe[] pipes, Fields uniqueFields, int capacity ) 452 { 453 this( name, pipes, uniqueFields, null, capacity ); 454 } 455 456 /** 457 * Constructor Unique creates a new Unique instance. 458 * 459 * @param name of type String 460 * @param pipes of type Pipe[] 461 * @param uniqueFields of type Fields 462 * @param capacity of type int 463 */ 464 @ConstructorProperties({"name", "pipes", "uniqueFields", "include", "capacity"}) 465 public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include, int capacity ) 466 { 467 super( pipes ); 468 469 if( uniqueFields == null ) 470 throw new IllegalArgumentException( "uniqueFields may not be null" ); 471 472 Pipe[] filters = new Pipe[ pipes.length ]; 473 474 TupleHasher tupleHasher = null; 475 Comparator[] comparators = uniqueFields.getComparators(); 476 477 if( !TupleHasher.isNull( comparators ) ) 478 tupleHasher = new TupleHasher( null, comparators ); 479 480 FilterPartialDuplicates partialDuplicates = new FilterPartialDuplicates( include, capacity, tupleHasher ); 481 482 for( int i = 0; i < filters.length; i++ ) 483 filters[ i ] = new Each( pipes[ i ], uniqueFields, partialDuplicates ); 484 485 Pipe pipe = new GroupBy( name, filters, uniqueFields ); 486 pipe = new Every( pipe, Fields.ALL, new FirstNBuffer(), Fields.RESULTS ); 487 488 setTails( pipe ); 489 } 490 }