001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.util.ArrayList;
024    import java.util.Arrays;
025    import java.util.Collection;
026    import java.util.Collections;
027    import java.util.concurrent.CountDownLatch;
028    
029    import cascading.flow.FlowProcess;
030    import cascading.pipe.HashJoin;
031    import cascading.tuple.Tuple;
032    import cascading.tuple.TupleEntry;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     *
038     */
039    public class MemoryHashJoinGate extends MemorySpliceGate
040      {
041      private static final Logger LOG = LoggerFactory.getLogger( MemoryHashJoinGate.class );
042    
043      protected CountDownLatch latch;
044    
045      private Collection<Tuple>[] collections;
046      private ArrayList<Tuple> streamedCollection;
047    
048      public MemoryHashJoinGate( FlowProcess flowProcess, HashJoin join )
049        {
050        super( flowProcess, join );
051        }
052    
053      @Override
054      public void bind( StreamGraph streamGraph )
055        {
056        super.bind( streamGraph );
057    
058        count.set( numIncomingPaths ); // the number of paths incoming
059        latch = new CountDownLatch( numIncomingPaths - 1 );
060        }
061    
062      @Override
063      public void prepare()
064        {
065        super.prepare();
066    
067        streamedCollection = new ArrayList<Tuple>( Arrays.asList( new Tuple() ) ); // placeholder in collection
068        collections = new Collection[ orderedPrevious.length ];
069        collections[ 0 ] = streamedCollection;
070    
071        if( nullsAreNotEqual )
072          LOG.warn( "HashJoin does not fully support key comparators where null values are not treated equal" );
073        }
074    
075      @Override
076      public void receive( Duct previous, TupleEntry incomingEntry )
077        {
078        int pos = posMap.get( previous );
079    
080        Tuple incomingTuple = pos != 0 ? incomingEntry.getTupleCopy() : incomingEntry.getTuple();
081        Tuple keyTuple = keyBuilder[ pos ].makeResult( incomingTuple, null ); // view in incomingTuple
082    
083        keyTuple = getDelegatedTuple( keyTuple );
084    
085        if( pos != 0 )
086          {
087          keys.add( keyTuple );
088          keyValues[ pos ].get( keyTuple ).add( incomingTuple ); // always a copy
089          return;
090          }
091    
092        waitOnLatch();
093    
094        keys.remove( keyTuple );
095    
096        streamedCollection.set( 0, incomingTuple ); // no need to copy, temp setting
097    
098        performJoinWith( keyTuple );
099        }
100    
101      private void performJoinWith( Tuple keyTuple )
102        {
103        // never replace the first array, pos == 0
104        for( int i = 1; i < keyValues.length; i++ )
105          {
106          // if key does not exist, #get will create an empty array list,
107          // and store the key, which is not a copy
108          if( keyValues[ i ].containsKey( keyTuple ) )
109            collections[ i ] = keyValues[ i ].get( keyTuple );
110          else
111            collections[ i ] = Collections.EMPTY_LIST;
112          }
113    
114        closure.reset( collections );
115    
116        keyEntry.setTuple( keyTuple );
117        tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) );
118    
119        next.receive( this, grouping );
120        }
121    
122      @Override
123      public void complete( Duct previous )
124        {
125        countDownLatch();
126    
127        if( count.decrementAndGet() != 0 )
128          return;
129    
130        collections[ 0 ] = Collections.EMPTY_LIST;
131    
132        for( Tuple keyTuple : keys )
133          performJoinWith( keyTuple );
134    
135        keys = createKeySet();
136        keyValues = createKeyValuesArray();
137    
138        super.complete( previous );
139        }
140    
141      protected void waitOnLatch()
142        {
143        try
144          {
145          latch.await();
146          }
147        catch( InterruptedException exception )
148          {
149          throw new RuntimeException( "interrupted", exception );
150          }
151        }
152    
153      protected void countDownLatch()
154        {
155        latch.countDown();
156        }
157    
158      @Override
159      protected boolean isBlockingStreamed()
160        {
161        return false;
162        }
163      }