001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.stream.element; 022 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.concurrent.CountDownLatch; 028 029import cascading.flow.FlowProcess; 030import cascading.flow.stream.duct.Duct; 031import cascading.flow.stream.graph.StreamGraph; 032import cascading.pipe.HashJoin; 033import cascading.tuple.Tuple; 034import cascading.tuple.TupleEntry; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * 040 */ 041public class MemoryHashJoinGate extends MemorySpliceGate 042 { 043 private static final Logger LOG = LoggerFactory.getLogger( MemoryHashJoinGate.class ); 044 045 protected CountDownLatch latch; 046 047 private Collection<Tuple>[] collections; 048 private ArrayList<Tuple> streamedCollection; 049 050 public MemoryHashJoinGate( FlowProcess flowProcess, HashJoin join ) 051 { 052 super( flowProcess, join ); 053 } 054 055 @Override 056 public void bind( StreamGraph streamGraph ) 057 { 058 super.bind( streamGraph ); 059 060 count.set( numIncomingEventingPaths ); // the number of paths incoming 061 latch = new CountDownLatch( numIncomingEventingPaths - 1 ); 062 } 063 064 @Override 065 public void prepare() 066 { 067 super.prepare(); 068 069 streamedCollection = new ArrayList<Tuple>( Arrays.asList( new Tuple() ) ); // placeholder in collection 070 collections = new Collection[ getNumDeclaredIncomingBranches() ]; 071 collections[ 0 ] = streamedCollection; 072 073 if( nullsAreNotEqual ) 074 LOG.warn( "HashJoin does not fully support key comparators where null values are not treated equal" ); 075 } 076 077 @Override 078 public void receive( Duct previous, TupleEntry incomingEntry ) 079 { 080 int pos = ordinalMap.get( previous ); 081 082 Tuple incomingTuple = pos != 0 ? incomingEntry.getTupleCopy() : incomingEntry.getTuple(); 083 Tuple keyTuple = keyBuilder[ pos ].makeResult( incomingTuple, null ); // view in incomingTuple 084 085 keyTuple = getDelegatedTuple( keyTuple ); 086 087 if( pos != 0 ) 088 { 089 keys.add( keyTuple ); 090 keyValues[ pos ].get( keyTuple ).add( incomingTuple ); // always a copy 091 return; 092 } 093 094 waitOnLatch(); 095 096 keys.remove( keyTuple ); 097 098 streamedCollection.set( 0, incomingTuple ); // no need to copy, temp setting 099 100 performJoinWith( keyTuple ); 101 } 102 103 private void performJoinWith( Tuple keyTuple ) 104 { 105 // never replace the first array, pos == 0 106 for( int i = 1; i < keyValues.length; i++ ) 107 { 108 // if key does not exist, #get will create an empty array list, 109 // and store the key, which is not a copy 110 if( keyValues[ i ].containsKey( keyTuple ) ) 111 collections[ i ] = keyValues[ i ].get( keyTuple ); 112 else 113 collections[ i ] = Collections.EMPTY_LIST; 114 } 115 116 closure.reset( collections ); 117 118 keyEntry.setTuple( keyTuple ); 119 tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) ); 120 121 next.receive( this, grouping ); 122 } 123 124 @Override 125 public void complete( Duct previous ) 126 { 127 countDownLatch(); 128 129 if( count.decrementAndGet() != 0 ) 130 return; 131 132 collections[ 0 ] = Collections.EMPTY_LIST; 133 134 for( Tuple keyTuple : keys ) 135 performJoinWith( keyTuple ); 136 137 keys = createKeySet(); 138 keyValues = createKeyValuesArray(); 139 140 super.complete( previous ); 141 } 142 143 protected void waitOnLatch() 144 { 145 try 146 { 147 latch.await(); 148 } 149 catch( InterruptedException exception ) 150 { 151 throw new RuntimeException( "interrupted", exception ); 152 } 153 } 154 155 protected void countDownLatch() 156 { 157 latch.countDown(); 158 } 159 160 @Override 161 protected boolean isBlockingStreamed() 162 { 163 return false; 164 } 165 }