001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading;
022
023import java.io.IOException;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029
030import cascading.cascade.Cascades;
031import cascading.flow.Flow;
032import cascading.flow.FlowConnectorProps;
033import cascading.flow.FlowDef;
034import cascading.flow.FlowProps;
035import cascading.operation.Function;
036import cascading.operation.Identity;
037import cascading.operation.Insert;
038import cascading.operation.aggregator.Count;
039import cascading.operation.aggregator.First;
040import cascading.operation.regex.RegexFilter;
041import cascading.operation.regex.RegexSplitGenerator;
042import cascading.operation.regex.RegexSplitter;
043import cascading.pipe.CoGroup;
044import cascading.pipe.Each;
045import cascading.pipe.Every;
046import cascading.pipe.GroupBy;
047import cascading.pipe.Pipe;
048import cascading.pipe.assembly.Discard;
049import cascading.pipe.assembly.Rename;
050import cascading.pipe.assembly.Retain;
051import cascading.pipe.joiner.InnerJoin;
052import cascading.pipe.joiner.Joiner;
053import cascading.pipe.joiner.LeftJoin;
054import cascading.pipe.joiner.MixedJoin;
055import cascading.pipe.joiner.OuterJoin;
056import cascading.pipe.joiner.RightJoin;
057import cascading.tap.SinkMode;
058import cascading.tap.Tap;
059import cascading.tuple.Fields;
060import cascading.tuple.Tuple;
061import cascading.util.NullNotEquivalentComparator;
062import org.junit.Test;
063
064import static data.InputData.*;
065
066public class CoGroupFieldedPipesPlatformTest extends PlatformTestCase
067  {
068  public CoGroupFieldedPipesPlatformTest()
069    {
070    super( true, 4, 1 ); // leave cluster testing enabled
071    }
072
073  @Test
074  public void testCross() throws Exception
075    {
076    getPlatform().copyFromLocal( inputFileLhs );
077    getPlatform().copyFromLocal( inputFileRhs );
078
079    Map sources = new HashMap();
080
081    sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) );
082    sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) );
083
084    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE );
085
086    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
087    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
088
089    Pipe cross = new CoGroup( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
090
091    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross );
092
093    flow.complete();
094
095    validateLength( flow, 37, null );
096
097    List<Tuple> values = getSinkAsList( flow );
098
099    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
100    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
101    }
102
103  @Test
104  public void testCoGroup() throws Exception
105    {
106    getPlatform().copyFromLocal( inputFileLower );
107    getPlatform().copyFromLocal( inputFileUpper );
108
109    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
110    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
111
112    Map sources = new HashMap();
113
114    sources.put( "lower", sourceLower );
115    sources.put( "upper", sourceUpper );
116
117    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroup" ), SinkMode.REPLACE );
118
119    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
120
121    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
122    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
123
124    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new InnerJoin( Fields.size( 4 ) ) );
125
126    Map<Object, Object> properties = getProperties();
127
128    // make sure hasher is getting called, but does nothing special
129    FlowProps.setDefaultTupleElementComparator( properties, getPlatform().getStringComparator( false ).getClass().getCanonicalName() );
130
131    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
132
133    flow.complete();
134
135    validateLength( flow, 5 );
136
137    List<Tuple> values = getSinkAsList( flow );
138
139    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
140    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
141    }
142
143  @Test
144  public void testCoGroupSamePipeName() throws Exception
145    {
146    getPlatform().copyFromLocal( inputFileLower );
147    getPlatform().copyFromLocal( inputFileUpper );
148
149    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
150    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
151
152    Map sources = new HashMap();
153
154    sources.put( "lower", sourceLower );
155    sources.put( "upper", sourceUpper );
156
157    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE );
158
159    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
160
161    Pipe pipeLower = new Pipe( "lower" );
162    Pipe pipeUpper = new Pipe( "upper" );
163
164    // these pipes will hide the source name, and could cause one to be lost
165    pipeLower = new Pipe( "same", pipeLower );
166    pipeUpper = new Pipe( "same", pipeUpper );
167
168    pipeLower = new Each( pipeLower, new Fields( "line" ), splitter );
169    pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter );
170
171//    pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
172//    pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
173
174    pipeLower = new Pipe( "left", pipeLower );
175    pipeUpper = new Pipe( "right", pipeUpper );
176
177//    pipeLower = new Each( pipeLower, new Debug( true ) );
178//    pipeUpper = new Each( pipeUpper, new Debug( true ) );
179
180    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
181
182//    splice = new Each( splice, new Debug( true ) );
183    splice = new Pipe( "splice", splice );
184    splice = new Pipe( "tail", splice );
185
186    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
187
188    flow.complete();
189
190    validateLength( flow, 5 );
191
192    List<Tuple> values = getSinkAsList( flow );
193
194    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
195    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
196    }
197
198  @Test
199  public void testCoGroupWithUnknowns() throws Exception
200    {
201    getPlatform().copyFromLocal( inputFileLower );
202    getPlatform().copyFromLocal( inputFileUpper );
203
204    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
205    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
206
207    Map sources = new HashMap();
208
209    sources.put( "lower", sourceLower );
210    sources.put( "upper", sourceUpper );
211
212    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE );
213
214    Function splitter = new RegexSplitter( Fields.UNKNOWN, " " );
215
216    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
217    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
218
219    Pipe splice = new CoGroup( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) );
220
221    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
222
223    flow.complete();
224
225    validateLength( flow, 5 );
226
227    List<Tuple> values = getSinkAsList( flow );
228
229    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
230    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
231    }
232
233  /**
234   * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with
235   * a new stream using an outerjoin.
236   *
237   * @throws Exception
238   */
239  @Test
240  public void testCoGroupFilteredBranch() throws Exception
241    {
242    getPlatform().copyFromLocal( inputFileLower );
243    getPlatform().copyFromLocal( inputFileUpper );
244
245    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
246    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
247
248    Map sources = new HashMap();
249
250    sources.put( "lower", sourceLower );
251    sources.put( "upper", sourceUpper );
252
253    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupfilteredbranch" ), SinkMode.REPLACE );
254
255    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
256
257    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
258    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
259    pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all
260    pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
261
262    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() );
263
264    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
265
266    flow.complete();
267
268    validateLength( flow, 5 );
269
270    List<Tuple> values = getSinkAsList( flow );
271
272    assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) );
273    assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) );
274    }
275
276  @Test
277  public void testCoGroupSelf() throws Exception
278    {
279    getPlatform().copyFromLocal( inputFileLower );
280
281    // intentionally creating multiple instances that are equivalent
282    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
283    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
284
285    Map sources = new HashMap();
286
287    sources.put( "lower", sourceLower );
288    sources.put( "upper", sourceUpper );
289
290    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupself" ), SinkMode.REPLACE );
291
292    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
293
294    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
295    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
296
297    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
298
299    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
300
301    flow.complete();
302
303    validateLength( flow, 5 );
304
305    List<Tuple> values = getSinkAsList( flow );
306
307    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
308    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
309    }
310
311  @Test
312  public void testSplitCoGroupSelf() throws Exception
313    {
314    getPlatform().copyFromLocal( inputFileLower );
315
316    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
317
318    Map sources = new HashMap();
319
320    sources.put( "lowerLhs", source );
321    sources.put( "upperLhs", source );
322    sources.put( "lowerRhs", source );
323    sources.put( "upperRhs", source );
324
325    Tap sinkLhs = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "splitcogroupself/lhs" ), SinkMode.REPLACE );
326    Tap sinkRhs = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "splitcogroupself/rhs" ), SinkMode.REPLACE );
327
328    Map sinks = new HashMap();
329
330    sinks.put( "lhs", sinkLhs );
331    sinks.put( "rhs", sinkRhs );
332
333    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
334
335    Pipe pipeLowerLhs = new Each( new Pipe( "lowerLhs" ), new Fields( "line" ), splitter );
336    Pipe pipeUpperLhs = new Each( new Pipe( "upperLhs" ), new Fields( "line" ), splitter );
337
338    Pipe spliceLhs = new CoGroup( "lhs", pipeLowerLhs, new Fields( "num" ), pipeUpperLhs, new Fields( "num" ), Fields.size( 4 ) );
339
340    Pipe pipeLowerRhs = new Each( new Pipe( "lowerRhs" ), new Fields( "line" ), splitter );
341    Pipe pipeUpperRhs = new Each( new Pipe( "upperRhs" ), new Fields( "line" ), splitter );
342
343    Pipe spliceRhs = new CoGroup( "rhs", pipeLowerRhs, new Fields( "num" ), pipeUpperRhs, new Fields( "num" ), Fields.size( 4 ) );
344
345    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, spliceLhs, spliceRhs );
346
347    flow.complete();
348
349    List<Tuple> values = asList( flow, sinkLhs );
350
351    assertEquals( 5, values.size() );
352    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
353    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
354
355    values = asList( flow, sinkRhs );
356
357    assertEquals( 5, values.size() );
358    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
359    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
360    }
361
362  /**
363   * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join
364   *
365   * @throws Exception when
366   */
367  @Test
368  public void testCoGroupAfterEvery() throws Exception
369    {
370    getPlatform().copyFromLocal( inputFileLower );
371    getPlatform().copyFromLocal( inputFileUpper );
372
373    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
374    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
375
376    Map sources = new HashMap();
377
378    sources.put( "lower", sourceLower );
379    sources.put( "upper", sourceUpper );
380
381    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE );
382
383    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
384
385    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
386    pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
387    pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL );
388
389    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
390    pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
391    pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL );
392
393    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
394
395    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
396
397    flow.complete();
398
399    validateLength( flow, 5, null );
400
401    List<Tuple> values = getSinkAsList( flow );
402
403    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
404    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
405    }
406
407  /**
408   * Tests that CoGroup properly resolves fields when following an Every
409   *
410   * @throws Exception
411   */
412  @Test
413  public void testCoGroupAfterEveryNoDeclared() throws Exception
414    {
415    getPlatform().copyFromLocal( inputFileLower );
416    getPlatform().copyFromLocal( inputFileUpper );
417
418    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
419    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
420
421    Map sources = new HashMap();
422
423    sources.put( "lower", sourceLower );
424    sources.put( "upper", sourceUpper );
425
426    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "aftereverynodeclared" ), SinkMode.REPLACE );
427
428    Function splitter1 = new RegexSplitter( new Fields( "num1", "char1" ), " " );
429
430    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter1 );
431    pipeLower = new Each( pipeLower, new Insert( new Fields( "one", "two", "three", "four" ), "one", "two", "three", "four" ), Fields.ALL );
432    pipeLower = new GroupBy( pipeLower, new Fields( "num1" ) );
433    pipeLower = new Every( pipeLower, new Fields( "char1" ), new First(), Fields.ALL );
434
435    Function splitter2 = new RegexSplitter( new Fields( "num2", "char2" ), " " );
436
437    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter2 );
438    pipeUpper = new GroupBy( pipeUpper, new Fields( "num2" ) );
439    pipeUpper = new Every( pipeUpper, new Fields( "char2" ), new First(), Fields.ALL );
440
441    Pipe splice = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) );
442
443    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
444
445    flow.complete();
446
447    validateLength( flow, 5, null );
448
449    List<Tuple> values = getSinkAsList( flow );
450
451    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
452    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
453    }
454
455  @Test
456  public void testCoGroupInnerSingleField() throws Exception
457    {
458    getPlatform().copyFromLocal( inputFileLowerOffset );
459    getPlatform().copyFromLocal( inputFileUpper );
460
461    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
462    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
463
464    Map sources = new HashMap();
465
466    sources.put( "lower", sourceLower );
467    sources.put( "upper", sourceUpper );
468
469    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupinnersingle" ), SinkMode.REPLACE );
470
471    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) );
472    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) );
473
474    Pipe join = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) );
475
476    join = new Every( join, new Count() );
477
478//    join = new Each( join, new Debug( true ) );
479
480    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
481
482    flow.complete();
483
484    validateLength( flow, 2, null );
485
486    Set<Tuple> results = new HashSet<Tuple>();
487
488    results.add( new Tuple( "1\t1\t1" ) );
489    results.add( new Tuple( "5\t5\t2" ) );
490
491    List<Tuple> actual = getSinkAsList( flow );
492
493    results.removeAll( actual );
494
495    assertEquals( 0, results.size() );
496    }
497
498  /**
499   * 1 a1
500   * 1 a2
501   * 1 a3
502   * 2 b1
503   * 3 c1
504   * 4 d1
505   * 4 d2
506   * 4 d3
507   * 5 e1
508   * 5 e2
509   * 5 e3
510   * 7 g1
511   * 7 g2
512   * 7 g3
513   * 7 g4
514   * 7 g5
515   * null h1
516   * <p/>
517   * 1 A1
518   * 1 A2
519   * 1 A3
520   * 2 B1
521   * 2 B2
522   * 2 B3
523   * 4 D1
524   * 6 F1
525   * 6 F2
526   * null H1
527   * <p/>
528   * 1  a1      1       A1
529   * 1  a1      1       A2
530   * 1  a1      1       A3
531   * 1  a2      1       A1
532   * 1  a2      1       A2
533   * 1  a2      1       A3
534   * 1  a3      1       A1
535   * 1  a3      1       A2
536   * 1  a3      1       A3
537   * 2  b1      2       B1
538   * 2  b1      2       B2
539   * 2  b1      2       B3
540   * 4  d1      4       D1
541   * 4  d2      4       D1
542   * 4  d3      4       D1
543   * null h1  null  H1
544   *
545   * @throws Exception
546   */
547  @Test
548  public void testCoGroupInner() throws Exception
549    {
550    HashSet<Tuple> results = new HashSet<Tuple>();
551
552    results.add( new Tuple( "1", "a1", "1", "A1" ) );
553    results.add( new Tuple( "1", "a1", "1", "A2" ) );
554    results.add( new Tuple( "1", "a1", "1", "A3" ) );
555    results.add( new Tuple( "1", "a2", "1", "A1" ) );
556    results.add( new Tuple( "1", "a2", "1", "A2" ) );
557    results.add( new Tuple( "1", "a2", "1", "A3" ) );
558    results.add( new Tuple( "1", "a3", "1", "A1" ) );
559    results.add( new Tuple( "1", "a3", "1", "A2" ) );
560    results.add( new Tuple( "1", "a3", "1", "A3" ) );
561    results.add( new Tuple( "2", "b1", "2", "B1" ) );
562    results.add( new Tuple( "2", "b1", "2", "B2" ) );
563    results.add( new Tuple( "2", "b1", "2", "B3" ) );
564    results.add( new Tuple( "4", "d1", "4", "D1" ) );
565    results.add( new Tuple( "4", "d2", "4", "D1" ) );
566    results.add( new Tuple( "4", "d3", "4", "D1" ) );
567    results.add( new Tuple( null, "h1", null, "H1" ) );
568
569    handleJoins( "cogroupinner", new InnerJoin(), results, 8, false, null );
570    handleJoins( "cogroupinner-resultgroup", new InnerJoin(), results, 8, true, null );
571    }
572
573  /**
574   * 1 a1
575   * 1 a2
576   * 1 a3
577   * 2 b1
578   * 3 c1
579   * 4 d1
580   * 4 d2
581   * 4 d3
582   * 5 e1
583   * 5 e2
584   * 5 e3
585   * 7 g1
586   * 7 g2
587   * 7 g3
588   * 7 g4
589   * 7 g5
590   * null h1
591   * <p/>
592   * 1 A1
593   * 1 A2
594   * 1 A3
595   * 2 B1
596   * 2 B2
597   * 2 B3
598   * 4 D1
599   * 6 F1
600   * 6 F2
601   * null H1
602   * <p/>
603   * 1  a1      1       A1
604   * 1  a1      1       A2
605   * 1  a1      1       A3
606   * 1  a2      1       A1
607   * 1  a2      1       A2
608   * 1  a2      1       A3
609   * 1  a3      1       A1
610   * 1  a3      1       A2
611   * 1  a3      1       A3
612   * 2  b1      2       B1
613   * 2  b1      2       B2
614   * 2  b1      2       B3
615   * 4  d1      4       D1
616   * 4  d2      4       D1
617   * 4  d3      4       D1
618   *
619   * @throws Exception
620   */
621  @Test
622  public void testCoGroupInnerNull() throws Exception
623    {
624    HashSet<Tuple> results = new HashSet<Tuple>();
625
626    results.add( new Tuple( "1", "a1", "1", "A1" ) );
627    results.add( new Tuple( "1", "a1", "1", "A2" ) );
628    results.add( new Tuple( "1", "a1", "1", "A3" ) );
629    results.add( new Tuple( "1", "a2", "1", "A1" ) );
630    results.add( new Tuple( "1", "a2", "1", "A2" ) );
631    results.add( new Tuple( "1", "a2", "1", "A3" ) );
632    results.add( new Tuple( "1", "a3", "1", "A1" ) );
633    results.add( new Tuple( "1", "a3", "1", "A2" ) );
634    results.add( new Tuple( "1", "a3", "1", "A3" ) );
635    results.add( new Tuple( "2", "b1", "2", "B1" ) );
636    results.add( new Tuple( "2", "b1", "2", "B2" ) );
637    results.add( new Tuple( "2", "b1", "2", "B3" ) );
638    results.add( new Tuple( "4", "d1", "4", "D1" ) );
639    results.add( new Tuple( "4", "d2", "4", "D1" ) );
640    results.add( new Tuple( "4", "d3", "4", "D1" ) );
641
642    handleJoins( "cogroupinnernull", new InnerJoin(), results, 9, false, new NullNotEquivalentComparator() );
643    handleJoins( "cogroupinnernull-resultgroup", new InnerJoin(), results, 9, true, new NullNotEquivalentComparator() );
644    }
645
646  /**
647   * 1 a1
648   * 1 a2
649   * 1 a3
650   * 2 b1
651   * 3 c1
652   * 4 d1
653   * 4 d2
654   * 4 d3
655   * 5 e1
656   * 5 e2
657   * 5 e3
658   * 7 g1
659   * 7 g2
660   * 7 g3
661   * 7 g4
662   * 7 g5
663   * null h1
664   * <p/>
665   * 1 A1
666   * 1 A2
667   * 1 A3
668   * 2 B1
669   * 2 B2
670   * 2 B3
671   * 4 D1
672   * 6 F1
673   * 6 F2
674   * null H1
675   * <p/>
676   * 1  a1      1       A1
677   * 1  a1      1       A2
678   * 1  a1      1       A3
679   * 1  a2      1       A1
680   * 1  a2      1       A2
681   * 1  a2      1       A3
682   * 1  a3      1       A1
683   * 1  a3      1       A2
684   * 1  a3      1       A3
685   * 2  b1      2       B1
686   * 2  b1      2       B2
687   * 2  b1      2       B3
688   * 3  c1      null    null
689   * 4  d1      4       D1
690   * 4  d2      4       D1
691   * 4  d3      4       D1
692   * 5  e1      null    null
693   * 5  e2      null    null
694   * 5  e3      null    null
695   * null       null    6       F1
696   * null       null    6       F2
697   * 7  g1      null    null
698   * 7  g2      null    null
699   * 7  g3      null    null
700   * 7  g4      null    null
701   * 7  g5      null    null
702   * null h1  null  H1
703   *
704   * @throws Exception
705   */
706  @Test
707  public void testCoGroupOuter() throws Exception
708    {
709    Set<Tuple> results = new HashSet<Tuple>();
710
711    results.add( new Tuple( "1", "a1", "1", "A1" ) );
712    results.add( new Tuple( "1", "a1", "1", "A2" ) );
713    results.add( new Tuple( "1", "a1", "1", "A3" ) );
714    results.add( new Tuple( "1", "a2", "1", "A1" ) );
715    results.add( new Tuple( "1", "a2", "1", "A2" ) );
716    results.add( new Tuple( "1", "a2", "1", "A3" ) );
717    results.add( new Tuple( "1", "a3", "1", "A1" ) );
718    results.add( new Tuple( "1", "a3", "1", "A2" ) );
719    results.add( new Tuple( "1", "a3", "1", "A3" ) );
720    results.add( new Tuple( "2", "b1", "2", "B1" ) );
721    results.add( new Tuple( "2", "b1", "2", "B2" ) );
722    results.add( new Tuple( "2", "b1", "2", "B3" ) );
723    results.add( new Tuple( "3", "c1", null, null ) );
724    results.add( new Tuple( "4", "d1", "4", "D1" ) );
725    results.add( new Tuple( "4", "d2", "4", "D1" ) );
726    results.add( new Tuple( "4", "d3", "4", "D1" ) );
727    results.add( new Tuple( "5", "e1", null, null ) );
728    results.add( new Tuple( "5", "e2", null, null ) );
729    results.add( new Tuple( "5", "e3", null, null ) );
730    results.add( new Tuple( null, null, "6", "F1" ) );
731    results.add( new Tuple( null, null, "6", "F2" ) );
732    results.add( new Tuple( "7", "g1", null, null ) );
733    results.add( new Tuple( "7", "g2", null, null ) );
734    results.add( new Tuple( "7", "g3", null, null ) );
735    results.add( new Tuple( "7", "g4", null, null ) );
736    results.add( new Tuple( "7", "g5", null, null ) );
737    results.add( new Tuple( null, "h1", null, "H1" ) );
738
739    handleJoins( "cogroupouter", new OuterJoin(), results, 8, false, null );
740    handleJoins( "cogroupouter-resultgroup", new OuterJoin(), results, 8, true, null );
741    }
742
743  /**
744   * 1 a1
745   * 1 a2
746   * 1 a3
747   * 2 b1
748   * 3 c1
749   * 4 d1
750   * 4 d2
751   * 4 d3
752   * 5 e1
753   * 5 e2
754   * 5 e3
755   * 7 g1
756   * 7 g2
757   * 7 g3
758   * 7 g4
759   * 7 g5
760   * null h1
761   * <p/>
762   * 1 A1
763   * 1 A2
764   * 1 A3
765   * 2 B1
766   * 2 B2
767   * 2 B3
768   * 4 D1
769   * 6 F1
770   * 6 F2
771   * null H1
772   * <p/>
773   * 1  a1      1       A1
774   * 1  a1      1       A2
775   * 1  a1      1       A3
776   * 1  a2      1       A1
777   * 1  a2      1       A2
778   * 1  a2      1       A3
779   * 1  a3      1       A1
780   * 1  a3      1       A2
781   * 1  a3      1       A3
782   * 2  b1      2       B1
783   * 2  b1      2       B2
784   * 2  b1      2       B3
785   * 3  c1      null    null
786   * 4  d1      4       D1
787   * 4  d2      4       D1
788   * 4  d3      4       D1
789   * 5  e1      null    null
790   * 5  e2      null    null
791   * 5  e3      null    null
792   * null       null    6       F1
793   * null       null    6       F2
794   * 7  g1      null    null
795   * 7  g2      null    null
796   * 7  g3      null    null
797   * 7  g4      null    null
798   * 7  g5      null    null
799   * null h1  null  null
800   * null null  null  H1
801   *
802   * @throws Exception
803   */
804  @Test
805  public void testCoGroupOuterNull() throws Exception
806    {
807    Set<Tuple> results = new HashSet<Tuple>();
808
809    results.add( new Tuple( "1", "a1", "1", "A1" ) );
810    results.add( new Tuple( "1", "a1", "1", "A2" ) );
811    results.add( new Tuple( "1", "a1", "1", "A3" ) );
812    results.add( new Tuple( "1", "a2", "1", "A1" ) );
813    results.add( new Tuple( "1", "a2", "1", "A2" ) );
814    results.add( new Tuple( "1", "a2", "1", "A3" ) );
815    results.add( new Tuple( "1", "a3", "1", "A1" ) );
816    results.add( new Tuple( "1", "a3", "1", "A2" ) );
817    results.add( new Tuple( "1", "a3", "1", "A3" ) );
818    results.add( new Tuple( "2", "b1", "2", "B1" ) );
819    results.add( new Tuple( "2", "b1", "2", "B2" ) );
820    results.add( new Tuple( "2", "b1", "2", "B3" ) );
821    results.add( new Tuple( "3", "c1", null, null ) );
822    results.add( new Tuple( "4", "d1", "4", "D1" ) );
823    results.add( new Tuple( "4", "d2", "4", "D1" ) );
824    results.add( new Tuple( "4", "d3", "4", "D1" ) );
825    results.add( new Tuple( "5", "e1", null, null ) );
826    results.add( new Tuple( "5", "e2", null, null ) );
827    results.add( new Tuple( "5", "e3", null, null ) );
828    results.add( new Tuple( null, null, "6", "F1" ) );
829    results.add( new Tuple( null, null, "6", "F2" ) );
830    results.add( new Tuple( "7", "g1", null, null ) );
831    results.add( new Tuple( "7", "g2", null, null ) );
832    results.add( new Tuple( "7", "g3", null, null ) );
833    results.add( new Tuple( "7", "g4", null, null ) );
834    results.add( new Tuple( "7", "g5", null, null ) );
835    results.add( new Tuple( null, "h1", null, null ) );
836    results.add( new Tuple( null, null, null, "H1" ) );
837
838    handleJoins( "cogroupouternull", new OuterJoin(), results, 9, false, new NullNotEquivalentComparator() );
839    handleJoins( "cogroupouternull-resultgroup", new OuterJoin(), results, 9, true, new NullNotEquivalentComparator() );
840    }
841
842  /**
843   * 1 a1
844   * 1 a2
845   * 1 a3
846   * 2 b1
847   * 3 c1
848   * 4 d1
849   * 4 d2
850   * 4 d3
851   * 5 e1
852   * 5 e2
853   * 5 e3
854   * 7 g1
855   * 7 g2
856   * 7 g3
857   * 7 g4
858   * 7 g5
859   * null h1
860   * <p/>
861   * 1 A1
862   * 1 A2
863   * 1 A3
864   * 2 B1
865   * 2 B2
866   * 2 B3
867   * 4 D1
868   * 6 F1
869   * 6 F2
870   * null H1
871   * <p/>
872   * 1  a1      1       A1
873   * 1  a1      1       A2
874   * 1  a1      1       A3
875   * 1  a2      1       A1
876   * 1  a2      1       A2
877   * 1  a2      1       A3
878   * 1  a3      1       A1
879   * 1  a3      1       A2
880   * 1  a3      1       A3
881   * 2  b1      2       B1
882   * 2  b1      2       B2
883   * 2  b1      2       B3
884   * 3  c1      null    null
885   * 4  d1      4       D1
886   * 4  d2      4       D1
887   * 4  d3      4       D1
888   * 5  e1      null    null
889   * 5  e2      null    null
890   * 5  e3      null    null
891   * 7  g1      null    null
892   * 7  g2      null    null
893   * 7  g3      null    null
894   * 7  g4      null    null
895   * 7  g5      null    null
896   * null h1    null    H1
897   *
898   * @throws Exception
899   */
900  @Test
901  public void testCoGroupInnerOuter() throws Exception
902    {
903    Set<Tuple> results = new HashSet<Tuple>();
904
905    results.add( new Tuple( "1", "a1", "1", "A1" ) );
906    results.add( new Tuple( "1", "a1", "1", "A2" ) );
907    results.add( new Tuple( "1", "a1", "1", "A3" ) );
908    results.add( new Tuple( "1", "a2", "1", "A1" ) );
909    results.add( new Tuple( "1", "a2", "1", "A2" ) );
910    results.add( new Tuple( "1", "a2", "1", "A3" ) );
911    results.add( new Tuple( "1", "a3", "1", "A1" ) );
912    results.add( new Tuple( "1", "a3", "1", "A2" ) );
913    results.add( new Tuple( "1", "a3", "1", "A3" ) );
914    results.add( new Tuple( "2", "b1", "2", "B1" ) );
915    results.add( new Tuple( "2", "b1", "2", "B2" ) );
916    results.add( new Tuple( "2", "b1", "2", "B3" ) );
917    results.add( new Tuple( "3", "c1", null, null ) );
918    results.add( new Tuple( "4", "d1", "4", "D1" ) );
919    results.add( new Tuple( "4", "d2", "4", "D1" ) );
920    results.add( new Tuple( "4", "d3", "4", "D1" ) );
921    results.add( new Tuple( "5", "e1", null, null ) );
922    results.add( new Tuple( "5", "e2", null, null ) );
923    results.add( new Tuple( "5", "e3", null, null ) );
924    results.add( new Tuple( "7", "g1", null, null ) );
925    results.add( new Tuple( "7", "g2", null, null ) );
926    results.add( new Tuple( "7", "g3", null, null ) );
927    results.add( new Tuple( "7", "g4", null, null ) );
928    results.add( new Tuple( "7", "g5", null, null ) );
929    results.add( new Tuple( null, "h1", null, "H1" ) );
930
931    handleJoins( "cogroupinnerouter", new LeftJoin(), results, 8, false, null );
932    handleJoins( "cogroupinnerouter-resultgroup", new LeftJoin(), results, 8, true, null );
933    }
934
935  /**
936   * 1 a1
937   * 1 a2
938   * 1 a3
939   * 2 b1
940   * 3 c1
941   * 4 d1
942   * 4 d2
943   * 4 d3
944   * 5 e1
945   * 5 e2
946   * 5 e3
947   * 7 g1
948   * 7 g2
949   * 7 g3
950   * 7 g4
951   * 7 g5
952   * null h1
953   * <p/>
954   * 1 A1
955   * 1 A2
956   * 1 A3
957   * 2 B1
958   * 2 B2
959   * 2 B3
960   * 4 D1
961   * 6 F1
962   * 6 F2
963   * null H1
964   * <p/>
965   * 1  a1      1       A1
966   * 1  a1      1       A2
967   * 1  a1      1       A3
968   * 1  a2      1       A1
969   * 1  a2      1       A2
970   * 1  a2      1       A3
971   * 1  a3      1       A1
972   * 1  a3      1       A2
973   * 1  a3      1       A3
974   * 2  b1      2       B1
975   * 2  b1      2       B2
976   * 2  b1      2       B3
977   * 3  c1      null    null
978   * 4  d1      4       D1
979   * 4  d2      4       D1
980   * 4  d3      4       D1
981   * 5  e1      null    null
982   * 5  e2      null    null
983   * 5  e3      null    null
984   * 7  g1      null    null
985   * 7  g2      null    null
986   * 7  g3      null    null
987   * 7  g4      null    null
988   * 7  g5      null    null
989   * null h1    null    null
990   *
991   * @throws Exception
992   */
993  @Test
994  public void testCoGroupInnerOuterNull() throws Exception
995    {
996    Set<Tuple> results = new HashSet<Tuple>();
997
998    results.add( new Tuple( "1", "a1", "1", "A1" ) );
999    results.add( new Tuple( "1", "a1", "1", "A2" ) );
1000    results.add( new Tuple( "1", "a1", "1", "A3" ) );
1001    results.add( new Tuple( "1", "a2", "1", "A1" ) );
1002    results.add( new Tuple( "1", "a2", "1", "A2" ) );
1003    results.add( new Tuple( "1", "a2", "1", "A3" ) );
1004    results.add( new Tuple( "1", "a3", "1", "A1" ) );
1005    results.add( new Tuple( "1", "a3", "1", "A2" ) );
1006    results.add( new Tuple( "1", "a3", "1", "A3" ) );
1007    results.add( new Tuple( "2", "b1", "2", "B1" ) );
1008    results.add( new Tuple( "2", "b1", "2", "B2" ) );
1009    results.add( new Tuple( "2", "b1", "2", "B3" ) );
1010    results.add( new Tuple( "3", "c1", null, null ) );
1011    results.add( new Tuple( "4", "d1", "4", "D1" ) );
1012    results.add( new Tuple( "4", "d2", "4", "D1" ) );
1013    results.add( new Tuple( "4", "d3", "4", "D1" ) );
1014    results.add( new Tuple( "5", "e1", null, null ) );
1015    results.add( new Tuple( "5", "e2", null, null ) );
1016    results.add( new Tuple( "5", "e3", null, null ) );
1017    results.add( new Tuple( "7", "g1", null, null ) );
1018    results.add( new Tuple( "7", "g2", null, null ) );
1019    results.add( new Tuple( "7", "g3", null, null ) );
1020    results.add( new Tuple( "7", "g4", null, null ) );
1021    results.add( new Tuple( "7", "g5", null, null ) );
1022    results.add( new Tuple( null, "h1", null, null ) );
1023
1024    handleJoins( "cogroupinnerouternull", new LeftJoin(), results, 9, false, new NullNotEquivalentComparator() );
1025    handleJoins( "cogroupinnerouternull-resultgroup", new LeftJoin(), results, 9, true, new NullNotEquivalentComparator() );
1026    }
1027
1028  /**
1029   * 1 a1
1030   * 1 a2
1031   * 1 a3
1032   * 2 b1
1033   * 3 c1
1034   * 4 d1
1035   * 4 d2
1036   * 4 d3
1037   * 5 e1
1038   * 5 e2
1039   * 5 e3
1040   * 7 g1
1041   * 7 g2
1042   * 7 g3
1043   * 7 g4
1044   * 7 g5
1045   * null h1
1046   * <p/>
1047   * 1 A1
1048   * 1 A2
1049   * 1 A3
1050   * 2 B1
1051   * 2 B2
1052   * 2 B3
1053   * 4 D1
1054   * 6 F1
1055   * 6 F2
1056   * null H1
1057   * <p/>
1058   * 1  a1      1       A1
1059   * 1  a1      1       A2
1060   * 1  a1      1       A3
1061   * 1  a2      1       A1
1062   * 1  a2      1       A2
1063   * 1  a2      1       A3
1064   * 1  a3      1       A1
1065   * 1  a3      1       A2
1066   * 1  a3      1       A3
1067   * 2  b1      2       B1
1068   * 2  b1      2       B2
1069   * 2  b1      2       B3
1070   * 4  d1      4       D1
1071   * 4  d2      4       D1
1072   * 4  d3      4       D1
1073   * null       null    6       F1
1074   * null       null    6       F2
1075   * null h1    null    H1
1076   *
1077   * @throws Exception
1078   */
1079  @Test
1080  public void testCoGroupOuterInner() throws Exception
1081    {
1082    Set<Tuple> results = new HashSet<Tuple>();
1083
1084    results.add( new Tuple( "1", "a1", "1", "A1" ) );
1085    results.add( new Tuple( "1", "a1", "1", "A2" ) );
1086    results.add( new Tuple( "1", "a1", "1", "A3" ) );
1087    results.add( new Tuple( "1", "a2", "1", "A1" ) );
1088    results.add( new Tuple( "1", "a2", "1", "A2" ) );
1089    results.add( new Tuple( "1", "a2", "1", "A3" ) );
1090    results.add( new Tuple( "1", "a3", "1", "A1" ) );
1091    results.add( new Tuple( "1", "a3", "1", "A2" ) );
1092    results.add( new Tuple( "1", "a3", "1", "A3" ) );
1093    results.add( new Tuple( "2", "b1", "2", "B1" ) );
1094    results.add( new Tuple( "2", "b1", "2", "B2" ) );
1095    results.add( new Tuple( "2", "b1", "2", "B3" ) );
1096    results.add( new Tuple( "4", "d1", "4", "D1" ) );
1097    results.add( new Tuple( "4", "d2", "4", "D1" ) );
1098    results.add( new Tuple( "4", "d3", "4", "D1" ) );
1099    results.add( new Tuple( null, null, "6", "F1" ) );
1100    results.add( new Tuple( null, null, "6", "F2" ) );
1101    results.add( new Tuple( null, "h1", null, "H1" ) );
1102
1103    handleJoins( "cogroupouterinner", new RightJoin(), results, 8, false, null );
1104    handleJoins( "cogroupouterinner-resultgroup", new RightJoin(), results, 8, true, null );
1105    }
1106
1107  /**
1108   * 1 a1
1109   * 1 a2
1110   * 1 a3
1111   * 2 b1
1112   * 3 c1
1113   * 4 d1
1114   * 4 d2
1115   * 4 d3
1116   * 5 e1
1117   * 5 e2
1118   * 5 e3
1119   * 7 g1
1120   * 7 g2
1121   * 7 g3
1122   * 7 g4
1123   * 7 g5
1124   * null h1
1125   * <p/>
1126   * 1 A1
1127   * 1 A2
1128   * 1 A3
1129   * 2 B1
1130   * 2 B2
1131   * 2 B3
1132   * 4 D1
1133   * 6 F1
1134   * 6 F2
1135   * null H1
1136   * <p/>
1137   * 1  a1      1       A1
1138   * 1  a1      1       A2
1139   * 1  a1      1       A3
1140   * 1  a2      1       A1
1141   * 1  a2      1       A2
1142   * 1  a2      1       A3
1143   * 1  a3      1       A1
1144   * 1  a3      1       A2
1145   * 1  a3      1       A3
1146   * 2  b1      2       B1
1147   * 2  b1      2       B2
1148   * 2  b1      2       B3
1149   * 4  d1      4       D1
1150   * 4  d2      4       D1
1151   * 4  d3      4       D1
1152   * null       null    6       F1
1153   * null       null    6       F2
1154   * null null  null    H1
1155   *
1156   * @throws Exception
1157   */
1158  @Test
1159  public void testCoGroupOuterInnerNull() throws Exception
1160    {
1161    Set<Tuple> results = new HashSet<Tuple>();
1162
1163    results.add( new Tuple( "1", "a1", "1", "A1" ) );
1164    results.add( new Tuple( "1", "a1", "1", "A2" ) );
1165    results.add( new Tuple( "1", "a1", "1", "A3" ) );
1166    results.add( new Tuple( "1", "a2", "1", "A1" ) );
1167    results.add( new Tuple( "1", "a2", "1", "A2" ) );
1168    results.add( new Tuple( "1", "a2", "1", "A3" ) );
1169    results.add( new Tuple( "1", "a3", "1", "A1" ) );
1170    results.add( new Tuple( "1", "a3", "1", "A2" ) );
1171    results.add( new Tuple( "1", "a3", "1", "A3" ) );
1172    results.add( new Tuple( "2", "b1", "2", "B1" ) );
1173    results.add( new Tuple( "2", "b1", "2", "B2" ) );
1174    results.add( new Tuple( "2", "b1", "2", "B3" ) );
1175    results.add( new Tuple( "4", "d1", "4", "D1" ) );
1176    results.add( new Tuple( "4", "d2", "4", "D1" ) );
1177    results.add( new Tuple( "4", "d3", "4", "D1" ) );
1178    results.add( new Tuple( null, null, "6", "F1" ) );
1179    results.add( new Tuple( null, null, "6", "F2" ) );
1180    results.add( new Tuple( null, null, null, "H1" ) );
1181
1182    handleJoins( "cogroupouterinnernull", new RightJoin(), results, 9, false, new NullNotEquivalentComparator() );
1183    handleJoins( "cogroupouterinnernull-resultgroup", new RightJoin(), results, 9, true, new NullNotEquivalentComparator() );
1184    }
1185
1186  private void handleJoins( String path, Joiner joiner, Set<Tuple> results, int numGroups, boolean useResultGroupFields, NullNotEquivalentComparator comparator ) throws Exception
1187    {
1188    results = new HashSet<Tuple>( results );
1189
1190    getPlatform().copyFromLocal( inputFileLhsSparse );
1191    getPlatform().copyFromLocal( inputFileRhsSparse );
1192
1193    Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class );
1194    Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse );
1195    Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse );
1196
1197    Map sources = new HashMap();
1198
1199    sources.put( "lower", sourceLower );
1200    sources.put( "upper", sourceUpper );
1201
1202    Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE );
1203
1204    Pipe pipeLower = new Pipe( "lower" );
1205    Pipe pipeUpper = new Pipe( "upper" );
1206
1207    Fields declaredFields = new Fields( "num", "char", "num2", "char2" );
1208
1209    Fields groupFields = new Fields( "num" );
1210
1211    if( comparator != null )
1212      groupFields.setComparator( 0, comparator );
1213
1214    Pipe splice;
1215    if( useResultGroupFields )
1216      splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, new Fields( "num", "num2" ), joiner );
1217    else
1218      splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, joiner );
1219
1220    splice = new Every( splice, Fields.ALL, new TestIdentityBuffer( new Fields( "num", "num2" ), numGroups, true ), Fields.RESULTS );
1221
1222    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
1223
1224    flow.complete();
1225
1226    validateLength( flow, results.size() );
1227
1228    List<Tuple> actual = getSinkAsList( flow );
1229
1230    results.removeAll( actual );
1231
1232    assertEquals( 0, results.size() );
1233    }
1234
1235  /**
1236   * 1 a
1237   * 5 b
1238   * 6 c
1239   * 5 b
1240   * 5 e
1241   * <p/>
1242   * 1 A
1243   * 2 B
1244   * 3 C
1245   * 4 D
1246   * 5 E
1247   * <p/>
1248   * 1 a
1249   * 2 b
1250   * 3 c
1251   * 4 d
1252   * 5 e
1253   * <p/>
1254   * 1  a       1       A  1  a
1255   * -  -   2   B  2  b
1256   * -  -   3   C  3  c
1257   * -  -   4   D  4  d
1258   * 5  b       5   E  5  e
1259   * 5  e       5   E  5  e
1260   *
1261   * @throws Exception
1262   */
1263  @Test
1264  public void testCoGroupMixed() throws Exception
1265    {
1266    getPlatform().copyFromLocal( inputFileLowerOffset );
1267    getPlatform().copyFromLocal( inputFileLower );
1268    getPlatform().copyFromLocal( inputFileUpper );
1269
1270    Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
1271    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1272    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1273
1274    Map sources = new HashMap();
1275
1276    sources.put( "loweroffset", sourceLowerOffset );
1277    sources.put( "lower", sourceLower );
1278    sources.put( "upper", sourceUpper );
1279
1280    Tap sink = getPlatform().getDelimitedFile( Fields.size( 6, String.class ), "\t", getOutputPath( "cogroupmixed" ), SinkMode.REPLACE );
1281
1282    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1283
1284    Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter );
1285    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1286    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1287
1288    Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower );
1289    Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) );
1290
1291    MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} );
1292    Pipe splice = new CoGroup( pipes, fields, Fields.size( 6 ), join );
1293
1294    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
1295
1296    flow.complete();
1297
1298    validateLength( flow, 6 );
1299
1300    Set<Tuple> results = new HashSet<Tuple>();
1301
1302    results.add( new Tuple( "1", "a", "1", "A", "1", "a" ) );
1303    results.add( new Tuple( null, null, "2", "B", "2", "b" ) );
1304    results.add( new Tuple( null, null, "3", "C", "3", "c" ) );
1305    results.add( new Tuple( null, null, "4", "D", "4", "d" ) );
1306    results.add( new Tuple( "5", "b", "5", "E", "5", "e" ) );
1307    results.add( new Tuple( "5", "e", "5", "E", "5", "e" ) );
1308
1309    List<Tuple> actual = getSinkAsList( flow );
1310
1311    results.removeAll( actual );
1312
1313    assertEquals( 0, results.size() );
1314    }
1315
1316  @Test
1317  public void testCoGroupDiffFields() throws Exception
1318    {
1319    getPlatform().copyFromLocal( inputFileLower );
1320    getPlatform().copyFromLocal( inputFileUpper );
1321
1322    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1323    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1324
1325    Map sources = new HashMap();
1326
1327    sources.put( "lower", sourceLower );
1328    sources.put( "upper", sourceUpper );
1329
1330    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE );
1331
1332    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
1333    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
1334
1335    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1336    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1337
1338    Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1339
1340    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1341
1342    flow.complete();
1343
1344    validateLength( flow, 5 );
1345
1346    List<Tuple> actual = getSinkAsList( flow );
1347
1348    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1349    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
1350    }
1351
1352  @Test
1353  public void testCoGroupGroupBy() throws Exception
1354    {
1355    getPlatform().copyFromLocal( inputFileLower );
1356    getPlatform().copyFromLocal( inputFileUpper );
1357
1358    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1359    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1360
1361    Map sources = new HashMap();
1362
1363    sources.put( "lower", sourceLower );
1364    sources.put( "upper", sourceUpper );
1365
1366    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupgroupby" ), SinkMode.REPLACE );
1367
1368    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
1369    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
1370
1371    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1372    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1373
1374    Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1375
1376    Pipe groupby = new GroupBy( cogroup, new Fields( "numA" ) );
1377
1378    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, groupby );
1379
1380    flow.complete();
1381
1382    validateLength( flow, 5, null );
1383
1384    List<Tuple> actual = getSinkAsList( flow );
1385
1386    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1387    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
1388    }
1389
1390  @Test
1391  public void testCoGroupSamePipe() throws Exception
1392    {
1393    getPlatform().copyFromLocal( inputFileLower );
1394
1395    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1396
1397    Map sources = new HashMap();
1398
1399    sources.put( "lower", source );
1400
1401    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE );
1402
1403    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1404
1405    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1406
1407    Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) );
1408
1409    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1410
1411    flow.complete();
1412
1413    validateLength( flow, 5, null );
1414
1415    List<Tuple> actual = getSinkAsList( flow );
1416
1417    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1418    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1419    }
1420
1421  @Test
1422  public void testCoGroupSamePipe2() throws Exception
1423    {
1424    getPlatform().copyFromLocal( inputFileLower );
1425
1426    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1427
1428    Map sources = new HashMap();
1429
1430    sources.put( "lower", source );
1431
1432    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE );
1433
1434    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1435
1436    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1437
1438    Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1439
1440    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1441
1442    flow.complete();
1443
1444    validateLength( flow, 5, null );
1445
1446    List<Tuple> actual = getSinkAsList( flow );
1447
1448    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1449    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1450    }
1451
1452  @Test
1453  public void testCoGroupSamePipe3() throws Exception
1454    {
1455    getPlatform().copyFromLocal( inputFileLower );
1456
1457    Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
1458
1459    Map sources = new HashMap();
1460
1461    sources.put( "lower", source );
1462
1463    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE );
1464
1465    Pipe pipe = new Pipe( "lower" );
1466
1467    Pipe lhs = new Pipe( "lhs", pipe );
1468    Pipe rhs = new Pipe( "rhs", pipe );
1469
1470    Pipe cogroup = new CoGroup( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1471
1472    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1473
1474    flow.complete();
1475
1476    validateLength( flow, 5, null );
1477
1478    List<Tuple> actual = getSinkAsList( flow );
1479
1480    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1481    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1482    }
1483
1484  @Test
1485  public void testCoGroupAroundCoGroup() throws Exception
1486    {
1487    getPlatform().copyFromLocal( inputFileLower );
1488    getPlatform().copyFromLocal( inputFileUpper );
1489
1490    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1491    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1492
1493    Map sources = new HashMap();
1494
1495    sources.put( "lower", sourceLower );
1496    sources.put( "upper1", sourceUpper );
1497    sources.put( "upper2", sourceUpper );
1498
1499    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupacogroup" ), SinkMode.REPLACE );
1500
1501    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1502
1503    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1504    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1505    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1506
1507    Pipe splice1 = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1508
1509    splice1 = new Each( splice1, new Identity() );
1510
1511    Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1512
1513    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1514
1515    flow.complete();
1516
1517    validateLength( flow, 5, null );
1518
1519    List<Tuple> actual = getSinkAsList( flow );
1520
1521    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
1522    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) );
1523    }
1524
1525  @Test
1526  public void testCoGroupAroundCoGroupWithout() throws Exception
1527    {
1528    runCoGroupAroundCoGroup( null, "cogroupacogroupopt1" );
1529    }
1530
1531  @Test
1532  public void testCoGroupAroundCoGroupWith() throws Exception
1533    {
1534    // hack to get classname
1535    runCoGroupAroundCoGroup( getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 ).getScheme().getClass(), "cogroupacogroupopt2" );
1536    }
1537
1538  private void runCoGroupAroundCoGroup( Class schemeClass, String stringPath ) throws IOException
1539    {
1540    getPlatform().copyFromLocal( inputFileNums20 );
1541    getPlatform().copyFromLocal( inputFileNums10 );
1542
1543    Tap source10 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 );
1544    Tap source20 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums20 );
1545
1546    Map sources = new HashMap();
1547
1548    sources.put( "source20", source20 );
1549    sources.put( "source101", source10 );
1550    sources.put( "source102", source10 );
1551
1552    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( stringPath ), SinkMode.REPLACE );
1553
1554    Pipe pipeNum20 = new Pipe( "source20" );
1555    Pipe pipeNum101 = new Pipe( "source101" );
1556    Pipe pipeNum102 = new Pipe( "source102" );
1557
1558    Pipe splice1 = new CoGroup( pipeNum20, new Fields( "num" ), pipeNum101, new Fields( "num" ), new Fields( "num1", "num2" ) );
1559
1560    Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeNum102, new Fields( "num" ), new Fields( "num1", "num2", "num3" ) );
1561
1562    splice2 = new Each( splice2, new Identity() );
1563
1564    Map<Object, Object> properties = getPlatform().getProperties();
1565
1566    if( getPlatform().isMapReduce() )
1567      FlowConnectorProps.setIntermediateSchemeClass( properties, schemeClass );
1568
1569    Flow flow = getPlatform().getFlowConnector( properties ).connect( "cogroupopt", sources, sink, splice2 );
1570
1571    if( getPlatform().isMapReduce() )
1572      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1573
1574    flow.complete();
1575
1576    validateLength( flow, 10 );
1577
1578    List<Tuple> actual = getSinkAsList( flow );
1579
1580    assertTrue( actual.contains( new Tuple( "1\t1\t1" ) ) );
1581    assertTrue( actual.contains( new Tuple( "10\t10\t10" ) ) );
1582    }
1583
1584  @Test
1585  public void testCoGroupDiffFieldsSameFile() throws Exception
1586    {
1587    getPlatform().copyFromLocal( inputFileLower );
1588    getPlatform().copyFromLocal( inputFileUpper );
1589
1590    Tap sourceOffsetLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1591    Tap sourceLower = getPlatform().getTextFile( new Fields( "line" ), inputFileLower );
1592
1593    Map sources = new HashMap();
1594
1595    sources.put( "offsetLower", sourceOffsetLower );
1596    sources.put( "lower", sourceLower );
1597
1598    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samefiledifffields" ), SinkMode.REPLACE );
1599
1600    Function splitterLower = new RegexSplitter( new Fields( "numA", "left" ), " " );
1601    Function splitterUpper = new RegexSplitter( new Fields( "numB", "right" ), " " );
1602
1603    Pipe offsetLower = new Pipe( "offsetLower" );
1604    offsetLower = new Discard( offsetLower, new Fields( "offset" ) );
1605    offsetLower = new Each( offsetLower, new Fields( "line" ), splitterLower );
1606
1607    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterUpper );
1608
1609    Pipe cogroup = new CoGroup( offsetLower, new Fields( "numA" ), pipeLower, new Fields( "numB" ) );
1610
1611    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1612
1613    flow.complete();
1614
1615    validateLength( flow, 5 );
1616
1617    List<Tuple> actual = getSinkAsList( flow );
1618
1619    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1620    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1621    }
1622
1623  @Test
1624  public void testJoinNone() throws Exception
1625    {
1626    getPlatform().copyFromLocal( inputFileLower );
1627    getPlatform().copyFromLocal( inputFileUpper );
1628
1629    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1630    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1631
1632    Map sources = new HashMap();
1633
1634    sources.put( "lower", sourceLower );
1635    sources.put( "upper", sourceUpper );
1636
1637    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE );
1638
1639    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1640
1641    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1642    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1643
1644    Pipe splice = new CoGroup( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) );
1645
1646    Map<Object, Object> properties = getProperties();
1647
1648    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
1649
1650    flow.complete();
1651
1652    validateLength( flow, 25 );
1653
1654    List<Tuple> values = getSinkAsList( flow );
1655
1656    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
1657    assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) );
1658    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
1659    }
1660
1661  @Test
1662  public void testMultiJoin() throws Exception
1663    {
1664    getPlatform().copyFromLocal( inputFileCrossX2 );
1665
1666    Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCrossX2 );
1667    Tap innerSink = getPlatform().getTextFile( getOutputPath( "inner" ), SinkMode.REPLACE );
1668    Tap outerSink = getPlatform().getTextFile( getOutputPath( "outer" ), SinkMode.REPLACE );
1669    Tap leftSink = getPlatform().getTextFile( getOutputPath( "left" ), SinkMode.REPLACE );
1670    Tap rightSink = getPlatform().getTextFile( getOutputPath( "right" ), SinkMode.REPLACE );
1671
1672    Pipe uniques = new Pipe( "unique" );
1673
1674    uniques = new Each( uniques, new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1675
1676    uniques = new GroupBy( uniques, new Fields( "word" ) );
1677
1678    uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE );
1679
1680//    uniques = new Each( uniques, new Debug( true ) );
1681
1682    Pipe fielded = new Pipe( "fielded" );
1683
1684    fielded = new Each( fielded, new Fields( "line" ), new RegexSplitter( "\\s" ) );
1685
1686//    fielded = new Each( fielded, new Debug( true ) );
1687
1688    Pipe inner = new CoGroup( "inner", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new InnerJoin() );
1689    Pipe outer = new CoGroup( "outer", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new OuterJoin() );
1690    Pipe left = new CoGroup( "left", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new LeftJoin() );
1691    Pipe right = new CoGroup( "right", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new RightJoin() );
1692
1693    Pipe[] heads = Pipe.pipes( uniques, fielded );
1694    Map<String, Tap> sources = Cascades.tapsMap( heads, Tap.taps( source, source ) );
1695
1696    Pipe[] tails = Pipe.pipes( inner, outer, left, right );
1697    Map<String, Tap> sinks = Cascades.tapsMap( tails, Tap.taps( innerSink, outerSink, leftSink, rightSink ) );
1698
1699    Flow flow = getPlatform().getFlowConnector().connect( "multi-joins", sources, sinks, tails );
1700
1701    flow.complete();
1702
1703    validateLength( flow.openTapForRead( innerSink ), 74 );
1704    validateLength( flow.openTapForRead( outerSink ), 84 );
1705    validateLength( flow.openTapForRead( leftSink ), 74 );
1706    validateLength( flow.openTapForRead( rightSink ), 84 );
1707    }
1708
1709  /**
1710   * This test checks that a valid node is created when adding back remainders during unique
1711   * path sub-graph partitioning.
1712   */
1713  @Test
1714  public void testMultiJoinWithSplits() throws Exception
1715    {
1716    getPlatform().copyFromLocal( inputFileCrossX2 );
1717
1718    Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCrossX2 );
1719    Tap innerSinkLhs = getPlatform().getTextFile( getOutputPath( "innerLhs" ), SinkMode.REPLACE );
1720    Tap uniqueSinkLhs = getPlatform().getTextFile( getOutputPath( "uniquesLhs" ), SinkMode.REPLACE );
1721    Tap innerSinkRhs = getPlatform().getTextFile( getOutputPath( "innerRhs" ), SinkMode.REPLACE );
1722    Tap uniqueSinkRhs = getPlatform().getTextFile( getOutputPath( "uniquesRhs" ), SinkMode.REPLACE );
1723
1724    Pipe incoming = new Pipe( "incoming" );
1725    Pipe uniquesLhs;
1726    Pipe innerLhs;
1727    Pipe uniquesRhs;
1728    Pipe innerRhs;
1729
1730    {
1731    Pipe generatorLhs = new Each( new Pipe( "genLhsLhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1732
1733    generatorLhs = new Each( generatorLhs, new Identity() );
1734
1735    Pipe generatorRhs = new Each( new Pipe( "genRhsLhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1736
1737    Pipe uniques = new Each( new Pipe( "uniquesLhs", generatorLhs ), new Identity() );
1738
1739    uniques = new GroupBy( uniques, new Fields( "word" ) );
1740
1741    uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE );
1742
1743    Pipe lhs = new Pipe( "lhs", generatorLhs );
1744
1745    lhs = new Rename( lhs, new Fields( "word" ), new Fields( "lhs" ) );
1746
1747    Pipe rhs = new Pipe( "rhs", generatorRhs );
1748
1749    rhs = new Rename( rhs, new Fields( "word" ), new Fields( "rhs" ) );
1750
1751    Pipe inner = new CoGroup( "innerLhs", lhs, new Fields( 0 ), rhs, new Fields( 0 ), new InnerJoin() );
1752
1753    uniquesLhs = uniques;
1754    innerLhs = inner;
1755    }
1756
1757    {
1758    Pipe generatorLhs = new Each( new Pipe( "genLhsRhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1759
1760    generatorLhs = new Each( generatorLhs, new Identity() );
1761
1762    Pipe generatorRhs = new Each( new Pipe( "genRhsRhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1763
1764    Pipe uniques = new Each( new Pipe( "uniquesRhs", generatorLhs ), new Identity() );
1765
1766    uniques = new GroupBy( uniques, new Fields( "word" ) );
1767
1768    uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE );
1769
1770    Pipe lhs = new Pipe( "lhs", generatorLhs );
1771
1772    lhs = new Rename( lhs, new Fields( "word" ), new Fields( "lhs" ) );
1773
1774    Pipe rhs = new Pipe( "rhs", generatorRhs );
1775
1776    rhs = new Rename( rhs, new Fields( "word" ), new Fields( "rhs" ) );
1777
1778    Pipe inner = new CoGroup( "innerRhs", lhs, new Fields( 0 ), rhs, new Fields( 0 ), new InnerJoin() );
1779
1780    uniquesRhs = uniques;
1781    innerRhs = inner;
1782    }
1783
1784    FlowDef flowDef = FlowDef.flowDef()
1785      .setName( "multi-joins" )
1786      .addSource( "incoming", source )
1787      .addTailSink( innerLhs, innerSinkLhs )
1788      .addTailSink( uniquesLhs, uniqueSinkLhs )
1789      .addTailSink( innerRhs, innerSinkRhs )
1790      .addTailSink( uniquesRhs, uniqueSinkRhs );
1791
1792    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
1793
1794    flow.complete();
1795
1796    validateLength( flow.openTapForRead( innerSinkLhs ), 3900 );
1797    validateLength( flow.openTapForRead( uniqueSinkLhs ), 15 );
1798    validateLength( flow.openTapForRead( innerSinkRhs ), 3900 );
1799    validateLength( flow.openTapForRead( uniqueSinkRhs ), 15 );
1800    }
1801
1802  @Test
1803  public void testSameSourceGroupSplitCoGroup() throws Exception
1804    {
1805    getPlatform().copyFromLocal( inputFileLower );
1806
1807    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ), inputFileLower );
1808
1809    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath(), SinkMode.REPLACE );
1810
1811    Function splitter = new RegexSplitter( new Fields( "num", "char" ).applyTypes( String.class, String.class ), " " );
1812
1813    Pipe sourcePipe = new Each( new Pipe( "source" ), new Fields( "line" ), splitter );
1814    sourcePipe = new GroupBy( sourcePipe, new Fields( "num" ) );
1815    sourcePipe = new Every( sourcePipe, new Fields( "char" ), new First( new Fields( "first", String.class ) ), Fields.ALL );
1816
1817    Pipe lhsPipe = new Retain( new Pipe( "lhs", sourcePipe ), Fields.ALL );
1818    Pipe rhsPipe = new Retain( new Pipe( "rhs", sourcePipe ), Fields.ALL );
1819
1820    Pipe splice = new CoGroup( lhsPipe, new Fields( "num" ), rhsPipe, new Fields( "num" ), Fields.size( 4 ) );
1821
1822    Map<Object, Object> properties = getPlatform().getProperties();
1823
1824    properties.put( "cascading.serialization.types.required", "true" );
1825
1826    Flow flow = getPlatform().getFlowConnector( properties ).connect( source, sink, splice );
1827
1828    flow.complete();
1829
1830    validateLength( flow, 5, null );
1831
1832    List<Tuple> values = getSinkAsList( flow );
1833
1834    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
1835    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
1836    }
1837  }