001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe.assembly; 022 023 import java.beans.ConstructorProperties; 024 import java.util.LinkedHashMap; 025 import java.util.Map; 026 027 import cascading.flow.FlowProcess; 028 import cascading.operation.BaseOperation; 029 import cascading.operation.Filter; 030 import cascading.operation.FilterCall; 031 import cascading.operation.OperationCall; 032 import cascading.operation.buffer.FirstNBuffer; 033 import cascading.pipe.Each; 034 import cascading.pipe.Every; 035 import cascading.pipe.GroupBy; 036 import cascading.pipe.Pipe; 037 import cascading.pipe.SubAssembly; 038 import cascading.tuple.Fields; 039 import cascading.tuple.Tuple; 040 import cascading.tuple.Tuples; 041 042 /** 043 * Class Unique {@link SubAssembly} is used to filter all duplicates out of a tuple stream. 044 * <p/> 045 * Typically finding unique value in a tuple stream relies on a {@link GroupBy} and a {@link FirstNBuffer} 046 * {@link cascading.operation.Buffer} operation. 047 * <p/> 048 * If the {@code include} value is set to {@link Include#NO_NULLS}, any tuple consisting of only {@code null} 049 * values will be removed from the stream. 050 * <p/> 051 * This SubAssembly uses the {@link FilterPartialDuplicates} {@link cascading.operation.Filter} 052 * to remove as many observed duplicates before the GroupBy operator to reduce IO over the network. 053 * <p/> 054 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 055 * in a much simpler mechanism. 056 * <p/> 057 * The {@code threshold} value tells the underlying FilterPartialDuplicates how many values to cache for duplicate 058 * comparison before dropping values from the LRU cache. 059 */ 060 public class Unique extends SubAssembly 061 { 062 public enum Include 063 { 064 ALL, 065 NO_NULLS 066 } 067 068 /** 069 * Class FilterPartialDuplicates is a {@link cascading.operation.Filter} that is used to remove observed duplicates from the tuple stream. 070 * <p/> 071 * Use this class typically in tandem with a {@link cascading.operation.aggregator.First} 072 * {@link cascading.operation.Aggregator} in order to improve de-duping performance by removing as many values 073 * as possible before the intermediate {@link cascading.pipe.GroupBy} operator. 074 * <p/> 075 * The {@code threshold} value is used to maintain a LRU of a constant size. If more than threshold unique values 076 * are seen, the oldest cached values will be removed from the cache. 077 * 078 * @see Unique 079 */ 080 public static class FilterPartialDuplicates extends BaseOperation<LinkedHashMap<Tuple, Object>> implements Filter<LinkedHashMap<Tuple, Object>> 081 { 082 private int threshold = 10000; 083 private Include include = Include.ALL; 084 085 /** Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. */ 086 public FilterPartialDuplicates() 087 { 088 } 089 090 /** 091 * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. 092 * 093 * @param threshold of type int 094 */ 095 @ConstructorProperties({"threshold"}) 096 public FilterPartialDuplicates( int threshold ) 097 { 098 this.threshold = threshold; 099 } 100 101 /** 102 * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. 103 * 104 * @param threshold of type int 105 */ 106 @ConstructorProperties({"include", "threshold"}) 107 public FilterPartialDuplicates( Include include, int threshold ) 108 { 109 this.threshold = threshold; 110 this.include = include == null ? this.include : include; 111 } 112 113 @Override 114 public void prepare( FlowProcess flowProcess, OperationCall<LinkedHashMap<Tuple, Object>> operationCall ) 115 { 116 operationCall.setContext( new LinkedHashMap<Tuple, Object>( threshold, 0.75f, true ) 117 { 118 @Override 119 protected boolean removeEldestEntry( Map.Entry eldest ) 120 { 121 return size() > threshold; 122 } 123 } ); 124 } 125 126 @Override 127 public boolean isRemove( FlowProcess flowProcess, FilterCall<LinkedHashMap<Tuple, Object>> filterCall ) 128 { 129 // we assume its more painful to create lots of tuple copies vs comparisons 130 Tuple args = filterCall.getArguments().getTuple(); 131 132 switch( include ) 133 { 134 case ALL: 135 break; 136 137 case NO_NULLS: 138 if( Tuples.frequency( args, null ) == args.size() ) 139 return true; 140 141 break; 142 } 143 144 if( filterCall.getContext().containsKey( args ) ) 145 return true; 146 147 filterCall.getContext().put( filterCall.getArguments().getTupleCopy(), null ); 148 149 return false; 150 } 151 152 @Override 153 public void cleanup( FlowProcess flowProcess, OperationCall<LinkedHashMap<Tuple, Object>> operationCall ) 154 { 155 operationCall.setContext( null ); 156 } 157 158 @Override 159 public boolean equals( Object object ) 160 { 161 if( this == object ) 162 return true; 163 if( !( object instanceof FilterPartialDuplicates ) ) 164 return false; 165 if( !super.equals( object ) ) 166 return false; 167 168 FilterPartialDuplicates that = (FilterPartialDuplicates) object; 169 170 if( threshold != that.threshold ) 171 return false; 172 173 return true; 174 } 175 176 @Override 177 public int hashCode() 178 { 179 int result = super.hashCode(); 180 result = 31 * result + threshold; 181 return result; 182 } 183 } 184 185 /** 186 * Constructor Unique creates a new Unique instance. 187 * 188 * @param pipe of type Pipe 189 * @param uniqueFields of type Fields 190 */ 191 @ConstructorProperties({"pipe", "uniqueFields"}) 192 public Unique( Pipe pipe, Fields uniqueFields ) 193 { 194 this( null, pipe, uniqueFields ); 195 } 196 197 /** 198 * Constructor Unique creates a new Unique instance. 199 * 200 * @param pipe of type Pipe 201 * @param uniqueFields of type Fields 202 * @param include of type Include 203 */ 204 @ConstructorProperties({"pipe", "uniqueFields", "include"}) 205 public Unique( Pipe pipe, Fields uniqueFields, Include include ) 206 { 207 this( null, pipe, uniqueFields, include ); 208 } 209 210 /** 211 * Constructor Unique creates a new Unique instance. 212 * 213 * @param pipe of type Pipe 214 * @param uniqueFields of type Fields 215 * @param threshold of type int 216 */ 217 @ConstructorProperties({"pipe", "uniqueFields", "threshold"}) 218 public Unique( Pipe pipe, Fields uniqueFields, int threshold ) 219 { 220 this( null, pipe, uniqueFields, threshold ); 221 } 222 223 /** 224 * Constructor Unique creates a new Unique instance. 225 * 226 * @param pipe of type Pipe 227 * @param uniqueFields of type Fields 228 * @param include of type Include 229 * @param threshold of type int 230 */ 231 @ConstructorProperties({"pipe", "uniqueFields", "include", "threshold"}) 232 public Unique( Pipe pipe, Fields uniqueFields, Include include, int threshold ) 233 { 234 this( null, pipe, uniqueFields, include, threshold ); 235 } 236 237 /** 238 * Constructor Unique creates a new Unique instance. 239 * 240 * @param name of type String 241 * @param pipe of type Pipe 242 * @param uniqueFields of type Fields 243 */ 244 @ConstructorProperties({"name", "pipe", "uniqueFields"}) 245 public Unique( String name, Pipe pipe, Fields uniqueFields ) 246 { 247 this( name, pipe, uniqueFields, 10000 ); 248 } 249 250 /** 251 * Constructor Unique creates a new Unique instance. 252 * 253 * @param name of type String 254 * @param pipe of type Pipe 255 * @param uniqueFields of type Fields 256 * @param include of type Include 257 */ 258 @ConstructorProperties({"name", "pipe", "uniqueFields", "include"}) 259 public Unique( String name, Pipe pipe, Fields uniqueFields, Include include ) 260 { 261 this( name, pipe, uniqueFields, include, 10000 ); 262 } 263 264 /** 265 * Constructor Unique creates a new Unique instance. 266 * 267 * @param name of type String 268 * @param pipe of type Pipe 269 * @param uniqueFields of type Fields 270 * @param threshold of type int 271 */ 272 @ConstructorProperties({"name", "pipe", "uniqueFields", "threshold"}) 273 public Unique( String name, Pipe pipe, Fields uniqueFields, int threshold ) 274 { 275 this( name, Pipe.pipes( pipe ), uniqueFields, threshold ); 276 } 277 278 /** 279 * Constructor Unique creates a new Unique instance. 280 * 281 * @param name of type String 282 * @param pipe of type Pipe 283 * @param uniqueFields of type Fields 284 * @param include of type Include 285 * @param threshold of type int 286 */ 287 @ConstructorProperties({"name", "pipe", "uniqueFields", "include", "threshold"}) 288 public Unique( String name, Pipe pipe, Fields uniqueFields, Include include, int threshold ) 289 { 290 this( name, Pipe.pipes( pipe ), uniqueFields, include, threshold ); 291 } 292 293 /** 294 * Constructor Unique creates a new Unique instance. 295 * 296 * @param pipes of type Pipe[] 297 * @param uniqueFields of type Fields 298 */ 299 @ConstructorProperties({"pipes", "uniqueFields"}) 300 public Unique( Pipe[] pipes, Fields uniqueFields ) 301 { 302 this( null, pipes, uniqueFields, 10000 ); 303 } 304 305 /** 306 * Constructor Unique creates a new Unique instance. 307 * 308 * @param pipes of type Pipe[] 309 * @param uniqueFields of type Fields 310 * @param include of type Include 311 */ 312 @ConstructorProperties({"pipes", "uniqueFields", "include"}) 313 public Unique( Pipe[] pipes, Fields uniqueFields, Include include ) 314 { 315 this( null, pipes, uniqueFields, include, 10000 ); 316 } 317 318 /** 319 * Constructor Unique creates a new Unique instance. 320 * 321 * @param pipes of type Pipe[] 322 * @param uniqueFields of type Fields 323 * @param threshold of type int 324 */ 325 @ConstructorProperties({"pipes", "uniqueFields", "threshold"}) 326 public Unique( Pipe[] pipes, Fields uniqueFields, int threshold ) 327 { 328 this( null, pipes, uniqueFields, threshold ); 329 } 330 331 /** 332 * Constructor Unique creates a new Unique instance. 333 * 334 * @param pipes of type Pipe[] 335 * @param uniqueFields of type Fields 336 * @param include of type Include 337 * @param threshold of type int 338 */ 339 @ConstructorProperties({"pipes", "uniqueFields", "threshold"}) 340 public Unique( Pipe[] pipes, Fields uniqueFields, Include include, int threshold ) 341 { 342 this( null, pipes, uniqueFields, include, threshold ); 343 } 344 345 /** 346 * Constructor Unique creates a new Unique instance. 347 * 348 * @param name of type String 349 * @param pipes of type Pipe[] 350 * @param uniqueFields of type Fields 351 */ 352 @ConstructorProperties({"name", "pipes", "uniqueFields"}) 353 public Unique( String name, Pipe[] pipes, Fields uniqueFields ) 354 { 355 this( name, pipes, uniqueFields, 10000 ); 356 } 357 358 /** 359 * Constructor Unique creates a new Unique instance. 360 * 361 * @param name of type String 362 * @param pipes of type Pipe[] 363 * @param uniqueFields of type Fields 364 * @param include of type Include 365 */ 366 @ConstructorProperties({"name", "pipes", "uniqueFields", "include"}) 367 public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include ) 368 { 369 this( name, pipes, uniqueFields, include, 10000 ); 370 } 371 372 /** 373 * Constructor Unique creates a new Unique instance. 374 * 375 * @param name of type String 376 * @param pipes of type Pipe[] 377 * @param uniqueFields of type Fields 378 * @param threshold of type int 379 */ 380 @ConstructorProperties({"name", "pipes", "uniqueFields", "threshold"}) 381 public Unique( String name, Pipe[] pipes, Fields uniqueFields, int threshold ) 382 { 383 this( name, pipes, uniqueFields, null, threshold ); 384 } 385 386 /** 387 * Constructor Unique creates a new Unique instance. 388 * 389 * @param name of type String 390 * @param pipes of type Pipe[] 391 * @param uniqueFields of type Fields 392 * @param threshold of type int 393 */ 394 @ConstructorProperties({"name", "pipes", "uniqueFields", "include", "threshold"}) 395 public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include, int threshold ) 396 { 397 super( pipes ); 398 399 Pipe[] filters = new Pipe[ pipes.length ]; 400 FilterPartialDuplicates partialDuplicates = new FilterPartialDuplicates( include, threshold ); 401 402 for( int i = 0; i < filters.length; i++ ) 403 filters[ i ] = new Each( pipes[ i ], uniqueFields, partialDuplicates ); 404 405 Pipe pipe = new GroupBy( name, filters, uniqueFields ); 406 pipe = new Every( pipe, Fields.ALL, new FirstNBuffer(), Fields.RESULTS ); 407 408 setTails( pipe ); 409 } 410 }