001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.util.Collection;
024    import java.util.Collections;
025    import java.util.HashSet;
026    import java.util.Iterator;
027    import java.util.Set;
028    
029    import cascading.flow.FlowProcess;
030    import cascading.pipe.Splice;
031    import cascading.tuple.Tuple;
032    import cascading.tuple.TupleEntry;
033    import cascading.tuple.Tuples;
034    
035    /**
036     *
037     */
038    public class MemoryCoGroupGate extends MemorySpliceGate
039      {
040      public MemoryCoGroupGate( FlowProcess flowProcess, Splice splice )
041        {
042        super( flowProcess, splice );
043        }
044    
045      @Override
046      protected boolean isBlockingStreamed()
047        {
048        return true;
049        }
050    
051      @Override
052      public void start( Duct previous )
053        {
054        }
055    
056      @Override
057      public void receive( Duct previous, TupleEntry incomingEntry )
058        {
059        int pos = posMap.get( previous );
060    
061        Tuple valuesTuple = incomingEntry.getTupleCopy();
062        Tuple groupTuple = keyBuilder[ pos ].makeResult( valuesTuple, null ); // view on valuesTuple
063    
064        groupTuple = getDelegatedTuple( groupTuple ); // wrap so hasher/comparator is honored
065    
066        keys.add( groupTuple );
067        keyValues[ pos ].get( groupTuple ).add( valuesTuple );
068        }
069    
070      @Override
071      public void complete( Duct previous )
072        {
073        if( count.decrementAndGet() != 0 )
074          return;
075    
076        next.start( this );
077    
078        Collection<Tuple>[] collections = new Collection[ orderedPrevious.length ];
079        Iterator<Tuple> keyIterator = keys.iterator();
080    
081        Set<Tuple> seenNulls = new HashSet<Tuple>();
082    
083        while( keyIterator.hasNext() )
084          {
085          Tuple keysTuple = keyIterator.next();
086    
087          keyIterator.remove();
088    
089          // provides sql like semantics
090          if( nullsAreNotEqual && Tuples.frequency( keysTuple, null ) != 0 )
091            {
092            if( seenNulls.contains( keysTuple ) )
093              continue;
094    
095            seenNulls.add( keysTuple );
096    
097            for( int i = 0; i < keyValues.length; i++ )
098              {
099              Collection<Tuple> values = keyValues[ i ].remove( keysTuple );
100    
101              if( values == null )
102                continue;
103    
104              for( int j = 0; j < keyValues.length; j++ )
105                collections[ j ] = Collections.EMPTY_LIST;
106    
107              collections[ i ] = values;
108    
109              push( collections, keysTuple );
110              }
111            }
112          else
113            {
114            // drain the keys and keyValues collections to preserve memory
115            for( int i = 0; i < keyValues.length; i++ )
116              {
117              collections[ i ] = keyValues[ i ].remove( keysTuple );
118    
119              if( collections[ i ] == null )
120                collections[ i ] = Collections.EMPTY_LIST;
121              }
122    
123            push( collections, keysTuple );
124            }
125          }
126    
127        keys = createKeySet();
128        keyValues = createKeyValuesArray();
129    
130        count.set( numIncomingPaths );
131    
132        next.complete( this );
133        }
134    
135      private void push( Collection<Tuple>[] collections, Tuple keysTuple )
136        {
137        closure.reset( collections );
138    
139        keyEntry.setTuple( closure.getGroupTuple( keysTuple ) );
140    
141        // create Closure type here
142        tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) );
143    
144        next.receive( this, grouping );
145        }
146      }