001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.concurrent.CountDownLatch;
028
029import cascading.flow.FlowProcess;
030import cascading.flow.stream.duct.Duct;
031import cascading.flow.stream.graph.StreamGraph;
032import cascading.pipe.HashJoin;
033import cascading.tuple.Tuple;
034import cascading.tuple.TupleEntry;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 *
040 */
041public class MemoryHashJoinGate extends MemorySpliceGate
042  {
043  private static final Logger LOG = LoggerFactory.getLogger( MemoryHashJoinGate.class );
044
045  protected CountDownLatch latch;
046
047  private Collection<Tuple>[] collections;
048  private ArrayList<Tuple> streamedCollection;
049
050  public MemoryHashJoinGate( FlowProcess flowProcess, HashJoin join )
051    {
052    super( flowProcess, join );
053    }
054
055  @Override
056  public void bind( StreamGraph streamGraph )
057    {
058    super.bind( streamGraph );
059
060    count.set( numIncomingEventingPaths ); // the number of paths incoming
061    latch = new CountDownLatch( numIncomingEventingPaths - 1 );
062    }
063
064  @Override
065  public void prepare()
066    {
067    super.prepare();
068
069    streamedCollection = new ArrayList<Tuple>( Arrays.asList( new Tuple() ) ); // placeholder in collection
070    collections = new Collection[ getNumDeclaredIncomingBranches() ];
071    collections[ 0 ] = streamedCollection;
072
073    if( nullsAreNotEqual )
074      LOG.warn( "HashJoin does not fully support key comparators where null values are not treated equal" );
075    }
076
077  @Override
078  public void receive( Duct previous, TupleEntry incomingEntry )
079    {
080    int pos = ordinalMap.get( previous );
081
082    Tuple incomingTuple = pos != 0 ? incomingEntry.getTupleCopy() : incomingEntry.getTuple();
083    Tuple keyTuple = keyBuilder[ pos ].makeResult( incomingTuple, null ); // view in incomingTuple
084
085    keyTuple = getDelegatedTuple( keyTuple );
086
087    if( pos != 0 )
088      {
089      keys.add( keyTuple );
090      keyValues[ pos ].get( keyTuple ).add( incomingTuple ); // always a copy
091      return;
092      }
093
094    waitOnLatch();
095
096    keys.remove( keyTuple );
097
098    streamedCollection.set( 0, incomingTuple ); // no need to copy, temp setting
099
100    performJoinWith( keyTuple );
101    }
102
103  private void performJoinWith( Tuple keyTuple )
104    {
105    // never replace the first array, pos == 0
106    for( int i = 1; i < keyValues.length; i++ )
107      {
108      // if key does not exist, #get will create an empty array list,
109      // and store the key, which is not a copy
110      if( keyValues[ i ].containsKey( keyTuple ) )
111        collections[ i ] = keyValues[ i ].get( keyTuple );
112      else
113        collections[ i ] = Collections.EMPTY_LIST;
114      }
115
116    closure.reset( collections );
117
118    keyEntry.setTuple( keyTuple );
119    tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) );
120
121    next.receive( this, grouping );
122    }
123
124  @Override
125  public void complete( Duct previous )
126    {
127    countDownLatch();
128
129    if( count.decrementAndGet() != 0 )
130      return;
131
132    collections[ 0 ] = Collections.EMPTY_LIST;
133
134    for( Tuple keyTuple : keys )
135      performJoinWith( keyTuple );
136
137    keys = createKeySet();
138    keyValues = createKeyValuesArray();
139
140    super.complete( previous );
141    }
142
143  protected void waitOnLatch()
144    {
145    try
146      {
147      latch.await();
148      }
149    catch( InterruptedException exception )
150      {
151      throw new RuntimeException( "interrupted", exception );
152      }
153    }
154
155  protected void countDownLatch()
156    {
157    latch.countDown();
158    }
159
160  @Override
161  protected boolean isBlockingStreamed()
162    {
163    return false;
164    }
165  }