001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading;
022
023import java.io.Serializable;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.Comparator;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032
033import cascading.flow.Flow;
034import cascading.flow.FlowDef;
035import cascading.flow.FlowStep;
036import cascading.flow.planner.graph.ElementGraph;
037import cascading.operation.Aggregator;
038import cascading.operation.Function;
039import cascading.operation.Identity;
040import cascading.operation.aggregator.Count;
041import cascading.operation.aggregator.First;
042import cascading.operation.expression.ExpressionFunction;
043import cascading.operation.regex.RegexFilter;
044import cascading.operation.regex.RegexSplitter;
045import cascading.pipe.Checkpoint;
046import cascading.pipe.CoGroup;
047import cascading.pipe.Each;
048import cascading.pipe.Every;
049import cascading.pipe.GroupBy;
050import cascading.pipe.HashJoin;
051import cascading.pipe.Merge;
052import cascading.pipe.Pipe;
053import cascading.pipe.joiner.InnerJoin;
054import cascading.pipe.joiner.Joiner;
055import cascading.pipe.joiner.LeftJoin;
056import cascading.pipe.joiner.MixedJoin;
057import cascading.pipe.joiner.OuterJoin;
058import cascading.pipe.joiner.RightJoin;
059import cascading.tap.SinkMode;
060import cascading.tap.Tap;
061import cascading.tuple.Fields;
062import cascading.tuple.Hasher;
063import cascading.tuple.Tuple;
064import org.junit.Test;
065
066import static data.InputData.*;
067
068public class JoinFieldedPipesPlatformTest extends PlatformTestCase
069  {
070  public JoinFieldedPipesPlatformTest()
071    {
072    super( true, 4, 1 ); // leave cluster testing enabled
073    }
074
075  @Test
076  public void testCross() throws Exception
077    {
078    getPlatform().copyFromLocal( inputFileLhs );
079    getPlatform().copyFromLocal( inputFileRhs );
080
081    Map sources = new HashMap();
082
083    sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) );
084    sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) );
085
086    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE );
087
088    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
089    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
090
091    Pipe cross = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
092
093    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross );
094
095    flow.complete();
096
097    validateLength( flow, 37, null );
098
099    List<Tuple> values = getSinkAsList( flow );
100
101    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
102    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
103    }
104
105  @Test
106  public void testJoin() throws Exception
107    {
108    getPlatform().copyFromLocal( inputFileLower );
109    getPlatform().copyFromLocal( inputFileUpper );
110
111    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
112    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
113
114    Map sources = new HashMap();
115
116    sources.put( "lower", sourceLower );
117    sources.put( "upper", sourceUpper );
118
119    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );
120
121    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
122
123    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
124    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
125
126    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
127
128    Map<Object, Object> properties = getProperties();
129
130    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
131
132    flow.complete();
133
134    validateLength( flow, 5 );
135
136    List<Tuple> values = getSinkAsList( flow );
137
138    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
139    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
140    }
141
142  @Test
143  public void testJoinSamePipeName() throws Exception
144    {
145    getPlatform().copyFromLocal( inputFileLower );
146    getPlatform().copyFromLocal( inputFileUpper );
147
148    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
149    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
150
151    Map sources = new HashMap();
152
153    sources.put( "lower", sourceLower );
154    sources.put( "upper", sourceUpper );
155
156    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE );
157
158    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
159
160    Pipe pipeLower = new Pipe( "lower" );
161    Pipe pipeUpper = new Pipe( "upper" );
162
163    // these pipes will hide the source name, and could cause one to be lost
164    pipeLower = new Pipe( "same", pipeLower );
165    pipeUpper = new Pipe( "same", pipeUpper );
166
167    pipeLower = new Each( pipeLower, new Fields( "line" ), splitter );
168    pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter );
169
170//    pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
171//    pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
172
173    pipeLower = new Pipe( "left", pipeLower );
174    pipeUpper = new Pipe( "right", pipeUpper );
175
176//    pipeLower = new Each( pipeLower, new Debug( true ) );
177//    pipeUpper = new Each( pipeUpper, new Debug( true ) );
178
179    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
180
181//    splice = new Each( splice, new Debug( true ) );
182    splice = new Pipe( "splice", splice );
183    splice = new Pipe( "tail", splice );
184
185    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
186
187    flow.complete();
188
189    validateLength( flow, 5 );
190
191    List<Tuple> values = getSinkAsList( flow );
192
193    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
194    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
195    }
196
197  @Test
198  public void testJoinWithUnknowns() throws Exception
199    {
200    getPlatform().copyFromLocal( inputFileLower );
201    getPlatform().copyFromLocal( inputFileUpper );
202
203    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
204    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
205
206    Map sources = new HashMap();
207
208    sources.put( "lower", sourceLower );
209    sources.put( "upper", sourceUpper );
210
211    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE );
212
213    Function splitter = new RegexSplitter( Fields.UNKNOWN, " " );
214
215    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
216    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
217
218    Pipe splice = new HashJoin( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) );
219
220    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
221
222    flow.complete();
223
224    validateLength( flow, 5 );
225
226    List<Tuple> values = getSinkAsList( flow );
227
228    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
229    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
230    }
231
232  /**
233   * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with
234   * a new stream using an outerjoin.
235   *
236   * @throws Exception
237   */
238  @Test
239  public void testJoinFilteredBranch() throws Exception
240    {
241    getPlatform().copyFromLocal( inputFileLower );
242    getPlatform().copyFromLocal( inputFileUpper );
243
244    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
245    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
246
247    Map sources = new HashMap();
248
249    sources.put( "lower", sourceLower );
250    sources.put( "upper", sourceUpper );
251
252    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinfilteredbranch" ), SinkMode.REPLACE );
253
254    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
255
256    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
257    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
258    pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all
259    pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
260
261    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() );
262
263    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
264
265    flow.complete();
266
267    validateLength( flow, 5 );
268
269    List<Tuple> values = getSinkAsList( flow );
270
271    assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) );
272    assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) );
273    }
274
275  @Test
276  public void testJoinSelf() throws Exception
277    {
278    getPlatform().copyFromLocal( inputFileLower );
279
280    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
281    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
282
283    Map sources = new HashMap();
284
285    sources.put( "lower", sourceLower );
286    sources.put( "upper", sourceUpper );
287
288    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinself" ), SinkMode.REPLACE );
289
290    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
291
292    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
293    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
294
295    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
296
297    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
298
299    flow.complete();
300
301    validateLength( flow, 5 );
302
303    List<Tuple> values = getSinkAsList( flow );
304
305    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
306    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
307    }
308
309  /**
310   * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join
311   *
312   * @throws Exception when
313   */
314  @Test
315  public void testJoinAfterEvery() throws Exception
316    {
317    getPlatform().copyFromLocal( inputFileLower );
318    getPlatform().copyFromLocal( inputFileUpper );
319
320    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
321    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
322
323    Map sources = new HashMap();
324
325    sources.put( "lower", sourceLower );
326    sources.put( "upper", sourceUpper );
327
328    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE );
329
330    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
331
332    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
333    pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
334    pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL );
335
336    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
337    pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
338    pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL );
339
340    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
341
342    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
343
344    flow.complete();
345
346    validateLength( flow, 5, null );
347
348    List<Tuple> values = getSinkAsList( flow );
349
350    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
351    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
352    }
353
354  @Test
355  public void testJoinInnerSingleField() throws Exception
356    {
357    getPlatform().copyFromLocal( inputFileLowerOffset );
358    getPlatform().copyFromLocal( inputFileUpper );
359
360    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
361    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
362
363    Map sources = new HashMap();
364
365    sources.put( "lower", sourceLower );
366    sources.put( "upper", sourceUpper );
367
368    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joininnersingle" ), SinkMode.REPLACE );
369
370    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) );
371    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) );
372
373    Pipe join = new HashJoin( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) );
374
375    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
376
377    flow.complete();
378
379    validateLength( flow, 3, null );
380
381    Set<Tuple> results = new HashSet<Tuple>();
382
383    results.add( new Tuple( "1\t1" ) );
384    results.add( new Tuple( "5\t5" ) );
385
386    List<Tuple> actual = getSinkAsList( flow );
387
388    results.removeAll( actual );
389
390    assertEquals( 0, results.size() );
391    }
392
393  /**
394   * 1 a1
395   * 1 a2
396   * 1 a3
397   * 2 b1
398   * 3 c1
399   * 4 d1
400   * 4 d2
401   * 4 d3
402   * 5 e1
403   * 5 e2
404   * 5 e3
405   * 7 g1
406   * 7 g2
407   * 7 g3
408   * 7 g4
409   * 7 g5
410   * null h1
411   * <p/>
412   * 1 A1
413   * 1 A2
414   * 1 A3
415   * 2 B1
416   * 2 B2
417   * 2 B3
418   * 4 D1
419   * 6 F1
420   * 6 F2
421   * null H1
422   * <p/>
423   * 1  a1      1       A1
424   * 1  a1      1       A2
425   * 1  a1      1       A3
426   * 1  a2      1       A1
427   * 1  a2      1       A2
428   * 1  a2      1       A3
429   * 1  a3      1       A1
430   * 1  a3      1       A2
431   * 1  a3      1       A3
432   * 2  b1      2       B1
433   * 2  b1      2       B2
434   * 2  b1      2       B3
435   * 4  d1      4       D1
436   * 4  d2      4       D1
437   * 4  d3      4       D1
438   * null h1  null  H1
439   *
440   * @throws Exception
441   */
442  @Test
443  public void testJoinInner() throws Exception
444    {
445    HashSet<Tuple> results = new HashSet<Tuple>();
446
447    results.add( new Tuple( "1", "a1", "1", "A1" ) );
448    results.add( new Tuple( "1", "a1", "1", "A2" ) );
449    results.add( new Tuple( "1", "a1", "1", "A3" ) );
450    results.add( new Tuple( "1", "a2", "1", "A1" ) );
451    results.add( new Tuple( "1", "a2", "1", "A2" ) );
452    results.add( new Tuple( "1", "a2", "1", "A3" ) );
453    results.add( new Tuple( "1", "a3", "1", "A1" ) );
454    results.add( new Tuple( "1", "a3", "1", "A2" ) );
455    results.add( new Tuple( "1", "a3", "1", "A3" ) );
456    results.add( new Tuple( "2", "b1", "2", "B1" ) );
457    results.add( new Tuple( "2", "b1", "2", "B2" ) );
458    results.add( new Tuple( "2", "b1", "2", "B3" ) );
459    results.add( new Tuple( "4", "d1", "4", "D1" ) );
460    results.add( new Tuple( "4", "d2", "4", "D1" ) );
461    results.add( new Tuple( "4", "d3", "4", "D1" ) );
462    results.add( new Tuple( null, "h1", null, "H1" ) );
463
464    handleJoins( "joininner", new InnerJoin(), results );
465    }
466
467  /**
468   * /**
469   * 1 a1
470   * 1 a2
471   * 1 a3
472   * 2 b1
473   * 3 c1
474   * 4 d1
475   * 4 d2
476   * 4 d3
477   * 5 e1
478   * 5 e2
479   * 5 e3
480   * 7 g1
481   * 7 g2
482   * 7 g3
483   * 7 g4
484   * 7 g5
485   * null h1
486   * <p/>
487   * 1 A1
488   * 1 A2
489   * 1 A3
490   * 2 B1
491   * 2 B2
492   * 2 B3
493   * 4 D1
494   * 6 F1
495   * 6 F2
496   * null H1
497   * <p/>
498   * 1  a1      1       A1
499   * 1  a1      1       A2
500   * 1  a1      1       A3
501   * 1  a2      1       A1
502   * 1  a2      1       A2
503   * 1  a2      1       A3
504   * 1  a3      1       A1
505   * 1  a3      1       A2
506   * 1  a3      1       A3
507   * 2  b1      2       B1
508   * 2  b1      2       B2
509   * 2  b1      2       B3
510   * 3  c1      null    null
511   * 4  d1      4       D1
512   * 4  d2      4       D1
513   * 4  d3      4       D1
514   * 5  e1      null    null
515   * 5  e2      null    null
516   * 5  e3      null    null
517   * null       null    6       F1
518   * null       null    6       F2
519   * 7  g1      null    null
520   * 7  g2      null    null
521   * 7  g3      null    null
522   * 7  g4      null    null
523   * 7  g5      null    null
524   * null h1  null  H1
525   *
526   * @throws Exception
527   */
528  @Test
529  public void testJoinOuter() throws Exception
530    {
531    // skip if hadoop cluster mode, outer joins don't behave the same
532    if( getPlatform().isMapReduce() && getPlatform().isUseCluster() )
533      return;
534
535    Set<Tuple> results = new HashSet<Tuple>();
536
537    results.add( new Tuple( "1", "a1", "1", "A1" ) );
538    results.add( new Tuple( "1", "a1", "1", "A2" ) );
539    results.add( new Tuple( "1", "a1", "1", "A3" ) );
540    results.add( new Tuple( "1", "a2", "1", "A1" ) );
541    results.add( new Tuple( "1", "a2", "1", "A2" ) );
542    results.add( new Tuple( "1", "a2", "1", "A3" ) );
543    results.add( new Tuple( "1", "a3", "1", "A1" ) );
544    results.add( new Tuple( "1", "a3", "1", "A2" ) );
545    results.add( new Tuple( "1", "a3", "1", "A3" ) );
546    results.add( new Tuple( "2", "b1", "2", "B1" ) );
547    results.add( new Tuple( "2", "b1", "2", "B2" ) );
548    results.add( new Tuple( "2", "b1", "2", "B3" ) );
549    results.add( new Tuple( "3", "c1", null, null ) );
550    results.add( new Tuple( "4", "d1", "4", "D1" ) );
551    results.add( new Tuple( "4", "d2", "4", "D1" ) );
552    results.add( new Tuple( "4", "d3", "4", "D1" ) );
553    results.add( new Tuple( "5", "e1", null, null ) );
554    results.add( new Tuple( "5", "e2", null, null ) );
555    results.add( new Tuple( "5", "e3", null, null ) );
556    results.add( new Tuple( null, null, "6", "F1" ) );
557    results.add( new Tuple( null, null, "6", "F2" ) );
558    results.add( new Tuple( "7", "g1", null, null ) );
559    results.add( new Tuple( "7", "g2", null, null ) );
560    results.add( new Tuple( "7", "g3", null, null ) );
561    results.add( new Tuple( "7", "g4", null, null ) );
562    results.add( new Tuple( "7", "g5", null, null ) );
563    results.add( new Tuple( null, "h1", null, "H1" ) );
564
565    handleJoins( "joinouter", new OuterJoin(), results );
566    }
567
568  /**
569   * 1 a1
570   * 1 a2
571   * 1 a3
572   * 2 b1
573   * 3 c1
574   * 4 d1
575   * 4 d2
576   * 4 d3
577   * 5 e1
578   * 5 e2
579   * 5 e3
580   * 7 g1
581   * 7 g2
582   * 7 g3
583   * 7 g4
584   * 7 g5
585   * null h1
586   * <p/>
587   * 1 A1
588   * 1 A2
589   * 1 A3
590   * 2 B1
591   * 2 B2
592   * 2 B3
593   * 4 D1
594   * 6 F1
595   * 6 F2
596   * null H1
597   * <p/>
598   * 1  a1      1       A1
599   * 1  a1      1       A2
600   * 1  a1      1       A3
601   * 1  a2      1       A1
602   * 1  a2      1       A2
603   * 1  a2      1       A3
604   * 1  a3      1       A1
605   * 1  a3      1       A2
606   * 1  a3      1       A3
607   * 2  b1      2       B1
608   * 2  b1      2       B2
609   * 2  b1      2       B3
610   * 3  c1      null    null
611   * 4  d1      4       D1
612   * 4  d2      4       D1
613   * 4  d3      4       D1
614   * 5  e1      null    null
615   * 5  e2      null    null
616   * 5  e3      null    null
617   * 7  g1      null    null
618   * 7  g2      null    null
619   * 7  g3      null    null
620   * 7  g4      null    null
621   * 7  g5      null    null
622   * null h1    null    H1
623   *
624   * @throws Exception
625   */
626  @Test
627  public void testJoinInnerOuter() throws Exception
628    {
629    Set<Tuple> results = new HashSet<Tuple>();
630
631    results.add( new Tuple( "1", "a1", "1", "A1" ) );
632    results.add( new Tuple( "1", "a1", "1", "A2" ) );
633    results.add( new Tuple( "1", "a1", "1", "A3" ) );
634    results.add( new Tuple( "1", "a2", "1", "A1" ) );
635    results.add( new Tuple( "1", "a2", "1", "A2" ) );
636    results.add( new Tuple( "1", "a2", "1", "A3" ) );
637    results.add( new Tuple( "1", "a3", "1", "A1" ) );
638    results.add( new Tuple( "1", "a3", "1", "A2" ) );
639    results.add( new Tuple( "1", "a3", "1", "A3" ) );
640    results.add( new Tuple( "2", "b1", "2", "B1" ) );
641    results.add( new Tuple( "2", "b1", "2", "B2" ) );
642    results.add( new Tuple( "2", "b1", "2", "B3" ) );
643    results.add( new Tuple( "3", "c1", null, null ) );
644    results.add( new Tuple( "4", "d1", "4", "D1" ) );
645    results.add( new Tuple( "4", "d2", "4", "D1" ) );
646    results.add( new Tuple( "4", "d3", "4", "D1" ) );
647    results.add( new Tuple( "5", "e1", null, null ) );
648    results.add( new Tuple( "5", "e2", null, null ) );
649    results.add( new Tuple( "5", "e3", null, null ) );
650    results.add( new Tuple( "7", "g1", null, null ) );
651    results.add( new Tuple( "7", "g2", null, null ) );
652    results.add( new Tuple( "7", "g3", null, null ) );
653    results.add( new Tuple( "7", "g4", null, null ) );
654    results.add( new Tuple( "7", "g5", null, null ) );
655    results.add( new Tuple( null, "h1", null, "H1" ) );
656
657    handleJoins( "joininnerouter", new LeftJoin(), results );
658    }
659
660  /**
661   * 1 a1
662   * 1 a2
663   * 1 a3
664   * 2 b1
665   * 3 c1
666   * 4 d1
667   * 4 d2
668   * 4 d3
669   * 5 e1
670   * 5 e2
671   * 5 e3
672   * 7 g1
673   * 7 g2
674   * 7 g3
675   * 7 g4
676   * 7 g5
677   * null h1
678   * <p/>
679   * 1 A1
680   * 1 A2
681   * 1 A3
682   * 2 B1
683   * 2 B2
684   * 2 B3
685   * 4 D1
686   * 6 F1
687   * 6 F2
688   * null H1
689   * <p/>
690   * 1  a1      1       A1
691   * 1  a1      1       A2
692   * 1  a1      1       A3
693   * 1  a2      1       A1
694   * 1  a2      1       A2
695   * 1  a2      1       A3
696   * 1  a3      1       A1
697   * 1  a3      1       A2
698   * 1  a3      1       A3
699   * 2  b1      2       B1
700   * 2  b1      2       B2
701   * 2  b1      2       B3
702   * 4  d1      4       D1
703   * 4  d2      4       D1
704   * 4  d3      4       D1
705   * null       null    6       F1
706   * null       null    6       F2
707   * null h1    null    H1
708   *
709   * @throws Exception
710   */
711  @Test
712  public void testJoinOuterInner() throws Exception
713    {
714    // skip if hadoop cluster mode, outer joins don't behave the same
715    if( getPlatform().isMapReduce() && getPlatform().isUseCluster() )
716      return;
717
718    Set<Tuple> results = new HashSet<Tuple>();
719
720    results.add( new Tuple( "1", "a1", "1", "A1" ) );
721    results.add( new Tuple( "1", "a1", "1", "A2" ) );
722    results.add( new Tuple( "1", "a1", "1", "A3" ) );
723    results.add( new Tuple( "1", "a2", "1", "A1" ) );
724    results.add( new Tuple( "1", "a2", "1", "A2" ) );
725    results.add( new Tuple( "1", "a2", "1", "A3" ) );
726    results.add( new Tuple( "1", "a3", "1", "A1" ) );
727    results.add( new Tuple( "1", "a3", "1", "A2" ) );
728    results.add( new Tuple( "1", "a3", "1", "A3" ) );
729    results.add( new Tuple( "2", "b1", "2", "B1" ) );
730    results.add( new Tuple( "2", "b1", "2", "B2" ) );
731    results.add( new Tuple( "2", "b1", "2", "B3" ) );
732    results.add( new Tuple( "4", "d1", "4", "D1" ) );
733    results.add( new Tuple( "4", "d2", "4", "D1" ) );
734    results.add( new Tuple( "4", "d3", "4", "D1" ) );
735    results.add( new Tuple( null, null, "6", "F1" ) );
736    results.add( new Tuple( null, null, "6", "F2" ) );
737    results.add( new Tuple( null, "h1", null, "H1" ) );
738
739    handleJoins( "joinouterinner", new RightJoin(), results );
740    }
741
742  private void handleJoins( String path, Joiner joiner, Set<Tuple> results ) throws Exception
743    {
744    getPlatform().copyFromLocal( inputFileLhsSparse );
745    getPlatform().copyFromLocal( inputFileRhsSparse );
746
747    Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class );
748    Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse );
749    Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse );
750
751    Map sources = new HashMap();
752
753    sources.put( "lower", sourceLower );
754    sources.put( "upper", sourceUpper );
755
756    Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE );
757
758    Pipe pipeLower = new Pipe( "lower" );
759    Pipe pipeUpper = new Pipe( "upper" );
760
761    Fields declaredFields = new Fields( "num", "char", "num2", "char2" );
762    Fields groupingFields = new Fields( "num" );
763
764    Pipe splice = new HashJoin( pipeLower, groupingFields, pipeUpper, groupingFields, declaredFields, joiner );
765
766    splice = new Each( splice, Fields.ALL, new Identity(), Fields.RESULTS );
767
768    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
769
770    flow.complete();
771
772    validateLength( flow, results.size() );
773
774    List<Tuple> actual = getSinkAsList( flow );
775
776    results.removeAll( actual );
777
778    assertEquals( 0, results.size() );
779    }
780
781  /**
782   * 1 a
783   * 5 b
784   * 6 c
785   * 5 b
786   * 5 e
787   * <p/>
788   * 1 A
789   * 2 B
790   * 3 C
791   * 4 D
792   * 5 E
793   * <p/>
794   * 1 a
795   * 2 b
796   * 3 c
797   * 4 d
798   * 5 e
799   * <p/>
800   * 1  a       1       A  1  a
801   * -  -   2   B  2  b
802   * -  -   3   C  3  c
803   * -  -   4   D  4  d
804   * 5  b       5   E  5  e
805   * 5  e       5   E  5  e
806   *
807   * @throws Exception
808   */
809  @Test
810  public void testJoinMixed() throws Exception
811    {
812    // skip if hadoop cluster mode, outer joins don't behave the same
813    if( getPlatform().isMapReduce() && getPlatform().isUseCluster() )
814      return;
815
816    getPlatform().copyFromLocal( inputFileLowerOffset );
817    getPlatform().copyFromLocal( inputFileLower );
818    getPlatform().copyFromLocal( inputFileUpper );
819
820    Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
821    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
822    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
823
824    Map sources = new HashMap();
825
826    sources.put( "loweroffset", sourceLowerOffset );
827    sources.put( "lower", sourceLower );
828    sources.put( "upper", sourceUpper );
829
830    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinmixed" ), SinkMode.REPLACE );
831
832    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
833
834    Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter );
835    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
836    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
837
838    Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower );
839    Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) );
840
841    MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} );
842    Pipe splice = new HashJoin( pipes, fields, Fields.size( 6 ), join );
843
844    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
845
846    flow.complete();
847
848    validateLength( flow, 6 );
849
850    Set<Tuple> results = new HashSet<Tuple>();
851
852    results.add( new Tuple( "1\ta\t1\tA\t1\ta" ) );
853    results.add( new Tuple( "null\tnull\t2\tB\t2\tb" ) );
854    results.add( new Tuple( "null\tnull\t3\tC\t3\tc" ) );
855    results.add( new Tuple( "null\tnull\t4\tD\t4\td" ) );
856    results.add( new Tuple( "5\tb\t5\tE\t5\te" ) );
857    results.add( new Tuple( "5\te\t5\tE\t5\te" ) );
858
859    List<Tuple> actual = getSinkAsList( flow );
860
861    results.removeAll( actual );
862
863    assertEquals( 0, results.size() );
864    }
865
866  @Test
867  public void testJoinDiffFields() throws Exception
868    {
869    getPlatform().copyFromLocal( inputFileLower );
870    getPlatform().copyFromLocal( inputFileUpper );
871
872    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
873    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
874
875    Map sources = new HashMap();
876
877    sources.put( "lower", sourceLower );
878    sources.put( "upper", sourceUpper );
879
880    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE );
881
882    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
883    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
884
885    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
886    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
887
888    Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
889
890    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
891
892    flow.complete();
893
894    validateLength( flow, 5 );
895
896    List<Tuple> actual = getSinkAsList( flow );
897
898    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
899    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
900    }
901
902  @Test
903  public void testJoinGroupBy() throws Exception
904    {
905    getPlatform().copyFromLocal( inputFileLower );
906    getPlatform().copyFromLocal( inputFileUpper );
907
908    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
909    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
910
911    Map sources = new HashMap();
912
913    sources.put( "lower", sourceLower );
914    sources.put( "upper", sourceUpper );
915
916    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupby" ), SinkMode.REPLACE );
917
918    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
919    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
920
921    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
922    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
923
924    Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
925
926    Pipe groupby = new GroupBy( pipe, new Fields( "numA" ) );
927
928    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, groupby );
929
930    flow.complete();
931
932    validateLength( flow, 5, null );
933
934    List<Tuple> actual = getSinkAsList( flow );
935
936    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
937    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
938    }
939
940  @Test
941  public void testJoinSamePipe() throws Exception
942    {
943    getPlatform().copyFromLocal( inputFileLower );
944
945    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
946
947    Map sources = new HashMap();
948
949    sources.put( "lower", source );
950
951    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE );
952
953    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
954
955    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
956
957    Pipe pipe = new HashJoin( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) );
958
959    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
960
961    flow.complete();
962
963    validateLength( flow, 5, null );
964
965    List<Tuple> actual = getSinkAsList( flow );
966
967    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
968    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
969    }
970
971  @Test
972  public void testJoinSamePipe2() throws Exception
973    {
974    getPlatform().copyFromLocal( inputFileLower );
975
976    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
977
978    Map sources = new HashMap();
979
980    sources.put( "lower", source );
981
982    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE );
983
984    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
985
986    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
987
988    Pipe join = new HashJoin( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
989
990    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
991
992    flow.complete();
993
994    validateLength( flow, 5, null );
995
996    List<Tuple> actual = getSinkAsList( flow );
997
998    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
999    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1000    }
1001
1002  @Test
1003  public void testJoinSamePipe3() throws Exception
1004    {
1005    getPlatform().copyFromLocal( inputFileLower );
1006
1007    Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
1008
1009    Map sources = new HashMap();
1010
1011    sources.put( "lower", source );
1012
1013    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE );
1014
1015    Pipe pipe = new Pipe( "lower" );
1016
1017    Pipe lhs = new Pipe( "lhs", pipe );
1018    Pipe rhs = new Pipe( "rhs", pipe );
1019
1020    Pipe join = new HashJoin( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1021
1022    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
1023
1024    flow.complete();
1025
1026    validateLength( flow, 5, null );
1027
1028    List<Tuple> actual = getSinkAsList( flow );
1029
1030    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1031    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1032    }
1033
1034  /**
1035   * Same source as rightmost
1036   * <p/>
1037   * should be a single job as the same file accumulates into the joins
1038   *
1039   * @throws Exception
1040   */
1041  @Test
1042  public void testJoinAroundJoinRightMost() throws Exception
1043    {
1044    getPlatform().copyFromLocal( inputFileLower );
1045    getPlatform().copyFromLocal( inputFileUpper );
1046
1047    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1048    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1049
1050    Map sources = new HashMap();
1051
1052    sources.put( "lower", sourceLower );
1053    sources.put( "upper1", sourceUpper );
1054    sources.put( "upper2", sourceUpper );
1055
1056    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinrightmost" ), SinkMode.REPLACE );
1057
1058    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1059
1060    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1061    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1062    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1063
1064    Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1065
1066    splice1 = new Each( splice1, new Identity() );
1067
1068    Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1069
1070    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1071
1072//    flow.writeDOT( "joinaroundrightmost.dot" );
1073
1074    if( getPlatform().isMapReduce() )
1075      assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1076
1077    flow.complete();
1078
1079    validateLength( flow, 5, null );
1080
1081    List<Tuple> actual = getSinkAsList( flow );
1082
1083    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
1084    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) );
1085    }
1086
1087  /**
1088   * Same source as leftmost
1089   *
1090   * @throws Exception
1091   */
1092  @Test
1093  public void testJoinAroundJoinLeftMost() throws Exception
1094    {
1095    getPlatform().copyFromLocal( inputFileLower );
1096    getPlatform().copyFromLocal( inputFileUpper );
1097
1098    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1099    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1100
1101    Map sources = new HashMap();
1102
1103    sources.put( "lower", sourceLower );
1104    sources.put( "upper1", sourceUpper );
1105    sources.put( "upper2", sourceUpper );
1106
1107    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinleftmost" ), SinkMode.REPLACE );
1108
1109    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1110
1111    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1112    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1113    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1114
1115    Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1116
1117    splice1 = new Each( splice1, new Identity() );
1118
1119    Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1120
1121    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1122
1123//    flow.writeDOT( "joinaroundleftmost.dot" );
1124
1125    if( getPlatform().isMapReduce() )
1126      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1127
1128    flow.complete();
1129
1130    validateLength( flow, 5, null );
1131
1132    List<Tuple> actual = getSinkAsList( flow );
1133
1134    assertTrue( actual.contains( new Tuple( "1\tA\t1\tA\t1\ta" ) ) );
1135    assertTrue( actual.contains( new Tuple( "2\tB\t2\tB\t2\tb" ) ) );
1136    }
1137
1138  /**
1139   * Upper as leftmost and rightmost forcing two jobs
1140   *
1141   * @throws Exception
1142   */
1143  @Test
1144  public void testJoinAroundJoinRightMostSwapped() throws Exception
1145    {
1146    getPlatform().copyFromLocal( inputFileLower );
1147    getPlatform().copyFromLocal( inputFileUpper );
1148
1149    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1150    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1151
1152    Map sources = new HashMap();
1153
1154    sources.put( "lower", sourceLower );
1155    sources.put( "upper1", sourceUpper );
1156    sources.put( "upper2", sourceUpper );
1157
1158    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinswapped" ), SinkMode.REPLACE );
1159
1160    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1161
1162    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1163    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1164    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1165
1166    Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1167
1168    splice1 = new Each( splice1, new Identity() );
1169
1170    // upper2 becomes leftmost, forcing a tap between the joins
1171    Pipe splice2 = new HashJoin( pipeUpper2, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1172
1173    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1174
1175    if( getPlatform().isMapReduce() )
1176      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1177
1178    flow.complete();
1179
1180    validateLength( flow, 5, null );
1181
1182    List<Tuple> actual = getSinkAsList( flow );
1183
1184    assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\tA" ) ) );
1185    assertTrue( actual.contains( new Tuple( "2\tB\t2\tb\t2\tB" ) ) );
1186    }
1187
1188  @Test
1189  public void testJoinGroupByJoin() throws Exception
1190    {
1191    getPlatform().copyFromLocal( inputFileLower );
1192    getPlatform().copyFromLocal( inputFileUpper );
1193    getPlatform().copyFromLocal( inputFileJoined );
1194
1195    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1196    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1197    Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined );
1198
1199    Map sources = new HashMap();
1200
1201    sources.put( "lower", sourceLower );
1202    sources.put( "upper", sourceUpper );
1203    sources.put( "joined", sourceJoined );
1204
1205    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupbyjoin" ), SinkMode.REPLACE );
1206
1207    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
1208    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
1209    Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" );
1210
1211    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1212    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1213    Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined );
1214
1215    Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1216
1217    pipe = new GroupBy( pipe, new Fields( "numA" ) );
1218
1219    pipe = new HashJoin( pipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) );
1220
1221    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
1222
1223    if( getPlatform().isMapReduce() )
1224      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1225
1226    flow.complete();
1227
1228    validateLength( flow, 5, null );
1229
1230    List<Tuple> actual = getSinkAsList( flow );
1231
1232    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\tA" ) ) );
1233    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tb\tB" ) ) );
1234    }
1235
1236  /**
1237   * here the same file is fed into the same HashJoin.
1238   * <p/>
1239   * This is three jobs.
1240   * <p/>
1241   * a temp tap is inserted before the accumulated branch for two reasons on the common HashJoin
1242   * <p/>
1243   * it is assumed the accumulated side is filtered down, so pushing to disk will preserve io
1244   * if accumulated side was streamed instead via a fork, only part of the file will accumulate into the HashJoin
1245   * <p/>
1246   * /-T-\ <-- accumulated
1247   * T      HJ
1248   * \---/ <-- streamed
1249   *
1250   * @throws Exception
1251   */
1252  @Test
1253  public void testJoinSameSourceIntoJoin() throws Exception
1254    {
1255    getPlatform().copyFromLocal( inputFileLower );
1256    getPlatform().copyFromLocal( inputFileUpper );
1257
1258    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1259    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1260
1261    Map sources = new HashMap();
1262
1263    sources.put( "lower", sourceLower );
1264    sources.put( "upper1", sourceUpper );
1265    sources.put( "upper2", sourceUpper );
1266
1267    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceintojoin" ), SinkMode.REPLACE );
1268
1269    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1270
1271    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1272    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1273    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1274
1275    Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1276
1277    splice1 = new Each( splice1, new Identity() );
1278
1279    Pipe splice2 = new HashJoin( pipeLower, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1280
1281    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1282
1283//    flow.writeDOT( "joinsamesourceintojoin.dot" );
1284
1285    if( getPlatform().isMapReduce() )
1286      assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );
1287
1288    flow.complete();
1289
1290    validateLength( flow, 5, null );
1291
1292    List<Tuple> actual = getSinkAsList( flow );
1293
1294    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
1295    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) );
1296    }
1297
1298  @Test
1299  public void testJoinSameSourceIntoJoinSimple() throws Exception
1300    {
1301    getPlatform().copyFromLocal( inputFileUpper );
1302
1303    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1304
1305    Map sources = new HashMap();
1306
1307    sources.put( "upper1", sourceUpper );
1308    sources.put( "upper2", sourceUpper );
1309
1310    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceintojoinsimple" ), SinkMode.REPLACE );
1311
1312    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1313
1314    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1315    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1316
1317    Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1318
1319    splice1 = new Each( splice1, new Identity() );
1320
1321    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice1 );
1322
1323//    flow.writeDOT( "joinsamesourceintojoin.dot" );
1324
1325    if( getPlatform().isMapReduce() )
1326      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1327
1328    flow.complete();
1329
1330    validateLength( flow, 5, null );
1331
1332    List<Tuple> actual = getSinkAsList( flow );
1333
1334    assertTrue( actual.contains( new Tuple( "1\tA\t1\tA" ) ) );
1335    assertTrue( actual.contains( new Tuple( "2\tB\t2\tB" ) ) );
1336    }
1337
1338  /**
1339   * Loosely tests for a deadlock when BlockingHashJoinAnnotator rule doesn't excluce the GroupBy from the blocking
1340   * annotation.
1341   * <p/>
1342   * the deadlock is random on the order of the paths traversed from the Source Tap + fork.
1343   *
1344   * @throws Exception
1345   */
1346  @Test
1347  public void testJoinSameSourceOverGroupByIntoJoinSimple() throws Exception
1348    {
1349    getPlatform().copyFromLocal( inputFileLower );
1350    getPlatform().copyFromLocal( inputFileUpper );
1351
1352    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1353
1354    Map sources = new HashMap();
1355
1356    sources.put( "upper1", sourceUpper );
1357    sources.put( "upper2", sourceUpper );
1358
1359    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceovergroupbyintojoinsimple" ), SinkMode.REPLACE );
1360
1361    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1362
1363    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1364    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1365
1366    pipeUpper1 = new GroupBy( pipeUpper1, new Fields( "num" ) );
1367    pipeUpper2 = new GroupBy( pipeUpper2, new Fields( "num" ) );
1368
1369    Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1370
1371    splice1 = new Each( splice1, new Identity() );
1372
1373    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice1 );
1374
1375    if( getPlatform().isMapReduce() )
1376      assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );
1377
1378    flow.complete();
1379
1380    validateLength( flow, 5, null );
1381
1382    List<Tuple> actual = getSinkAsList( flow );
1383
1384    assertTrue( actual.contains( new Tuple( "1\tA\t1\tA" ) ) );
1385    assertTrue( actual.contains( new Tuple( "2\tB\t2\tB" ) ) );
1386    }
1387
1388  /**
1389   * Tests that two independent streamed sources with loadable tributaries properly plan into a GroupBy
1390   * without loading unused sources
1391   *
1392   * @throws Exception
1393   */
1394  @Test
1395  public void testJoinsIntoGroupBy() throws Exception
1396    {
1397    getPlatform().copyFromLocal( inputFileLower );
1398    getPlatform().copyFromLocal( inputFileUpper );
1399
1400    getPlatform().copyFromLocal( inputFileLhs );
1401    getPlatform().copyFromLocal( inputFileRhs );
1402
1403    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1404    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1405
1406    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1407    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1408
1409    Map sources = new HashMap();
1410
1411    sources.put( "lower", sourceLower );
1412    sources.put( "upper", sourceUpper );
1413    sources.put( "lhs", sourceLhs );
1414    sources.put( "rhs", sourceRhs );
1415
1416    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintogroupby" ), SinkMode.REPLACE );
1417
1418    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1419
1420    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1421    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1422
1423    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1424    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1425
1426    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1427
1428    upperLower = new Each( upperLower, new Identity() );
1429
1430    Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1431
1432    lhsRhs = new Each( lhsRhs, new Identity() );
1433
1434    Pipe grouped = new GroupBy( "merging", Pipe.pipes( upperLower, lhsRhs ), new Fields( "num1" ) );
1435
1436    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1437
1438    if( getPlatform().isMapReduce() )
1439      assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1440
1441    flow.complete();
1442
1443    validateLength( flow, 42, null );
1444
1445    List<Tuple> actual = getSinkAsList( flow );
1446
1447    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1448    assertTrue( actual.contains( new Tuple( "5\te\t5\tE" ) ) );
1449    }
1450
1451  @Test
1452  public void testJoinSamePipeAroundGroupBy() throws Exception
1453    {
1454    getPlatform().copyFromLocal( inputFileLower );
1455
1456    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1457    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipearoundgroupby" ), SinkMode.REPLACE );
1458
1459    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1460
1461    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1462
1463    Pipe lhsPipe = new Each( new Pipe( "lhs", pipeLower ), new Identity() );
1464
1465    Pipe rhsPipe = new Each( new Pipe( "rhs", pipeLower ), new Identity() );
1466
1467    rhsPipe = new GroupBy( rhsPipe, new Fields( "num" ) );
1468
1469    rhsPipe = new Each( rhsPipe, new Identity() );
1470
1471    Pipe pipe = new HashJoin( lhsPipe, new Fields( "num" ), rhsPipe, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1472
1473    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
1474
1475    flow.complete();
1476
1477    validateLength( flow, 5, null );
1478
1479    List<Tuple> actual = getSinkAsList( flow );
1480
1481    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1482    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1483    }
1484
1485  /**
1486   * This test results in two MR jobs because one join feeds into the accumulated side of the second. A mapper
1487   * can only stream on branch at a time forcing a temp file between the mappers. see next test for swapped join
1488   *
1489   * @throws Exception
1490   */
1491  @Test
1492  public void testJoinsIntoCoGroupLhs() throws Exception
1493    {
1494    getPlatform().copyFromLocal( inputFileLower );
1495    getPlatform().copyFromLocal( inputFileUpper );
1496
1497    getPlatform().copyFromLocal( inputFileLhs );
1498    getPlatform().copyFromLocal( inputFileRhs );
1499
1500    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1501    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1502
1503    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1504    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1505
1506    Map sources = new HashMap();
1507
1508    sources.put( "lower", sourceLower );
1509    sources.put( "upper", sourceUpper );
1510    sources.put( "lhs", sourceLhs );
1511    sources.put( "rhs", sourceRhs );
1512
1513    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhs" ), SinkMode.REPLACE );
1514
1515    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1516
1517    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1518    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1519
1520    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1521    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1522
1523    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1524
1525    upperLower = new Each( upperLower, new Identity() );
1526
1527    Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1528
1529    lhsUpperLower = new Each( lhsUpperLower, new Identity() );
1530
1531    Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) );
1532
1533    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1534
1535    if( getPlatform().isMapReduce() )
1536      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1537
1538    flow.complete();
1539
1540    validateLength( flow, 37, null );
1541
1542    List<Tuple> actual = getSinkAsList( flow );
1543
1544    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta\t1\tA\t1\tA" ) ) );
1545    assertTrue( actual.contains( new Tuple( "5\ta\t5\te\t5\tE\t5\tA" ) ) );
1546    }
1547
1548  /**
1549   * This test results in one MR jobs because one join feeds into the streamed side of the second.
1550   *
1551   * @throws Exception
1552   */
1553  @Test
1554  public void testJoinsIntoCoGroupLhsSwappedJoin() throws Exception
1555    {
1556    getPlatform().copyFromLocal( inputFileLower );
1557    getPlatform().copyFromLocal( inputFileUpper );
1558
1559    getPlatform().copyFromLocal( inputFileLhs );
1560    getPlatform().copyFromLocal( inputFileRhs );
1561
1562    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1563    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1564
1565    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1566    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1567
1568    Map sources = new HashMap();
1569
1570    sources.put( "lower", sourceLower );
1571    sources.put( "upper", sourceUpper );
1572    sources.put( "lhs", sourceLhs );
1573    sources.put( "rhs", sourceRhs );
1574
1575    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhsswappedjoin" ), SinkMode.REPLACE );
1576
1577    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1578
1579    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1580    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1581
1582    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1583    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1584
1585    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1586
1587    upperLower = new Each( upperLower, new Identity() );
1588
1589    Pipe lhsUpperLower = new HashJoin( upperLower, new Fields( "numUpperLower" ), pipeLhs, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower", "numLhs", "charLhs" ) );
1590
1591    lhsUpperLower = new Each( lhsUpperLower, new Identity() );
1592
1593    Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) );
1594
1595    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1596
1597    if( getPlatform().isMapReduce() )
1598      assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1599
1600    flow.complete();
1601
1602    validateLength( flow, 37, null );
1603
1604    List<Tuple> actual = getSinkAsList( flow );
1605
1606    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) );
1607    assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) );
1608    }
1609
1610  @Test
1611  public void testJoinsIntoCoGroupRhs() throws Exception
1612    {
1613    getPlatform().copyFromLocal( inputFileLower );
1614    getPlatform().copyFromLocal( inputFileUpper );
1615
1616    getPlatform().copyFromLocal( inputFileLhs );
1617    getPlatform().copyFromLocal( inputFileRhs );
1618
1619    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1620    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1621
1622    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1623    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1624
1625    Map sources = new HashMap();
1626
1627    sources.put( "lower", sourceLower );
1628    sources.put( "upper", sourceUpper );
1629    sources.put( "lhs", sourceLhs );
1630    sources.put( "rhs", sourceRhs );
1631
1632    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouprhs" ), SinkMode.REPLACE );
1633
1634    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1635
1636    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1637    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1638
1639    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1640    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1641
1642    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1643
1644    upperLower = new Each( upperLower, new Identity() );
1645
1646    Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1647
1648    lhsUpperLower = new Each( lhsUpperLower, new Identity() );
1649
1650    Pipe grouped = new CoGroup( "cogrouping", pipeRhs, new Fields( "num" ), lhsUpperLower, new Fields( "numLhs" ) );
1651
1652    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1653
1654    if( getPlatform().isMapReduce() )
1655      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1656
1657    flow.complete();
1658
1659    validateLength( flow, 37, null );
1660
1661    List<Tuple> actual = getSinkAsList( flow );
1662
1663    assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\ta\t1\tA" ) ) );
1664    assertTrue( actual.contains( new Tuple( "5\tE\t5\te\t5\te\t5\tE" ) ) );
1665    }
1666
1667  @Test
1668  public void testJoinsIntoCoGroup() throws Exception
1669    {
1670    getPlatform().copyFromLocal( inputFileLower );
1671    getPlatform().copyFromLocal( inputFileUpper );
1672
1673    getPlatform().copyFromLocal( inputFileLhs );
1674    getPlatform().copyFromLocal( inputFileRhs );
1675
1676    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1677    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1678
1679    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1680    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1681
1682    Map sources = new HashMap();
1683
1684    sources.put( "lower", sourceLower );
1685    sources.put( "upper", sourceUpper );
1686    sources.put( "lhs", sourceLhs );
1687    sources.put( "rhs", sourceRhs );
1688
1689    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogroup" ), SinkMode.REPLACE );
1690
1691    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1692
1693    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1694    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1695
1696    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1697    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1698
1699    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower1", "charUpperLower1", "numUpperLower2", "charUpperLower2" ) );
1700
1701    upperLower = new Each( upperLower, new Identity() );
1702
1703    Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "numLhsRhs1", "charLhsRhs1", "numLhsRhs2", "charLhsRhs2" ) );
1704
1705    lhsRhs = new Each( lhsRhs, new Identity() );
1706
1707    Pipe grouped = new CoGroup( "cogrouping", upperLower, new Fields( "numUpperLower1" ), lhsRhs, new Fields( "numLhsRhs1" ) );
1708
1709    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1710
1711    if( getPlatform().isMapReduce() )
1712      assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1713
1714    flow.complete();
1715
1716    validateLength( flow, 37, null );
1717
1718    List<Tuple> actual = getSinkAsList( flow );
1719
1720    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) );
1721    assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) );
1722    }
1723
1724  public static class AllComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable
1725    {
1726
1727    @Override
1728    public int compare( Comparable lhs, Comparable rhs )
1729      {
1730      return lhs.toString().compareTo( rhs.toString() );
1731      }
1732
1733    @Override
1734    public int hashCode( Comparable value )
1735      {
1736      if( value == null )
1737        return 0;
1738
1739      return value.toString().hashCode();
1740      }
1741    }
1742
1743  /**
1744   * Tests Hasher being honored even if default comparator is null.
1745   *
1746   * @throws Exception
1747   */
1748  @Test
1749  public void testJoinWithHasher() throws Exception
1750    {
1751    getPlatform().copyFromLocal( inputFileLower );
1752    getPlatform().copyFromLocal( inputFileUpper );
1753
1754    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1755    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1756
1757    Map sources = new HashMap();
1758
1759    sources.put( "lower", sourceLower );
1760    sources.put( "upper", sourceUpper );
1761
1762    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinhasher" ), SinkMode.REPLACE );
1763
1764    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1765
1766    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1767
1768    pipeLower = new Each( pipeLower, new Fields( "num" ), new ExpressionFunction( Fields.ARGS, "Integer.parseInt( num )", String.class ), Fields.REPLACE );
1769
1770    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1771
1772    Fields num = new Fields( "num" );
1773    num.setComparator( "num", new AllComparator() );
1774
1775    Pipe splice = new HashJoin( pipeLower, num, pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
1776
1777    Map<Object, Object> properties = getProperties();
1778
1779    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
1780
1781    flow.complete();
1782
1783    validateLength( flow, 5 );
1784
1785    List<Tuple> values = getSinkAsList( flow );
1786
1787    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
1788    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
1789    }
1790
1791  @Test
1792  public void testJoinNone() throws Exception
1793    {
1794    getPlatform().copyFromLocal( inputFileLower );
1795    getPlatform().copyFromLocal( inputFileUpper );
1796
1797    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1798    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1799
1800    Map sources = new HashMap();
1801
1802    sources.put( "lower", sourceLower );
1803    sources.put( "upper", sourceUpper );
1804
1805    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE );
1806
1807    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1808
1809    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1810    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1811
1812    Pipe splice = new HashJoin( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) );
1813
1814    Map<Object, Object> properties = getProperties();
1815
1816    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
1817
1818    flow.complete();
1819
1820    validateLength( flow, 25 );
1821
1822    List<Tuple> values = getSinkAsList( flow );
1823
1824    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
1825    assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) );
1826    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
1827    }
1828
1829  @Test
1830  public void testGroupBySplitJoins() throws Exception
1831    {
1832    getPlatform().copyFromLocal( inputFileLower );
1833    getPlatform().copyFromLocal( inputFileUpper );
1834    getPlatform().copyFromLocal( inputFileJoined );
1835
1836    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1837    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1838    Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined );
1839
1840    Map sources = new HashMap();
1841
1842    sources.put( "lower", sourceLower );
1843    sources.put( "upper", sourceUpper );
1844    sources.put( "joined", sourceJoined );
1845
1846    Tap lhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE );
1847    Tap rhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE );
1848
1849    Map sinks = new HashMap();
1850
1851    sinks.put( "lhs", lhsSink );
1852    sinks.put( "rhs", rhsSink );
1853
1854    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
1855    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
1856    Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" );
1857
1858    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1859    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1860    Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined );
1861
1862    Pipe pipe = new GroupBy( pipeLower, new Fields( "numA" ) );
1863
1864    pipe = new Every( pipe, Fields.ALL, new TestIdentityBuffer( new Fields( "numA" ), 5, false ), Fields.RESULTS );
1865
1866    Pipe lhsPipe = new Each( pipe, new Identity() );
1867    lhsPipe = new HashJoin( "lhs", lhsPipe, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1868
1869    Pipe rhsPipe = new Each( pipe, new Identity() );
1870    rhsPipe = new HashJoin( "rhs", rhsPipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) );
1871
1872    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, lhsPipe, rhsPipe );
1873
1874    if( getPlatform().isMapReduce() )
1875      assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );
1876
1877    flow.complete();
1878
1879    validateLength( flow.openSink( "lhs" ), 5, null );
1880    validateLength( flow.openSink( "rhs" ), 5, null );
1881
1882    List<Tuple> lhsActual = asList( flow, lhsSink );
1883
1884    assertTrue( lhsActual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1885    assertTrue( lhsActual.contains( new Tuple( "2\tb\t2\tB" ) ) );
1886
1887    List<Tuple> rhsActual = asList( flow, rhsSink );
1888
1889    assertTrue( rhsActual.contains( new Tuple( "1\ta\t1\ta\tA" ) ) );
1890    assertTrue( rhsActual.contains( new Tuple( "2\tb\t2\tb\tB" ) ) );
1891    }
1892
1893  /**
1894   * currently we cannot efficiently plan for this case. better to throw an error
1895   * <p/>
1896   * When run against a cluster a Merge before a GroupBy can hide the streamed/accumulated nature of a branch.
1897   * <p/>
1898   * commented code is for troubleshooting.
1899   *
1900   * @throws Exception
1901   */
1902  @Test
1903  public void testJoinMergeGroupBy() throws Exception
1904    {
1905    getPlatform().copyFromLocal( inputFileNums10 );
1906    getPlatform().copyFromLocal( inputFileNums20 );
1907
1908    Tap lhsTap = getPlatform().getTextFile( new Fields( "id" ), inputFileNums10 );
1909    Tap rhsTap = getPlatform().getTextFile( new Fields( "id2" ), inputFileNums20 );
1910
1911    Pipe lhs = new Pipe( "lhs" );
1912    Pipe rhs = new Pipe( "rhs" );
1913
1914//    Pipe joined = new CoGroup( messages, new Fields( "id" ), people, new Fields( "id2" ) );
1915    Pipe joined = new HashJoin( lhs, new Fields( "id" ), rhs, new Fields( "id2" ) );
1916
1917    Pipe pruned = new Each( joined, new Fields( "id2" ), new Identity(), Fields.RESULTS );
1918//    pruned = new Checkpoint( pruned );
1919    Pipe merged = new Merge( pruned, rhs );
1920    Pipe grouped = new GroupBy( merged, new Fields( "id2" ) );
1921//    Pipe grouped = new GroupBy( Pipe.pipes(  pruned, people  ), new Fields( "id2" ) );
1922    Aggregator count = new Count( new Fields( "count" ) );
1923    Pipe counted = new Every( grouped, count );
1924
1925    String testJoinMerge = "testJoinMergeGroupBy/" + ( ( joined instanceof CoGroup ) ? "cogroup" : "hashjoin" );
1926    Tap sink = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", null, getOutputPath( testJoinMerge ), SinkMode.REPLACE );
1927
1928    FlowDef flowDef = FlowDef.flowDef()
1929      .setName( "join-merge" )
1930      .addSource( rhs, rhsTap )
1931      .addSource( lhs, lhsTap )
1932      .addTailSink( counted, sink );
1933
1934    boolean failOnPlanner = getPlatform().isMapReduce() || getPlatform().isDAG();
1935
1936    Flow flow = null;
1937
1938    try
1939      {
1940      flow = getPlatform().getFlowConnector().connect( flowDef );
1941
1942      if( failOnPlanner )
1943        fail( "planner should throw error on plan" );
1944      }
1945    catch( Exception exception )
1946      {
1947      if( !failOnPlanner )
1948        throw exception;
1949
1950      return;
1951      }
1952
1953//    flow.writeDOT( "joinmerge.dot" );
1954//    flow.writeStepsDOT( "joinmerge-steps.dot" );
1955
1956    flow.complete();
1957
1958    validateLength( flow, 20 );
1959
1960    List<Tuple> values = getSinkAsList( flow );
1961    List<Tuple> expected = new ArrayList<Tuple>();
1962
1963    expected.add( new Tuple( "1", "2" ) );
1964    expected.add( new Tuple( "10", "2" ) );
1965    expected.add( new Tuple( "11", "1" ) );
1966    expected.add( new Tuple( "12", "1" ) );
1967    expected.add( new Tuple( "13", "1" ) );
1968    expected.add( new Tuple( "14", "1" ) );
1969    expected.add( new Tuple( "15", "1" ) );
1970    expected.add( new Tuple( "16", "1" ) );
1971    expected.add( new Tuple( "17", "1" ) );
1972    expected.add( new Tuple( "18", "1" ) );
1973    expected.add( new Tuple( "19", "1" ) );
1974    expected.add( new Tuple( "2", "2" ) );
1975    expected.add( new Tuple( "20", "1" ) );
1976    expected.add( new Tuple( "3", "2" ) );
1977    expected.add( new Tuple( "4", "2" ) );
1978    expected.add( new Tuple( "5", "2" ) );
1979    expected.add( new Tuple( "6", "2" ) );
1980    expected.add( new Tuple( "7", "2" ) );
1981    expected.add( new Tuple( "8", "2" ) );
1982    expected.add( new Tuple( "9", "2" ) );
1983
1984    Collections.sort( values );
1985    Collections.sort( expected );
1986
1987    assertEquals( expected, values );
1988    }
1989
1990  /**
1991   * Under tez, this can result in the HashJoin being duplicated across nodes for each split after the HashJoin
1992   * BoundaryBalanceJoinSplitTransformer inserts a Boundary at the split, preventing duplication of the path
1993   *
1994   * @throws Exception
1995   */
1996  @Test
1997  public void testJoinSplit() throws Exception
1998    {
1999    getPlatform().copyFromLocal( inputFileLhs );
2000    getPlatform().copyFromLocal( inputFileRhs );
2001
2002    FlowDef flowDef = FlowDef.flowDef()
2003      .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) )
2004      .addSource( "rhs", getPlatform().getTextFile( inputFileRhs ) )
2005      .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) )
2006      .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) );
2007
2008    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
2009    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
2010
2011    Pipe join = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
2012
2013    Pipe pipeLhs = new Each( new Pipe( "lhsSink", join ), new Identity() );
2014    Pipe pipeRhs = new Each( new Pipe( "rhsSink", join ), new Identity() );
2015
2016    flowDef
2017      .addTail( pipeLhs )
2018      .addTail( pipeRhs );
2019
2020    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
2021
2022    flow.complete();
2023
2024    validateLength( flow, 37, null );
2025
2026    List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) );
2027
2028    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
2029    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
2030
2031    values = asList( flow, flowDef.getSinks().get( "rhsSink" ) );
2032
2033    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
2034    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
2035    }
2036
2037  /**
2038   * catches a situation where BottomUpJoinedBoundariesNodePartitioner may capture an invalid HashJoin sub-graph
2039   * if the in-bound Boundary is split upon.
2040   */
2041  @Test
2042  public void testSameSourceJoinSplitIntoJoin() throws Exception
2043    {
2044    getPlatform().copyFromLocal( inputFileLhs );
2045    getPlatform().copyFromLocal( inputFileRhs );
2046
2047    FlowDef flowDef = FlowDef.flowDef()
2048      .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) )
2049      .addSource( "rhs", getPlatform().getTextFile( inputFileLhs ) )
2050      .addSource( "joinSecond", getPlatform().getTextFile( inputFileRhs ) )
2051      .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) )
2052      .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) );
2053
2054    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
2055    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
2056
2057    Pipe joinFirst = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
2058
2059    Pipe pipeLhs = new Each( new Pipe( "lhsSink", joinFirst ), new Identity() );
2060
2061    Pipe joinSecond = new Each( "joinSecond", new Fields( "line" ), new RegexSplitter( new Fields( "numRHSSecond", "charRHSSecond" ), " " ) );
2062
2063    joinSecond = new HashJoin( joinFirst, new Fields( "numLHS" ), joinSecond, new Fields( "numRHSSecond" ) );
2064
2065    Pipe pipeRhs = new Each( new Pipe( "rhsSink", joinSecond ), new Identity() );
2066
2067    flowDef
2068      .addTail( pipeLhs )
2069      .addTail( pipeRhs );
2070
2071    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
2072
2073    flow.complete();
2074
2075    List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) );
2076
2077    assertEquals( 37, values.size() );
2078    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
2079    assertTrue( values.contains( new Tuple( "1\ta\t1\tb" ) ) );
2080
2081    values = asList( flow, flowDef.getSinks().get( "rhsSink" ) );
2082
2083    assertEquals( 109, values.size() );
2084    assertTrue( values.contains( new Tuple( "1\ta\t1\ta\t1\tA" ) ) );
2085    assertTrue( values.contains( new Tuple( "1\ta\t1\tb\t1\tB" ) ) );
2086    }
2087
2088  /**
2089   * checks that a split after a HashJoin does not result in the HashJoin execution being duplicated across
2090   * multiple nodes, one for each branch in the split.
2091   */
2092  @Test
2093  public void testJoinSplitBeforeJoin() throws Exception
2094    {
2095    getPlatform().copyFromLocal( inputFileLhs );
2096    getPlatform().copyFromLocal( inputFileRhs );
2097
2098    FlowDef flowDef = FlowDef.flowDef()
2099      .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) )
2100      .addSource( "rhs", getPlatform().getTextFile( inputFileRhs ) )
2101      .addSource( "joinSecond", getPlatform().getTextFile( inputFileRhs ) )
2102      .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) )
2103      .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) );
2104
2105    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
2106    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
2107
2108    pipeUpper = new Checkpoint( pipeUpper );
2109
2110    HashJoin hashJoin = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
2111
2112    Pipe joinFirst = hashJoin;
2113
2114    joinFirst = new Each( joinFirst, new Identity() );
2115
2116    Pipe pipeLhs = new Each( new Pipe( "lhsSink", joinFirst ), new Identity() );
2117
2118    pipeLhs = new GroupBy( pipeLhs, new Fields( "numLHS" ) );
2119
2120    joinFirst = new Each( new Pipe( "lhsSplit", joinFirst ), new Identity() );
2121
2122    Pipe joinSecond = new Each( "joinSecond", new Fields( "line" ), new RegexSplitter( new Fields( "numRHSSecond", "charRHSSecond" ), " " ) );
2123
2124    joinSecond = new CoGroup( joinFirst, new Fields( "numLHS" ), joinSecond, new Fields( "numRHSSecond" ) );
2125
2126    Pipe pipeRhs = new Each( new Pipe( "rhsSink", joinSecond ), new Identity() );
2127
2128    flowDef
2129      .addTail( pipeLhs )
2130      .addTail( pipeRhs );
2131
2132    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
2133
2134    if( getPlatform().isDAG() )
2135      {
2136      FlowStep flowStep = (FlowStep) flow.getFlowSteps().get( 0 );
2137      List<ElementGraph> elementGraphs = flowStep.getFlowNodeGraph().getElementGraphs( hashJoin );
2138
2139      assertEquals( 1, elementGraphs.size() );
2140      }
2141
2142    flow.complete();
2143
2144    List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) );
2145
2146    assertEquals( 37, values.size() );
2147    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
2148    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
2149
2150    values = asList( flow, flowDef.getSinks().get( "rhsSink" ) );
2151
2152    assertEquals( 109, values.size() );
2153    assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
2154    assertTrue( values.contains( new Tuple( "1\ta\t1\tB\t1\tB" ) ) );
2155    }
2156
2157  @Test
2158  public void testGroupBySplitGroupByJoin() throws Exception
2159    {
2160    getPlatform().copyFromLocal( inputFileLower );
2161
2162    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
2163
2164    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE );
2165
2166    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
2167
2168    Pipe pipeFirst = new Pipe( "first" );
2169    pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter );
2170    pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) );
2171    pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL );
2172
2173    Pipe pipeSecond = new Pipe( "second", pipeFirst );
2174    pipeSecond = new Each( pipeSecond, new Identity() );
2175    pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) );
2176    pipeSecond = new Every( pipeSecond, new Fields( "firstFirst" ), new First( new Fields( "secondFirst" ) ), Fields.ALL );
2177    pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) );
2178    pipeSecond = new Every( pipeSecond, new Fields( "secondFirst" ), new First( new Fields( "thirdFirst" ) ), Fields.ALL );
2179
2180    Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) );
2181
2182    Flow flow = getPlatform().getFlowConnector().connect( source, sink, splice );
2183
2184    flow.complete();
2185
2186    validateLength( flow, 5, null );
2187
2188    List<Tuple> values = getSinkAsList( flow );
2189
2190    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
2191    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
2192    assertTrue( values.contains( new Tuple( "3\tc\t3\tc" ) ) );
2193    assertTrue( values.contains( new Tuple( "4\td\t4\td" ) ) );
2194    assertTrue( values.contains( new Tuple( "5\te\t5\te" ) ) );
2195    }
2196  }