001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe;
022    
023    import java.beans.ConstructorProperties;
024    
025    import cascading.pipe.joiner.Joiner;
026    import cascading.tuple.Fields;
027    
028    /**
029     * The HashJoin pipe allows for two or more tuple streams to join into a single stream via a {@link Joiner} when
030     * all but one tuple stream is considered small enough to fit into memory.
031     * <p/>
032     * When planned onto MapReduce, this is effectively a non-blocking "asymmetrical join" or "replicated join",
033     * where the left-most side will not block (accumulate into memory) in order to complete the join, but the right-most
034     * sides will. See below...
035     * <p/>
036     * No aggregations can be performed with a HashJoin pipe as there is no guarantee all value will be associated with
037     * a given grouping key. In fact, an Aggregator would see the same grouping many times with a partial set of values.
038     * <p/>
039     * For every incoming {@link Pipe} instance, a {@link Fields} instance must be specified that denotes the field names
040     * or positions that should be joined with the other given Pipe instances. If the incoming Pipe instances declare
041     * one or more field with the same name, the declaredFields must be given to name the outgoing Tuple stream fields
042     * to overcome field name collisions.
043     * <p/>
044     * By default HashJoin performs an inner join via the {@link cascading.pipe.joiner.InnerJoin}
045     * {@link cascading.pipe.joiner.Joiner} class.
046     * <p/>
047     * Self joins can be achieved by using a constructor that takes a single Pipe and a numSelfJoins value. A value of
048     * 1 for numSelfJoins will join the Pipe with itself once. Note that a self join will block until all data is accumulated
049     * thus the stream must be reasonably small.
050     * <p/>
051     * Note "outer" joins on the left most side will not behave as expected. All observed keys on the right most sides
052     * will be emitted with {@code null} for the left most stream, thus when running distributed, duplicate values will
053     * emerge from every Map task split on the MapReduce platform.
054     * <p/>
055     * HashJoin does not scale well to large data sizes and thus requires streams with more data on the left hand side to
056     * join with more sparse data on the right hand side. That is, always attempt to effect M x N joins where M is large
057     * and N is small, instead of where M is small and N is large. Right hand side streams will be accumulated, and
058     * spilled to disk if the collection reaches a specific threshold when using Hadoop.
059     * <p/>
060     * If spills are happening, consider increasing the spill thresholds, see {@link cascading.tuple.collect.SpillableTupleMap}.
061     * <p/>
062     * <p/>
063     * If one of the right hand side streams starts larger than memory but is filtered (likely by a
064     * {@link cascading.operation.Filter} implementation) down to the point it fits into memory, it may be useful to use
065     * a {@link Checkpoint} Pipe to persist the stream and force a new FlowStep (MapReduce job) to read the data from
066     * disk, instead of applying the filter redundantly. This will minimize the amount of data "replicated" across the
067     * network.
068     * <p/>
069     * See the {@link cascading.tuple.collect.TupleCollectionFactory} and {@link cascading.tuple.collect.TupleMapFactory} for a means
070     * to use alternative spillable types.
071     *
072     * @see cascading.pipe.joiner.InnerJoin
073     * @see cascading.pipe.joiner.OuterJoin
074     * @see cascading.pipe.joiner.LeftJoin
075     * @see cascading.pipe.joiner.RightJoin
076     * @see cascading.pipe.joiner.MixedJoin
077     * @see cascading.tuple.Fields
078     * @see cascading.tuple.collect.SpillableTupleMap
079     */
080    public class HashJoin extends Splice
081      {
082      /**
083       * Constructor HashJoin creates a new HashJoin instance.
084       *
085       * @param joinName
086       * @param lhs
087       * @param lhsJoinFields
088       * @param rhs
089       * @param rhsJoinFields
090       */
091      @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields"})
092      public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields )
093        {
094        super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null );
095        }
096    
097      /**
098       * Constructor HashJoin creates a new HashJoin instance.
099       *
100       * @param joinName
101       * @param lhs
102       * @param lhsJoinFields
103       * @param rhs
104       * @param rhsJoinFields
105       * @param joiner
106       */
107      @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "joiner"})
108      public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Joiner joiner )
109        {
110        super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null, joiner );
111        }
112    
113      /**
114       * Constructor HashJoin creates a new HashJoin instance.
115       *
116       * @param joinName
117       * @param lhs
118       * @param lhsJoinFields
119       * @param rhs
120       * @param rhsJoinFields
121       * @param declaredFields
122       */
123      @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields"})
124      public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields )
125        {
126        super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null );
127        }
128    
129      /**
130       * Constructor HashJoin creates a new HashJoin instance.
131       *
132       * @param joinName
133       * @param lhs
134       * @param lhsJoinFields
135       * @param rhs
136       * @param rhsJoinFields
137       * @param declaredFields
138       * @param joiner
139       */
140      @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields", "joiner"})
141      public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields, Joiner joiner )
142        {
143        super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null, joiner );
144        }
145    
146      /**
147       * Constructor HashJoin creates a new HashJoin instance.
148       *
149       * @param joinName
150       * @param pipe
151       * @param joinFields
152       * @param numSelfJoins
153       * @param declaredFields
154       * @param joiner
155       */
156      @ConstructorProperties({"joinName", "pipe", "joinFields", "numSelfJoins", "declaredFields", "joiner"})
157      public HashJoin( String joinName, Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields, Joiner joiner )
158        {
159        super( joinName, pipe, joinFields, numSelfJoins, declaredFields, joiner );
160        }
161    
162      /**
163       * Constructor HashJoin creates a new HashJoin instance.
164       *
165       * @param joinName
166       * @param pipe
167       * @param joinFields
168       * @param numSelfJoins
169       * @param declaredFields
170       */
171      @ConstructorProperties({"joinName", "pipe", "joinFields", "numSelfJoins", "declaredFields"})
172      public HashJoin( String joinName, Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields )
173        {
174        super( joinName, pipe, joinFields, numSelfJoins, declaredFields, null, null );
175        }
176    
177      /**
178       * Constructor HashJoin creates a new HashJoin instance.
179       *
180       * @param joinName
181       * @param pipe
182       * @param joinFields
183       * @param numSelfJoins
184       * @param joiner
185       */
186      @ConstructorProperties({"joinName", "pipe", "joinFields", "numSelfJoins", "joiner"})
187      public HashJoin( String joinName, Pipe pipe, Fields joinFields, int numSelfJoins, Joiner joiner )
188        {
189        super( joinName, pipe, joinFields, numSelfJoins, null, joiner );
190        }
191    
192      /**
193       * Constructor HashJoin creates a new HashJoin instance.
194       *
195       * @param joinName
196       * @param pipes
197       * @param joinFields
198       * @param declaredFields
199       * @param joiner
200       */
201      @ConstructorProperties({"joinName", "pipes", "joinFields", "declaredFields", "joiner"})
202      public HashJoin( String joinName, Pipe[] pipes, Fields[] joinFields, Fields declaredFields, Joiner joiner )
203        {
204        super( joinName, pipes, joinFields, declaredFields, null, joiner );
205        }
206    
207      /**
208       * Constructor HashJoin creates a new HashJoin instance.
209       *
210       * @param lhs
211       * @param lhsJoinFields
212       * @param rhs
213       * @param rhsJoinFields
214       */
215      @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields"})
216      public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields )
217        {
218        super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null );
219        }
220    
221      /**
222       * Constructor HashJoin creates a new HashJoin instance.
223       *
224       * @param lhs
225       * @param lhsJoinFields
226       * @param rhs
227       * @param rhsJoinFields
228       * @param joiner
229       */
230      @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "joiner"})
231      public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Joiner joiner )
232        {
233        super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null, joiner );
234        }
235    
236      /**
237       * Constructor HashJoin creates a new HashJoin instance.
238       *
239       * @param lhs
240       * @param lhsJoinFields
241       * @param rhs
242       * @param rhsJoinFields
243       * @param declaredFields
244       */
245      @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields"})
246      public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields )
247        {
248        super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null );
249        }
250    
251      /**
252       * Constructor HashJoin creates a new HashJoin instance.
253       *
254       * @param lhs
255       * @param lhsJoinFields
256       * @param rhs
257       * @param rhsJoinFields
258       * @param declaredFields
259       * @param joiner
260       */
261      @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields", "joiner"})
262      public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields, Joiner joiner )
263        {
264        super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null, joiner );
265        }
266    
267      /**
268       * Constructor HashJoin creates a new HashJoin instance.
269       *
270       * @param pipe
271       * @param joinFields
272       * @param numSelfJoins
273       * @param declaredFields
274       * @param joiner
275       */
276      @ConstructorProperties({"pipe", "joinFields", "numSelfJoins", "declaredFields", "joiner"})
277      public HashJoin( Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields, Joiner joiner )
278        {
279        super( null, pipe, joinFields, numSelfJoins, declaredFields, joiner );
280        }
281    
282      /**
283       * Constructor HashJoin creates a new HashJoin instance.
284       *
285       * @param pipe
286       * @param joinFields
287       * @param numSelfJoins
288       * @param declaredFields
289       */
290      @ConstructorProperties({"pipe", "joinFields", "numSelfJoins", "declaredFields"})
291      public HashJoin( Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields )
292        {
293        super( null, pipe, joinFields, numSelfJoins, declaredFields );
294        }
295    
296      /**
297       * Constructor HashJoin creates a new HashJoin instance.
298       *
299       * @param pipe
300       * @param joinFields
301       * @param numSelfJoins
302       * @param joiner
303       */
304      @ConstructorProperties({"pipe", "joinFields", "numSelfJoins", "joiner"})
305      public HashJoin( Pipe pipe, Fields joinFields, int numSelfJoins, Joiner joiner )
306        {
307        super( null, pipe, joinFields, numSelfJoins, null, joiner );
308        }
309    
310      /**
311       * Constructor HashJoin creates a new Join instance.
312       *
313       * @param pipes
314       * @param joinFields
315       * @param declaredFields
316       * @param joiner
317       */
318      @ConstructorProperties({"pipes", "joinFields", "declaredFields", "joiner"})
319      public HashJoin( Pipe[] pipes, Fields[] joinFields, Fields declaredFields, Joiner joiner )
320        {
321        super( null, pipes, joinFields, declaredFields, null, joiner );
322        }
323      }