001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.local.stream.element;
023
024import java.util.Collections;
025import java.util.Iterator;
026import java.util.List;
027
028import cascading.flow.FlowProcess;
029import cascading.flow.stream.duct.Duct;
030import cascading.flow.stream.element.MemorySpliceGate;
031import cascading.pipe.Splice;
032import cascading.tuple.Tuple;
033import cascading.tuple.TupleEntry;
034import com.google.common.collect.ArrayListMultimap;
035import com.google.common.collect.ListMultimap;
036import com.google.common.collect.Multimaps;
037
038/**
039 *
040 */
041public class LocalGroupByGate extends MemorySpliceGate
042  {
043  private ListMultimap<Tuple, Tuple> valueMap;
044
045  public LocalGroupByGate( FlowProcess flowProcess, Splice splice )
046    {
047    super( flowProcess, splice );
048    }
049
050  @Override
051  protected boolean isBlockingStreamed()
052    {
053    return true;
054    }
055
056  private ListMultimap<Tuple, Tuple> initNewValueMap()
057    {
058    return Multimaps.synchronizedListMultimap( ArrayListMultimap.<Tuple, Tuple>create() );
059    }
060
061  @Override
062  public void prepare()
063    {
064    super.prepare();
065
066    valueMap = initNewValueMap();
067    }
068
069  @Override
070  public void start( Duct previous )
071    {
072    // chained below in #complete()
073    }
074
075  @Override
076  public void receive( Duct previous, int ordinal, TupleEntry incomingEntry )
077    {
078    Tuple valuesTuple = incomingEntry.getTupleCopy();
079    Tuple groupTuple = keyBuilder[ 0 ].makeResult( valuesTuple, null ); // view on valuesTuple
080
081    groupTuple = getDelegatedTuple( groupTuple ); // wrap so hasher/comparator is honored
082
083    keys.add( groupTuple );
084    valueMap.put( groupTuple, valuesTuple );
085    }
086
087  @Override
088  public void complete( Duct previous )
089    {
090    if( count.decrementAndGet() != 0 )
091      return;
092
093    next.start( this );
094
095    // drain the keys and keyValues collections to preserve memory
096    Iterator<Tuple> iterator = keys.iterator();
097
098    // no need to synchronize here as we are guaranteed all writer threads are completed
099    while( iterator.hasNext() )
100      {
101      Tuple groupTuple = iterator.next();
102
103      iterator.remove();
104
105      keyEntry.setTuple( groupTuple );
106
107      List<Tuple> tuples = valueMap.get( groupTuple ); // can't removeAll, returns unmodifiable collection
108
109      if( valueComparators != null )
110        Collections.sort( tuples, valueComparators[ 0 ] );
111
112      tupleEntryIterator.reset( tuples.iterator() );
113
114      next.receive( this, 0, grouping );
115
116      tuples.clear();
117      }
118
119    keys = createKeySet();
120    valueMap = initNewValueMap();
121    count.set( numIncomingEventingPaths );
122
123    next.complete( this );
124    }
125  }