001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.stream.element; 022 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.Map; 026 027import cascading.flow.FlowProcess; 028import cascading.flow.FlowProps; 029import cascading.flow.planner.Scope; 030import cascading.flow.stream.duct.Duct; 031import cascading.flow.stream.duct.Grouping; 032import cascading.flow.stream.duct.Window; 033import cascading.flow.stream.graph.IORole; 034import cascading.flow.stream.graph.StreamGraph; 035import cascading.flow.stream.util.SparseTupleComparator; 036import cascading.pipe.Splice; 037import cascading.tuple.Fields; 038import cascading.tuple.Tuple; 039import cascading.tuple.TupleEntry; 040import cascading.tuple.TupleEntryChainIterator; 041import cascading.tuple.TupleEntryIterator; 042import cascading.tuple.Tuples; 043import cascading.tuple.util.TupleBuilder; 044import cascading.tuple.util.TupleHasher; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import static cascading.tuple.util.TupleViews.*; 049 050/** 051 * 052 */ 053public abstract class GroupingSpliceGate extends SpliceGate<TupleEntry, Grouping<TupleEntry, TupleEntryIterator>> implements Window 054 { 055 private static final Logger LOG = LoggerFactory.getLogger( GroupingSpliceGate.class ); 056 057 protected Map<Duct, Integer> ordinalMap; 058 059 protected Fields[] keyFields; 060 protected Fields[] sortFields; 061 protected Fields[] valuesFields; 062 063 protected Comparator<Tuple>[] groupComparators; 064 protected Comparator<Tuple>[] valueComparators; 065 protected TupleHasher groupHasher; 066 protected boolean nullsAreNotEqual; 067 068 protected TupleBuilder[] keyBuilder; 069 protected TupleBuilder[] valuesBuilder; 070 protected TupleBuilder[] sortBuilder; 071 072 protected Grouping<TupleEntry, TupleEntryIterator> grouping; 073 protected TupleEntry keyEntry; 074 protected TupleEntryChainIterator tupleEntryIterator; 075 076 protected GroupingSpliceGate( FlowProcess flowProcess, Splice splice ) 077 { 078 super( flowProcess, splice ); 079 } 080 081 protected GroupingSpliceGate( FlowProcess flowProcess, Splice splice, IORole role ) 082 { 083 super( flowProcess, splice, role ); 084 } 085 086 @Override 087 public void bind( StreamGraph streamGraph ) 088 { 089 super.bind( streamGraph ); 090 091 setOrdinalMap( streamGraph ); 092 } 093 094 protected synchronized void setOrdinalMap( StreamGraph streamGraph ) 095 { 096 ordinalMap = streamGraph.getOrdinalMap( this ); 097 } 098 099 protected TupleBuilder createNarrowBuilder( final Fields incomingFields, final Fields narrowFields ) 100 { 101 if( narrowFields.isNone() ) 102 return new TupleBuilder() 103 { 104 @Override 105 public Tuple makeResult( Tuple input, Tuple output ) 106 { 107 return Tuple.NULL; 108 } 109 }; 110 111 if( incomingFields.isUnknown() ) 112 return new TupleBuilder() 113 { 114 @Override 115 public Tuple makeResult( Tuple input, Tuple output ) 116 { 117 return input.get( incomingFields, narrowFields ); 118 } 119 }; 120 121 if( narrowFields.isAll() ) // dubious this is ever reached 122 return new TupleBuilder() 123 { 124 @Override 125 public Tuple makeResult( Tuple input, Tuple output ) 126 { 127 return input; 128 } 129 }; 130 131 return createDefaultNarrowBuilder( incomingFields, narrowFields ); 132 } 133 134 protected TupleBuilder createDefaultNarrowBuilder( final Fields incomingFields, final Fields narrowFields ) 135 { 136 return new TupleBuilder() 137 { 138 Tuple result = createNarrow( incomingFields.getPos( narrowFields ) ); 139 140 @Override 141 public Tuple makeResult( Tuple input, Tuple output ) 142 { 143 return reset( result, input ); 144 } 145 }; 146 } 147 148 protected TupleBuilder createNulledBuilder( final Fields incomingFields, final Fields keyField ) 149 { 150 if( incomingFields.isUnknown() ) 151 return new TupleBuilder() 152 { 153 @Override 154 public Tuple makeResult( Tuple input, Tuple output ) 155 { 156 return Tuples.nulledCopy( incomingFields, input, keyField ); 157 } 158 }; 159 160 if( keyField.isNone() ) 161 return new TupleBuilder() 162 { 163 @Override 164 public Tuple makeResult( Tuple input, Tuple output ) 165 { 166 return input; 167 } 168 }; 169 170 if( keyField.isAll() ) 171 return new TupleBuilder() 172 { 173 Tuple nullTuple = Tuple.size( incomingFields.size() ); 174 175 @Override 176 public Tuple makeResult( Tuple input, Tuple output ) 177 { 178 return nullTuple; 179 } 180 }; 181 182 return new TupleBuilder() 183 { 184 Tuple nullTuple = Tuple.size( keyField.size() ); 185 Tuple result = createOverride( incomingFields, keyField ); 186 187 @Override 188 public Tuple makeResult( Tuple baseTuple, Tuple output ) 189 { 190 return reset( result, baseTuple, nullTuple ); 191 } 192 }; 193 } 194 195 @Override 196 public void initialize() 197 { 198 super.initialize(); 199 200 int size = getNumDeclaredIncomingBranches(); // is the maximum ordinal value 201 202 // this is a merge, all fields have the same declaration 203 // filling out full array has implications on joiner/closure which should be resolved independently 204 if( role == IORole.source && splice.isGroupBy() ) 205 size = 1; 206 207 keyFields = new Fields[ size ]; 208 valuesFields = new Fields[ size ]; 209 210 keyBuilder = new TupleBuilder[ size ]; 211 valuesBuilder = new TupleBuilder[ size ]; 212 213 if( splice.isSorted() ) 214 { 215 sortFields = new Fields[ size ]; 216 sortBuilder = new TupleBuilder[ size ]; 217 } 218 219 Scope outgoingScope = outgoingScopes.get( 0 ); 220 221 int numScopes = Math.min( size, incomingScopes.size() ); 222 for( int i = 0; i < numScopes; i++ ) 223 { 224 Scope incomingScope = incomingScopes.get( i ); 225 226 // for GroupBy, incoming may have same name, but guaranteed to have same key/value/sort fields for merge 227 // arrays may be size 1, then ordinal should always be zero. 228 int ordinal = size == 1 ? 0 : incomingScope.getOrdinal(); 229 230 keyFields[ ordinal ] = outgoingScope.getKeySelectors().get( incomingScope.getName() ); 231 valuesFields[ ordinal ] = incomingScope.getIncomingSpliceFields(); 232 233 keyBuilder[ ordinal ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ ordinal ] ); 234 valuesBuilder[ ordinal ] = createNulledBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ ordinal ] ); 235 236 if( sortFields != null ) 237 { 238 sortFields[ ordinal ] = outgoingScope.getSortingSelectors().get( incomingScope.getName() ); 239 sortBuilder[ ordinal ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), sortFields[ ordinal ] ); 240 } 241 242 if( LOG.isDebugEnabled() ) 243 { 244 LOG.debug( "incomingScope: {}, in pos: {}", incomingScope.getName(), ordinal ); 245 LOG.debug( "keyFields: {}", printSafe( keyFields[ ordinal ] ) ); 246 LOG.debug( "valueFields: {}", printSafe( valuesFields[ ordinal ] ) ); 247 248 if( sortFields != null ) 249 LOG.debug( "sortFields: {}", printSafe( sortFields[ ordinal ] ) ); 250 } 251 } 252 253 if( role == IORole.sink ) 254 return; 255 256 keyEntry = new TupleEntry( outgoingScope.getOutGroupingFields(), true ); 257 tupleEntryIterator = new TupleEntryChainIterator( outgoingScope.getOutValuesFields() ); 258 259 grouping = new Grouping<>(); 260 grouping.key = keyEntry; 261 grouping.joinIterator = tupleEntryIterator; 262 } 263 264 protected void initComparators() 265 { 266 Comparator defaultComparator = (Comparator) flowProcess.newInstance( (String) flowProcess.getProperty( FlowProps.DEFAULT_ELEMENT_COMPARATOR ) ); 267 268 Fields[] compareFields = new Fields[ getNumDeclaredIncomingBranches() ]; 269 groupComparators = new Comparator[ getNumDeclaredIncomingBranches() ]; 270 271 if( splice.isSorted() ) 272 valueComparators = new Comparator[ getNumDeclaredIncomingBranches() ]; 273 274 int size = splice.isGroupBy() ? 1 : getNumDeclaredIncomingBranches(); 275 276 for( int i = 0; i < size; i++ ) 277 { 278 Scope incomingScope = incomingScopes.get( i ); 279 280 int pos = splice.isGroupBy() ? 0 : splice.getPipePos().get( incomingScope.getName() ); 281 282 // we want the comparators 283 Fields groupFields = splice.getKeySelectors().get( incomingScope.getName() ); 284 285 compareFields[ pos ] = groupFields; // used for finding hashers 286 287 if( groupFields.size() == 0 ) 288 groupComparators[ pos ] = groupFields; 289 else 290 groupComparators[ pos ] = new SparseTupleComparator( Fields.asDeclaration( groupFields ), defaultComparator ); 291 292 groupComparators[ pos ] = splice.isSortReversed() ? Collections.reverseOrder( groupComparators[ pos ] ) : groupComparators[ pos ]; 293 294 if( sortFields != null ) 295 { 296 // we want the comparators, so don't use sortFields array 297 Fields sortFields = splice.getSortingSelectors().get( incomingScope.getName() ); 298 valueComparators[ pos ] = new SparseTupleComparator( valuesFields[ pos ], sortFields, defaultComparator ); 299 300 if( splice.isSortReversed() ) 301 valueComparators[ pos ] = Collections.reverseOrder( valueComparators[ pos ] ); 302 } 303 } 304 305 nullsAreNotEqual = !areNullsEqual(); 306 307 if( nullsAreNotEqual ) 308 LOG.debug( "treating null values in Tuples at not equal during grouping" ); 309 310 Comparator[] hashers = TupleHasher.merge( compareFields ); 311 groupHasher = defaultComparator != null || !TupleHasher.isNull( hashers ) ? new TupleHasher( defaultComparator, hashers ) : null; 312 } 313 314 protected Comparator getKeyComparator() 315 { 316 if( groupComparators.length > 0 && groupComparators[ 0 ] != null ) 317 return groupComparators[ 0 ]; 318 319 return new Comparator<Comparable>() 320 { 321 @Override 322 public int compare( Comparable lhs, Comparable rhs ) 323 { 324 return lhs.compareTo( rhs ); 325 } 326 }; 327 } 328 329 @Override 330 public void cleanup() 331 { 332 super.cleanup(); 333 334 // close if top of stack 335 if( next == null ) 336 flowProcess.closeTrapCollectors(); 337 } 338 339 private boolean areNullsEqual() 340 { 341 try 342 { 343 Tuple tupleWithNull = Tuple.size( 1 ); 344 345 return groupComparators[ 0 ].compare( tupleWithNull, tupleWithNull ) == 0; 346 } 347 catch( Exception exception ) 348 { 349 return true; // assume we have an npe or something and they don't expect to see nulls 350 } 351 } 352 353 protected int getNumDeclaredIncomingBranches() 354 { 355 return splice.getPrevious().length; 356 } 357 358 /** 359 * This allows the tuple to honor the hasher and comparators, if any 360 * 361 * @param object the tuple to wrap 362 * @return a DelegatedTuple instance 363 */ 364 protected final Tuple getDelegatedTuple( Tuple object ) 365 { 366 if( groupHasher == null ) 367 return object; 368 369 return new DelegatedTuple( object ); 370 } 371 372 private String printSafe( Fields fields ) 373 { 374 if( fields != null ) 375 return fields.printVerbose(); 376 377 return ""; 378 } 379 380 @Override 381 public final boolean equals( Object object ) 382 { 383 if( this == object ) 384 return true; 385 if( !( object instanceof GroupingSpliceGate ) ) 386 return false; 387 388 GroupingSpliceGate groupingSpliceGate = (GroupingSpliceGate) object; 389 390 if( splice != null ? splice != groupingSpliceGate.splice : groupingSpliceGate.splice != null ) 391 return false; 392 393 return true; 394 } 395 396 @Override 397 public final int hashCode() 398 { 399 return splice != null ? System.identityHashCode( splice ) : 0; 400 } 401 402 @Override 403 public String toString() 404 { 405 final StringBuilder sb = new StringBuilder( "SpliceGate{" ); 406 sb.append( "splice=" ).append( splice ); 407 sb.append( ", role=" ).append( role ); 408 sb.append( '}' ); 409 return sb.toString(); 410 } 411 412 protected class DelegatedTuple extends Tuple 413 { 414 public DelegatedTuple( Tuple wrapped ) 415 { 416 // pass it in to prevent one being allocated 417 super( Tuple.elements( wrapped ) ); 418 } 419 420 @Override 421 public boolean equals( Object object ) 422 { 423 return compareTo( object ) == 0; 424 } 425 426 @Override 427 public int compareTo( Object other ) 428 { 429 return groupComparators[ 0 ].compare( this, (Tuple) other ); 430 } 431 432 @Override 433 public int hashCode() 434 { 435 return groupHasher.hashCode( this ); 436 } 437 } 438 }