001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe.assembly; 022 023import java.beans.ConstructorProperties; 024import java.util.Comparator; 025import java.util.Map; 026 027import cascading.CascadingException; 028import cascading.flow.FlowProcess; 029import cascading.operation.BaseOperation; 030import cascading.operation.Filter; 031import cascading.operation.FilterCall; 032import cascading.operation.OperationCall; 033import cascading.operation.buffer.FirstNBuffer; 034import cascading.pipe.Each; 035import cascading.pipe.Every; 036import cascading.pipe.GroupBy; 037import cascading.pipe.Pipe; 038import cascading.pipe.SubAssembly; 039import cascading.provider.FactoryLoader; 040import cascading.tuple.Fields; 041import cascading.tuple.Tuple; 042import cascading.tuple.Tuples; 043import cascading.tuple.util.TupleHasher; 044import cascading.util.cache.BaseCacheFactory; 045import cascading.util.cache.CacheEvictionCallback; 046import cascading.util.cache.CascadingCache; 047 048/** 049 * Class Unique {@link SubAssembly} is used to filter all duplicates out of a tuple stream. 050 * <p/> 051 * Typically finding unique value in a tuple stream relies on a {@link GroupBy} and a {@link FirstNBuffer} 052 * {@link cascading.operation.Buffer} operation. 053 * <p/> 054 * If the {@code include} value is set to {@link Include#NO_NULLS}, any tuple consisting of only {@code null} 055 * values will be removed from the stream. 056 * <p/> 057 * This SubAssembly uses the {@link FilterPartialDuplicates} {@link cascading.operation.Filter} 058 * to remove as many observed duplicates before the GroupBy operator to reduce IO over the network. 059 * <p/> 060 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 061 * in a much simpler mechanism. 062 * <p/> 063 * Unique uses a {@link cascading.util.cache.CascadingCache} or LRU to do the filtering. To tune the cache, set the 064 * {@code capacity} value to a high enough value to utilize available memory. Or set a default value via the 065 * {@link cascading.pipe.assembly.UniqueProps#UNIQUE_CACHE_CAPACITY} property. The current default is {@code 10, 000} unique keys. 066 * <p/> 067 * The LRU cache is pluggable and defaults to {@link cascading.util.cache.LRUHashMapCache}. It can be changed 068 * by setting {@link cascading.pipe.assembly.UniqueProps#UNIQUE_CACHE_FACTORY} property to the name of a sub-class of 069 * {@link cascading.util.cache.BaseCacheFactory}. 070 * <p/> 071 * The {@code capacity} value tells the underlying FilterPartialDuplicates how many values to cache for duplicate 072 * comparison before dropping values from the LRU cache. 073 * 074 * @see cascading.util.cache.LRUHashMapCacheFactory 075 * @see cascading.util.cache.DirectMappedCacheFactory 076 * @see cascading.util.cache.LRUHashMapCache 077 * @see cascading.util.cache.DirectMappedCache 078 */ 079public class Unique extends SubAssembly 080 { 081 082 public enum Include 083 { 084 ALL, 085 NO_NULLS 086 } 087 088 public enum Cache 089 { 090 Num_Keys_Flushed, 091 Num_Keys_Hit, 092 Num_Keys_Missed 093 } 094 095 /** 096 * Class FilterPartialDuplicates is a {@link cascading.operation.Filter} that is used to remove observed duplicates from the tuple stream. 097 * <p/> 098 * Use this class typically in tandem with a {@link cascading.operation.aggregator.First} 099 * {@link cascading.operation.Aggregator} in order to improve de-duping performance by removing as many values 100 * as possible before the intermediate {@link cascading.pipe.GroupBy} operator. 101 * <p/> 102 * The {@code capacity} value is used to maintain a LRU of a constant size. If more than capacity unique values 103 * are seen, the oldest cached values will be removed from the cache. 104 * 105 * @see Unique 106 */ 107 public static class FilterPartialDuplicates extends BaseOperation<CascadingCache<Tuple, Object>> implements Filter<CascadingCache<Tuple, Object>> 108 { 109 /** special null value for the caches, since a cache might not permit 'null' as a value */ 110 private final static Object NULL_VALUE = new Object(); 111 112 private int capacity = 0; 113 private Include include = Include.ALL; 114 private TupleHasher tupleHasher; 115 116 /** Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. */ 117 public FilterPartialDuplicates() 118 { 119 } 120 121 /** 122 * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. 123 * 124 * @param capacity of type int 125 */ 126 @ConstructorProperties({"capacity"}) 127 public FilterPartialDuplicates( int capacity ) 128 { 129 this.capacity = capacity; 130 } 131 132 /** 133 * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. 134 * 135 * @param include of type Include 136 * @param capacity of type int 137 */ 138 @ConstructorProperties({"include", "capacity"}) 139 public FilterPartialDuplicates( Include include, int capacity ) 140 { 141 this( include, capacity, null ); 142 } 143 144 /** 145 * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. 146 * 147 * @param capacity of type int 148 * @param include of type Include 149 * @param tupleHasher of type TupleHasher 150 */ 151 @ConstructorProperties({"include", "capacity", "tupleHasher"}) 152 public FilterPartialDuplicates( Include include, int capacity, TupleHasher tupleHasher ) 153 { 154 this.capacity = capacity; 155 this.include = include == null ? this.include : include; 156 this.tupleHasher = tupleHasher; 157 } 158 159 @Override 160 public void prepare( final FlowProcess flowProcess, OperationCall<CascadingCache<Tuple, Object>> operationCall ) 161 { 162 CacheEvictionCallback callback = new CacheEvictionCallback() 163 { 164 @Override 165 public void evict( Map.Entry entry ) 166 { 167 flowProcess.increment( Cache.Num_Keys_Flushed, 1 ); 168 } 169 }; 170 FactoryLoader loader = FactoryLoader.getInstance(); 171 BaseCacheFactory cacheFactory = loader.loadFactoryFrom( flowProcess, UniqueProps.UNIQUE_CACHE_FACTORY, UniqueProps.DEFAULT_CACHE_FACTORY_CLASS ); 172 173 if( cacheFactory == null ) 174 throw new CascadingException( "unable to load cache factory, please check your '" + UniqueProps.UNIQUE_CACHE_FACTORY + "' setting." ); 175 176 CascadingCache cache = cacheFactory.create( flowProcess ); 177 cache.setCacheEvictionCallback( callback ); 178 Integer cacheCapacity = capacity; 179 180 if( capacity == 0 ) 181 { 182 cacheCapacity = flowProcess.getIntegerProperty( UniqueProps.UNIQUE_CACHE_CAPACITY ); 183 184 if( cacheCapacity == null ) 185 cacheCapacity = UniqueProps.UNIQUE_DEFAULT_CAPACITY; 186 } 187 188 cache.setCapacity( cacheCapacity.intValue() ); 189 cache.initialize(); 190 191 operationCall.setContext( cache ); 192 } 193 194 @Override 195 public boolean isRemove( FlowProcess flowProcess, FilterCall<CascadingCache<Tuple, Object>> filterCall ) 196 { 197 // we assume its more painful to create lots of tuple copies vs comparisons 198 Tuple args = TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTuple() ); 199 200 switch( include ) 201 { 202 case ALL: 203 break; 204 205 case NO_NULLS: 206 if( Tuples.frequency( args, null ) == args.size() ) 207 return true; 208 209 break; 210 } 211 212 if( filterCall.getContext().containsKey( args ) ) 213 { 214 flowProcess.increment( Cache.Num_Keys_Hit, 1 ); 215 return true; 216 } 217 218 // only do the copy here 219 filterCall.getContext().put( TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTupleCopy() ), NULL_VALUE ); 220 221 flowProcess.increment( Cache.Num_Keys_Missed, 1 ); 222 223 return false; 224 } 225 226 @Override 227 public void cleanup( FlowProcess flowProcess, OperationCall<CascadingCache<Tuple, Object>> operationCall ) 228 { 229 operationCall.setContext( null ); 230 } 231 232 @Override 233 public boolean equals( Object object ) 234 { 235 if( this == object ) 236 return true; 237 if( !( object instanceof FilterPartialDuplicates ) ) 238 return false; 239 if( !super.equals( object ) ) 240 return false; 241 242 FilterPartialDuplicates that = (FilterPartialDuplicates) object; 243 244 if( capacity != that.capacity ) 245 return false; 246 247 return true; 248 } 249 250 @Override 251 public int hashCode() 252 { 253 int result = super.hashCode(); 254 result = 31 * result + capacity; 255 return result; 256 } 257 } 258 259 /** 260 * Constructor Unique creates a new Unique instance. 261 * 262 * @param pipe of type Pipe 263 * @param uniqueFields of type Fields 264 */ 265 @ConstructorProperties({"pipe", "uniqueFields"}) 266 public Unique( Pipe pipe, Fields uniqueFields ) 267 { 268 this( null, pipe, uniqueFields ); 269 } 270 271 /** 272 * Constructor Unique creates a new Unique instance. 273 * 274 * @param pipe of type Pipe 275 * @param uniqueFields of type Fields 276 * @param include of type Include 277 */ 278 @ConstructorProperties({"pipe", "uniqueFields", "include"}) 279 public Unique( Pipe pipe, Fields uniqueFields, Include include ) 280 { 281 this( null, pipe, uniqueFields, include ); 282 } 283 284 /** 285 * Constructor Unique creates a new Unique instance. 286 * 287 * @param pipe of type Pipe 288 * @param uniqueFields of type Fields 289 * @param capacity of type int 290 */ 291 @ConstructorProperties({"pipe", "uniqueFields", "capacity"}) 292 public Unique( Pipe pipe, Fields uniqueFields, int capacity ) 293 { 294 this( null, pipe, uniqueFields, capacity ); 295 } 296 297 /** 298 * Constructor Unique creates a new Unique instance. 299 * 300 * @param pipe of type Pipe 301 * @param uniqueFields of type Fields 302 * @param include of type Include 303 * @param capacity of type int 304 */ 305 @ConstructorProperties({"pipe", "uniqueFields", "include", "capacity"}) 306 public Unique( Pipe pipe, Fields uniqueFields, Include include, int capacity ) 307 { 308 this( null, pipe, uniqueFields, include, capacity ); 309 } 310 311 /** 312 * Constructor Unique creates a new Unique instance. 313 * 314 * @param name of type String 315 * @param pipe of type Pipe 316 * @param uniqueFields of type Fields 317 */ 318 @ConstructorProperties({"name", "pipe", "uniqueFields"}) 319 public Unique( String name, Pipe pipe, Fields uniqueFields ) 320 { 321 this( name, pipe, uniqueFields, null ); 322 } 323 324 /** 325 * Constructor Unique creates a new Unique instance. 326 * 327 * @param name of type String 328 * @param pipe of type Pipe 329 * @param uniqueFields of type Fields 330 * @param include of type Include 331 */ 332 @ConstructorProperties({"name", "pipe", "uniqueFields", "include"}) 333 public Unique( String name, Pipe pipe, Fields uniqueFields, Include include ) 334 { 335 this( name, pipe, uniqueFields, include, 0 ); 336 } 337 338 /** 339 * Constructor Unique creates a new Unique instance. 340 * 341 * @param name of type String 342 * @param pipe of type Pipe 343 * @param uniqueFields of type Fields 344 * @param capacity of type int 345 */ 346 @ConstructorProperties({"name", "pipe", "uniqueFields", "capacity"}) 347 public Unique( String name, Pipe pipe, Fields uniqueFields, int capacity ) 348 { 349 this( name, Pipe.pipes( pipe ), uniqueFields, capacity ); 350 } 351 352 /** 353 * Constructor Unique creates a new Unique instance. 354 * 355 * @param name of type String 356 * @param pipe of type Pipe 357 * @param uniqueFields of type Fields 358 * @param include of type Include 359 * @param capacity of type int 360 */ 361 @ConstructorProperties({"name", "pipe", "uniqueFields", "include", "capacity"}) 362 public Unique( String name, Pipe pipe, Fields uniqueFields, Include include, int capacity ) 363 { 364 this( name, Pipe.pipes( pipe ), uniqueFields, include, capacity ); 365 } 366 367 /** 368 * Constructor Unique creates a new Unique instance. 369 * 370 * @param pipes of type Pipe[] 371 * @param uniqueFields of type Fields 372 */ 373 @ConstructorProperties({"pipes", "uniqueFields"}) 374 public Unique( Pipe[] pipes, Fields uniqueFields ) 375 { 376 this( null, pipes, uniqueFields ); 377 } 378 379 /** 380 * Constructor Unique creates a new Unique instance. 381 * 382 * @param pipes of type Pipe[] 383 * @param uniqueFields of type Fields 384 * @param include of type Include 385 */ 386 @ConstructorProperties({"pipes", "uniqueFields", "include"}) 387 public Unique( Pipe[] pipes, Fields uniqueFields, Include include ) 388 { 389 this( null, pipes, uniqueFields, include ); 390 } 391 392 /** 393 * Constructor Unique creates a new Unique instance. 394 * 395 * @param pipes of type Pipe[] 396 * @param uniqueFields of type Fields 397 * @param capacity of type int 398 */ 399 @ConstructorProperties({"pipes", "uniqueFields", "capacity"}) 400 public Unique( Pipe[] pipes, Fields uniqueFields, int capacity ) 401 { 402 this( null, pipes, uniqueFields, capacity ); 403 } 404 405 /** 406 * Constructor Unique creates a new Unique instance. 407 * 408 * @param pipes of type Pipe[] 409 * @param uniqueFields of type Fields 410 * @param include of type Include 411 * @param capacity of type int 412 */ 413 @ConstructorProperties({"pipes", "uniqueFields", "include", "capacity"}) 414 public Unique( Pipe[] pipes, Fields uniqueFields, Include include, int capacity ) 415 { 416 this( null, pipes, uniqueFields, include, capacity ); 417 } 418 419 /** 420 * Constructor Unique creates a new Unique instance. 421 * 422 * @param name of type String 423 * @param pipes of type Pipe[] 424 * @param uniqueFields of type Fields 425 */ 426 @ConstructorProperties({"name", "pipes", "uniqueFields"}) 427 public Unique( String name, Pipe[] pipes, Fields uniqueFields ) 428 { 429 this( name, pipes, uniqueFields, null ); 430 } 431 432 /** 433 * Constructor Unique creates a new Unique instance. 434 * 435 * @param name of type String 436 * @param pipes of type Pipe[] 437 * @param uniqueFields of type Fields 438 * @param include of type Include 439 */ 440 @ConstructorProperties({"name", "pipes", "uniqueFields", "include"}) 441 public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include ) 442 { 443 this( name, pipes, uniqueFields, include, 0 ); 444 } 445 446 /** 447 * Constructor Unique creates a new Unique instance. 448 * 449 * @param name of type String 450 * @param pipes of type Pipe[] 451 * @param uniqueFields of type Fields 452 * @param capacity of type int 453 */ 454 @ConstructorProperties({"name", "pipes", "uniqueFields", "capacity"}) 455 public Unique( String name, Pipe[] pipes, Fields uniqueFields, int capacity ) 456 { 457 this( name, pipes, uniqueFields, null, capacity ); 458 } 459 460 /** 461 * Constructor Unique creates a new Unique instance. 462 * 463 * @param name of type String 464 * @param pipes of type Pipe[] 465 * @param uniqueFields of type Fields 466 * @param capacity of type int 467 */ 468 @ConstructorProperties({"name", "pipes", "uniqueFields", "include", "capacity"}) 469 public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include, int capacity ) 470 { 471 super( pipes ); 472 473 if( uniqueFields == null ) 474 throw new IllegalArgumentException( "uniqueFields may not be null" ); 475 476 Pipe[] filters = new Pipe[ pipes.length ]; 477 478 TupleHasher tupleHasher = null; 479 Comparator[] comparators = uniqueFields.getComparators(); 480 481 if( !TupleHasher.isNull( comparators ) ) 482 tupleHasher = new TupleHasher( null, comparators ); 483 484 FilterPartialDuplicates partialDuplicates = new FilterPartialDuplicates( include, capacity, tupleHasher ); 485 486 for( int i = 0; i < filters.length; i++ ) 487 filters[ i ] = new Each( pipes[ i ], uniqueFields, partialDuplicates ); 488 489 Pipe pipe = new GroupBy( name, filters, uniqueFields ); 490 pipe = new Every( pipe, Fields.ALL, new FirstNBuffer(), Fields.RESULTS ); 491 492 setTails( pipe ); 493 } 494 }