001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading;
023
024import java.io.Serializable;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.Comparator;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033
034import cascading.flow.Flow;
035import cascading.flow.FlowDef;
036import cascading.flow.FlowStep;
037import cascading.flow.planner.graph.ElementGraph;
038import cascading.operation.Aggregator;
039import cascading.operation.Function;
040import cascading.operation.Identity;
041import cascading.operation.aggregator.Count;
042import cascading.operation.aggregator.First;
043import cascading.operation.expression.ExpressionFunction;
044import cascading.operation.regex.RegexFilter;
045import cascading.operation.regex.RegexSplitter;
046import cascading.pipe.Checkpoint;
047import cascading.pipe.CoGroup;
048import cascading.pipe.Each;
049import cascading.pipe.Every;
050import cascading.pipe.GroupBy;
051import cascading.pipe.HashJoin;
052import cascading.pipe.Merge;
053import cascading.pipe.Pipe;
054import cascading.pipe.assembly.Rename;
055import cascading.pipe.joiner.InnerJoin;
056import cascading.pipe.joiner.Joiner;
057import cascading.pipe.joiner.LeftJoin;
058import cascading.pipe.joiner.MixedJoin;
059import cascading.pipe.joiner.OuterJoin;
060import cascading.pipe.joiner.RightJoin;
061import cascading.tap.SinkMode;
062import cascading.tap.Tap;
063import cascading.tuple.Fields;
064import cascading.tuple.Hasher;
065import cascading.tuple.Tuple;
066import org.junit.Test;
067
068import static data.InputData.*;
069
070public class JoinFieldedPipesPlatformTest extends PlatformTestCase
071  {
072  public JoinFieldedPipesPlatformTest()
073    {
074    super( true, 4, 1 ); // leave cluster testing enabled
075    }
076
077  @Test
078  public void testCross() throws Exception
079    {
080    getPlatform().copyFromLocal( inputFileLhs );
081    getPlatform().copyFromLocal( inputFileRhs );
082
083    Map sources = new HashMap();
084
085    sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) );
086    sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) );
087
088    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE );
089
090    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
091    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
092
093    Pipe cross = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
094
095    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross );
096
097    flow.complete();
098
099    validateLength( flow, 37, null );
100
101    List<Tuple> values = getSinkAsList( flow );
102
103    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
104    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
105    }
106
107  @Test
108  public void testJoin() throws Exception
109    {
110    getPlatform().copyFromLocal( inputFileLower );
111    getPlatform().copyFromLocal( inputFileUpper );
112
113    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
114    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
115
116    Map sources = new HashMap();
117
118    sources.put( "lower", sourceLower );
119    sources.put( "upper", sourceUpper );
120
121    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );
122
123    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
124
125    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
126    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
127
128    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
129
130    Map<Object, Object> properties = getProperties();
131
132    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
133
134    flow.complete();
135
136    validateLength( flow, 5 );
137
138    List<Tuple> values = getSinkAsList( flow );
139
140    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
141    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
142    }
143
144  @Test
145  public void testJoinSamePipeName() throws Exception
146    {
147    getPlatform().copyFromLocal( inputFileLower );
148    getPlatform().copyFromLocal( inputFileUpper );
149
150    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
151    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
152
153    Map sources = new HashMap();
154
155    sources.put( "lower", sourceLower );
156    sources.put( "upper", sourceUpper );
157
158    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE );
159
160    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
161
162    Pipe pipeLower = new Pipe( "lower" );
163    Pipe pipeUpper = new Pipe( "upper" );
164
165    // these pipes will hide the source name, and could cause one to be lost
166    pipeLower = new Pipe( "same", pipeLower );
167    pipeUpper = new Pipe( "same", pipeUpper );
168
169    pipeLower = new Each( pipeLower, new Fields( "line" ), splitter );
170    pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter );
171
172//    pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
173//    pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
174
175    pipeLower = new Pipe( "left", pipeLower );
176    pipeUpper = new Pipe( "right", pipeUpper );
177
178//    pipeLower = new Each( pipeLower, new Debug( true ) );
179//    pipeUpper = new Each( pipeUpper, new Debug( true ) );
180
181    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
182
183//    splice = new Each( splice, new Debug( true ) );
184    splice = new Pipe( "splice", splice );
185    splice = new Pipe( "tail", splice );
186
187    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
188
189    flow.complete();
190
191    validateLength( flow, 5 );
192
193    List<Tuple> values = getSinkAsList( flow );
194
195    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
196    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
197    }
198
199  @Test
200  public void testJoinWithUnknowns() throws Exception
201    {
202    getPlatform().copyFromLocal( inputFileLower );
203    getPlatform().copyFromLocal( inputFileUpper );
204
205    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
206    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
207
208    Map sources = new HashMap();
209
210    sources.put( "lower", sourceLower );
211    sources.put( "upper", sourceUpper );
212
213    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE );
214
215    Function splitter = new RegexSplitter( Fields.UNKNOWN, " " );
216
217    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
218    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
219
220    Pipe splice = new HashJoin( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) );
221
222    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
223
224    flow.complete();
225
226    validateLength( flow, 5 );
227
228    List<Tuple> values = getSinkAsList( flow );
229
230    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
231    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
232    }
233
234  /**
235   * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with
236   * a new stream using an outerjoin.
237   *
238   * @throws Exception
239   */
240  @Test
241  public void testJoinFilteredBranch() throws Exception
242    {
243    getPlatform().copyFromLocal( inputFileLower );
244    getPlatform().copyFromLocal( inputFileUpper );
245
246    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
247    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
248
249    Map sources = new HashMap();
250
251    sources.put( "lower", sourceLower );
252    sources.put( "upper", sourceUpper );
253
254    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinfilteredbranch" ), SinkMode.REPLACE );
255
256    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
257
258    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
259    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
260    pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all
261    pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
262
263    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() );
264
265    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
266
267    flow.complete();
268
269    validateLength( flow, 5 );
270
271    List<Tuple> values = getSinkAsList( flow );
272
273    assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) );
274    assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) );
275    }
276
277  @Test
278  public void testJoinSelf() throws Exception
279    {
280    getPlatform().copyFromLocal( inputFileLhs );
281
282    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
283    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
284
285    Map sources = new HashMap();
286
287    sources.put( "lhs", sourceLhs );
288    sources.put( "rhs", sourceRhs );
289
290    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinself" ), SinkMode.REPLACE );
291
292    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
293
294    Pipe pipeLower = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
295    Pipe pipeUpper = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
296
297    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
298
299    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
300
301    flow.complete();
302
303    validateLength( flow, 37 );
304
305    List<Tuple> values = getSinkAsList( flow );
306
307    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
308    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
309    }
310
311  @Test
312  public void testSameSourceJoin() throws Exception
313    {
314    getPlatform().copyFromLocal( inputFileLhs );
315
316    Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
317
318    Map sources = new HashMap();
319
320    sources.put( "lhs", source );
321    sources.put( "rhs", source );
322
323    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath(), SinkMode.REPLACE );
324
325    Pipe pipeLower = new Pipe( "lhs" );
326    Pipe pipeUpper = new Pipe( "rhs" );
327
328    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
329
330    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
331
332    flow.complete();
333
334    validateLength( flow, 37 );
335
336    List<Tuple> values = getSinkAsList( flow );
337
338    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
339    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
340    }
341
342  /**
343   * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join
344   *
345   * @throws Exception when
346   */
347  @Test
348  public void testJoinAfterEvery() throws Exception
349    {
350    getPlatform().copyFromLocal( inputFileLower );
351    getPlatform().copyFromLocal( inputFileUpper );
352
353    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
354    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
355
356    Map sources = new HashMap();
357
358    sources.put( "lower", sourceLower );
359    sources.put( "upper", sourceUpper );
360
361    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE );
362
363    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
364
365    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
366    pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
367    pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL );
368
369    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
370    pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
371    pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL );
372
373    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
374
375    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
376
377    flow.complete();
378
379    validateLength( flow, 5, null );
380
381    List<Tuple> values = getSinkAsList( flow );
382
383    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
384    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
385    }
386
387  @Test
388  public void testJoinInnerSingleField() throws Exception
389    {
390    getPlatform().copyFromLocal( inputFileLowerOffset );
391    getPlatform().copyFromLocal( inputFileUpper );
392
393    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
394    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
395
396    Map sources = new HashMap();
397
398    sources.put( "lower", sourceLower );
399    sources.put( "upper", sourceUpper );
400
401    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joininnersingle" ), SinkMode.REPLACE );
402
403    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) );
404    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) );
405
406    Pipe join = new HashJoin( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) );
407
408    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
409
410    flow.complete();
411
412    validateLength( flow, 3, null );
413
414    Set<Tuple> results = new HashSet<Tuple>();
415
416    results.add( new Tuple( "1\t1" ) );
417    results.add( new Tuple( "5\t5" ) );
418
419    List<Tuple> actual = getSinkAsList( flow );
420
421    results.removeAll( actual );
422
423    assertEquals( 0, results.size() );
424    }
425
426  /**
427   * 1 a1
428   * 1 a2
429   * 1 a3
430   * 2 b1
431   * 3 c1
432   * 4 d1
433   * 4 d2
434   * 4 d3
435   * 5 e1
436   * 5 e2
437   * 5 e3
438   * 7 g1
439   * 7 g2
440   * 7 g3
441   * 7 g4
442   * 7 g5
443   * null h1
444   * <p/>
445   * 1 A1
446   * 1 A2
447   * 1 A3
448   * 2 B1
449   * 2 B2
450   * 2 B3
451   * 4 D1
452   * 6 F1
453   * 6 F2
454   * null H1
455   * <p/>
456   * 1  a1      1       A1
457   * 1  a1      1       A2
458   * 1  a1      1       A3
459   * 1  a2      1       A1
460   * 1  a2      1       A2
461   * 1  a2      1       A3
462   * 1  a3      1       A1
463   * 1  a3      1       A2
464   * 1  a3      1       A3
465   * 2  b1      2       B1
466   * 2  b1      2       B2
467   * 2  b1      2       B3
468   * 4  d1      4       D1
469   * 4  d2      4       D1
470   * 4  d3      4       D1
471   * null h1  null  H1
472   *
473   * @throws Exception
474   */
475  @Test
476  public void testJoinInner() throws Exception
477    {
478    HashSet<Tuple> results = new HashSet<Tuple>();
479
480    results.add( new Tuple( "1", "a1", "1", "A1" ) );
481    results.add( new Tuple( "1", "a1", "1", "A2" ) );
482    results.add( new Tuple( "1", "a1", "1", "A3" ) );
483    results.add( new Tuple( "1", "a2", "1", "A1" ) );
484    results.add( new Tuple( "1", "a2", "1", "A2" ) );
485    results.add( new Tuple( "1", "a2", "1", "A3" ) );
486    results.add( new Tuple( "1", "a3", "1", "A1" ) );
487    results.add( new Tuple( "1", "a3", "1", "A2" ) );
488    results.add( new Tuple( "1", "a3", "1", "A3" ) );
489    results.add( new Tuple( "2", "b1", "2", "B1" ) );
490    results.add( new Tuple( "2", "b1", "2", "B2" ) );
491    results.add( new Tuple( "2", "b1", "2", "B3" ) );
492    results.add( new Tuple( "4", "d1", "4", "D1" ) );
493    results.add( new Tuple( "4", "d2", "4", "D1" ) );
494    results.add( new Tuple( "4", "d3", "4", "D1" ) );
495    results.add( new Tuple( null, "h1", null, "H1" ) );
496
497    handleJoins( "joininner", new InnerJoin(), results );
498    }
499
500  /**
501   * /**
502   * 1 a1
503   * 1 a2
504   * 1 a3
505   * 2 b1
506   * 3 c1
507   * 4 d1
508   * 4 d2
509   * 4 d3
510   * 5 e1
511   * 5 e2
512   * 5 e3
513   * 7 g1
514   * 7 g2
515   * 7 g3
516   * 7 g4
517   * 7 g5
518   * null h1
519   * <p/>
520   * 1 A1
521   * 1 A2
522   * 1 A3
523   * 2 B1
524   * 2 B2
525   * 2 B3
526   * 4 D1
527   * 6 F1
528   * 6 F2
529   * null H1
530   * <p/>
531   * 1  a1      1       A1
532   * 1  a1      1       A2
533   * 1  a1      1       A3
534   * 1  a2      1       A1
535   * 1  a2      1       A2
536   * 1  a2      1       A3
537   * 1  a3      1       A1
538   * 1  a3      1       A2
539   * 1  a3      1       A3
540   * 2  b1      2       B1
541   * 2  b1      2       B2
542   * 2  b1      2       B3
543   * 3  c1      null    null
544   * 4  d1      4       D1
545   * 4  d2      4       D1
546   * 4  d3      4       D1
547   * 5  e1      null    null
548   * 5  e2      null    null
549   * 5  e3      null    null
550   * null       null    6       F1
551   * null       null    6       F2
552   * 7  g1      null    null
553   * 7  g2      null    null
554   * 7  g3      null    null
555   * 7  g4      null    null
556   * 7  g5      null    null
557   * null h1  null  H1
558   *
559   * @throws Exception
560   */
561  @Test
562  public void testJoinOuter() throws Exception
563    {
564    // skip if hadoop cluster mode, outer joins don't behave the same
565    if( getPlatform().isMapReduce() && getPlatform().isUseCluster() )
566      return;
567
568    Set<Tuple> results = new HashSet<Tuple>();
569
570    results.add( new Tuple( "1", "a1", "1", "A1" ) );
571    results.add( new Tuple( "1", "a1", "1", "A2" ) );
572    results.add( new Tuple( "1", "a1", "1", "A3" ) );
573    results.add( new Tuple( "1", "a2", "1", "A1" ) );
574    results.add( new Tuple( "1", "a2", "1", "A2" ) );
575    results.add( new Tuple( "1", "a2", "1", "A3" ) );
576    results.add( new Tuple( "1", "a3", "1", "A1" ) );
577    results.add( new Tuple( "1", "a3", "1", "A2" ) );
578    results.add( new Tuple( "1", "a3", "1", "A3" ) );
579    results.add( new Tuple( "2", "b1", "2", "B1" ) );
580    results.add( new Tuple( "2", "b1", "2", "B2" ) );
581    results.add( new Tuple( "2", "b1", "2", "B3" ) );
582    results.add( new Tuple( "3", "c1", null, null ) );
583    results.add( new Tuple( "4", "d1", "4", "D1" ) );
584    results.add( new Tuple( "4", "d2", "4", "D1" ) );
585    results.add( new Tuple( "4", "d3", "4", "D1" ) );
586    results.add( new Tuple( "5", "e1", null, null ) );
587    results.add( new Tuple( "5", "e2", null, null ) );
588    results.add( new Tuple( "5", "e3", null, null ) );
589    results.add( new Tuple( null, null, "6", "F1" ) );
590    results.add( new Tuple( null, null, "6", "F2" ) );
591    results.add( new Tuple( "7", "g1", null, null ) );
592    results.add( new Tuple( "7", "g2", null, null ) );
593    results.add( new Tuple( "7", "g3", null, null ) );
594    results.add( new Tuple( "7", "g4", null, null ) );
595    results.add( new Tuple( "7", "g5", null, null ) );
596    results.add( new Tuple( null, "h1", null, "H1" ) );
597
598    handleJoins( "joinouter", new OuterJoin(), results );
599    }
600
601  /**
602   * 1 a1
603   * 1 a2
604   * 1 a3
605   * 2 b1
606   * 3 c1
607   * 4 d1
608   * 4 d2
609   * 4 d3
610   * 5 e1
611   * 5 e2
612   * 5 e3
613   * 7 g1
614   * 7 g2
615   * 7 g3
616   * 7 g4
617   * 7 g5
618   * null h1
619   * <p/>
620   * 1 A1
621   * 1 A2
622   * 1 A3
623   * 2 B1
624   * 2 B2
625   * 2 B3
626   * 4 D1
627   * 6 F1
628   * 6 F2
629   * null H1
630   * <p/>
631   * 1  a1      1       A1
632   * 1  a1      1       A2
633   * 1  a1      1       A3
634   * 1  a2      1       A1
635   * 1  a2      1       A2
636   * 1  a2      1       A3
637   * 1  a3      1       A1
638   * 1  a3      1       A2
639   * 1  a3      1       A3
640   * 2  b1      2       B1
641   * 2  b1      2       B2
642   * 2  b1      2       B3
643   * 3  c1      null    null
644   * 4  d1      4       D1
645   * 4  d2      4       D1
646   * 4  d3      4       D1
647   * 5  e1      null    null
648   * 5  e2      null    null
649   * 5  e3      null    null
650   * 7  g1      null    null
651   * 7  g2      null    null
652   * 7  g3      null    null
653   * 7  g4      null    null
654   * 7  g5      null    null
655   * null h1    null    H1
656   *
657   * @throws Exception
658   */
659  @Test
660  public void testJoinInnerOuter() throws Exception
661    {
662    Set<Tuple> results = new HashSet<Tuple>();
663
664    results.add( new Tuple( "1", "a1", "1", "A1" ) );
665    results.add( new Tuple( "1", "a1", "1", "A2" ) );
666    results.add( new Tuple( "1", "a1", "1", "A3" ) );
667    results.add( new Tuple( "1", "a2", "1", "A1" ) );
668    results.add( new Tuple( "1", "a2", "1", "A2" ) );
669    results.add( new Tuple( "1", "a2", "1", "A3" ) );
670    results.add( new Tuple( "1", "a3", "1", "A1" ) );
671    results.add( new Tuple( "1", "a3", "1", "A2" ) );
672    results.add( new Tuple( "1", "a3", "1", "A3" ) );
673    results.add( new Tuple( "2", "b1", "2", "B1" ) );
674    results.add( new Tuple( "2", "b1", "2", "B2" ) );
675    results.add( new Tuple( "2", "b1", "2", "B3" ) );
676    results.add( new Tuple( "3", "c1", null, null ) );
677    results.add( new Tuple( "4", "d1", "4", "D1" ) );
678    results.add( new Tuple( "4", "d2", "4", "D1" ) );
679    results.add( new Tuple( "4", "d3", "4", "D1" ) );
680    results.add( new Tuple( "5", "e1", null, null ) );
681    results.add( new Tuple( "5", "e2", null, null ) );
682    results.add( new Tuple( "5", "e3", null, null ) );
683    results.add( new Tuple( "7", "g1", null, null ) );
684    results.add( new Tuple( "7", "g2", null, null ) );
685    results.add( new Tuple( "7", "g3", null, null ) );
686    results.add( new Tuple( "7", "g4", null, null ) );
687    results.add( new Tuple( "7", "g5", null, null ) );
688    results.add( new Tuple( null, "h1", null, "H1" ) );
689
690    handleJoins( "joininnerouter", new LeftJoin(), results );
691    }
692
693  /**
694   * 1 a1
695   * 1 a2
696   * 1 a3
697   * 2 b1
698   * 3 c1
699   * 4 d1
700   * 4 d2
701   * 4 d3
702   * 5 e1
703   * 5 e2
704   * 5 e3
705   * 7 g1
706   * 7 g2
707   * 7 g3
708   * 7 g4
709   * 7 g5
710   * null h1
711   * <p/>
712   * 1 A1
713   * 1 A2
714   * 1 A3
715   * 2 B1
716   * 2 B2
717   * 2 B3
718   * 4 D1
719   * 6 F1
720   * 6 F2
721   * null H1
722   * <p/>
723   * 1  a1      1       A1
724   * 1  a1      1       A2
725   * 1  a1      1       A3
726   * 1  a2      1       A1
727   * 1  a2      1       A2
728   * 1  a2      1       A3
729   * 1  a3      1       A1
730   * 1  a3      1       A2
731   * 1  a3      1       A3
732   * 2  b1      2       B1
733   * 2  b1      2       B2
734   * 2  b1      2       B3
735   * 4  d1      4       D1
736   * 4  d2      4       D1
737   * 4  d3      4       D1
738   * null       null    6       F1
739   * null       null    6       F2
740   * null h1    null    H1
741   *
742   * @throws Exception
743   */
744  @Test
745  public void testJoinOuterInner() throws Exception
746    {
747    // skip if hadoop cluster mode, outer joins don't behave the same
748    if( getPlatform().isMapReduce() && getPlatform().isUseCluster() )
749      return;
750
751    Set<Tuple> results = new HashSet<Tuple>();
752
753    results.add( new Tuple( "1", "a1", "1", "A1" ) );
754    results.add( new Tuple( "1", "a1", "1", "A2" ) );
755    results.add( new Tuple( "1", "a1", "1", "A3" ) );
756    results.add( new Tuple( "1", "a2", "1", "A1" ) );
757    results.add( new Tuple( "1", "a2", "1", "A2" ) );
758    results.add( new Tuple( "1", "a2", "1", "A3" ) );
759    results.add( new Tuple( "1", "a3", "1", "A1" ) );
760    results.add( new Tuple( "1", "a3", "1", "A2" ) );
761    results.add( new Tuple( "1", "a3", "1", "A3" ) );
762    results.add( new Tuple( "2", "b1", "2", "B1" ) );
763    results.add( new Tuple( "2", "b1", "2", "B2" ) );
764    results.add( new Tuple( "2", "b1", "2", "B3" ) );
765    results.add( new Tuple( "4", "d1", "4", "D1" ) );
766    results.add( new Tuple( "4", "d2", "4", "D1" ) );
767    results.add( new Tuple( "4", "d3", "4", "D1" ) );
768    results.add( new Tuple( null, null, "6", "F1" ) );
769    results.add( new Tuple( null, null, "6", "F2" ) );
770    results.add( new Tuple( null, "h1", null, "H1" ) );
771
772    handleJoins( "joinouterinner", new RightJoin(), results );
773    }
774
775  private void handleJoins( String path, Joiner joiner, Set<Tuple> results ) throws Exception
776    {
777    getPlatform().copyFromLocal( inputFileLhsSparse );
778    getPlatform().copyFromLocal( inputFileRhsSparse );
779
780    Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class );
781    Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse );
782    Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse );
783
784    Map sources = new HashMap();
785
786    sources.put( "lower", sourceLower );
787    sources.put( "upper", sourceUpper );
788
789    Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE );
790
791    Pipe pipeLower = new Pipe( "lower" );
792    Pipe pipeUpper = new Pipe( "upper" );
793
794    Fields declaredFields = new Fields( "num", "char", "num2", "char2" );
795    Fields groupingFields = new Fields( "num" );
796
797    Pipe splice = new HashJoin( pipeLower, groupingFields, pipeUpper, groupingFields, declaredFields, joiner );
798
799    splice = new Each( splice, Fields.ALL, new Identity(), Fields.RESULTS );
800
801    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
802
803    flow.complete();
804
805    validateLength( flow, results.size() );
806
807    List<Tuple> actual = getSinkAsList( flow );
808
809    results.removeAll( actual );
810
811    assertEquals( 0, results.size() );
812    }
813
814  /**
815   * 1 a
816   * 5 b
817   * 6 c
818   * 5 b
819   * 5 e
820   * <p/>
821   * 1 A
822   * 2 B
823   * 3 C
824   * 4 D
825   * 5 E
826   * <p/>
827   * 1 a
828   * 2 b
829   * 3 c
830   * 4 d
831   * 5 e
832   * <p/>
833   * 1  a       1       A  1  a
834   * -  -   2   B  2  b
835   * -  -   3   C  3  c
836   * -  -   4   D  4  d
837   * 5  b       5   E  5  e
838   * 5  e       5   E  5  e
839   *
840   * @throws Exception
841   */
842  @Test
843  public void testJoinMixed() throws Exception
844    {
845    // skip if hadoop cluster mode, outer joins don't behave the same
846    if( getPlatform().isMapReduce() && getPlatform().isUseCluster() )
847      return;
848
849    getPlatform().copyFromLocal( inputFileLowerOffset );
850    getPlatform().copyFromLocal( inputFileLower );
851    getPlatform().copyFromLocal( inputFileUpper );
852
853    Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
854    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
855    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
856
857    Map sources = new HashMap();
858
859    sources.put( "loweroffset", sourceLowerOffset );
860    sources.put( "lower", sourceLower );
861    sources.put( "upper", sourceUpper );
862
863    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinmixed" ), SinkMode.REPLACE );
864
865    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
866
867    Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter );
868    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
869    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
870
871    Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower );
872    Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) );
873
874    MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} );
875    Pipe splice = new HashJoin( pipes, fields, Fields.size( 6 ), join );
876
877    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
878
879    flow.complete();
880
881    validateLength( flow, 6 );
882
883    Set<Tuple> results = new HashSet<Tuple>();
884
885    results.add( new Tuple( "1\ta\t1\tA\t1\ta" ) );
886    results.add( new Tuple( "null\tnull\t2\tB\t2\tb" ) );
887    results.add( new Tuple( "null\tnull\t3\tC\t3\tc" ) );
888    results.add( new Tuple( "null\tnull\t4\tD\t4\td" ) );
889    results.add( new Tuple( "5\tb\t5\tE\t5\te" ) );
890    results.add( new Tuple( "5\te\t5\tE\t5\te" ) );
891
892    List<Tuple> actual = getSinkAsList( flow );
893
894    results.removeAll( actual );
895
896    assertEquals( 0, results.size() );
897    }
898
899  @Test
900  public void testJoinDiffFields() throws Exception
901    {
902    getPlatform().copyFromLocal( inputFileLower );
903    getPlatform().copyFromLocal( inputFileUpper );
904
905    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
906    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
907
908    Map sources = new HashMap();
909
910    sources.put( "lower", sourceLower );
911    sources.put( "upper", sourceUpper );
912
913    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE );
914
915    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
916    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
917
918    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
919    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
920
921    Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
922
923    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
924
925    flow.complete();
926
927    validateLength( flow, 5 );
928
929    List<Tuple> actual = getSinkAsList( flow );
930
931    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
932    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
933    }
934
935  @Test
936  public void testJoinGroupBy() throws Exception
937    {
938    getPlatform().copyFromLocal( inputFileLower );
939    getPlatform().copyFromLocal( inputFileUpper );
940
941    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
942    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
943
944    Map sources = new HashMap();
945
946    sources.put( "lower", sourceLower );
947    sources.put( "upper", sourceUpper );
948
949    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupby" ), SinkMode.REPLACE );
950
951    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
952    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
953
954    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
955    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
956
957    Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
958
959    Pipe groupby = new GroupBy( pipe, new Fields( "numA" ) );
960
961    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, groupby );
962
963    flow.complete();
964
965    validateLength( flow, 5, null );
966
967    List<Tuple> actual = getSinkAsList( flow );
968
969    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
970    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
971    }
972
973  @Test
974  public void testJoinSamePipe() throws Exception
975    {
976    getPlatform().copyFromLocal( inputFileLower );
977
978    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
979
980    Map sources = new HashMap();
981
982    sources.put( "lower", source );
983
984    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE );
985
986    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
987
988    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
989
990    Pipe pipe = new HashJoin( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) );
991
992    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
993
994    flow.complete();
995
996    validateLength( flow, 5, null );
997
998    List<Tuple> actual = getSinkAsList( flow );
999
1000    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1001    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1002    }
1003
1004  @Test
1005  public void testJoinSamePipe2() throws Exception
1006    {
1007    getPlatform().copyFromLocal( inputFileLower );
1008
1009    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1010
1011    Map sources = new HashMap();
1012
1013    sources.put( "lower", source );
1014
1015    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE );
1016
1017    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1018
1019    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1020
1021    Pipe join = new HashJoin( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1022
1023    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
1024
1025    flow.complete();
1026
1027    validateLength( flow, 5, null );
1028
1029    List<Tuple> actual = getSinkAsList( flow );
1030
1031    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1032    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1033    }
1034
1035  @Test
1036  public void testJoinSamePipe3() throws Exception
1037    {
1038    getPlatform().copyFromLocal( inputFileLower );
1039
1040    Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
1041
1042    Map sources = new HashMap();
1043
1044    sources.put( "lower", source );
1045
1046    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE );
1047
1048    Pipe pipe = new Pipe( "lower" );
1049
1050    Pipe lhs = new Pipe( "lhs", pipe );
1051    Pipe rhs = new Pipe( "rhs", pipe );
1052
1053    Pipe join = new HashJoin( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1054
1055    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
1056
1057    flow.complete();
1058
1059    validateLength( flow, 5, null );
1060
1061    List<Tuple> actual = getSinkAsList( flow );
1062
1063    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1064    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1065    }
1066
1067  /**
1068   * Same source as rightmost
1069   * <p/>
1070   * should be a single job as the same file accumulates into the joins
1071   *
1072   * @throws Exception
1073   */
1074  @Test
1075  public void testJoinAroundJoinRightMost() throws Exception
1076    {
1077    getPlatform().copyFromLocal( inputFileLower );
1078    getPlatform().copyFromLocal( inputFileUpper );
1079
1080    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1081    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1082
1083    Map sources = new HashMap();
1084
1085    sources.put( "lower", sourceLower );
1086    sources.put( "upper1", sourceUpper );
1087    sources.put( "upper2", sourceUpper );
1088
1089    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinrightmost" ), SinkMode.REPLACE );
1090
1091    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1092
1093    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1094    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1095    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1096
1097    Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1098
1099    splice1 = new Each( splice1, new Identity() );
1100
1101    Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1102
1103    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1104
1105//    flow.writeDOT( "joinaroundrightmost.dot" );
1106
1107    if( getPlatform().isMapReduce() )
1108      assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1109
1110    flow.complete();
1111
1112    validateLength( flow, 5, null );
1113
1114    List<Tuple> actual = getSinkAsList( flow );
1115
1116    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
1117    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) );
1118    }
1119
1120  /**
1121   * Same source as leftmost
1122   *
1123   * @throws Exception
1124   */
1125  @Test
1126  public void testJoinAroundJoinLeftMost() throws Exception
1127    {
1128    getPlatform().copyFromLocal( inputFileLower );
1129    getPlatform().copyFromLocal( inputFileUpper );
1130
1131    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1132    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1133
1134    Map sources = new HashMap();
1135
1136    sources.put( "lower", sourceLower );
1137    sources.put( "upper1", sourceUpper );
1138    sources.put( "upper2", sourceUpper );
1139
1140    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinleftmost" ), SinkMode.REPLACE );
1141
1142    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1143
1144    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1145    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1146    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1147
1148    Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1149
1150    splice1 = new Each( splice1, new Identity() );
1151
1152    Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1153
1154    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1155
1156//    flow.writeDOT( "joinaroundleftmost.dot" );
1157
1158    if( getPlatform().isMapReduce() )
1159      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1160
1161    flow.complete();
1162
1163    validateLength( flow, 5, null );
1164
1165    List<Tuple> actual = getSinkAsList( flow );
1166
1167    assertTrue( actual.contains( new Tuple( "1\tA\t1\tA\t1\ta" ) ) );
1168    assertTrue( actual.contains( new Tuple( "2\tB\t2\tB\t2\tb" ) ) );
1169    }
1170
1171  /**
1172   * Upper as leftmost and rightmost forcing two jobs
1173   *
1174   * @throws Exception
1175   */
1176  @Test
1177  public void testJoinAroundJoinRightMostSwapped() throws Exception
1178    {
1179    getPlatform().copyFromLocal( inputFileLower );
1180    getPlatform().copyFromLocal( inputFileUpper );
1181
1182    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1183    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1184
1185    Map sources = new HashMap();
1186
1187    sources.put( "lower", sourceLower );
1188    sources.put( "upper1", sourceUpper );
1189    sources.put( "upper2", sourceUpper );
1190
1191    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinswapped" ), SinkMode.REPLACE );
1192
1193    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1194
1195    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1196    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1197    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1198
1199    Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1200
1201    splice1 = new Each( splice1, new Identity() );
1202
1203    // upper2 becomes leftmost, forcing a tap between the joins
1204    Pipe splice2 = new HashJoin( pipeUpper2, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1205
1206    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1207
1208    if( getPlatform().isMapReduce() )
1209      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1210
1211    flow.complete();
1212
1213    validateLength( flow, 5, null );
1214
1215    List<Tuple> actual = getSinkAsList( flow );
1216
1217    assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\tA" ) ) );
1218    assertTrue( actual.contains( new Tuple( "2\tB\t2\tb\t2\tB" ) ) );
1219    }
1220
1221  @Test
1222  public void testJoinGroupByJoin() throws Exception
1223    {
1224    getPlatform().copyFromLocal( inputFileLower );
1225    getPlatform().copyFromLocal( inputFileUpper );
1226    getPlatform().copyFromLocal( inputFileJoined );
1227
1228    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1229    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1230    Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined );
1231
1232    Map sources = new HashMap();
1233
1234    sources.put( "lower", sourceLower );
1235    sources.put( "upper", sourceUpper );
1236    sources.put( "joined", sourceJoined );
1237
1238    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupbyjoin" ), SinkMode.REPLACE );
1239
1240    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
1241    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
1242    Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" );
1243
1244    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1245    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1246    Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined );
1247
1248    Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1249
1250    pipe = new GroupBy( pipe, new Fields( "numA" ) );
1251
1252    pipe = new HashJoin( pipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) );
1253
1254    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
1255
1256    if( getPlatform().isMapReduce() )
1257      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1258
1259    flow.complete();
1260
1261    validateLength( flow, 5, null );
1262
1263    List<Tuple> actual = getSinkAsList( flow );
1264
1265    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\tA" ) ) );
1266    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tb\tB" ) ) );
1267    }
1268
1269  /**
1270   * here the same file is fed into the same HashJoin.
1271   * <p/>
1272   * This is three jobs.
1273   * <p/>
1274   * a temp tap is inserted before the accumulated branch for two reasons on the common HashJoin
1275   * <p/>
1276   * it is assumed the accumulated side is filtered down, so pushing to disk will preserve io
1277   * if accumulated side was streamed instead via a fork, only part of the file will accumulate into the HashJoin
1278   * <p/>
1279   * /-T-\ <-- accumulated
1280   * T      HJ
1281   * \---/ <-- streamed
1282   *
1283   * @throws Exception
1284   */
1285  @Test
1286  public void testJoinSameSourceIntoJoin() throws Exception
1287    {
1288    getPlatform().copyFromLocal( inputFileLower );
1289    getPlatform().copyFromLocal( inputFileUpper );
1290
1291    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1292    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1293
1294    Map sources = new HashMap();
1295
1296    sources.put( "lower", sourceLower );
1297    sources.put( "upper1", sourceUpper );
1298    sources.put( "upper2", sourceUpper );
1299
1300    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceintojoin" ), SinkMode.REPLACE );
1301
1302    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1303
1304    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1305    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1306    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1307
1308    Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1309
1310    splice1 = new Each( splice1, new Identity() );
1311
1312    Pipe splice2 = new HashJoin( pipeLower, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1313
1314    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1315
1316//    flow.writeDOT( "joinsamesourceintojoin.dot" );
1317
1318    if( getPlatform().isMapReduce() )
1319      assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );
1320
1321    flow.complete();
1322
1323    validateLength( flow, 5, null );
1324
1325    List<Tuple> actual = getSinkAsList( flow );
1326
1327    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
1328    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) );
1329    }
1330
1331  @Test
1332  public void testJoinSameSourceIntoJoinSimple() throws Exception
1333    {
1334    getPlatform().copyFromLocal( inputFileUpper );
1335
1336    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1337
1338    Map sources = new HashMap();
1339
1340    sources.put( "upper1", sourceUpper );
1341    sources.put( "upper2", sourceUpper );
1342
1343    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceintojoinsimple" ), SinkMode.REPLACE );
1344
1345    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1346
1347    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1348    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1349
1350    Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1351
1352    splice1 = new Each( splice1, new Identity() );
1353
1354    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice1 );
1355
1356//    flow.writeDOT( "joinsamesourceintojoin.dot" );
1357
1358    if( getPlatform().isMapReduce() )
1359      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1360
1361    flow.complete();
1362
1363    validateLength( flow, 5, null );
1364
1365    List<Tuple> actual = getSinkAsList( flow );
1366
1367    assertTrue( actual.contains( new Tuple( "1\tA\t1\tA" ) ) );
1368    assertTrue( actual.contains( new Tuple( "2\tB\t2\tB" ) ) );
1369    }
1370
1371  /**
1372   * Loosely tests for a deadlock when BlockingHashJoinAnnotator rule doesn't excluce the GroupBy from the blocking
1373   * annotation.
1374   * <p/>
1375   * the deadlock is random on the order of the paths traversed from the Source Tap + fork.
1376   *
1377   * @throws Exception
1378   */
1379  @Test
1380  public void testJoinSameSourceOverGroupByIntoJoinSimple() throws Exception
1381    {
1382    getPlatform().copyFromLocal( inputFileLower );
1383    getPlatform().copyFromLocal( inputFileUpper );
1384
1385    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1386
1387    Map sources = new HashMap();
1388
1389    sources.put( "upper1", sourceUpper );
1390    sources.put( "upper2", sourceUpper );
1391
1392    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceovergroupbyintojoinsimple" ), SinkMode.REPLACE );
1393
1394    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1395
1396    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1397    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1398
1399    pipeUpper1 = new GroupBy( pipeUpper1, new Fields( "num" ) );
1400    pipeUpper2 = new GroupBy( pipeUpper2, new Fields( "num" ) );
1401
1402    Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1403
1404    splice1 = new Each( splice1, new Identity() );
1405
1406    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice1 );
1407
1408    if( getPlatform().isMapReduce() )
1409      assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );
1410
1411    flow.complete();
1412
1413    validateLength( flow, 5, null );
1414
1415    List<Tuple> actual = getSinkAsList( flow );
1416
1417    assertTrue( actual.contains( new Tuple( "1\tA\t1\tA" ) ) );
1418    assertTrue( actual.contains( new Tuple( "2\tB\t2\tB" ) ) );
1419    }
1420
1421  /**
1422   * Tests that two independent streamed sources with loadable tributaries properly plan into a GroupBy
1423   * without loading unused sources
1424   *
1425   * @throws Exception
1426   */
1427  @Test
1428  public void testJoinsIntoGroupBy() throws Exception
1429    {
1430    getPlatform().copyFromLocal( inputFileLower );
1431    getPlatform().copyFromLocal( inputFileUpper );
1432
1433    getPlatform().copyFromLocal( inputFileLhs );
1434    getPlatform().copyFromLocal( inputFileRhs );
1435
1436    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1437    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1438
1439    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1440    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1441
1442    Map sources = new HashMap();
1443
1444    sources.put( "lower", sourceLower );
1445    sources.put( "upper", sourceUpper );
1446    sources.put( "lhs", sourceLhs );
1447    sources.put( "rhs", sourceRhs );
1448
1449    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintogroupby" ), SinkMode.REPLACE );
1450
1451    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1452
1453    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1454    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1455
1456    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1457    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1458
1459    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1460
1461    upperLower = new Each( upperLower, new Identity() );
1462
1463    Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1464
1465    lhsRhs = new Each( lhsRhs, new Identity() );
1466
1467    Pipe grouped = new GroupBy( "merging", Pipe.pipes( upperLower, lhsRhs ), new Fields( "num1" ) );
1468
1469    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1470
1471    if( getPlatform().isMapReduce() )
1472      assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1473
1474    flow.complete();
1475
1476    validateLength( flow, 42, null );
1477
1478    List<Tuple> actual = getSinkAsList( flow );
1479
1480    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1481    assertTrue( actual.contains( new Tuple( "5\te\t5\tE" ) ) );
1482    }
1483
1484  @Test
1485  public void testJoinSamePipeAroundGroupBy() throws Exception
1486    {
1487    getPlatform().copyFromLocal( inputFileLower );
1488
1489    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1490    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipearoundgroupby" ), SinkMode.REPLACE );
1491
1492    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1493
1494    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1495
1496    Pipe lhsPipe = new Each( new Pipe( "lhs", pipeLower ), new Identity() );
1497
1498    Pipe rhsPipe = new Each( new Pipe( "rhs", pipeLower ), new Identity() );
1499
1500    rhsPipe = new GroupBy( rhsPipe, new Fields( "num" ) );
1501
1502    rhsPipe = new Each( rhsPipe, new Identity() );
1503
1504    Pipe pipe = new HashJoin( lhsPipe, new Fields( "num" ), rhsPipe, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1505
1506    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
1507
1508    flow.complete();
1509
1510    validateLength( flow, 5, null );
1511
1512    List<Tuple> actual = getSinkAsList( flow );
1513
1514    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1515    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1516    }
1517
1518  /**
1519   * This test results in two MR jobs because one join feeds into the accumulated side of the second. A mapper
1520   * can only stream on branch at a time forcing a temp file between the mappers. see next test for swapped join
1521   *
1522   * @throws Exception
1523   */
1524  @Test
1525  public void testJoinsIntoCoGroupLhs() throws Exception
1526    {
1527    getPlatform().copyFromLocal( inputFileLower );
1528    getPlatform().copyFromLocal( inputFileUpper );
1529
1530    getPlatform().copyFromLocal( inputFileLhs );
1531    getPlatform().copyFromLocal( inputFileRhs );
1532
1533    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1534    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1535
1536    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1537    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1538
1539    Map sources = new HashMap();
1540
1541    sources.put( "lower", sourceLower );
1542    sources.put( "upper", sourceUpper );
1543    sources.put( "lhs", sourceLhs );
1544    sources.put( "rhs", sourceRhs );
1545
1546    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhs" ), SinkMode.REPLACE );
1547
1548    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1549
1550    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1551    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1552
1553    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1554    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1555
1556    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1557
1558    upperLower = new Each( upperLower, new Identity() );
1559
1560    Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1561
1562    lhsUpperLower = new Each( lhsUpperLower, new Identity() );
1563
1564    Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) );
1565
1566    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1567
1568    if( getPlatform().isMapReduce() )
1569      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1570
1571    flow.complete();
1572
1573    validateLength( flow, 37, null );
1574
1575    List<Tuple> actual = getSinkAsList( flow );
1576
1577    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta\t1\tA\t1\tA" ) ) );
1578    assertTrue( actual.contains( new Tuple( "5\ta\t5\te\t5\tE\t5\tA" ) ) );
1579    }
1580
1581  /**
1582   * This test results in one MR jobs because one join feeds into the streamed side of the second.
1583   *
1584   * @throws Exception
1585   */
1586  @Test
1587  public void testJoinsIntoCoGroupLhsSwappedJoin() throws Exception
1588    {
1589    getPlatform().copyFromLocal( inputFileLower );
1590    getPlatform().copyFromLocal( inputFileUpper );
1591
1592    getPlatform().copyFromLocal( inputFileLhs );
1593    getPlatform().copyFromLocal( inputFileRhs );
1594
1595    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1596    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1597
1598    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1599    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1600
1601    Map sources = new HashMap();
1602
1603    sources.put( "lower", sourceLower );
1604    sources.put( "upper", sourceUpper );
1605    sources.put( "lhs", sourceLhs );
1606    sources.put( "rhs", sourceRhs );
1607
1608    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhsswappedjoin" ), SinkMode.REPLACE );
1609
1610    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1611
1612    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1613    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1614
1615    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1616    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1617
1618    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1619
1620    upperLower = new Each( upperLower, new Identity() );
1621
1622    Pipe lhsUpperLower = new HashJoin( upperLower, new Fields( "numUpperLower" ), pipeLhs, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower", "numLhs", "charLhs" ) );
1623
1624    lhsUpperLower = new Each( lhsUpperLower, new Identity() );
1625
1626    Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) );
1627
1628    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1629
1630    if( getPlatform().isMapReduce() )
1631      assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1632
1633    flow.complete();
1634
1635    validateLength( flow, 37, null );
1636
1637    List<Tuple> actual = getSinkAsList( flow );
1638
1639    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) );
1640    assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) );
1641    }
1642
1643  @Test
1644  public void testJoinsIntoCoGroupRhs() throws Exception
1645    {
1646    getPlatform().copyFromLocal( inputFileLower );
1647    getPlatform().copyFromLocal( inputFileUpper );
1648
1649    getPlatform().copyFromLocal( inputFileLhs );
1650    getPlatform().copyFromLocal( inputFileRhs );
1651
1652    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1653    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1654
1655    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1656    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1657
1658    Map sources = new HashMap();
1659
1660    sources.put( "lower", sourceLower );
1661    sources.put( "upper", sourceUpper );
1662    sources.put( "lhs", sourceLhs );
1663    sources.put( "rhs", sourceRhs );
1664
1665    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouprhs" ), SinkMode.REPLACE );
1666
1667    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1668
1669    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1670    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1671
1672    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1673    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1674
1675    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1676
1677    upperLower = new Each( upperLower, new Identity() );
1678
1679    Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1680
1681    lhsUpperLower = new Each( lhsUpperLower, new Identity() );
1682
1683    Pipe grouped = new CoGroup( "cogrouping", pipeRhs, new Fields( "num" ), lhsUpperLower, new Fields( "numLhs" ) );
1684
1685    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1686
1687    if( getPlatform().isMapReduce() )
1688      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1689
1690    flow.complete();
1691
1692    validateLength( flow, 37, null );
1693
1694    List<Tuple> actual = getSinkAsList( flow );
1695
1696    assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\ta\t1\tA" ) ) );
1697    assertTrue( actual.contains( new Tuple( "5\tE\t5\te\t5\te\t5\tE" ) ) );
1698    }
1699
1700  @Test
1701  public void testJoinsIntoCoGroup() throws Exception
1702    {
1703    getPlatform().copyFromLocal( inputFileLower );
1704    getPlatform().copyFromLocal( inputFileUpper );
1705
1706    getPlatform().copyFromLocal( inputFileLhs );
1707    getPlatform().copyFromLocal( inputFileRhs );
1708
1709    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1710    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1711
1712    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1713    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1714
1715    Map sources = new HashMap();
1716
1717    sources.put( "lower", sourceLower );
1718    sources.put( "upper", sourceUpper );
1719    sources.put( "lhs", sourceLhs );
1720    sources.put( "rhs", sourceRhs );
1721
1722    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogroup" ), SinkMode.REPLACE );
1723
1724    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1725
1726    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1727    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1728
1729    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1730    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1731
1732    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower1", "charUpperLower1", "numUpperLower2", "charUpperLower2" ) );
1733
1734    upperLower = new Each( upperLower, new Identity() );
1735
1736    Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "numLhsRhs1", "charLhsRhs1", "numLhsRhs2", "charLhsRhs2" ) );
1737
1738    lhsRhs = new Each( lhsRhs, new Identity() );
1739
1740    Pipe grouped = new CoGroup( "cogrouping", upperLower, new Fields( "numUpperLower1" ), lhsRhs, new Fields( "numLhsRhs1" ) );
1741
1742    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1743
1744    if( getPlatform().isMapReduce() )
1745      assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1746
1747    flow.complete();
1748
1749    validateLength( flow, 37, null );
1750
1751    List<Tuple> actual = getSinkAsList( flow );
1752
1753    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) );
1754    assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) );
1755    }
1756
1757  public static class AllComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable
1758    {
1759
1760    @Override
1761    public int compare( Comparable lhs, Comparable rhs )
1762      {
1763      return lhs.toString().compareTo( rhs.toString() );
1764      }
1765
1766    @Override
1767    public int hashCode( Comparable value )
1768      {
1769      if( value == null )
1770        return 0;
1771
1772      return value.toString().hashCode();
1773      }
1774    }
1775
1776  /**
1777   * Tests Hasher being honored even if default comparator is null.
1778   *
1779   * @throws Exception
1780   */
1781  @Test
1782  public void testJoinWithHasher() throws Exception
1783    {
1784    getPlatform().copyFromLocal( inputFileLower );
1785    getPlatform().copyFromLocal( inputFileUpper );
1786
1787    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1788    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1789
1790    Map sources = new HashMap();
1791
1792    sources.put( "lower", sourceLower );
1793    sources.put( "upper", sourceUpper );
1794
1795    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinhasher" ), SinkMode.REPLACE );
1796
1797    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1798
1799    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1800
1801    pipeLower = new Each( pipeLower, new Fields( "num" ), new ExpressionFunction( Fields.ARGS, "Integer.parseInt( num )", String.class ), Fields.REPLACE );
1802
1803    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1804
1805    Fields num = new Fields( "num" );
1806    num.setComparator( "num", new AllComparator() );
1807
1808    Pipe splice = new HashJoin( pipeLower, num, pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
1809
1810    Map<Object, Object> properties = getProperties();
1811
1812    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
1813
1814    flow.complete();
1815
1816    validateLength( flow, 5 );
1817
1818    List<Tuple> values = getSinkAsList( flow );
1819
1820    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
1821    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
1822    }
1823
1824  @Test
1825  public void testJoinNone() throws Exception
1826    {
1827    getPlatform().copyFromLocal( inputFileLower );
1828    getPlatform().copyFromLocal( inputFileUpper );
1829
1830    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1831    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1832
1833    Map sources = new HashMap();
1834
1835    sources.put( "lower", sourceLower );
1836    sources.put( "upper", sourceUpper );
1837
1838    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE );
1839
1840    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1841
1842    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1843    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1844
1845    Pipe splice = new HashJoin( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) );
1846
1847    Map<Object, Object> properties = getProperties();
1848
1849    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
1850
1851    flow.complete();
1852
1853    validateLength( flow, 25 );
1854
1855    List<Tuple> values = getSinkAsList( flow );
1856
1857    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
1858    assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) );
1859    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
1860    }
1861
1862  @Test
1863  public void testGroupBySplitJoins() throws Exception
1864    {
1865    getPlatform().copyFromLocal( inputFileLower );
1866    getPlatform().copyFromLocal( inputFileUpper );
1867    getPlatform().copyFromLocal( inputFileJoined );
1868
1869    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1870    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1871    Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined );
1872
1873    Map sources = new HashMap();
1874
1875    sources.put( "lower", sourceLower );
1876    sources.put( "upper", sourceUpper );
1877    sources.put( "joined", sourceJoined );
1878
1879    Tap lhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE );
1880    Tap rhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE );
1881
1882    Map sinks = new HashMap();
1883
1884    sinks.put( "lhs", lhsSink );
1885    sinks.put( "rhs", rhsSink );
1886
1887    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
1888    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
1889    Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" );
1890
1891    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1892    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1893    Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined );
1894
1895    Pipe pipe = new GroupBy( pipeLower, new Fields( "numA" ) );
1896
1897    pipe = new Every( pipe, Fields.ALL, new TestIdentityBuffer( new Fields( "numA" ), 5, false ), Fields.RESULTS );
1898
1899    Pipe lhsPipe = new Each( pipe, new Identity() );
1900    lhsPipe = new HashJoin( "lhs", lhsPipe, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1901
1902    Pipe rhsPipe = new Each( pipe, new Identity() );
1903    rhsPipe = new HashJoin( "rhs", rhsPipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) );
1904
1905    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, lhsPipe, rhsPipe );
1906
1907    if( getPlatform().isMapReduce() )
1908      assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );
1909
1910    flow.complete();
1911
1912    validateLength( flow.openSink( "lhs" ), 5, null );
1913    validateLength( flow.openSink( "rhs" ), 5, null );
1914
1915    List<Tuple> lhsActual = asList( flow, lhsSink );
1916
1917    assertTrue( lhsActual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1918    assertTrue( lhsActual.contains( new Tuple( "2\tb\t2\tB" ) ) );
1919
1920    List<Tuple> rhsActual = asList( flow, rhsSink );
1921
1922    assertTrue( rhsActual.contains( new Tuple( "1\ta\t1\ta\tA" ) ) );
1923    assertTrue( rhsActual.contains( new Tuple( "2\tb\t2\tb\tB" ) ) );
1924    }
1925
1926  /**
1927   * When run against a cluster a Merge before a GroupBy can hide the streamed/accumulated nature of a branch.
1928   * <p>
1929   * The planner nw
1930   * <p>
1931   * <p/>
1932   * commented code is for troubleshooting.
1933   *
1934   * @throws Exception
1935   */
1936  @Test
1937  public void testJoinMergeGroupBy() throws Exception
1938    {
1939    getPlatform().copyFromLocal( inputFileNums10 );
1940    getPlatform().copyFromLocal( inputFileNums20 );
1941
1942    Tap lhsTap = getPlatform().getTextFile( new Fields( "id" ), inputFileNums10 );
1943    Tap rhsTap = getPlatform().getTextFile( new Fields( "id2" ), inputFileNums20 );
1944
1945    Pipe lhs = new Pipe( "lhs" );
1946    Pipe rhs = new Pipe( "rhs" );
1947
1948//    Pipe joined = new CoGroup( messages, new Fields( "id" ), people, new Fields( "id2" ) );
1949    Pipe joined = new HashJoin( lhs, new Fields( "id" ), rhs, new Fields( "id2" ) );
1950
1951    Pipe pruned = new Each( joined, new Fields( "id2" ), new Identity(), Fields.RESULTS );
1952//    pruned = new Checkpoint( pruned );
1953    Pipe merged = new Merge( pruned, rhs );
1954    Pipe grouped = new GroupBy( merged, new Fields( "id2" ) );
1955//    Pipe grouped = new GroupBy( Pipe.pipes(  pruned, people  ), new Fields( "id2" ) );
1956    Aggregator count = new Count( new Fields( "count" ) );
1957    Pipe counted = new Every( grouped, count );
1958
1959    String testJoinMerge = "testJoinMergeGroupBy/" + ( ( joined instanceof CoGroup ) ? "cogroup" : "hashjoin" );
1960    Tap sink = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", null, getOutputPath( testJoinMerge ), SinkMode.REPLACE );
1961
1962    FlowDef flowDef = FlowDef.flowDef()
1963      .setName( "join-merge" )
1964      .addSource( rhs, rhsTap )
1965      .addSource( lhs, lhsTap )
1966      .addTailSink( counted, sink );
1967
1968    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
1969
1970//    flow.writeDOT( "joinmerge.dot" );
1971//    flow.writeStepsDOT( "joinmerge-steps.dot" );
1972
1973    flow.complete();
1974
1975    validateLength( flow, 20 );
1976
1977    List<Tuple> values = getSinkAsList( flow );
1978    List<Tuple> expected = new ArrayList<Tuple>();
1979
1980    expected.add( new Tuple( "1", "2" ) );
1981    expected.add( new Tuple( "10", "2" ) );
1982    expected.add( new Tuple( "11", "1" ) );
1983    expected.add( new Tuple( "12", "1" ) );
1984    expected.add( new Tuple( "13", "1" ) );
1985    expected.add( new Tuple( "14", "1" ) );
1986    expected.add( new Tuple( "15", "1" ) );
1987    expected.add( new Tuple( "16", "1" ) );
1988    expected.add( new Tuple( "17", "1" ) );
1989    expected.add( new Tuple( "18", "1" ) );
1990    expected.add( new Tuple( "19", "1" ) );
1991    expected.add( new Tuple( "2", "2" ) );
1992    expected.add( new Tuple( "20", "1" ) );
1993    expected.add( new Tuple( "3", "2" ) );
1994    expected.add( new Tuple( "4", "2" ) );
1995    expected.add( new Tuple( "5", "2" ) );
1996    expected.add( new Tuple( "6", "2" ) );
1997    expected.add( new Tuple( "7", "2" ) );
1998    expected.add( new Tuple( "8", "2" ) );
1999    expected.add( new Tuple( "9", "2" ) );
2000
2001    Collections.sort( values );
2002    Collections.sort( expected );
2003
2004    assertEquals( expected, values );
2005    }
2006
2007  /**
2008   * Under tez, this can result in the HashJoin being duplicated across nodes for each split after the HashJoin
2009   * BoundaryBalanceJoinSplitTransformer inserts a Boundary at the split, preventing duplication of the path
2010   *
2011   * @throws Exception
2012   */
2013  @Test
2014  public void testJoinSplit() throws Exception
2015    {
2016    getPlatform().copyFromLocal( inputFileLhs );
2017    getPlatform().copyFromLocal( inputFileRhs );
2018
2019    FlowDef flowDef = FlowDef.flowDef()
2020      .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) )
2021      .addSource( "rhs", getPlatform().getTextFile( inputFileRhs ) )
2022      .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) )
2023      .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) );
2024
2025    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
2026    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
2027
2028    Pipe join = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
2029
2030    Pipe pipeLhs = new Each( new Pipe( "lhsSink", join ), new Identity() );
2031    Pipe pipeRhs = new Each( new Pipe( "rhsSink", join ), new Identity() );
2032
2033    flowDef
2034      .addTail( pipeLhs )
2035      .addTail( pipeRhs );
2036
2037    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
2038
2039    flow.complete();
2040
2041    validateLength( flow, 37, null );
2042
2043    List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) );
2044
2045    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
2046    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
2047
2048    values = asList( flow, flowDef.getSinks().get( "rhsSink" ) );
2049
2050    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
2051    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
2052    }
2053
2054  /**
2055   * catches a situation where BottomUpJoinedBoundariesNodePartitioner may capture an invalid HashJoin sub-graph
2056   * if the in-bound Boundary is split upon.
2057   */
2058  @Test
2059  public void testSameSourceJoinSplitIntoJoin() throws Exception
2060    {
2061    getPlatform().copyFromLocal( inputFileLhs );
2062    getPlatform().copyFromLocal( inputFileRhs );
2063
2064    FlowDef flowDef = FlowDef.flowDef()
2065      .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) )
2066      .addSource( "rhs", getPlatform().getTextFile( inputFileLhs ) )
2067      .addSource( "joinSecond", getPlatform().getTextFile( inputFileRhs ) )
2068      .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) )
2069      .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) );
2070
2071    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
2072    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
2073
2074    Pipe joinFirst = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
2075
2076    Pipe pipeLhs = new Each( new Pipe( "lhsSink", joinFirst ), new Identity() );
2077
2078    Pipe joinSecond = new Each( "joinSecond", new Fields( "line" ), new RegexSplitter( new Fields( "numRHSSecond", "charRHSSecond" ), " " ) );
2079
2080    joinSecond = new HashJoin( joinFirst, new Fields( "numLHS" ), joinSecond, new Fields( "numRHSSecond" ) );
2081
2082    Pipe pipeRhs = new Each( new Pipe( "rhsSink", joinSecond ), new Identity() );
2083
2084    flowDef
2085      .addTail( pipeLhs )
2086      .addTail( pipeRhs );
2087
2088    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
2089
2090    flow.complete();
2091
2092    List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) );
2093
2094    assertEquals( 37, values.size() );
2095    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
2096    assertTrue( values.contains( new Tuple( "1\ta\t1\tb" ) ) );
2097
2098    values = asList( flow, flowDef.getSinks().get( "rhsSink" ) );
2099
2100    assertEquals( 109, values.size() );
2101    assertTrue( values.contains( new Tuple( "1\ta\t1\ta\t1\tA" ) ) );
2102    assertTrue( values.contains( new Tuple( "1\ta\t1\tb\t1\tB" ) ) );
2103    }
2104
2105  /**
2106   * checks that a split after a HashJoin does not result in the HashJoin execution being duplicated across
2107   * multiple nodes, one for each branch in the split.
2108   */
2109  @Test
2110  public void testJoinSplitBeforeJoin() throws Exception
2111    {
2112    getPlatform().copyFromLocal( inputFileLhs );
2113    getPlatform().copyFromLocal( inputFileRhs );
2114
2115    FlowDef flowDef = FlowDef.flowDef()
2116      .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) )
2117      .addSource( "rhs", getPlatform().getTextFile( inputFileRhs ) )
2118      .addSource( "joinSecond", getPlatform().getTextFile( inputFileRhs ) )
2119      .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) )
2120      .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) );
2121
2122    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
2123    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
2124
2125    pipeUpper = new Checkpoint( pipeUpper );
2126
2127    HashJoin hashJoin = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
2128
2129    Pipe joinFirst = hashJoin;
2130
2131    joinFirst = new Each( joinFirst, new Identity() );
2132
2133    Pipe pipeLhs = new Each( new Pipe( "lhsSink", joinFirst ), new Identity() );
2134
2135    pipeLhs = new GroupBy( pipeLhs, new Fields( "numLHS" ) );
2136
2137    joinFirst = new Each( new Pipe( "lhsSplit", joinFirst ), new Identity() );
2138
2139    Pipe joinSecond = new Each( "joinSecond", new Fields( "line" ), new RegexSplitter( new Fields( "numRHSSecond", "charRHSSecond" ), " " ) );
2140
2141    joinSecond = new CoGroup( joinFirst, new Fields( "numLHS" ), joinSecond, new Fields( "numRHSSecond" ) );
2142
2143    Pipe pipeRhs = new Each( new Pipe( "rhsSink", joinSecond ), new Identity() );
2144
2145    flowDef
2146      .addTail( pipeLhs )
2147      .addTail( pipeRhs );
2148
2149    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
2150
2151    if( getPlatform().isDAG() )
2152      {
2153      FlowStep flowStep = (FlowStep) flow.getFlowSteps().get( 0 );
2154      List<ElementGraph> elementGraphs = flowStep.getFlowNodeGraph().getElementGraphs( hashJoin );
2155
2156      assertEquals( 1, elementGraphs.size() );
2157      }
2158
2159    flow.complete();
2160
2161    List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) );
2162
2163    assertEquals( 37, values.size() );
2164    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
2165    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
2166
2167    values = asList( flow, flowDef.getSinks().get( "rhsSink" ) );
2168
2169    assertEquals( 109, values.size() );
2170    assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
2171    assertTrue( values.contains( new Tuple( "1\ta\t1\tB\t1\tB" ) ) );
2172    }
2173
2174  @Test
2175  public void testGroupBySplitGroupByJoin() throws Exception
2176    {
2177    getPlatform().copyFromLocal( inputFileLower );
2178
2179    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
2180
2181    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE );
2182
2183    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
2184
2185    Pipe pipeFirst = new Pipe( "first" );
2186    pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter );
2187    pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) );
2188    pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL );
2189
2190    Pipe pipeSecond = new Pipe( "second", pipeFirst );
2191    pipeSecond = new Each( pipeSecond, new Identity() );
2192    pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) );
2193    pipeSecond = new Every( pipeSecond, new Fields( "firstFirst" ), new First( new Fields( "secondFirst" ) ), Fields.ALL );
2194    pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) );
2195    pipeSecond = new Every( pipeSecond, new Fields( "secondFirst" ), new First( new Fields( "thirdFirst" ) ), Fields.ALL );
2196
2197    Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) );
2198
2199    Flow flow = getPlatform().getFlowConnector().connect( source, sink, splice );
2200
2201    flow.complete();
2202
2203    validateLength( flow, 5, null );
2204
2205    List<Tuple> values = getSinkAsList( flow );
2206
2207    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
2208    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
2209    assertTrue( values.contains( new Tuple( "3\tc\t3\tc" ) ) );
2210    assertTrue( values.contains( new Tuple( "4\td\t4\td" ) ) );
2211    assertTrue( values.contains( new Tuple( "5\te\t5\te" ) ) );
2212    }
2213
2214  @Test
2215  public void testGroupBySplitSplitGroupByJoin() throws Exception
2216    {
2217    getPlatform().copyFromLocal( inputFileLower );
2218
2219    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
2220
2221    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE );
2222
2223    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
2224
2225    Pipe pipeFirst = new Pipe( "first" );
2226    pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter );
2227    pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) );
2228    pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL );
2229
2230    Pipe pipeSecond = new Pipe( "second", pipeFirst );
2231    pipeSecond = new Each( pipeSecond, new Identity() );
2232    pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) );
2233    pipeSecond = new Every( pipeSecond, new Fields( "firstFirst" ), new First( new Fields( "secondFirst" ) ), Fields.ALL );
2234
2235    Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) );
2236//    Pipe splice = new HashJoin( pipeSecond, new Fields( "num" ), pipeFirst, new Fields( "num" ), Fields.size( 4 ) );
2237
2238    splice = new HashJoin( splice, new Fields( 0 ), pipeSecond, new Fields( "num" ), Fields.size( 6 ) );
2239
2240    Flow flow = getPlatform().getFlowConnector().connect( source, sink, splice );
2241
2242    flow.complete();
2243
2244    validateLength( flow, 5, null );
2245
2246    List<Tuple> values = getSinkAsList( flow );
2247
2248    assertTrue( values.contains( new Tuple( "1\ta\t1\ta\t1\ta" ) ) );
2249    assertTrue( values.contains( new Tuple( "2\tb\t2\tb\t2\tb" ) ) );
2250    assertTrue( values.contains( new Tuple( "3\tc\t3\tc\t3\tc" ) ) );
2251    assertTrue( values.contains( new Tuple( "4\td\t4\td\t4\td" ) ) );
2252    assertTrue( values.contains( new Tuple( "5\te\t5\te\t5\te" ) ) );
2253    }
2254
2255  @Test
2256  public void testGroupBySplitAroundSplitGroupByJoin() throws Exception
2257    {
2258    getPlatform().copyFromLocal( inputFileLower );
2259
2260    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
2261
2262    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE );
2263    Tap sink2 = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink2" ), SinkMode.REPLACE );
2264
2265    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
2266
2267    Pipe pipeInit = new Pipe( "init" );
2268    Pipe pipeFirst = new Pipe( "first", pipeInit );
2269    pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter );
2270    pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) );
2271    pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL );
2272
2273    Pipe sink2Pipe = new Pipe( "sink2", pipeFirst );
2274
2275    Pipe pipeSecond = new Pipe( "second", pipeInit );
2276    pipeSecond = new Each( pipeSecond, new Fields( "line" ), splitter );
2277    pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) );
2278    pipeSecond = new Every( pipeSecond, new Fields( "char" ), new First( new Fields( "secondFirst" ) ), Fields.ALL );
2279
2280//    Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) );
2281    Pipe splice = new HashJoin( pipeSecond, new Fields( "num" ), pipeFirst, new Fields( "num" ), Fields.size( 4 ) );
2282
2283    Pipe pipeThird = new Pipe( "third", pipeSecond );
2284    pipeThird = new Each( pipeThird, new Identity() );
2285    pipeThird = new GroupBy( pipeThird, new Fields( "num" ) );
2286    pipeThird = new Every( pipeThird, new Fields( "secondFirst" ), new First( new Fields( "thirdFirst" ) ), Fields.ALL );
2287
2288    splice = new HashJoin( splice, new Fields( 0 ), pipeThird, new Fields( "num" ), Fields.size( 6 ) );
2289
2290    FlowDef flowDef = FlowDef.flowDef()
2291      .setName( splice.getName() )
2292      .addSource( "init", source )
2293      .addTailSink( splice, sink )
2294      .addTailSink( sink2Pipe, sink2 );
2295
2296    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
2297
2298    flow.complete();
2299
2300    validateLength( flow, 5, null );
2301
2302    List<Tuple> values = getSinkAsList( flow );
2303
2304    assertTrue( values.contains( new Tuple( "1\ta\t1\ta\t1\ta" ) ) );
2305    assertTrue( values.contains( new Tuple( "2\tb\t2\tb\t2\tb" ) ) );
2306    assertTrue( values.contains( new Tuple( "3\tc\t3\tc\t3\tc" ) ) );
2307    assertTrue( values.contains( new Tuple( "4\td\t4\td\t4\td" ) ) );
2308    assertTrue( values.contains( new Tuple( "5\te\t5\te\t5\te" ) ) );
2309    }
2310
2311  /**
2312   * This test checks for a deadlock when the same input is forked, adapted on one edge, then hashjoined back together.
2313   *
2314   * @throws Exception
2315   */
2316  @Test
2317  public void testForkThenJoin() throws Exception
2318    {
2319    getPlatform().copyFromLocal( inputFileLower );
2320    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
2321
2322    Map sources = new HashMap();
2323
2324    sources.put( "lower", sourceLower );
2325
2326    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );
2327
2328    Function splitter = new RegexSplitter( new Fields( "num", "text" ), " " );
2329
2330    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
2331    Pipe pipeUpper = new Each( new Pipe( "upper", pipeLower ), new Fields( "text" ),
2332      new ExpressionFunction( Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class ),
2333      Fields.REPLACE );
2334
2335    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
2336
2337    Map<Object, Object> properties = getProperties();
2338
2339    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
2340
2341    flow.complete();
2342
2343    validateLength( flow, 5 );
2344
2345    List<Tuple> values = getSinkAsList( flow );
2346
2347    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
2348    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
2349    }
2350
2351  /**
2352   * This test checks for a deadlock when the same input is forked, adapted on one edge, then hashjoined back together.
2353   *
2354   * @throws Exception
2355   */
2356  @Test
2357  public void testForkCoGroupThenHashJoin() throws Exception
2358    {
2359    getPlatform().copyFromLocal( inputFileLower );
2360    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
2361    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
2362
2363    Map sources = new HashMap();
2364
2365    sources.put( "sourceLower", sourceLower );
2366    sources.put( "sourceUpper", sourceUpper );
2367
2368    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );
2369
2370    Function splitter = new RegexSplitter( new Fields( "num", "text" ), " " );
2371
2372    Pipe leftPipeLower = new Each( new Pipe( "sourceLower" ), new Fields( "line" ), splitter );
2373    Pipe rightPipeUpper = new Each( new Pipe( "sourceUpper" ), new Fields( "line" ), splitter );
2374
2375    Pipe leftPipeUpper = new Each( new Pipe( "leftUpper", leftPipeLower ), new Fields( "text" ),
2376      new ExpressionFunction( Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class ),
2377      Fields.REPLACE );
2378    Pipe rightPipeLower = new Each( new Pipe( "rightLower", rightPipeUpper ), new Fields( "text" ),
2379      new ExpressionFunction( Fields.ARGS, "text.toLowerCase(java.util.Locale.ROOT)", String.class ),
2380      Fields.REPLACE );
2381
2382    leftPipeUpper = new GroupBy( leftPipeUpper, new Fields( "num" ) );
2383    rightPipeLower = new GroupBy( rightPipeLower, new Fields( "num" ) );
2384
2385    Pipe middleSplice = new CoGroup( "middleCoGroup", leftPipeUpper, new Fields( "num" ), rightPipeLower, new Fields( "num" ), new Fields( "numM1", "charM1", "numM2", "charM2" ) );
2386
2387    Pipe leftSplice = new HashJoin( leftPipeLower, new Fields( "num" ), middleSplice, new Fields( "numM1" ) );
2388
2389    Map<Object, Object> properties = getProperties();
2390
2391    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, leftSplice );
2392
2393    flow.complete();
2394
2395    validateLength( flow, 5 );
2396
2397    List<Tuple> values = getSinkAsList( flow );
2398    // that the flow completes at all is already success.
2399    assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\ta" ) ) );
2400    assertTrue( values.contains( new Tuple( "2\tb\t2\tB\t2\tb" ) ) );
2401    }
2402
2403  /**
2404   * This test checks for a deadlock when the same input is forked, adapted on one edge, cogroup with something,
2405   * then hashjoined back together.
2406   *
2407   * @throws Exception
2408   */
2409  @Test
2410  public void testForkCoGroupThenHashJoinCoGroupAgain() throws Exception
2411    {
2412    getPlatform().copyFromLocal( inputFileLower );
2413    getPlatform().copyFromLocal( inputFileUpper );
2414
2415    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
2416    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
2417
2418    Map sources = new HashMap();
2419
2420    sources.put( "sourceLower", sourceLower );
2421    sources.put( "sourceUpper", sourceUpper );
2422
2423    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );
2424
2425    Function splitter = new RegexSplitter( new Fields( "num", "text" ), " " );
2426
2427    Pipe leftPipeLower = new Each( new Pipe( "sourceLower" ), new Fields( "line" ), splitter );
2428    Pipe rightPipeUpper = new Each( new Pipe( "sourceUpper" ), new Fields( "line" ), splitter );
2429
2430    Pipe leftPipeUpper = new Each( new Pipe( "leftUpper", leftPipeLower ), new Fields( "text" ),
2431      new ExpressionFunction( Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class ),
2432      Fields.REPLACE );
2433    Pipe rightPipeLower = new Each( new Pipe( "rightLower", rightPipeUpper ), new Fields( "text" ),
2434      new ExpressionFunction( Fields.ARGS, "text.toLowerCase(java.util.Locale.ROOT)", String.class ),
2435      Fields.REPLACE );
2436
2437    leftPipeUpper = new GroupBy( leftPipeUpper, new Fields( "num" ) );
2438    rightPipeLower = new GroupBy( rightPipeLower, new Fields( "num" ) );
2439
2440    Pipe middleSplice = new CoGroup( "middleCoGroup", leftPipeUpper, new Fields( "num" ), rightPipeLower, new Fields( "num" ), new Fields( "numM1", "charM1", "numM2", "charM2" ) );
2441
2442    Pipe leftSplice = new HashJoin( leftPipeLower, new Fields( "num" ), middleSplice, new Fields( "numM1" ) );
2443    Pipe rightSplice = new HashJoin( rightPipeUpper, new Fields( "num" ), middleSplice, new Fields( "numM2" ) );
2444
2445    leftSplice = new Rename( leftSplice, new Fields( "num", "text", "numM1", "charM1", "numM2", "charM2" ), new Fields( "numL1", "charL1", "numM1L", "charM1L", "numM2L", "charM2L" ) );
2446    rightSplice = new Rename( rightSplice, new Fields( "num", "text", "numM1", "charM1", "numM2", "charM2" ), new Fields( "numR1", "charR1", "numM1R", "charM1R", "numM2R", "charM2R" ) );
2447
2448    leftSplice = new GroupBy( leftSplice, new Fields( "numM1L" ) );
2449    rightSplice = new GroupBy( rightSplice, new Fields( "numM2R" ) );
2450
2451    Pipe splice = new CoGroup( "cogrouping", leftSplice, new Fields( "numM1L" ), rightSplice, new Fields( "numM2R" ) );
2452
2453    Map<Object, Object> properties = getProperties();
2454
2455    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
2456
2457    flow.complete();
2458
2459    validateLength( flow, 5 );
2460
2461    List<Tuple> values = getSinkAsList( flow );
2462
2463    // getting this far is a success already (past old deadlocks)
2464    assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA\t1\tA\t1\ta" ) ) );
2465    assertTrue( values.contains( new Tuple( "2\tb\t2\tB\t2\tb\t2\tB\t2\tB\t2\tb" ) ) );
2466    }
2467  }