001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.local.stream.element; 023 024import java.util.Collections; 025import java.util.Iterator; 026import java.util.List; 027 028import cascading.flow.FlowProcess; 029import cascading.flow.stream.duct.Duct; 030import cascading.flow.stream.element.MemorySpliceGate; 031import cascading.pipe.Splice; 032import cascading.tuple.Tuple; 033import cascading.tuple.TupleEntry; 034import com.google.common.collect.ArrayListMultimap; 035import com.google.common.collect.ListMultimap; 036import com.google.common.collect.Multimaps; 037 038/** 039 * 040 */ 041public class LocalGroupByGate extends MemorySpliceGate 042 { 043 private ListMultimap<Tuple, Tuple> valueMap; 044 045 public LocalGroupByGate( FlowProcess flowProcess, Splice splice ) 046 { 047 super( flowProcess, splice ); 048 } 049 050 @Override 051 protected boolean isBlockingStreamed() 052 { 053 return true; 054 } 055 056 private ListMultimap<Tuple, Tuple> initNewValueMap() 057 { 058 return Multimaps.synchronizedListMultimap( ArrayListMultimap.<Tuple, Tuple>create() ); 059 } 060 061 @Override 062 public void prepare() 063 { 064 super.prepare(); 065 066 valueMap = initNewValueMap(); 067 } 068 069 @Override 070 public void start( Duct previous ) 071 { 072 // chained below in #complete() 073 } 074 075 @Override 076 public void receive( Duct previous, int ordinal, TupleEntry incomingEntry ) 077 { 078 Tuple valuesTuple = incomingEntry.getTupleCopy(); 079 Tuple groupTuple = keyBuilder[ 0 ].makeResult( valuesTuple, null ); // view on valuesTuple 080 081 groupTuple = getDelegatedTuple( groupTuple ); // wrap so hasher/comparator is honored 082 083 keys.add( groupTuple ); 084 valueMap.put( groupTuple, valuesTuple ); 085 } 086 087 @Override 088 public void complete( Duct previous ) 089 { 090 if( count.decrementAndGet() != 0 ) 091 return; 092 093 next.start( this ); 094 095 // drain the keys and keyValues collections to preserve memory 096 Iterator<Tuple> iterator = keys.iterator(); 097 098 // no need to synchronize here as we are guaranteed all writer threads are completed 099 while( iterator.hasNext() ) 100 { 101 Tuple groupTuple = iterator.next(); 102 103 iterator.remove(); 104 105 keyEntry.setTuple( groupTuple ); 106 107 List<Tuple> tuples = valueMap.get( groupTuple ); // can't removeAll, returns unmodifiable collection 108 109 if( valueComparators != null ) 110 Collections.sort( tuples, valueComparators[ 0 ] ); 111 112 tupleEntryIterator.reset( tuples.iterator() ); 113 114 next.receive( this, 0, grouping ); 115 116 tuples.clear(); 117 } 118 119 keys = createKeySet(); 120 valueMap = initNewValueMap(); 121 count.set( numIncomingEventingPaths ); 122 123 next.complete( this ); 124 } 125 }