001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe.assembly; 022 023 import java.beans.ConstructorProperties; 024 import java.util.Comparator; 025 import java.util.LinkedHashMap; 026 import java.util.Map; 027 028 import cascading.flow.FlowProcess; 029 import cascading.operation.BaseOperation; 030 import cascading.operation.Filter; 031 import cascading.operation.FilterCall; 032 import cascading.operation.OperationCall; 033 import cascading.operation.buffer.FirstNBuffer; 034 import cascading.pipe.Each; 035 import cascading.pipe.Every; 036 import cascading.pipe.GroupBy; 037 import cascading.pipe.Pipe; 038 import cascading.pipe.SubAssembly; 039 import cascading.tuple.Fields; 040 import cascading.tuple.Tuple; 041 import cascading.tuple.Tuples; 042 import cascading.tuple.util.TupleHasher; 043 044 /** 045 * Class Unique {@link SubAssembly} is used to filter all duplicates out of a tuple stream. 046 * <p/> 047 * Typically finding unique value in a tuple stream relies on a {@link GroupBy} and a {@link FirstNBuffer} 048 * {@link cascading.operation.Buffer} operation. 049 * <p/> 050 * If the {@code include} value is set to {@link Include#NO_NULLS}, any tuple consisting of only {@code null} 051 * values will be removed from the stream. 052 * <p/> 053 * This SubAssembly uses the {@link FilterPartialDuplicates} {@link cascading.operation.Filter} 054 * to remove as many observed duplicates before the GroupBy operator to reduce IO over the network. 055 * <p/> 056 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 057 * in a much simpler mechanism. 058 * <p/> 059 * The {@code threshold} value tells the underlying FilterPartialDuplicates how many values to cache for duplicate 060 * comparison before dropping values from the LRU cache. 061 */ 062 public class Unique extends SubAssembly 063 { 064 public enum Include 065 { 066 ALL, 067 NO_NULLS 068 } 069 070 public enum Cache 071 { 072 Num_Keys_Flushed, 073 Num_Keys_Hit, 074 Num_Keys_Missed 075 } 076 077 /** 078 * Class FilterPartialDuplicates is a {@link cascading.operation.Filter} that is used to remove observed duplicates from the tuple stream. 079 * <p/> 080 * Use this class typically in tandem with a {@link cascading.operation.aggregator.First} 081 * {@link cascading.operation.Aggregator} in order to improve de-duping performance by removing as many values 082 * as possible before the intermediate {@link cascading.pipe.GroupBy} operator. 083 * <p/> 084 * The {@code threshold} value is used to maintain a LRU of a constant size. If more than threshold unique values 085 * are seen, the oldest cached values will be removed from the cache. 086 * 087 * @see Unique 088 */ 089 public static class FilterPartialDuplicates extends BaseOperation<LinkedHashMap<Tuple, Object>> implements Filter<LinkedHashMap<Tuple, Object>> 090 { 091 private int threshold = 10000; 092 private Include include = Include.ALL; 093 private TupleHasher tupleHasher; 094 095 /** Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. */ 096 public FilterPartialDuplicates() 097 { 098 } 099 100 /** 101 * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. 102 * 103 * @param threshold of type int 104 */ 105 @ConstructorProperties( {"threshold"} ) 106 public FilterPartialDuplicates( int threshold ) 107 { 108 this.threshold = threshold; 109 } 110 111 /** 112 * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. 113 * 114 * @param include of type Include 115 * @param threshold of type int 116 */ 117 @ConstructorProperties( {"include", "threshold"} ) 118 public FilterPartialDuplicates( Include include, int threshold ) 119 { 120 this( include, threshold, null ); 121 } 122 123 /** 124 * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. 125 * 126 * @param threshold of type int 127 * @param include of type Include 128 * @param tupleHasher of type TupleHasher 129 */ 130 @ConstructorProperties( {"include", "threshold", "tupleHasher"} ) 131 public FilterPartialDuplicates( Include include, int threshold, TupleHasher tupleHasher ) 132 { 133 this.threshold = threshold; 134 this.include = include == null ? this.include : include; 135 this.tupleHasher = tupleHasher; 136 } 137 138 @Override 139 public void prepare( final FlowProcess flowProcess, OperationCall<LinkedHashMap<Tuple, Object>> operationCall ) 140 { 141 operationCall.setContext( new LinkedHashMap<Tuple, Object>( threshold, 0.75f, true ) 142 { 143 @Override 144 protected boolean removeEldestEntry( Map.Entry eldest ) 145 { 146 boolean doFlush = size() > threshold; 147 148 if( doFlush ) 149 flowProcess.increment( Cache.Num_Keys_Flushed, 1 ); 150 151 return doFlush; 152 } 153 } ); 154 } 155 156 @Override 157 public boolean isRemove( FlowProcess flowProcess, FilterCall<LinkedHashMap<Tuple, Object>> filterCall ) 158 { 159 // we assume its more painful to create lots of tuple copies vs comparisons 160 Tuple args = TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTuple() ); 161 162 switch( include ) 163 { 164 case ALL: 165 break; 166 167 case NO_NULLS: 168 if( Tuples.frequency( args, null ) == args.size() ) 169 return true; 170 171 break; 172 } 173 174 if( filterCall.getContext().containsKey( args ) ) 175 { 176 flowProcess.increment( Cache.Num_Keys_Hit, 1 ); 177 return true; 178 } 179 180 // only do the copy here 181 filterCall.getContext().put( TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTupleCopy() ), null ); 182 183 flowProcess.increment( Cache.Num_Keys_Missed, 1 ); 184 185 return false; 186 } 187 188 @Override 189 public void cleanup( FlowProcess flowProcess, OperationCall<LinkedHashMap<Tuple, Object>> operationCall ) 190 { 191 operationCall.setContext( null ); 192 } 193 194 @Override 195 public boolean equals( Object object ) 196 { 197 if( this == object ) 198 return true; 199 if( !( object instanceof FilterPartialDuplicates ) ) 200 return false; 201 if( !super.equals( object ) ) 202 return false; 203 204 FilterPartialDuplicates that = (FilterPartialDuplicates) object; 205 206 if( threshold != that.threshold ) 207 return false; 208 209 return true; 210 } 211 212 @Override 213 public int hashCode() 214 { 215 int result = super.hashCode(); 216 result = 31 * result + threshold; 217 return result; 218 } 219 } 220 221 /** 222 * Constructor Unique creates a new Unique instance. 223 * 224 * @param pipe of type Pipe 225 * @param uniqueFields of type Fields 226 */ 227 @ConstructorProperties( {"pipe", "uniqueFields"} ) 228 public Unique( Pipe pipe, Fields uniqueFields ) 229 { 230 this( null, pipe, uniqueFields ); 231 } 232 233 /** 234 * Constructor Unique creates a new Unique instance. 235 * 236 * @param pipe of type Pipe 237 * @param uniqueFields of type Fields 238 * @param include of type Include 239 */ 240 @ConstructorProperties( {"pipe", "uniqueFields", "include"} ) 241 public Unique( Pipe pipe, Fields uniqueFields, Include include ) 242 { 243 this( null, pipe, uniqueFields, include ); 244 } 245 246 /** 247 * Constructor Unique creates a new Unique instance. 248 * 249 * @param pipe of type Pipe 250 * @param uniqueFields of type Fields 251 * @param threshold of type int 252 */ 253 @ConstructorProperties( {"pipe", "uniqueFields", "threshold"} ) 254 public Unique( Pipe pipe, Fields uniqueFields, int threshold ) 255 { 256 this( null, pipe, uniqueFields, threshold ); 257 } 258 259 /** 260 * Constructor Unique creates a new Unique instance. 261 * 262 * @param pipe of type Pipe 263 * @param uniqueFields of type Fields 264 * @param include of type Include 265 * @param threshold of type int 266 */ 267 @ConstructorProperties( {"pipe", "uniqueFields", "include", "threshold"} ) 268 public Unique( Pipe pipe, Fields uniqueFields, Include include, int threshold ) 269 { 270 this( null, pipe, uniqueFields, include, threshold ); 271 } 272 273 /** 274 * Constructor Unique creates a new Unique instance. 275 * 276 * @param name of type String 277 * @param pipe of type Pipe 278 * @param uniqueFields of type Fields 279 */ 280 @ConstructorProperties( {"name", "pipe", "uniqueFields"} ) 281 public Unique( String name, Pipe pipe, Fields uniqueFields ) 282 { 283 this( name, pipe, uniqueFields, 10000 ); 284 } 285 286 /** 287 * Constructor Unique creates a new Unique instance. 288 * 289 * @param name of type String 290 * @param pipe of type Pipe 291 * @param uniqueFields of type Fields 292 * @param include of type Include 293 */ 294 @ConstructorProperties( {"name", "pipe", "uniqueFields", "include"} ) 295 public Unique( String name, Pipe pipe, Fields uniqueFields, Include include ) 296 { 297 this( name, pipe, uniqueFields, include, 10000 ); 298 } 299 300 /** 301 * Constructor Unique creates a new Unique instance. 302 * 303 * @param name of type String 304 * @param pipe of type Pipe 305 * @param uniqueFields of type Fields 306 * @param threshold of type int 307 */ 308 @ConstructorProperties( {"name", "pipe", "uniqueFields", "threshold"} ) 309 public Unique( String name, Pipe pipe, Fields uniqueFields, int threshold ) 310 { 311 this( name, Pipe.pipes( pipe ), uniqueFields, threshold ); 312 } 313 314 /** 315 * Constructor Unique creates a new Unique instance. 316 * 317 * @param name of type String 318 * @param pipe of type Pipe 319 * @param uniqueFields of type Fields 320 * @param include of type Include 321 * @param threshold of type int 322 */ 323 @ConstructorProperties( {"name", "pipe", "uniqueFields", "include", "threshold"} ) 324 public Unique( String name, Pipe pipe, Fields uniqueFields, Include include, int threshold ) 325 { 326 this( name, Pipe.pipes( pipe ), uniqueFields, include, threshold ); 327 } 328 329 /** 330 * Constructor Unique creates a new Unique instance. 331 * 332 * @param pipes of type Pipe[] 333 * @param uniqueFields of type Fields 334 */ 335 @ConstructorProperties( {"pipes", "uniqueFields"} ) 336 public Unique( Pipe[] pipes, Fields uniqueFields ) 337 { 338 this( null, pipes, uniqueFields, 10000 ); 339 } 340 341 /** 342 * Constructor Unique creates a new Unique instance. 343 * 344 * @param pipes of type Pipe[] 345 * @param uniqueFields of type Fields 346 * @param include of type Include 347 */ 348 @ConstructorProperties( {"pipes", "uniqueFields", "include"} ) 349 public Unique( Pipe[] pipes, Fields uniqueFields, Include include ) 350 { 351 this( null, pipes, uniqueFields, include, 10000 ); 352 } 353 354 /** 355 * Constructor Unique creates a new Unique instance. 356 * 357 * @param pipes of type Pipe[] 358 * @param uniqueFields of type Fields 359 * @param threshold of type int 360 */ 361 @ConstructorProperties( {"pipes", "uniqueFields", "threshold"} ) 362 public Unique( Pipe[] pipes, Fields uniqueFields, int threshold ) 363 { 364 this( null, pipes, uniqueFields, threshold ); 365 } 366 367 /** 368 * Constructor Unique creates a new Unique instance. 369 * 370 * @param pipes of type Pipe[] 371 * @param uniqueFields of type Fields 372 * @param include of type Include 373 * @param threshold of type int 374 */ 375 @ConstructorProperties( {"pipes", "uniqueFields", "include", "threshold"} ) 376 public Unique( Pipe[] pipes, Fields uniqueFields, Include include, int threshold ) 377 { 378 this( null, pipes, uniqueFields, include, threshold ); 379 } 380 381 /** 382 * Constructor Unique creates a new Unique instance. 383 * 384 * @param name of type String 385 * @param pipes of type Pipe[] 386 * @param uniqueFields of type Fields 387 */ 388 @ConstructorProperties( {"name", "pipes", "uniqueFields"} ) 389 public Unique( String name, Pipe[] pipes, Fields uniqueFields ) 390 { 391 this( name, pipes, uniqueFields, 10000 ); 392 } 393 394 /** 395 * Constructor Unique creates a new Unique instance. 396 * 397 * @param name of type String 398 * @param pipes of type Pipe[] 399 * @param uniqueFields of type Fields 400 * @param include of type Include 401 */ 402 @ConstructorProperties( {"name", "pipes", "uniqueFields", "include"} ) 403 public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include ) 404 { 405 this( name, pipes, uniqueFields, include, 10000 ); 406 } 407 408 /** 409 * Constructor Unique creates a new Unique instance. 410 * 411 * @param name of type String 412 * @param pipes of type Pipe[] 413 * @param uniqueFields of type Fields 414 * @param threshold of type int 415 */ 416 @ConstructorProperties( {"name", "pipes", "uniqueFields", "threshold"} ) 417 public Unique( String name, Pipe[] pipes, Fields uniqueFields, int threshold ) 418 { 419 this( name, pipes, uniqueFields, null, threshold ); 420 } 421 422 /** 423 * Constructor Unique creates a new Unique instance. 424 * 425 * @param name of type String 426 * @param pipes of type Pipe[] 427 * @param uniqueFields of type Fields 428 * @param threshold of type int 429 */ 430 @ConstructorProperties( {"name", "pipes", "uniqueFields", "include", "threshold"} ) 431 public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include, int threshold ) 432 { 433 super( pipes ); 434 435 if( uniqueFields == null ) 436 throw new IllegalArgumentException( "uniqueFields may not be null" ); 437 438 Pipe[] filters = new Pipe[ pipes.length ]; 439 440 TupleHasher tupleHasher = null; 441 Comparator[] comparators = uniqueFields.getComparators(); 442 443 if( !TupleHasher.isNull( comparators ) ) 444 tupleHasher = new TupleHasher( null, comparators ); 445 446 FilterPartialDuplicates partialDuplicates = new FilterPartialDuplicates( include, threshold, tupleHasher ); 447 448 for( int i = 0; i < filters.length; i++ ) 449 filters[ i ] = new Each( pipes[ i ], uniqueFields, partialDuplicates ); 450 451 Pipe pipe = new GroupBy( name, filters, uniqueFields ); 452 pipe = new Every( pipe, Fields.ALL, new FirstNBuffer(), Fields.RESULTS ); 453 454 setTails( pipe ); 455 } 456 }