001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe;
022
023import java.beans.ConstructorProperties;
024
025import cascading.pipe.joiner.Joiner;
026import cascading.tuple.Fields;
027
028/**
029 * The HashJoin pipe allows for two or more tuple streams to join into a single stream via a {@link Joiner} when
030 * all but one tuple stream is considered small enough to fit into memory.
031 * <p/>
032 * When planned onto MapReduce, this is effectively a non-blocking "asymmetrical join" or "replicated join",
033 * where the left-most side will not block (accumulate into memory) in order to complete the join, but the right-most
034 * sides will. See below...
035 * <p/>
036 * No aggregations can be performed with a HashJoin pipe as there is no guarantee all value will be associated with
037 * a given grouping key. In fact, an Aggregator would see the same grouping many times with a partial set of values.
038 * <p/>
039 * For every incoming {@link Pipe} instance, a {@link Fields} instance must be specified that denotes the field names
040 * or positions that should be joined with the other given Pipe instances. If the incoming Pipe instances declare
041 * one or more field with the same name, the declaredFields must be given to name the outgoing Tuple stream fields
042 * to overcome field name collisions.
043 * <p/>
044 * By default HashJoin performs an inner join via the {@link cascading.pipe.joiner.InnerJoin}
045 * {@link cascading.pipe.joiner.Joiner} class.
046 * <p/>
047 * Self joins can be achieved by using a constructor that takes a single Pipe and a numSelfJoins value. A value of
048 * 1 for numSelfJoins will join the Pipe with itself once. Note that a self join will block until all data is accumulated
049 * thus the stream must be reasonably small.
050 * <p/>
051 * Note "outer" joins on the left most side will not behave as expected. All observed keys on the right most sides
052 * will be emitted with {@code null} for the left most stream, thus when running distributed, duplicate values will
053 * emerge from every Map task split on the MapReduce platform.
054 * <p/>
055 * HashJoin does not scale well to large data sizes and thus requires streams with more data on the left hand side to
056 * join with more sparse data on the right hand side. That is, always attempt to effect M x N joins where M is large
057 * and N is small, instead of where M is small and N is large. Right hand side streams will be accumulated, and
058 * spilled to disk if the collection reaches a specific threshold when using Hadoop.
059 * <p/>
060 * If spills are happening, consider increasing the spill thresholds, see {@link cascading.tuple.collect.SpillableTupleMap}.
061 * <p/>
062 * <p/>
063 * If one of the right hand side streams starts larger than memory but is filtered (likely by a
064 * {@link cascading.operation.Filter} implementation) down to the point it fits into memory, it may be useful to use
065 * a {@link Checkpoint} Pipe to persist the stream and force a new FlowStep (MapReduce job) to read the data from
066 * disk, instead of applying the filter redundantly. This will minimize the amount of data "replicated" across the
067 * network.
068 * <p/>
069 * See the {@link cascading.tuple.collect.TupleCollectionFactory} and {@link cascading.tuple.collect.TupleMapFactory} for a means
070 * to use alternative spillable types.
071 *
072 * @see cascading.pipe.joiner.InnerJoin
073 * @see cascading.pipe.joiner.OuterJoin
074 * @see cascading.pipe.joiner.LeftJoin
075 * @see cascading.pipe.joiner.RightJoin
076 * @see cascading.pipe.joiner.MixedJoin
077 * @see cascading.tuple.Fields
078 * @see cascading.tuple.collect.SpillableTupleMap
079 */
080public class HashJoin extends Splice
081  {
082  /**
083   * Constructor HashJoin creates a new HashJoin instance.
084   *
085   * @param joinName
086   * @param lhs
087   * @param lhsJoinFields
088   * @param rhs
089   * @param rhsJoinFields
090   */
091  @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields"})
092  public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields )
093    {
094    super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null );
095    }
096
097  /**
098   * Constructor HashJoin creates a new HashJoin instance.
099   *
100   * @param joinName
101   * @param lhs
102   * @param lhsJoinFields
103   * @param rhs
104   * @param rhsJoinFields
105   * @param joiner
106   */
107  @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "joiner"})
108  public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Joiner joiner )
109    {
110    super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null, joiner );
111    }
112
113  /**
114   * Constructor HashJoin creates a new HashJoin instance.
115   *
116   * @param joinName
117   * @param lhs
118   * @param lhsJoinFields
119   * @param rhs
120   * @param rhsJoinFields
121   * @param declaredFields
122   */
123  @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields"})
124  public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields )
125    {
126    super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null );
127    }
128
129  /**
130   * Constructor HashJoin creates a new HashJoin instance.
131   *
132   * @param joinName
133   * @param lhs
134   * @param lhsJoinFields
135   * @param rhs
136   * @param rhsJoinFields
137   * @param declaredFields
138   * @param joiner
139   */
140  @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields", "joiner"})
141  public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields, Joiner joiner )
142    {
143    super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null, joiner );
144    }
145
146  /**
147   * Constructor HashJoin creates a new HashJoin instance.
148   *
149   * @param joinName
150   * @param pipe
151   * @param joinFields
152   * @param numSelfJoins
153   * @param declaredFields
154   * @param joiner
155   */
156  @ConstructorProperties({"joinName", "pipe", "joinFields", "numSelfJoins", "declaredFields", "joiner"})
157  public HashJoin( String joinName, Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields, Joiner joiner )
158    {
159    super( joinName, pipe, joinFields, numSelfJoins, declaredFields, joiner );
160    }
161
162  /**
163   * Constructor HashJoin creates a new HashJoin instance.
164   *
165   * @param joinName
166   * @param pipe
167   * @param joinFields
168   * @param numSelfJoins
169   * @param declaredFields
170   */
171  @ConstructorProperties({"joinName", "pipe", "joinFields", "numSelfJoins", "declaredFields"})
172  public HashJoin( String joinName, Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields )
173    {
174    super( joinName, pipe, joinFields, numSelfJoins, declaredFields, null, null );
175    }
176
177  /**
178   * Constructor HashJoin creates a new HashJoin instance.
179   *
180   * @param joinName
181   * @param pipe
182   * @param joinFields
183   * @param numSelfJoins
184   * @param joiner
185   */
186  @ConstructorProperties({"joinName", "pipe", "joinFields", "numSelfJoins", "joiner"})
187  public HashJoin( String joinName, Pipe pipe, Fields joinFields, int numSelfJoins, Joiner joiner )
188    {
189    super( joinName, pipe, joinFields, numSelfJoins, null, joiner );
190    }
191
192  /**
193   * Constructor HashJoin creates a new HashJoin instance.
194   *
195   * @param joinName
196   * @param pipes
197   * @param joinFields
198   * @param declaredFields
199   * @param joiner
200   */
201  @ConstructorProperties({"joinName", "pipes", "joinFields", "declaredFields", "joiner"})
202  public HashJoin( String joinName, Pipe[] pipes, Fields[] joinFields, Fields declaredFields, Joiner joiner )
203    {
204    super( joinName, pipes, joinFields, declaredFields, null, joiner );
205    }
206
207  /**
208   * Constructor HashJoin creates a new HashJoin instance.
209   *
210   * @param lhs
211   * @param lhsJoinFields
212   * @param rhs
213   * @param rhsJoinFields
214   */
215  @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields"})
216  public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields )
217    {
218    super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null );
219    }
220
221  /**
222   * Constructor HashJoin creates a new HashJoin instance.
223   *
224   * @param lhs
225   * @param lhsJoinFields
226   * @param rhs
227   * @param rhsJoinFields
228   * @param joiner
229   */
230  @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "joiner"})
231  public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Joiner joiner )
232    {
233    super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null, joiner );
234    }
235
236  /**
237   * Constructor HashJoin creates a new HashJoin instance.
238   *
239   * @param lhs
240   * @param lhsJoinFields
241   * @param rhs
242   * @param rhsJoinFields
243   * @param declaredFields
244   */
245  @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields"})
246  public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields )
247    {
248    super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null );
249    }
250
251  /**
252   * Constructor HashJoin creates a new HashJoin instance.
253   *
254   * @param lhs
255   * @param lhsJoinFields
256   * @param rhs
257   * @param rhsJoinFields
258   * @param declaredFields
259   * @param joiner
260   */
261  @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields", "joiner"})
262  public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields, Joiner joiner )
263    {
264    super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null, joiner );
265    }
266
267  /**
268   * Constructor HashJoin creates a new HashJoin instance.
269   *
270   * @param pipe
271   * @param joinFields
272   * @param numSelfJoins
273   * @param declaredFields
274   * @param joiner
275   */
276  @ConstructorProperties({"pipe", "joinFields", "numSelfJoins", "declaredFields", "joiner"})
277  public HashJoin( Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields, Joiner joiner )
278    {
279    super( null, pipe, joinFields, numSelfJoins, declaredFields, joiner );
280    }
281
282  /**
283   * Constructor HashJoin creates a new HashJoin instance.
284   *
285   * @param pipe
286   * @param joinFields
287   * @param numSelfJoins
288   * @param declaredFields
289   */
290  @ConstructorProperties({"pipe", "joinFields", "numSelfJoins", "declaredFields"})
291  public HashJoin( Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields )
292    {
293    super( null, pipe, joinFields, numSelfJoins, declaredFields );
294    }
295
296  /**
297   * Constructor HashJoin creates a new HashJoin instance.
298   *
299   * @param pipe
300   * @param joinFields
301   * @param numSelfJoins
302   * @param joiner
303   */
304  @ConstructorProperties({"pipe", "joinFields", "numSelfJoins", "joiner"})
305  public HashJoin( Pipe pipe, Fields joinFields, int numSelfJoins, Joiner joiner )
306    {
307    super( null, pipe, joinFields, numSelfJoins, null, joiner );
308    }
309
310  /**
311   * Constructor HashJoin creates a new Join instance.
312   *
313   * @param pipes
314   * @param joinFields
315   * @param declaredFields
316   * @param joiner
317   */
318  @ConstructorProperties({"pipes", "joinFields", "declaredFields", "joiner"})
319  public HashJoin( Pipe[] pipes, Fields[] joinFields, Fields declaredFields, Joiner joiner )
320    {
321    super( null, pipes, joinFields, declaredFields, null, joiner );
322    }
323  }