001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.Set;
028
029import cascading.flow.FlowProcess;
030import cascading.flow.stream.duct.Duct;
031import cascading.pipe.Splice;
032import cascading.tuple.Tuple;
033import cascading.tuple.TupleEntry;
034import cascading.tuple.Tuples;
035
036/**
037 *
038 */
039public class MemoryCoGroupGate extends MemorySpliceGate
040  {
041  public MemoryCoGroupGate( FlowProcess flowProcess, Splice splice )
042    {
043    super( flowProcess, splice );
044    }
045
046  @Override
047  protected boolean isBlockingStreamed()
048    {
049    return true;
050    }
051
052  @Override
053  public void start( Duct previous )
054    {
055    }
056
057  @Override
058  public void receive( Duct previous, TupleEntry incomingEntry )
059    {
060    int pos = ordinalMap.get( previous );
061
062    Tuple valuesTuple = incomingEntry.getTupleCopy();
063    Tuple groupTuple = keyBuilder[ pos ].makeResult( valuesTuple, null ); // view on valuesTuple
064
065    groupTuple = getDelegatedTuple( groupTuple ); // wrap so hasher/comparator is honored
066
067    keys.add( groupTuple );
068    keyValues[ pos ].get( groupTuple ).add( valuesTuple );
069    }
070
071  @Override
072  public void complete( Duct previous )
073    {
074    if( count.decrementAndGet() != 0 )
075      return;
076
077    next.start( this );
078
079    Collection<Tuple>[] collections = new Collection[ keyValues.length ];
080    Iterator<Tuple> keyIterator = keys.iterator();
081
082    Set<Tuple> seenNulls = new HashSet<Tuple>();
083
084    while( keyIterator.hasNext() )
085      {
086      Tuple keysTuple = keyIterator.next();
087
088      keyIterator.remove();
089
090      // provides sql like semantics
091      if( nullsAreNotEqual && Tuples.frequency( keysTuple, null ) != 0 )
092        {
093        if( seenNulls.contains( keysTuple ) )
094          continue;
095
096        seenNulls.add( keysTuple );
097
098        for( int i = 0; i < keyValues.length; i++ )
099          {
100          Collection<Tuple> values = keyValues[ i ].remove( keysTuple );
101
102          if( values == null )
103            continue;
104
105          for( int j = 0; j < keyValues.length; j++ )
106            collections[ j ] = Collections.emptyList();
107
108          collections[ i ] = values;
109
110          push( collections, keysTuple );
111          }
112        }
113      else
114        {
115        // drain the keys and keyValues collections to preserve memory
116        for( int i = 0; i < keyValues.length; i++ )
117          {
118          collections[ i ] = keyValues[ i ].remove( keysTuple );
119
120          if( collections[ i ] == null )
121            collections[ i ] = Collections.emptyList();
122          }
123
124        push( collections, keysTuple );
125        }
126      }
127
128    keys = createKeySet();
129    keyValues = createKeyValuesArray();
130
131    count.set( numIncomingEventingPaths );
132
133    next.complete( this );
134    }
135
136  private void push( Collection<Tuple>[] collections, Tuple keysTuple )
137    {
138    closure.reset( collections );
139
140    keyEntry.setTuple( closure.getGroupTuple( keysTuple ) );
141
142    // create Closure type here
143    tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) );
144
145    next.receive( this, grouping );
146    }
147  }