001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading;
022    
023    import java.io.IOException;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Set;
029    
030    import cascading.flow.Flow;
031    import cascading.flow.FlowConnectorProps;
032    import cascading.flow.FlowProps;
033    import cascading.operation.Function;
034    import cascading.operation.Identity;
035    import cascading.operation.Insert;
036    import cascading.operation.aggregator.Count;
037    import cascading.operation.aggregator.First;
038    import cascading.operation.regex.RegexFilter;
039    import cascading.operation.regex.RegexSplitter;
040    import cascading.pipe.CoGroup;
041    import cascading.pipe.Each;
042    import cascading.pipe.Every;
043    import cascading.pipe.GroupBy;
044    import cascading.pipe.Pipe;
045    import cascading.pipe.assembly.Discard;
046    import cascading.pipe.joiner.InnerJoin;
047    import cascading.pipe.joiner.Joiner;
048    import cascading.pipe.joiner.LeftJoin;
049    import cascading.pipe.joiner.MixedJoin;
050    import cascading.pipe.joiner.OuterJoin;
051    import cascading.pipe.joiner.RightJoin;
052    import cascading.tap.SinkMode;
053    import cascading.tap.Tap;
054    import cascading.tuple.Fields;
055    import cascading.tuple.Tuple;
056    import cascading.util.NullNotEquivalentComparator;
057    import org.junit.Test;
058    
059    import static data.InputData.*;
060    
061    
062    public class CoGroupFieldedPipesPlatformTest extends PlatformTestCase
063      {
064      public CoGroupFieldedPipesPlatformTest()
065        {
066        super( true, 4, 1 ); // leave cluster testing enabled
067        }
068    
069      @Test
070      public void testCross() throws Exception
071        {
072        getPlatform().copyFromLocal( inputFileLhs );
073        getPlatform().copyFromLocal( inputFileRhs );
074    
075        Map sources = new HashMap();
076    
077        sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) );
078        sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) );
079    
080        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE );
081    
082        Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
083        Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
084    
085        Pipe cross = new CoGroup( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
086    
087        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross );
088    
089        flow.complete();
090    
091        validateLength( flow, 37, null );
092    
093        List<Tuple> values = getSinkAsList( flow );
094    
095        assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
096        assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
097        }
098    
099      @Test
100      public void testCoGroup() throws Exception
101        {
102        getPlatform().copyFromLocal( inputFileLower );
103        getPlatform().copyFromLocal( inputFileUpper );
104    
105        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
106        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
107    
108        Map sources = new HashMap();
109    
110        sources.put( "lower", sourceLower );
111        sources.put( "upper", sourceUpper );
112    
113        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroup" ), SinkMode.REPLACE );
114    
115        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
116    
117        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
118        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
119    
120        Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new InnerJoin( Fields.size( 4 ) ) );
121    
122        Map<Object, Object> properties = getProperties();
123    
124        // make sure hasher is getting called, but does nothing special
125        FlowProps.setDefaultTupleElementComparator( properties, getPlatform().getStringComparator( false ).getClass().getCanonicalName() );
126    
127        Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
128    
129        flow.complete();
130    
131        validateLength( flow, 5 );
132    
133        List<Tuple> values = getSinkAsList( flow );
134    
135        assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
136        assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
137        }
138    
139      @Test
140      public void testCoGroupSamePipeName() throws Exception
141        {
142        getPlatform().copyFromLocal( inputFileLower );
143        getPlatform().copyFromLocal( inputFileUpper );
144    
145        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
146        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
147    
148        Map sources = new HashMap();
149    
150        sources.put( "lower", sourceLower );
151        sources.put( "upper", sourceUpper );
152    
153        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE );
154    
155        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
156    
157        Pipe pipeLower = new Pipe( "lower" );
158        Pipe pipeUpper = new Pipe( "upper" );
159    
160        // these pipes will hide the source name, and could cause one to be lost
161        pipeLower = new Pipe( "same", pipeLower );
162        pipeUpper = new Pipe( "same", pipeUpper );
163    
164        pipeLower = new Each( pipeLower, new Fields( "line" ), splitter );
165        pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter );
166    
167    //    pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
168    //    pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
169    
170        pipeLower = new Pipe( "left", pipeLower );
171        pipeUpper = new Pipe( "right", pipeUpper );
172    
173    //    pipeLower = new Each( pipeLower, new Debug( true ) );
174    //    pipeUpper = new Each( pipeUpper, new Debug( true ) );
175    
176        Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
177    
178    //    splice = new Each( splice, new Debug( true ) );
179        splice = new Pipe( "splice", splice );
180        splice = new Pipe( "tail", splice );
181    
182        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
183    
184        flow.complete();
185    
186        validateLength( flow, 5 );
187    
188        List<Tuple> values = getSinkAsList( flow );
189    
190        assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
191        assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
192        }
193    
194      @Test
195      public void testCoGroupWithUnknowns() throws Exception
196        {
197        getPlatform().copyFromLocal( inputFileLower );
198        getPlatform().copyFromLocal( inputFileUpper );
199    
200        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
201        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
202    
203        Map sources = new HashMap();
204    
205        sources.put( "lower", sourceLower );
206        sources.put( "upper", sourceUpper );
207    
208        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE );
209    
210        Function splitter = new RegexSplitter( Fields.UNKNOWN, " " );
211    
212        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
213        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
214    
215        Pipe splice = new CoGroup( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) );
216    
217        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
218    
219        flow.complete();
220    
221        validateLength( flow, 5 );
222    
223        List<Tuple> values = getSinkAsList( flow );
224    
225        assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
226        assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
227        }
228    
229      /**
230       * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with
231       * a new stream using an outerjoin.
232       *
233       * @throws Exception
234       */
235      @Test
236      public void testCoGroupFilteredBranch() throws Exception
237        {
238        getPlatform().copyFromLocal( inputFileLower );
239        getPlatform().copyFromLocal( inputFileUpper );
240    
241        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
242        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
243    
244        Map sources = new HashMap();
245    
246        sources.put( "lower", sourceLower );
247        sources.put( "upper", sourceUpper );
248    
249        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupfilteredbranch" ), SinkMode.REPLACE );
250    
251        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
252    
253        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
254        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
255        pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all
256        pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
257    
258        Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() );
259    
260        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
261    
262        flow.complete();
263    
264        validateLength( flow, 5 );
265    
266        List<Tuple> values = getSinkAsList( flow );
267    
268        assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) );
269        assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) );
270        }
271    
272      @Test
273      public void testCoGroupSelf() throws Exception
274        {
275        getPlatform().copyFromLocal( inputFileLower );
276    
277        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
278        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
279    
280        Map sources = new HashMap();
281    
282        sources.put( "lower", sourceLower );
283        sources.put( "upper", sourceUpper );
284    
285        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupself" ), SinkMode.REPLACE );
286    
287        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
288    
289        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
290        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
291    
292        Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
293    
294        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
295    
296        flow.complete();
297    
298        validateLength( flow, 5 );
299    
300        List<Tuple> values = getSinkAsList( flow );
301    
302        assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
303        assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
304        }
305    
306      /**
307       * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join
308       *
309       * @throws Exception when
310       */
311      @Test
312      public void testCoGroupAfterEvery() throws Exception
313        {
314        getPlatform().copyFromLocal( inputFileLower );
315        getPlatform().copyFromLocal( inputFileUpper );
316    
317        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
318        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
319    
320        Map sources = new HashMap();
321    
322        sources.put( "lower", sourceLower );
323        sources.put( "upper", sourceUpper );
324    
325        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE );
326    
327        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
328    
329        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
330        pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
331        pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL );
332    
333        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
334        pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
335        pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL );
336    
337        Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
338    
339        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
340    
341        flow.complete();
342    
343        validateLength( flow, 5, null );
344    
345        List<Tuple> values = getSinkAsList( flow );
346    
347        assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
348        assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
349        }
350    
351      /**
352       * Tests that CoGroup properly resolves fields when following an Every
353       *
354       * @throws Exception
355       */
356      @Test
357      public void testCoGroupAfterEveryNoDeclared() throws Exception
358        {
359        getPlatform().copyFromLocal( inputFileLower );
360        getPlatform().copyFromLocal( inputFileUpper );
361    
362        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
363        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
364    
365        Map sources = new HashMap();
366    
367        sources.put( "lower", sourceLower );
368        sources.put( "upper", sourceUpper );
369    
370        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "aftereverynodeclared" ), SinkMode.REPLACE );
371    
372        Function splitter1 = new RegexSplitter( new Fields( "num1", "char1" ), " " );
373    
374        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter1 );
375        pipeLower = new Each( pipeLower, new Insert( new Fields( "one", "two", "three", "four" ), "one", "two", "three", "four" ), Fields.ALL );
376        pipeLower = new GroupBy( pipeLower, new Fields( "num1" ) );
377        pipeLower = new Every( pipeLower, new Fields( "char1" ), new First(), Fields.ALL );
378    
379        Function splitter2 = new RegexSplitter( new Fields( "num2", "char2" ), " " );
380    
381        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter2 );
382        pipeUpper = new GroupBy( pipeUpper, new Fields( "num2" ) );
383        pipeUpper = new Every( pipeUpper, new Fields( "char2" ), new First(), Fields.ALL );
384    
385        Pipe splice = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) );
386    
387        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
388    
389        flow.complete();
390    
391        validateLength( flow, 5, null );
392    
393        List<Tuple> values = getSinkAsList( flow );
394    
395        assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
396        assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
397        }
398    
399      @Test
400      public void testCoGroupInnerSingleField() throws Exception
401        {
402        getPlatform().copyFromLocal( inputFileLowerOffset );
403        getPlatform().copyFromLocal( inputFileUpper );
404    
405        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
406        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
407    
408        Map sources = new HashMap();
409    
410        sources.put( "lower", sourceLower );
411        sources.put( "upper", sourceUpper );
412    
413        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupinnersingle" ), SinkMode.REPLACE );
414    
415        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) );
416        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) );
417    
418        Pipe join = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) );
419    
420        join = new Every( join, new Count() );
421    
422    //    join = new Each( join, new Debug( true ) );
423    
424        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
425    
426        flow.complete();
427    
428        validateLength( flow, 2, null );
429    
430        Set<Tuple> results = new HashSet<Tuple>();
431    
432        results.add( new Tuple( "1\t1\t1" ) );
433        results.add( new Tuple( "5\t5\t2" ) );
434    
435        List<Tuple> actual = getSinkAsList( flow );
436    
437        results.removeAll( actual );
438    
439        assertEquals( 0, results.size() );
440        }
441    
442      /**
443       * 1 a1
444       * 1 a2
445       * 1 a3
446       * 2 b1
447       * 3 c1
448       * 4 d1
449       * 4 d2
450       * 4 d3
451       * 5 e1
452       * 5 e2
453       * 5 e3
454       * 7 g1
455       * 7 g2
456       * 7 g3
457       * 7 g4
458       * 7 g5
459       * null h1
460       * <p/>
461       * 1 A1
462       * 1 A2
463       * 1 A3
464       * 2 B1
465       * 2 B2
466       * 2 B3
467       * 4 D1
468       * 6 F1
469       * 6 F2
470       * null H1
471       * <p/>
472       * 1  a1      1       A1
473       * 1  a1      1       A2
474       * 1  a1      1       A3
475       * 1  a2      1       A1
476       * 1  a2      1       A2
477       * 1  a2      1       A3
478       * 1  a3      1       A1
479       * 1  a3      1       A2
480       * 1  a3      1       A3
481       * 2  b1      2       B1
482       * 2  b1      2       B2
483       * 2  b1      2       B3
484       * 4  d1      4       D1
485       * 4  d2      4       D1
486       * 4  d3      4       D1
487       * null h1  null  H1
488       *
489       * @throws Exception
490       */
491      @Test
492      public void testCoGroupInner() throws Exception
493        {
494        HashSet<Tuple> results = new HashSet<Tuple>();
495    
496        results.add( new Tuple( "1", "a1", "1", "A1" ) );
497        results.add( new Tuple( "1", "a1", "1", "A2" ) );
498        results.add( new Tuple( "1", "a1", "1", "A3" ) );
499        results.add( new Tuple( "1", "a2", "1", "A1" ) );
500        results.add( new Tuple( "1", "a2", "1", "A2" ) );
501        results.add( new Tuple( "1", "a2", "1", "A3" ) );
502        results.add( new Tuple( "1", "a3", "1", "A1" ) );
503        results.add( new Tuple( "1", "a3", "1", "A2" ) );
504        results.add( new Tuple( "1", "a3", "1", "A3" ) );
505        results.add( new Tuple( "2", "b1", "2", "B1" ) );
506        results.add( new Tuple( "2", "b1", "2", "B2" ) );
507        results.add( new Tuple( "2", "b1", "2", "B3" ) );
508        results.add( new Tuple( "4", "d1", "4", "D1" ) );
509        results.add( new Tuple( "4", "d2", "4", "D1" ) );
510        results.add( new Tuple( "4", "d3", "4", "D1" ) );
511        results.add( new Tuple( null, "h1", null, "H1" ) );
512    
513        handleJoins( "cogroupinner", new InnerJoin(), results, 8, false, null );
514        handleJoins( "cogroupinner-resultgroup", new InnerJoin(), results, 8, true, null );
515        }
516    
517      /**
518       * 1 a1
519       * 1 a2
520       * 1 a3
521       * 2 b1
522       * 3 c1
523       * 4 d1
524       * 4 d2
525       * 4 d3
526       * 5 e1
527       * 5 e2
528       * 5 e3
529       * 7 g1
530       * 7 g2
531       * 7 g3
532       * 7 g4
533       * 7 g5
534       * null h1
535       * <p/>
536       * 1 A1
537       * 1 A2
538       * 1 A3
539       * 2 B1
540       * 2 B2
541       * 2 B3
542       * 4 D1
543       * 6 F1
544       * 6 F2
545       * null H1
546       * <p/>
547       * 1  a1      1       A1
548       * 1  a1      1       A2
549       * 1  a1      1       A3
550       * 1  a2      1       A1
551       * 1  a2      1       A2
552       * 1  a2      1       A3
553       * 1  a3      1       A1
554       * 1  a3      1       A2
555       * 1  a3      1       A3
556       * 2  b1      2       B1
557       * 2  b1      2       B2
558       * 2  b1      2       B3
559       * 4  d1      4       D1
560       * 4  d2      4       D1
561       * 4  d3      4       D1
562       *
563       * @throws Exception
564       */
565      @Test
566      public void testCoGroupInnerNull() throws Exception
567        {
568        HashSet<Tuple> results = new HashSet<Tuple>();
569    
570        results.add( new Tuple( "1", "a1", "1", "A1" ) );
571        results.add( new Tuple( "1", "a1", "1", "A2" ) );
572        results.add( new Tuple( "1", "a1", "1", "A3" ) );
573        results.add( new Tuple( "1", "a2", "1", "A1" ) );
574        results.add( new Tuple( "1", "a2", "1", "A2" ) );
575        results.add( new Tuple( "1", "a2", "1", "A3" ) );
576        results.add( new Tuple( "1", "a3", "1", "A1" ) );
577        results.add( new Tuple( "1", "a3", "1", "A2" ) );
578        results.add( new Tuple( "1", "a3", "1", "A3" ) );
579        results.add( new Tuple( "2", "b1", "2", "B1" ) );
580        results.add( new Tuple( "2", "b1", "2", "B2" ) );
581        results.add( new Tuple( "2", "b1", "2", "B3" ) );
582        results.add( new Tuple( "4", "d1", "4", "D1" ) );
583        results.add( new Tuple( "4", "d2", "4", "D1" ) );
584        results.add( new Tuple( "4", "d3", "4", "D1" ) );
585    
586        handleJoins( "cogroupinnernull", new InnerJoin(), results, 9, false, new NullNotEquivalentComparator() );
587        handleJoins( "cogroupinnernull-resultgroup", new InnerJoin(), results, 9, true, new NullNotEquivalentComparator() );
588        }
589    
590      /**
591       * 1 a1
592       * 1 a2
593       * 1 a3
594       * 2 b1
595       * 3 c1
596       * 4 d1
597       * 4 d2
598       * 4 d3
599       * 5 e1
600       * 5 e2
601       * 5 e3
602       * 7 g1
603       * 7 g2
604       * 7 g3
605       * 7 g4
606       * 7 g5
607       * null h1
608       * <p/>
609       * 1 A1
610       * 1 A2
611       * 1 A3
612       * 2 B1
613       * 2 B2
614       * 2 B3
615       * 4 D1
616       * 6 F1
617       * 6 F2
618       * null H1
619       * <p/>
620       * 1  a1      1       A1
621       * 1  a1      1       A2
622       * 1  a1      1       A3
623       * 1  a2      1       A1
624       * 1  a2      1       A2
625       * 1  a2      1       A3
626       * 1  a3      1       A1
627       * 1  a3      1       A2
628       * 1  a3      1       A3
629       * 2  b1      2       B1
630       * 2  b1      2       B2
631       * 2  b1      2       B3
632       * 3  c1      null    null
633       * 4  d1      4       D1
634       * 4  d2      4       D1
635       * 4  d3      4       D1
636       * 5  e1      null    null
637       * 5  e2      null    null
638       * 5  e3      null    null
639       * null       null    6       F1
640       * null       null    6       F2
641       * 7  g1      null    null
642       * 7  g2      null    null
643       * 7  g3      null    null
644       * 7  g4      null    null
645       * 7  g5      null    null
646       * null h1  null  H1
647       *
648       * @throws Exception
649       */
650      @Test
651      public void testCoGroupOuter() throws Exception
652        {
653        Set<Tuple> results = new HashSet<Tuple>();
654    
655        results.add( new Tuple( "1", "a1", "1", "A1" ) );
656        results.add( new Tuple( "1", "a1", "1", "A2" ) );
657        results.add( new Tuple( "1", "a1", "1", "A3" ) );
658        results.add( new Tuple( "1", "a2", "1", "A1" ) );
659        results.add( new Tuple( "1", "a2", "1", "A2" ) );
660        results.add( new Tuple( "1", "a2", "1", "A3" ) );
661        results.add( new Tuple( "1", "a3", "1", "A1" ) );
662        results.add( new Tuple( "1", "a3", "1", "A2" ) );
663        results.add( new Tuple( "1", "a3", "1", "A3" ) );
664        results.add( new Tuple( "2", "b1", "2", "B1" ) );
665        results.add( new Tuple( "2", "b1", "2", "B2" ) );
666        results.add( new Tuple( "2", "b1", "2", "B3" ) );
667        results.add( new Tuple( "3", "c1", null, null ) );
668        results.add( new Tuple( "4", "d1", "4", "D1" ) );
669        results.add( new Tuple( "4", "d2", "4", "D1" ) );
670        results.add( new Tuple( "4", "d3", "4", "D1" ) );
671        results.add( new Tuple( "5", "e1", null, null ) );
672        results.add( new Tuple( "5", "e2", null, null ) );
673        results.add( new Tuple( "5", "e3", null, null ) );
674        results.add( new Tuple( null, null, "6", "F1" ) );
675        results.add( new Tuple( null, null, "6", "F2" ) );
676        results.add( new Tuple( "7", "g1", null, null ) );
677        results.add( new Tuple( "7", "g2", null, null ) );
678        results.add( new Tuple( "7", "g3", null, null ) );
679        results.add( new Tuple( "7", "g4", null, null ) );
680        results.add( new Tuple( "7", "g5", null, null ) );
681        results.add( new Tuple( null, "h1", null, "H1" ) );
682    
683        handleJoins( "cogroupouter", new OuterJoin(), results, 8, false, null );
684        handleJoins( "cogroupouter-resultgroup", new OuterJoin(), results, 8, true, null );
685        }
686    
687      /**
688       * 1 a1
689       * 1 a2
690       * 1 a3
691       * 2 b1
692       * 3 c1
693       * 4 d1
694       * 4 d2
695       * 4 d3
696       * 5 e1
697       * 5 e2
698       * 5 e3
699       * 7 g1
700       * 7 g2
701       * 7 g3
702       * 7 g4
703       * 7 g5
704       * null h1
705       * <p/>
706       * 1 A1
707       * 1 A2
708       * 1 A3
709       * 2 B1
710       * 2 B2
711       * 2 B3
712       * 4 D1
713       * 6 F1
714       * 6 F2
715       * null H1
716       * <p/>
717       * 1  a1      1       A1
718       * 1  a1      1       A2
719       * 1  a1      1       A3
720       * 1  a2      1       A1
721       * 1  a2      1       A2
722       * 1  a2      1       A3
723       * 1  a3      1       A1
724       * 1  a3      1       A2
725       * 1  a3      1       A3
726       * 2  b1      2       B1
727       * 2  b1      2       B2
728       * 2  b1      2       B3
729       * 3  c1      null    null
730       * 4  d1      4       D1
731       * 4  d2      4       D1
732       * 4  d3      4       D1
733       * 5  e1      null    null
734       * 5  e2      null    null
735       * 5  e3      null    null
736       * null       null    6       F1
737       * null       null    6       F2
738       * 7  g1      null    null
739       * 7  g2      null    null
740       * 7  g3      null    null
741       * 7  g4      null    null
742       * 7  g5      null    null
743       * null h1  null  null
744       * null null  null  H1
745       *
746       * @throws Exception
747       */
748      @Test
749      public void testCoGroupOuterNull() throws Exception
750        {
751        Set<Tuple> results = new HashSet<Tuple>();
752    
753        results.add( new Tuple( "1", "a1", "1", "A1" ) );
754        results.add( new Tuple( "1", "a1", "1", "A2" ) );
755        results.add( new Tuple( "1", "a1", "1", "A3" ) );
756        results.add( new Tuple( "1", "a2", "1", "A1" ) );
757        results.add( new Tuple( "1", "a2", "1", "A2" ) );
758        results.add( new Tuple( "1", "a2", "1", "A3" ) );
759        results.add( new Tuple( "1", "a3", "1", "A1" ) );
760        results.add( new Tuple( "1", "a3", "1", "A2" ) );
761        results.add( new Tuple( "1", "a3", "1", "A3" ) );
762        results.add( new Tuple( "2", "b1", "2", "B1" ) );
763        results.add( new Tuple( "2", "b1", "2", "B2" ) );
764        results.add( new Tuple( "2", "b1", "2", "B3" ) );
765        results.add( new Tuple( "3", "c1", null, null ) );
766        results.add( new Tuple( "4", "d1", "4", "D1" ) );
767        results.add( new Tuple( "4", "d2", "4", "D1" ) );
768        results.add( new Tuple( "4", "d3", "4", "D1" ) );
769        results.add( new Tuple( "5", "e1", null, null ) );
770        results.add( new Tuple( "5", "e2", null, null ) );
771        results.add( new Tuple( "5", "e3", null, null ) );
772        results.add( new Tuple( null, null, "6", "F1" ) );
773        results.add( new Tuple( null, null, "6", "F2" ) );
774        results.add( new Tuple( "7", "g1", null, null ) );
775        results.add( new Tuple( "7", "g2", null, null ) );
776        results.add( new Tuple( "7", "g3", null, null ) );
777        results.add( new Tuple( "7", "g4", null, null ) );
778        results.add( new Tuple( "7", "g5", null, null ) );
779        results.add( new Tuple( null, "h1", null, null ) );
780        results.add( new Tuple( null, null, null, "H1" ) );
781    
782        handleJoins( "cogroupouternull", new OuterJoin(), results, 9, false, new NullNotEquivalentComparator() );
783        handleJoins( "cogroupouternull-resultgroup", new OuterJoin(), results, 9, true, new NullNotEquivalentComparator() );
784        }
785    
786      /**
787       * 1 a1
788       * 1 a2
789       * 1 a3
790       * 2 b1
791       * 3 c1
792       * 4 d1
793       * 4 d2
794       * 4 d3
795       * 5 e1
796       * 5 e2
797       * 5 e3
798       * 7 g1
799       * 7 g2
800       * 7 g3
801       * 7 g4
802       * 7 g5
803       * null h1
804       * <p/>
805       * 1 A1
806       * 1 A2
807       * 1 A3
808       * 2 B1
809       * 2 B2
810       * 2 B3
811       * 4 D1
812       * 6 F1
813       * 6 F2
814       * null H1
815       * <p/>
816       * 1  a1      1       A1
817       * 1  a1      1       A2
818       * 1  a1      1       A3
819       * 1  a2      1       A1
820       * 1  a2      1       A2
821       * 1  a2      1       A3
822       * 1  a3      1       A1
823       * 1  a3      1       A2
824       * 1  a3      1       A3
825       * 2  b1      2       B1
826       * 2  b1      2       B2
827       * 2  b1      2       B3
828       * 3  c1      null    null
829       * 4  d1      4       D1
830       * 4  d2      4       D1
831       * 4  d3      4       D1
832       * 5  e1      null    null
833       * 5  e2      null    null
834       * 5  e3      null    null
835       * 7  g1      null    null
836       * 7  g2      null    null
837       * 7  g3      null    null
838       * 7  g4      null    null
839       * 7  g5      null    null
840       * null h1    null    H1
841       *
842       * @throws Exception
843       */
844      @Test
845      public void testCoGroupInnerOuter() throws Exception
846        {
847        Set<Tuple> results = new HashSet<Tuple>();
848    
849        results.add( new Tuple( "1", "a1", "1", "A1" ) );
850        results.add( new Tuple( "1", "a1", "1", "A2" ) );
851        results.add( new Tuple( "1", "a1", "1", "A3" ) );
852        results.add( new Tuple( "1", "a2", "1", "A1" ) );
853        results.add( new Tuple( "1", "a2", "1", "A2" ) );
854        results.add( new Tuple( "1", "a2", "1", "A3" ) );
855        results.add( new Tuple( "1", "a3", "1", "A1" ) );
856        results.add( new Tuple( "1", "a3", "1", "A2" ) );
857        results.add( new Tuple( "1", "a3", "1", "A3" ) );
858        results.add( new Tuple( "2", "b1", "2", "B1" ) );
859        results.add( new Tuple( "2", "b1", "2", "B2" ) );
860        results.add( new Tuple( "2", "b1", "2", "B3" ) );
861        results.add( new Tuple( "3", "c1", null, null ) );
862        results.add( new Tuple( "4", "d1", "4", "D1" ) );
863        results.add( new Tuple( "4", "d2", "4", "D1" ) );
864        results.add( new Tuple( "4", "d3", "4", "D1" ) );
865        results.add( new Tuple( "5", "e1", null, null ) );
866        results.add( new Tuple( "5", "e2", null, null ) );
867        results.add( new Tuple( "5", "e3", null, null ) );
868        results.add( new Tuple( "7", "g1", null, null ) );
869        results.add( new Tuple( "7", "g2", null, null ) );
870        results.add( new Tuple( "7", "g3", null, null ) );
871        results.add( new Tuple( "7", "g4", null, null ) );
872        results.add( new Tuple( "7", "g5", null, null ) );
873        results.add( new Tuple( null, "h1", null, "H1" ) );
874    
875        handleJoins( "cogroupinnerouter", new LeftJoin(), results, 8, false, null );
876        handleJoins( "cogroupinnerouter-resultgroup", new LeftJoin(), results, 8, true, null );
877        }
878    
879      /**
880       * 1 a1
881       * 1 a2
882       * 1 a3
883       * 2 b1
884       * 3 c1
885       * 4 d1
886       * 4 d2
887       * 4 d3
888       * 5 e1
889       * 5 e2
890       * 5 e3
891       * 7 g1
892       * 7 g2
893       * 7 g3
894       * 7 g4
895       * 7 g5
896       * null h1
897       * <p/>
898       * 1 A1
899       * 1 A2
900       * 1 A3
901       * 2 B1
902       * 2 B2
903       * 2 B3
904       * 4 D1
905       * 6 F1
906       * 6 F2
907       * null H1
908       * <p/>
909       * 1  a1      1       A1
910       * 1  a1      1       A2
911       * 1  a1      1       A3
912       * 1  a2      1       A1
913       * 1  a2      1       A2
914       * 1  a2      1       A3
915       * 1  a3      1       A1
916       * 1  a3      1       A2
917       * 1  a3      1       A3
918       * 2  b1      2       B1
919       * 2  b1      2       B2
920       * 2  b1      2       B3
921       * 3  c1      null    null
922       * 4  d1      4       D1
923       * 4  d2      4       D1
924       * 4  d3      4       D1
925       * 5  e1      null    null
926       * 5  e2      null    null
927       * 5  e3      null    null
928       * 7  g1      null    null
929       * 7  g2      null    null
930       * 7  g3      null    null
931       * 7  g4      null    null
932       * 7  g5      null    null
933       * null h1    null    null
934       *
935       * @throws Exception
936       */
937      @Test
938      public void testCoGroupInnerOuterNull() throws Exception
939        {
940        Set<Tuple> results = new HashSet<Tuple>();
941    
942        results.add( new Tuple( "1", "a1", "1", "A1" ) );
943        results.add( new Tuple( "1", "a1", "1", "A2" ) );
944        results.add( new Tuple( "1", "a1", "1", "A3" ) );
945        results.add( new Tuple( "1", "a2", "1", "A1" ) );
946        results.add( new Tuple( "1", "a2", "1", "A2" ) );
947        results.add( new Tuple( "1", "a2", "1", "A3" ) );
948        results.add( new Tuple( "1", "a3", "1", "A1" ) );
949        results.add( new Tuple( "1", "a3", "1", "A2" ) );
950        results.add( new Tuple( "1", "a3", "1", "A3" ) );
951        results.add( new Tuple( "2", "b1", "2", "B1" ) );
952        results.add( new Tuple( "2", "b1", "2", "B2" ) );
953        results.add( new Tuple( "2", "b1", "2", "B3" ) );
954        results.add( new Tuple( "3", "c1", null, null ) );
955        results.add( new Tuple( "4", "d1", "4", "D1" ) );
956        results.add( new Tuple( "4", "d2", "4", "D1" ) );
957        results.add( new Tuple( "4", "d3", "4", "D1" ) );
958        results.add( new Tuple( "5", "e1", null, null ) );
959        results.add( new Tuple( "5", "e2", null, null ) );
960        results.add( new Tuple( "5", "e3", null, null ) );
961        results.add( new Tuple( "7", "g1", null, null ) );
962        results.add( new Tuple( "7", "g2", null, null ) );
963        results.add( new Tuple( "7", "g3", null, null ) );
964        results.add( new Tuple( "7", "g4", null, null ) );
965        results.add( new Tuple( "7", "g5", null, null ) );
966        results.add( new Tuple( null, "h1", null, null ) );
967    
968        handleJoins( "cogroupinnerouternull", new LeftJoin(), results, 9, false, new NullNotEquivalentComparator() );
969        handleJoins( "cogroupinnerouternull-resultgroup", new LeftJoin(), results, 9, true, new NullNotEquivalentComparator() );
970        }
971    
972      /**
973       * 1 a1
974       * 1 a2
975       * 1 a3
976       * 2 b1
977       * 3 c1
978       * 4 d1
979       * 4 d2
980       * 4 d3
981       * 5 e1
982       * 5 e2
983       * 5 e3
984       * 7 g1
985       * 7 g2
986       * 7 g3
987       * 7 g4
988       * 7 g5
989       * null h1
990       * <p/>
991       * 1 A1
992       * 1 A2
993       * 1 A3
994       * 2 B1
995       * 2 B2
996       * 2 B3
997       * 4 D1
998       * 6 F1
999       * 6 F2
1000       * null H1
1001       * <p/>
1002       * 1  a1      1       A1
1003       * 1  a1      1       A2
1004       * 1  a1      1       A3
1005       * 1  a2      1       A1
1006       * 1  a2      1       A2
1007       * 1  a2      1       A3
1008       * 1  a3      1       A1
1009       * 1  a3      1       A2
1010       * 1  a3      1       A3
1011       * 2  b1      2       B1
1012       * 2  b1      2       B2
1013       * 2  b1      2       B3
1014       * 4  d1      4       D1
1015       * 4  d2      4       D1
1016       * 4  d3      4       D1
1017       * null       null    6       F1
1018       * null       null    6       F2
1019       * null h1    null    H1
1020       *
1021       * @throws Exception
1022       */
1023      @Test
1024      public void testCoGroupOuterInner() throws Exception
1025        {
1026        Set<Tuple> results = new HashSet<Tuple>();
1027    
1028        results.add( new Tuple( "1", "a1", "1", "A1" ) );
1029        results.add( new Tuple( "1", "a1", "1", "A2" ) );
1030        results.add( new Tuple( "1", "a1", "1", "A3" ) );
1031        results.add( new Tuple( "1", "a2", "1", "A1" ) );
1032        results.add( new Tuple( "1", "a2", "1", "A2" ) );
1033        results.add( new Tuple( "1", "a2", "1", "A3" ) );
1034        results.add( new Tuple( "1", "a3", "1", "A1" ) );
1035        results.add( new Tuple( "1", "a3", "1", "A2" ) );
1036        results.add( new Tuple( "1", "a3", "1", "A3" ) );
1037        results.add( new Tuple( "2", "b1", "2", "B1" ) );
1038        results.add( new Tuple( "2", "b1", "2", "B2" ) );
1039        results.add( new Tuple( "2", "b1", "2", "B3" ) );
1040        results.add( new Tuple( "4", "d1", "4", "D1" ) );
1041        results.add( new Tuple( "4", "d2", "4", "D1" ) );
1042        results.add( new Tuple( "4", "d3", "4", "D1" ) );
1043        results.add( new Tuple( null, null, "6", "F1" ) );
1044        results.add( new Tuple( null, null, "6", "F2" ) );
1045        results.add( new Tuple( null, "h1", null, "H1" ) );
1046    
1047        handleJoins( "cogroupouterinner", new RightJoin(), results, 8, false, null );
1048        handleJoins( "cogroupouterinner-resultgroup", new RightJoin(), results, 8, true, null );
1049        }
1050    
1051      /**
1052       * 1 a1
1053       * 1 a2
1054       * 1 a3
1055       * 2 b1
1056       * 3 c1
1057       * 4 d1
1058       * 4 d2
1059       * 4 d3
1060       * 5 e1
1061       * 5 e2
1062       * 5 e3
1063       * 7 g1
1064       * 7 g2
1065       * 7 g3
1066       * 7 g4
1067       * 7 g5
1068       * null h1
1069       * <p/>
1070       * 1 A1
1071       * 1 A2
1072       * 1 A3
1073       * 2 B1
1074       * 2 B2
1075       * 2 B3
1076       * 4 D1
1077       * 6 F1
1078       * 6 F2
1079       * null H1
1080       * <p/>
1081       * 1  a1      1       A1
1082       * 1  a1      1       A2
1083       * 1  a1      1       A3
1084       * 1  a2      1       A1
1085       * 1  a2      1       A2
1086       * 1  a2      1       A3
1087       * 1  a3      1       A1
1088       * 1  a3      1       A2
1089       * 1  a3      1       A3
1090       * 2  b1      2       B1
1091       * 2  b1      2       B2
1092       * 2  b1      2       B3
1093       * 4  d1      4       D1
1094       * 4  d2      4       D1
1095       * 4  d3      4       D1
1096       * null       null    6       F1
1097       * null       null    6       F2
1098       * null null  null    H1
1099       *
1100       * @throws Exception
1101       */
1102      @Test
1103      public void testCoGroupOuterInnerNull() throws Exception
1104        {
1105        Set<Tuple> results = new HashSet<Tuple>();
1106    
1107        results.add( new Tuple( "1", "a1", "1", "A1" ) );
1108        results.add( new Tuple( "1", "a1", "1", "A2" ) );
1109        results.add( new Tuple( "1", "a1", "1", "A3" ) );
1110        results.add( new Tuple( "1", "a2", "1", "A1" ) );
1111        results.add( new Tuple( "1", "a2", "1", "A2" ) );
1112        results.add( new Tuple( "1", "a2", "1", "A3" ) );
1113        results.add( new Tuple( "1", "a3", "1", "A1" ) );
1114        results.add( new Tuple( "1", "a3", "1", "A2" ) );
1115        results.add( new Tuple( "1", "a3", "1", "A3" ) );
1116        results.add( new Tuple( "2", "b1", "2", "B1" ) );
1117        results.add( new Tuple( "2", "b1", "2", "B2" ) );
1118        results.add( new Tuple( "2", "b1", "2", "B3" ) );
1119        results.add( new Tuple( "4", "d1", "4", "D1" ) );
1120        results.add( new Tuple( "4", "d2", "4", "D1" ) );
1121        results.add( new Tuple( "4", "d3", "4", "D1" ) );
1122        results.add( new Tuple( null, null, "6", "F1" ) );
1123        results.add( new Tuple( null, null, "6", "F2" ) );
1124        results.add( new Tuple( null, null, null, "H1" ) );
1125    
1126        handleJoins( "cogroupouterinnernull", new RightJoin(), results, 9, false, new NullNotEquivalentComparator() );
1127        handleJoins( "cogroupouterinnernull-resultgroup", new RightJoin(), results, 9, true, new NullNotEquivalentComparator() );
1128        }
1129    
1130      private void handleJoins( String path, Joiner joiner, Set<Tuple> results, int numGroups, boolean useResultGroupFields, NullNotEquivalentComparator comparator ) throws Exception
1131        {
1132        results = new HashSet<Tuple>( results );
1133    
1134        getPlatform().copyFromLocal( inputFileLhsSparse );
1135        getPlatform().copyFromLocal( inputFileRhsSparse );
1136    
1137        Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class );
1138        Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse );
1139        Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse );
1140    
1141        Map sources = new HashMap();
1142    
1143        sources.put( "lower", sourceLower );
1144        sources.put( "upper", sourceUpper );
1145    
1146        Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE );
1147    
1148        Pipe pipeLower = new Pipe( "lower" );
1149        Pipe pipeUpper = new Pipe( "upper" );
1150    
1151        Fields declaredFields = new Fields( "num", "char", "num2", "char2" );
1152    
1153        Fields groupFields = new Fields( "num" );
1154    
1155        if( comparator != null )
1156          groupFields.setComparator( 0, comparator );
1157    
1158        Pipe splice;
1159        if( useResultGroupFields )
1160          splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, new Fields( "num", "num2" ), joiner );
1161        else
1162          splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, joiner );
1163    
1164        splice = new Every( splice, Fields.ALL, new TestIdentityBuffer( new Fields( "num", "num2" ), numGroups, true ), Fields.RESULTS );
1165    
1166        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
1167    
1168        flow.complete();
1169    
1170        validateLength( flow, results.size() );
1171    
1172        List<Tuple> actual = getSinkAsList( flow );
1173    
1174        results.removeAll( actual );
1175    
1176        assertEquals( 0, results.size() );
1177        }
1178    
1179      /**
1180       * 1 a
1181       * 5 b
1182       * 6 c
1183       * 5 b
1184       * 5 e
1185       * <p/>
1186       * 1 A
1187       * 2 B
1188       * 3 C
1189       * 4 D
1190       * 5 E
1191       * <p/>
1192       * 1 a
1193       * 2 b
1194       * 3 c
1195       * 4 d
1196       * 5 e
1197       * <p/>
1198       * 1  a       1       A  1  a
1199       * -  -   2   B  2  b
1200       * -  -   3   C  3  c
1201       * -  -   4   D  4  d
1202       * 5  b       5   E  5  e
1203       * 5  e       5   E  5  e
1204       *
1205       * @throws Exception
1206       */
1207      @Test
1208      public void testCoGroupMixed() throws Exception
1209        {
1210        getPlatform().copyFromLocal( inputFileLowerOffset );
1211        getPlatform().copyFromLocal( inputFileLower );
1212        getPlatform().copyFromLocal( inputFileUpper );
1213    
1214        Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
1215        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1216        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1217    
1218        Map sources = new HashMap();
1219    
1220        sources.put( "loweroffset", sourceLowerOffset );
1221        sources.put( "lower", sourceLower );
1222        sources.put( "upper", sourceUpper );
1223    
1224        Tap sink = getPlatform().getDelimitedFile( Fields.size( 6, String.class ), "\t", getOutputPath( "cogroupmixed" ), SinkMode.REPLACE );
1225    
1226        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1227    
1228        Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter );
1229        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1230        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1231    
1232        Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower );
1233        Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) );
1234    
1235        MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} );
1236        Pipe splice = new CoGroup( pipes, fields, Fields.size( 6 ), join );
1237    
1238        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
1239    
1240        flow.complete();
1241    
1242        validateLength( flow, 6 );
1243    
1244        Set<Tuple> results = new HashSet<Tuple>();
1245    
1246        results.add( new Tuple( "1", "a", "1", "A", "1", "a" ) );
1247        results.add( new Tuple( null, null, "2", "B", "2", "b" ) );
1248        results.add( new Tuple( null, null, "3", "C", "3", "c" ) );
1249        results.add( new Tuple( null, null, "4", "D", "4", "d" ) );
1250        results.add( new Tuple( "5", "b", "5", "E", "5", "e" ) );
1251        results.add( new Tuple( "5", "e", "5", "E", "5", "e" ) );
1252    
1253        List<Tuple> actual = getSinkAsList( flow );
1254    
1255        results.removeAll( actual );
1256    
1257        assertEquals( 0, results.size() );
1258        }
1259    
1260      @Test
1261      public void testCoGroupDiffFields() throws Exception
1262        {
1263        getPlatform().copyFromLocal( inputFileLower );
1264        getPlatform().copyFromLocal( inputFileUpper );
1265    
1266        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1267        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1268    
1269        Map sources = new HashMap();
1270    
1271        sources.put( "lower", sourceLower );
1272        sources.put( "upper", sourceUpper );
1273    
1274        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE );
1275    
1276        Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
1277        Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
1278    
1279        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1280        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1281    
1282        Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1283    
1284        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1285    
1286        flow.complete();
1287    
1288        validateLength( flow, 5 );
1289    
1290        List<Tuple> actual = getSinkAsList( flow );
1291    
1292        assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1293        assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
1294        }
1295    
1296      @Test
1297      public void testCoGroupGroupBy() throws Exception
1298        {
1299        getPlatform().copyFromLocal( inputFileLower );
1300        getPlatform().copyFromLocal( inputFileUpper );
1301    
1302        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1303        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1304    
1305        Map sources = new HashMap();
1306    
1307        sources.put( "lower", sourceLower );
1308        sources.put( "upper", sourceUpper );
1309    
1310        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupgroupby" ), SinkMode.REPLACE );
1311    
1312        Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
1313        Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
1314    
1315        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1316        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1317    
1318        Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1319    
1320        Pipe groupby = new GroupBy( cogroup, new Fields( "numA" ) );
1321    
1322        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, groupby );
1323    
1324        flow.complete();
1325    
1326        validateLength( flow, 5, null );
1327    
1328        List<Tuple> actual = getSinkAsList( flow );
1329    
1330        assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1331        assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
1332        }
1333    
1334      @Test
1335      public void testCoGroupSamePipe() throws Exception
1336        {
1337        getPlatform().copyFromLocal( inputFileLower );
1338    
1339        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1340    
1341        Map sources = new HashMap();
1342    
1343        sources.put( "lower", source );
1344    
1345        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE );
1346    
1347        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1348    
1349        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1350    
1351        Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) );
1352    
1353        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1354    
1355        flow.complete();
1356    
1357        validateLength( flow, 5, null );
1358    
1359        List<Tuple> actual = getSinkAsList( flow );
1360    
1361        assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1362        assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1363        }
1364    
1365      @Test
1366      public void testCoGroupSamePipe2() throws Exception
1367        {
1368        getPlatform().copyFromLocal( inputFileLower );
1369    
1370        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1371    
1372        Map sources = new HashMap();
1373    
1374        sources.put( "lower", source );
1375    
1376        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE );
1377    
1378        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1379    
1380        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1381    
1382        Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1383    
1384        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1385    
1386        flow.complete();
1387    
1388        validateLength( flow, 5, null );
1389    
1390        List<Tuple> actual = getSinkAsList( flow );
1391    
1392        assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1393        assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1394        }
1395    
1396      @Test
1397      public void testCoGroupSamePipe3() throws Exception
1398        {
1399        getPlatform().copyFromLocal( inputFileLower );
1400    
1401        Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
1402    
1403        Map sources = new HashMap();
1404    
1405        sources.put( "lower", source );
1406    
1407        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE );
1408    
1409        Pipe pipe = new Pipe( "lower" );
1410    
1411        Pipe lhs = new Pipe( "lhs", pipe );
1412        Pipe rhs = new Pipe( "rhs", pipe );
1413    
1414        Pipe cogroup = new CoGroup( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1415    
1416        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1417    
1418        flow.complete();
1419    
1420        validateLength( flow, 5, null );
1421    
1422        List<Tuple> actual = getSinkAsList( flow );
1423    
1424        assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1425        assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1426        }
1427    
1428      @Test
1429      public void testCoGroupAroundCoGroup() throws Exception
1430        {
1431        getPlatform().copyFromLocal( inputFileLower );
1432        getPlatform().copyFromLocal( inputFileUpper );
1433    
1434        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1435        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1436    
1437        Map sources = new HashMap();
1438    
1439        sources.put( "lower", sourceLower );
1440        sources.put( "upper1", sourceUpper );
1441        sources.put( "upper2", sourceUpper );
1442    
1443        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupacogroup" ), SinkMode.REPLACE );
1444    
1445        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1446    
1447        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1448        Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1449        Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1450    
1451        Pipe splice1 = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1452    
1453        splice1 = new Each( splice1, new Identity() );
1454    
1455        Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1456    
1457        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1458    
1459        flow.complete();
1460    
1461        validateLength( flow, 5, null );
1462    
1463        List<Tuple> actual = getSinkAsList( flow );
1464    
1465        assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
1466        assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) );
1467        }
1468    
1469      @Test
1470      public void testCoGroupAroundCoGroupWithout() throws Exception
1471        {
1472        runCoGroupAroundCoGroup( null, "cogroupacogroupopt1" );
1473        }
1474    
1475      @Test
1476      public void testCoGroupAroundCoGroupWith() throws Exception
1477        {
1478        // hack to get classname
1479        runCoGroupAroundCoGroup( getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 ).getScheme().getClass(), "cogroupacogroupopt2" );
1480        }
1481    
1482      private void runCoGroupAroundCoGroup( Class schemeClass, String stringPath ) throws IOException
1483        {
1484        getPlatform().copyFromLocal( inputFileNums20 );
1485        getPlatform().copyFromLocal( inputFileNums10 );
1486    
1487        Tap source10 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 );
1488        Tap source20 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums20 );
1489    
1490        Map sources = new HashMap();
1491    
1492        sources.put( "source20", source20 );
1493        sources.put( "source101", source10 );
1494        sources.put( "source102", source10 );
1495    
1496        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( stringPath ), SinkMode.REPLACE );
1497    
1498        Pipe pipeNum20 = new Pipe( "source20" );
1499        Pipe pipeNum101 = new Pipe( "source101" );
1500        Pipe pipeNum102 = new Pipe( "source102" );
1501    
1502        Pipe splice1 = new CoGroup( pipeNum20, new Fields( "num" ), pipeNum101, new Fields( "num" ), new Fields( "num1", "num2" ) );
1503    
1504        Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeNum102, new Fields( "num" ), new Fields( "num1", "num2", "num3" ) );
1505    
1506        splice2 = new Each( splice2, new Identity() );
1507    
1508        Map<Object, Object> properties = getPlatform().getProperties();
1509    
1510        if( getPlatform().isMapReduce() )
1511          FlowConnectorProps.setIntermediateSchemeClass( properties, schemeClass );
1512    
1513        Flow flow = getPlatform().getFlowConnector( properties ).connect( "cogroupopt", sources, sink, splice2 );
1514    
1515        if( getPlatform().isMapReduce() )
1516          assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1517    
1518        flow.complete();
1519    
1520        validateLength( flow, 10 );
1521    
1522        List<Tuple> actual = getSinkAsList( flow );
1523    
1524        assertTrue( actual.contains( new Tuple( "1\t1\t1" ) ) );
1525        assertTrue( actual.contains( new Tuple( "10\t10\t10" ) ) );
1526        }
1527    
1528      @Test
1529      public void testCoGroupDiffFieldsSameFile() throws Exception
1530        {
1531        getPlatform().copyFromLocal( inputFileLower );
1532        getPlatform().copyFromLocal( inputFileUpper );
1533    
1534        Tap sourceOffsetLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1535        Tap sourceLower = getPlatform().getTextFile( new Fields( "line" ), inputFileLower );
1536    
1537        Map sources = new HashMap();
1538    
1539        sources.put( "offsetLower", sourceOffsetLower );
1540        sources.put( "lower", sourceLower );
1541    
1542        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samefiledifffields" ), SinkMode.REPLACE );
1543    
1544        Function splitterLower = new RegexSplitter( new Fields( "numA", "left" ), " " );
1545        Function splitterUpper = new RegexSplitter( new Fields( "numB", "right" ), " " );
1546    
1547        Pipe offsetLower = new Pipe( "offsetLower" );
1548        offsetLower = new Discard( offsetLower, new Fields( "offset" ) );
1549        offsetLower = new Each( offsetLower, new Fields( "line" ), splitterLower );
1550    
1551        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterUpper );
1552    
1553        Pipe cogroup = new CoGroup( offsetLower, new Fields( "numA" ), pipeLower, new Fields( "numB" ) );
1554    
1555        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1556    
1557        flow.complete();
1558    
1559        validateLength( flow, 5 );
1560    
1561        List<Tuple> actual = getSinkAsList( flow );
1562    
1563        assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1564        assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1565        }
1566    
1567      @Test
1568      public void testJoinNone() throws Exception
1569        {
1570        getPlatform().copyFromLocal( inputFileLower );
1571        getPlatform().copyFromLocal( inputFileUpper );
1572    
1573        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1574        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1575    
1576        Map sources = new HashMap();
1577    
1578        sources.put( "lower", sourceLower );
1579        sources.put( "upper", sourceUpper );
1580    
1581        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE );
1582    
1583        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1584    
1585        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1586        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1587    
1588        Pipe splice = new CoGroup( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) );
1589    
1590        Map<Object, Object> properties = getProperties();
1591    
1592        Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
1593    
1594        flow.complete();
1595    
1596        validateLength( flow, 25 );
1597    
1598        List<Tuple> values = getSinkAsList( flow );
1599    
1600        assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
1601        assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) );
1602        assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
1603        }
1604      }