001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading;
022    
023    import java.io.Serializable;
024    import java.util.ArrayList;
025    import java.util.Collections;
026    import java.util.Comparator;
027    import java.util.HashMap;
028    import java.util.HashSet;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.Set;
032    
033    import cascading.flow.Flow;
034    import cascading.flow.FlowDef;
035    import cascading.operation.Aggregator;
036    import cascading.operation.Function;
037    import cascading.operation.Identity;
038    import cascading.operation.aggregator.Count;
039    import cascading.operation.aggregator.First;
040    import cascading.operation.expression.ExpressionFunction;
041    import cascading.operation.regex.RegexFilter;
042    import cascading.operation.regex.RegexSplitter;
043    import cascading.pipe.CoGroup;
044    import cascading.pipe.Each;
045    import cascading.pipe.Every;
046    import cascading.pipe.GroupBy;
047    import cascading.pipe.HashJoin;
048    import cascading.pipe.Merge;
049    import cascading.pipe.Pipe;
050    import cascading.pipe.joiner.InnerJoin;
051    import cascading.pipe.joiner.Joiner;
052    import cascading.pipe.joiner.LeftJoin;
053    import cascading.pipe.joiner.MixedJoin;
054    import cascading.pipe.joiner.OuterJoin;
055    import cascading.pipe.joiner.RightJoin;
056    import cascading.tap.SinkMode;
057    import cascading.tap.Tap;
058    import cascading.tuple.Fields;
059    import cascading.tuple.Hasher;
060    import cascading.tuple.Tuple;
061    import org.junit.Test;
062    
063    import static data.InputData.*;
064    
065    
066    public class JoinFieldedPipesPlatformTest extends PlatformTestCase
067      {
068      public JoinFieldedPipesPlatformTest()
069        {
070        super( true, 4, 1 ); // leave cluster testing enabled
071        }
072    
073      @Test
074      public void testCross() throws Exception
075        {
076        getPlatform().copyFromLocal( inputFileLhs );
077        getPlatform().copyFromLocal( inputFileRhs );
078    
079        Map sources = new HashMap();
080    
081        sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) );
082        sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) );
083    
084        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE );
085    
086        Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
087        Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
088    
089        Pipe cross = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
090    
091        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross );
092    
093        flow.complete();
094    
095        validateLength( flow, 37, null );
096    
097        List<Tuple> values = getSinkAsList( flow );
098    
099        assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
100        assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
101        }
102    
103      @Test
104      public void testJoin() throws Exception
105        {
106        getPlatform().copyFromLocal( inputFileLower );
107        getPlatform().copyFromLocal( inputFileUpper );
108    
109        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
110        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
111    
112        Map sources = new HashMap();
113    
114        sources.put( "lower", sourceLower );
115        sources.put( "upper", sourceUpper );
116    
117        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );
118    
119        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
120    
121        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
122        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
123    
124        Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
125    
126        Map<Object, Object> properties = getProperties();
127    
128        Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
129    
130        flow.complete();
131    
132        validateLength( flow, 5 );
133    
134        List<Tuple> values = getSinkAsList( flow );
135    
136        assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
137        assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
138        }
139    
140      @Test
141      public void testJoinSamePipeName() throws Exception
142        {
143        getPlatform().copyFromLocal( inputFileLower );
144        getPlatform().copyFromLocal( inputFileUpper );
145    
146        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
147        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
148    
149        Map sources = new HashMap();
150    
151        sources.put( "lower", sourceLower );
152        sources.put( "upper", sourceUpper );
153    
154        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE );
155    
156        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
157    
158        Pipe pipeLower = new Pipe( "lower" );
159        Pipe pipeUpper = new Pipe( "upper" );
160    
161        // these pipes will hide the source name, and could cause one to be lost
162        pipeLower = new Pipe( "same", pipeLower );
163        pipeUpper = new Pipe( "same", pipeUpper );
164    
165        pipeLower = new Each( pipeLower, new Fields( "line" ), splitter );
166        pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter );
167    
168    //    pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
169    //    pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
170    
171        pipeLower = new Pipe( "left", pipeLower );
172        pipeUpper = new Pipe( "right", pipeUpper );
173    
174    //    pipeLower = new Each( pipeLower, new Debug( true ) );
175    //    pipeUpper = new Each( pipeUpper, new Debug( true ) );
176    
177        Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
178    
179    //    splice = new Each( splice, new Debug( true ) );
180        splice = new Pipe( "splice", splice );
181        splice = new Pipe( "tail", splice );
182    
183        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
184    
185        flow.complete();
186    
187        validateLength( flow, 5 );
188    
189        List<Tuple> values = getSinkAsList( flow );
190    
191        assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
192        assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
193        }
194    
195      @Test
196      public void testJoinWithUnknowns() throws Exception
197        {
198        getPlatform().copyFromLocal( inputFileLower );
199        getPlatform().copyFromLocal( inputFileUpper );
200    
201        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
202        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
203    
204        Map sources = new HashMap();
205    
206        sources.put( "lower", sourceLower );
207        sources.put( "upper", sourceUpper );
208    
209        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE );
210    
211        Function splitter = new RegexSplitter( Fields.UNKNOWN, " " );
212    
213        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
214        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
215    
216        Pipe splice = new HashJoin( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) );
217    
218        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
219    
220        flow.complete();
221    
222        validateLength( flow, 5 );
223    
224        List<Tuple> values = getSinkAsList( flow );
225    
226        assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
227        assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
228        }
229    
230      /**
231       * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with
232       * a new stream using an outerjoin.
233       *
234       * @throws Exception
235       */
236      @Test
237      public void testJoinFilteredBranch() throws Exception
238        {
239        getPlatform().copyFromLocal( inputFileLower );
240        getPlatform().copyFromLocal( inputFileUpper );
241    
242        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
243        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
244    
245        Map sources = new HashMap();
246    
247        sources.put( "lower", sourceLower );
248        sources.put( "upper", sourceUpper );
249    
250        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinfilteredbranch" ), SinkMode.REPLACE );
251    
252        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
253    
254        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
255        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
256        pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all
257        pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
258    
259        Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() );
260    
261        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
262    
263        flow.complete();
264    
265        validateLength( flow, 5 );
266    
267        List<Tuple> values = getSinkAsList( flow );
268    
269        assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) );
270        assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) );
271        }
272    
273      @Test
274      public void testJoinSelf() throws Exception
275        {
276        getPlatform().copyFromLocal( inputFileLower );
277    
278        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
279        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
280    
281        Map sources = new HashMap();
282    
283        sources.put( "lower", sourceLower );
284        sources.put( "upper", sourceUpper );
285    
286        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinself" ), SinkMode.REPLACE );
287    
288        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
289    
290        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
291        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
292    
293        Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
294    
295        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
296    
297        flow.complete();
298    
299        validateLength( flow, 5 );
300    
301        List<Tuple> values = getSinkAsList( flow );
302    
303        assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
304        assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
305        }
306    
307      /**
308       * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join
309       *
310       * @throws Exception when
311       */
312      @Test
313      public void testJoinAfterEvery() throws Exception
314        {
315        getPlatform().copyFromLocal( inputFileLower );
316        getPlatform().copyFromLocal( inputFileUpper );
317    
318        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
319        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
320    
321        Map sources = new HashMap();
322    
323        sources.put( "lower", sourceLower );
324        sources.put( "upper", sourceUpper );
325    
326        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE );
327    
328        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
329    
330        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
331        pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
332        pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL );
333    
334        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
335        pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
336        pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL );
337    
338        Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
339    
340        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
341    
342        flow.complete();
343    
344        validateLength( flow, 5, null );
345    
346        List<Tuple> values = getSinkAsList( flow );
347    
348        assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
349        assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
350        }
351    
352      @Test
353      public void testJoinInnerSingleField() throws Exception
354        {
355        getPlatform().copyFromLocal( inputFileLowerOffset );
356        getPlatform().copyFromLocal( inputFileUpper );
357    
358        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
359        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
360    
361        Map sources = new HashMap();
362    
363        sources.put( "lower", sourceLower );
364        sources.put( "upper", sourceUpper );
365    
366        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joininnersingle" ), SinkMode.REPLACE );
367    
368        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) );
369        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) );
370    
371        Pipe join = new HashJoin( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) );
372    
373        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
374    
375        flow.complete();
376    
377        validateLength( flow, 3, null );
378    
379        Set<Tuple> results = new HashSet<Tuple>();
380    
381        results.add( new Tuple( "1\t1" ) );
382        results.add( new Tuple( "5\t5" ) );
383    
384        List<Tuple> actual = getSinkAsList( flow );
385    
386        results.removeAll( actual );
387    
388        assertEquals( 0, results.size() );
389        }
390    
391      /**
392       * 1 a1
393       * 1 a2
394       * 1 a3
395       * 2 b1
396       * 3 c1
397       * 4 d1
398       * 4 d2
399       * 4 d3
400       * 5 e1
401       * 5 e2
402       * 5 e3
403       * 7 g1
404       * 7 g2
405       * 7 g3
406       * 7 g4
407       * 7 g5
408       * null h1
409       * <p/>
410       * 1 A1
411       * 1 A2
412       * 1 A3
413       * 2 B1
414       * 2 B2
415       * 2 B3
416       * 4 D1
417       * 6 F1
418       * 6 F2
419       * null H1
420       * <p/>
421       * 1  a1      1       A1
422       * 1  a1      1       A2
423       * 1  a1      1       A3
424       * 1  a2      1       A1
425       * 1  a2      1       A2
426       * 1  a2      1       A3
427       * 1  a3      1       A1
428       * 1  a3      1       A2
429       * 1  a3      1       A3
430       * 2  b1      2       B1
431       * 2  b1      2       B2
432       * 2  b1      2       B3
433       * 4  d1      4       D1
434       * 4  d2      4       D1
435       * 4  d3      4       D1
436       * null h1  null  H1
437       *
438       * @throws Exception
439       */
440      @Test
441      public void testJoinInner() throws Exception
442        {
443        HashSet<Tuple> results = new HashSet<Tuple>();
444    
445        results.add( new Tuple( "1", "a1", "1", "A1" ) );
446        results.add( new Tuple( "1", "a1", "1", "A2" ) );
447        results.add( new Tuple( "1", "a1", "1", "A3" ) );
448        results.add( new Tuple( "1", "a2", "1", "A1" ) );
449        results.add( new Tuple( "1", "a2", "1", "A2" ) );
450        results.add( new Tuple( "1", "a2", "1", "A3" ) );
451        results.add( new Tuple( "1", "a3", "1", "A1" ) );
452        results.add( new Tuple( "1", "a3", "1", "A2" ) );
453        results.add( new Tuple( "1", "a3", "1", "A3" ) );
454        results.add( new Tuple( "2", "b1", "2", "B1" ) );
455        results.add( new Tuple( "2", "b1", "2", "B2" ) );
456        results.add( new Tuple( "2", "b1", "2", "B3" ) );
457        results.add( new Tuple( "4", "d1", "4", "D1" ) );
458        results.add( new Tuple( "4", "d2", "4", "D1" ) );
459        results.add( new Tuple( "4", "d3", "4", "D1" ) );
460        results.add( new Tuple( null, "h1", null, "H1" ) );
461    
462        handleJoins( "joininner", new InnerJoin(), results );
463        }
464    
465      /**
466       * /**
467       * 1 a1
468       * 1 a2
469       * 1 a3
470       * 2 b1
471       * 3 c1
472       * 4 d1
473       * 4 d2
474       * 4 d3
475       * 5 e1
476       * 5 e2
477       * 5 e3
478       * 7 g1
479       * 7 g2
480       * 7 g3
481       * 7 g4
482       * 7 g5
483       * null h1
484       * <p/>
485       * 1 A1
486       * 1 A2
487       * 1 A3
488       * 2 B1
489       * 2 B2
490       * 2 B3
491       * 4 D1
492       * 6 F1
493       * 6 F2
494       * null H1
495       * <p/>
496       * 1  a1      1       A1
497       * 1  a1      1       A2
498       * 1  a1      1       A3
499       * 1  a2      1       A1
500       * 1  a2      1       A2
501       * 1  a2      1       A3
502       * 1  a3      1       A1
503       * 1  a3      1       A2
504       * 1  a3      1       A3
505       * 2  b1      2       B1
506       * 2  b1      2       B2
507       * 2  b1      2       B3
508       * 3  c1      null    null
509       * 4  d1      4       D1
510       * 4  d2      4       D1
511       * 4  d3      4       D1
512       * 5  e1      null    null
513       * 5  e2      null    null
514       * 5  e3      null    null
515       * null       null    6       F1
516       * null       null    6       F2
517       * 7  g1      null    null
518       * 7  g2      null    null
519       * 7  g3      null    null
520       * 7  g4      null    null
521       * 7  g5      null    null
522       * null h1  null  H1
523       *
524       * @throws Exception
525       */
526      @Test
527      public void testJoinOuter() throws Exception
528        {
529        // skip if hadoop cluster mode, outer joins don't behave the same
530        if( getPlatform().isMapReduce() && getPlatform().isUseCluster() )
531          return;
532    
533        Set<Tuple> results = new HashSet<Tuple>();
534    
535        results.add( new Tuple( "1", "a1", "1", "A1" ) );
536        results.add( new Tuple( "1", "a1", "1", "A2" ) );
537        results.add( new Tuple( "1", "a1", "1", "A3" ) );
538        results.add( new Tuple( "1", "a2", "1", "A1" ) );
539        results.add( new Tuple( "1", "a2", "1", "A2" ) );
540        results.add( new Tuple( "1", "a2", "1", "A3" ) );
541        results.add( new Tuple( "1", "a3", "1", "A1" ) );
542        results.add( new Tuple( "1", "a3", "1", "A2" ) );
543        results.add( new Tuple( "1", "a3", "1", "A3" ) );
544        results.add( new Tuple( "2", "b1", "2", "B1" ) );
545        results.add( new Tuple( "2", "b1", "2", "B2" ) );
546        results.add( new Tuple( "2", "b1", "2", "B3" ) );
547        results.add( new Tuple( "3", "c1", null, null ) );
548        results.add( new Tuple( "4", "d1", "4", "D1" ) );
549        results.add( new Tuple( "4", "d2", "4", "D1" ) );
550        results.add( new Tuple( "4", "d3", "4", "D1" ) );
551        results.add( new Tuple( "5", "e1", null, null ) );
552        results.add( new Tuple( "5", "e2", null, null ) );
553        results.add( new Tuple( "5", "e3", null, null ) );
554        results.add( new Tuple( null, null, "6", "F1" ) );
555        results.add( new Tuple( null, null, "6", "F2" ) );
556        results.add( new Tuple( "7", "g1", null, null ) );
557        results.add( new Tuple( "7", "g2", null, null ) );
558        results.add( new Tuple( "7", "g3", null, null ) );
559        results.add( new Tuple( "7", "g4", null, null ) );
560        results.add( new Tuple( "7", "g5", null, null ) );
561        results.add( new Tuple( null, "h1", null, "H1" ) );
562    
563        handleJoins( "joinouter", new OuterJoin(), results );
564        }
565    
566      /**
567       * 1 a1
568       * 1 a2
569       * 1 a3
570       * 2 b1
571       * 3 c1
572       * 4 d1
573       * 4 d2
574       * 4 d3
575       * 5 e1
576       * 5 e2
577       * 5 e3
578       * 7 g1
579       * 7 g2
580       * 7 g3
581       * 7 g4
582       * 7 g5
583       * null h1
584       * <p/>
585       * 1 A1
586       * 1 A2
587       * 1 A3
588       * 2 B1
589       * 2 B2
590       * 2 B3
591       * 4 D1
592       * 6 F1
593       * 6 F2
594       * null H1
595       * <p/>
596       * 1  a1      1       A1
597       * 1  a1      1       A2
598       * 1  a1      1       A3
599       * 1  a2      1       A1
600       * 1  a2      1       A2
601       * 1  a2      1       A3
602       * 1  a3      1       A1
603       * 1  a3      1       A2
604       * 1  a3      1       A3
605       * 2  b1      2       B1
606       * 2  b1      2       B2
607       * 2  b1      2       B3
608       * 3  c1      null    null
609       * 4  d1      4       D1
610       * 4  d2      4       D1
611       * 4  d3      4       D1
612       * 5  e1      null    null
613       * 5  e2      null    null
614       * 5  e3      null    null
615       * 7  g1      null    null
616       * 7  g2      null    null
617       * 7  g3      null    null
618       * 7  g4      null    null
619       * 7  g5      null    null
620       * null h1    null    H1
621       *
622       * @throws Exception
623       */
624      @Test
625      public void testJoinInnerOuter() throws Exception
626        {
627        Set<Tuple> results = new HashSet<Tuple>();
628    
629        results.add( new Tuple( "1", "a1", "1", "A1" ) );
630        results.add( new Tuple( "1", "a1", "1", "A2" ) );
631        results.add( new Tuple( "1", "a1", "1", "A3" ) );
632        results.add( new Tuple( "1", "a2", "1", "A1" ) );
633        results.add( new Tuple( "1", "a2", "1", "A2" ) );
634        results.add( new Tuple( "1", "a2", "1", "A3" ) );
635        results.add( new Tuple( "1", "a3", "1", "A1" ) );
636        results.add( new Tuple( "1", "a3", "1", "A2" ) );
637        results.add( new Tuple( "1", "a3", "1", "A3" ) );
638        results.add( new Tuple( "2", "b1", "2", "B1" ) );
639        results.add( new Tuple( "2", "b1", "2", "B2" ) );
640        results.add( new Tuple( "2", "b1", "2", "B3" ) );
641        results.add( new Tuple( "3", "c1", null, null ) );
642        results.add( new Tuple( "4", "d1", "4", "D1" ) );
643        results.add( new Tuple( "4", "d2", "4", "D1" ) );
644        results.add( new Tuple( "4", "d3", "4", "D1" ) );
645        results.add( new Tuple( "5", "e1", null, null ) );
646        results.add( new Tuple( "5", "e2", null, null ) );
647        results.add( new Tuple( "5", "e3", null, null ) );
648        results.add( new Tuple( "7", "g1", null, null ) );
649        results.add( new Tuple( "7", "g2", null, null ) );
650        results.add( new Tuple( "7", "g3", null, null ) );
651        results.add( new Tuple( "7", "g4", null, null ) );
652        results.add( new Tuple( "7", "g5", null, null ) );
653        results.add( new Tuple( null, "h1", null, "H1" ) );
654    
655        handleJoins( "joininnerouter", new LeftJoin(), results );
656        }
657    
658      /**
659       * 1 a1
660       * 1 a2
661       * 1 a3
662       * 2 b1
663       * 3 c1
664       * 4 d1
665       * 4 d2
666       * 4 d3
667       * 5 e1
668       * 5 e2
669       * 5 e3
670       * 7 g1
671       * 7 g2
672       * 7 g3
673       * 7 g4
674       * 7 g5
675       * null h1
676       * <p/>
677       * 1 A1
678       * 1 A2
679       * 1 A3
680       * 2 B1
681       * 2 B2
682       * 2 B3
683       * 4 D1
684       * 6 F1
685       * 6 F2
686       * null H1
687       * <p/>
688       * 1  a1      1       A1
689       * 1  a1      1       A2
690       * 1  a1      1       A3
691       * 1  a2      1       A1
692       * 1  a2      1       A2
693       * 1  a2      1       A3
694       * 1  a3      1       A1
695       * 1  a3      1       A2
696       * 1  a3      1       A3
697       * 2  b1      2       B1
698       * 2  b1      2       B2
699       * 2  b1      2       B3
700       * 4  d1      4       D1
701       * 4  d2      4       D1
702       * 4  d3      4       D1
703       * null       null    6       F1
704       * null       null    6       F2
705       * null h1    null    H1
706       *
707       * @throws Exception
708       */
709      @Test
710      public void testJoinOuterInner() throws Exception
711        {
712        // skip if hadoop cluster mode, outer joins don't behave the same
713        if( getPlatform().isMapReduce() && getPlatform().isUseCluster() )
714          return;
715    
716        Set<Tuple> results = new HashSet<Tuple>();
717    
718        results.add( new Tuple( "1", "a1", "1", "A1" ) );
719        results.add( new Tuple( "1", "a1", "1", "A2" ) );
720        results.add( new Tuple( "1", "a1", "1", "A3" ) );
721        results.add( new Tuple( "1", "a2", "1", "A1" ) );
722        results.add( new Tuple( "1", "a2", "1", "A2" ) );
723        results.add( new Tuple( "1", "a2", "1", "A3" ) );
724        results.add( new Tuple( "1", "a3", "1", "A1" ) );
725        results.add( new Tuple( "1", "a3", "1", "A2" ) );
726        results.add( new Tuple( "1", "a3", "1", "A3" ) );
727        results.add( new Tuple( "2", "b1", "2", "B1" ) );
728        results.add( new Tuple( "2", "b1", "2", "B2" ) );
729        results.add( new Tuple( "2", "b1", "2", "B3" ) );
730        results.add( new Tuple( "4", "d1", "4", "D1" ) );
731        results.add( new Tuple( "4", "d2", "4", "D1" ) );
732        results.add( new Tuple( "4", "d3", "4", "D1" ) );
733        results.add( new Tuple( null, null, "6", "F1" ) );
734        results.add( new Tuple( null, null, "6", "F2" ) );
735        results.add( new Tuple( null, "h1", null, "H1" ) );
736    
737        handleJoins( "joinouterinner", new RightJoin(), results );
738        }
739    
740      private void handleJoins( String path, Joiner joiner, Set<Tuple> results ) throws Exception
741        {
742        getPlatform().copyFromLocal( inputFileLhsSparse );
743        getPlatform().copyFromLocal( inputFileRhsSparse );
744    
745        Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class );
746        Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse );
747        Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse );
748    
749        Map sources = new HashMap();
750    
751        sources.put( "lower", sourceLower );
752        sources.put( "upper", sourceUpper );
753    
754        Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE );
755    
756        Pipe pipeLower = new Pipe( "lower" );
757        Pipe pipeUpper = new Pipe( "upper" );
758    
759        Fields declaredFields = new Fields( "num", "char", "num2", "char2" );
760        Fields groupingFields = new Fields( "num" );
761    
762        Pipe splice = new HashJoin( pipeLower, groupingFields, pipeUpper, groupingFields, declaredFields, joiner );
763    
764        splice = new Each( splice, Fields.ALL, new Identity(), Fields.RESULTS );
765    
766        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
767    
768        flow.complete();
769    
770        validateLength( flow, results.size() );
771    
772        List<Tuple> actual = getSinkAsList( flow );
773    
774        results.removeAll( actual );
775    
776        assertEquals( 0, results.size() );
777        }
778    
779      /**
780       * 1 a
781       * 5 b
782       * 6 c
783       * 5 b
784       * 5 e
785       * <p/>
786       * 1 A
787       * 2 B
788       * 3 C
789       * 4 D
790       * 5 E
791       * <p/>
792       * 1 a
793       * 2 b
794       * 3 c
795       * 4 d
796       * 5 e
797       * <p/>
798       * 1  a       1       A  1  a
799       * -  -   2   B  2  b
800       * -  -   3   C  3  c
801       * -  -   4   D  4  d
802       * 5  b       5   E  5  e
803       * 5  e       5   E  5  e
804       *
805       * @throws Exception
806       */
807      @Test
808      public void testJoinMixed() throws Exception
809        {
810        // skip if hadoop cluster mode, outer joins don't behave the same
811        if( getPlatform().isMapReduce() && getPlatform().isUseCluster() )
812          return;
813    
814        getPlatform().copyFromLocal( inputFileLowerOffset );
815        getPlatform().copyFromLocal( inputFileLower );
816        getPlatform().copyFromLocal( inputFileUpper );
817    
818        Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
819        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
820        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
821    
822        Map sources = new HashMap();
823    
824        sources.put( "loweroffset", sourceLowerOffset );
825        sources.put( "lower", sourceLower );
826        sources.put( "upper", sourceUpper );
827    
828        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinmixed" ), SinkMode.REPLACE );
829    
830        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
831    
832        Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter );
833        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
834        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
835    
836        Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower );
837        Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) );
838    
839        MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} );
840        Pipe splice = new HashJoin( pipes, fields, Fields.size( 6 ), join );
841    
842        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
843    
844        flow.complete();
845    
846        validateLength( flow, 6 );
847    
848        Set<Tuple> results = new HashSet<Tuple>();
849    
850        results.add( new Tuple( "1\ta\t1\tA\t1\ta" ) );
851        results.add( new Tuple( "null\tnull\t2\tB\t2\tb" ) );
852        results.add( new Tuple( "null\tnull\t3\tC\t3\tc" ) );
853        results.add( new Tuple( "null\tnull\t4\tD\t4\td" ) );
854        results.add( new Tuple( "5\tb\t5\tE\t5\te" ) );
855        results.add( new Tuple( "5\te\t5\tE\t5\te" ) );
856    
857        List<Tuple> actual = getSinkAsList( flow );
858    
859        results.removeAll( actual );
860    
861        assertEquals( 0, results.size() );
862        }
863    
864      @Test
865      public void testJoinDiffFields() throws Exception
866        {
867        getPlatform().copyFromLocal( inputFileLower );
868        getPlatform().copyFromLocal( inputFileUpper );
869    
870        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
871        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
872    
873        Map sources = new HashMap();
874    
875        sources.put( "lower", sourceLower );
876        sources.put( "upper", sourceUpper );
877    
878        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE );
879    
880        Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
881        Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
882    
883        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
884        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
885    
886        Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
887    
888        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
889    
890        flow.complete();
891    
892        validateLength( flow, 5 );
893    
894        List<Tuple> actual = getSinkAsList( flow );
895    
896        assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
897        assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
898        }
899    
900      @Test
901      public void testJoinGroupBy() throws Exception
902        {
903        getPlatform().copyFromLocal( inputFileLower );
904        getPlatform().copyFromLocal( inputFileUpper );
905    
906        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
907        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
908    
909        Map sources = new HashMap();
910    
911        sources.put( "lower", sourceLower );
912        sources.put( "upper", sourceUpper );
913    
914        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupby" ), SinkMode.REPLACE );
915    
916        Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
917        Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
918    
919        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
920        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
921    
922        Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
923    
924        Pipe groupby = new GroupBy( pipe, new Fields( "numA" ) );
925    
926        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, groupby );
927    
928        flow.complete();
929    
930        validateLength( flow, 5, null );
931    
932        List<Tuple> actual = getSinkAsList( flow );
933    
934        assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
935        assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
936        }
937    
938      @Test
939      public void testJoinSamePipe() throws Exception
940        {
941        getPlatform().copyFromLocal( inputFileLower );
942    
943        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
944    
945        Map sources = new HashMap();
946    
947        sources.put( "lower", source );
948    
949        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE );
950    
951        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
952    
953        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
954    
955        Pipe pipe = new HashJoin( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) );
956    
957        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
958    
959        flow.complete();
960    
961        validateLength( flow, 5, null );
962    
963        List<Tuple> actual = getSinkAsList( flow );
964    
965        assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
966        assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
967        }
968    
969      @Test
970      public void testJoinSamePipe2() throws Exception
971        {
972        getPlatform().copyFromLocal( inputFileLower );
973    
974        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
975    
976        Map sources = new HashMap();
977    
978        sources.put( "lower", source );
979    
980        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE );
981    
982        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
983    
984        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
985    
986        Pipe join = new HashJoin( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
987    
988        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
989    
990        flow.complete();
991    
992        validateLength( flow, 5, null );
993    
994        List<Tuple> actual = getSinkAsList( flow );
995    
996        assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
997        assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
998        }
999    
1000      @Test
1001      public void testJoinSamePipe3() throws Exception
1002        {
1003        getPlatform().copyFromLocal( inputFileLower );
1004    
1005        Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
1006    
1007        Map sources = new HashMap();
1008    
1009        sources.put( "lower", source );
1010    
1011        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE );
1012    
1013        Pipe pipe = new Pipe( "lower" );
1014    
1015        Pipe lhs = new Pipe( "lhs", pipe );
1016        Pipe rhs = new Pipe( "rhs", pipe );
1017    
1018        Pipe join = new HashJoin( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1019    
1020        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
1021    
1022        flow.complete();
1023    
1024        validateLength( flow, 5, null );
1025    
1026        List<Tuple> actual = getSinkAsList( flow );
1027    
1028        assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1029        assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1030        }
1031    
1032      /**
1033       * Same source as rightmost
1034       * <p/>
1035       * should be a single job as the same file accumulates into the joins
1036       *
1037       * @throws Exception
1038       */
1039      @Test
1040      public void testJoinAroundJoinRightMost() throws Exception
1041        {
1042        getPlatform().copyFromLocal( inputFileLower );
1043        getPlatform().copyFromLocal( inputFileUpper );
1044    
1045        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1046        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1047    
1048        Map sources = new HashMap();
1049    
1050        sources.put( "lower", sourceLower );
1051        sources.put( "upper1", sourceUpper );
1052        sources.put( "upper2", sourceUpper );
1053    
1054        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinrightmost" ), SinkMode.REPLACE );
1055    
1056        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1057    
1058        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1059        Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1060        Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1061    
1062        Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1063    
1064        splice1 = new Each( splice1, new Identity() );
1065    
1066        Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1067    
1068        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1069    
1070    //    flow.writeDOT( "joinaroundrightmost.dot" );
1071    
1072        if( getPlatform().isMapReduce() )
1073          assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1074    
1075        flow.complete();
1076    
1077        validateLength( flow, 5, null );
1078    
1079        List<Tuple> actual = getSinkAsList( flow );
1080    
1081        assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
1082        assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) );
1083        }
1084    
1085      /**
1086       * Same source as leftmost
1087       *
1088       * @throws Exception
1089       */
1090      @Test
1091      public void testJoinAroundJoinLeftMost() throws Exception
1092        {
1093        getPlatform().copyFromLocal( inputFileLower );
1094        getPlatform().copyFromLocal( inputFileUpper );
1095    
1096        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1097        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1098    
1099        Map sources = new HashMap();
1100    
1101        sources.put( "lower", sourceLower );
1102        sources.put( "upper1", sourceUpper );
1103        sources.put( "upper2", sourceUpper );
1104    
1105        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinleftmost" ), SinkMode.REPLACE );
1106    
1107        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1108    
1109        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1110        Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1111        Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1112    
1113        Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1114    
1115        splice1 = new Each( splice1, new Identity() );
1116    
1117        Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1118    
1119        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1120    
1121    //    flow.writeDOT( "joinaroundleftmost.dot" );
1122    
1123        if( getPlatform().isMapReduce() )
1124          assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1125    
1126        flow.complete();
1127    
1128        validateLength( flow, 5, null );
1129    
1130        List<Tuple> actual = getSinkAsList( flow );
1131    
1132        assertTrue( actual.contains( new Tuple( "1\tA\t1\tA\t1\ta" ) ) );
1133        assertTrue( actual.contains( new Tuple( "2\tB\t2\tB\t2\tb" ) ) );
1134        }
1135    
1136      /**
1137       * Upper as leftmost and rightmost forcing two jobs
1138       *
1139       * @throws Exception
1140       */
1141      @Test
1142      public void testJoinAroundJoinRightMostSwapped() throws Exception
1143        {
1144        getPlatform().copyFromLocal( inputFileLower );
1145        getPlatform().copyFromLocal( inputFileUpper );
1146    
1147        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1148        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1149    
1150        Map sources = new HashMap();
1151    
1152        sources.put( "lower", sourceLower );
1153        sources.put( "upper1", sourceUpper );
1154        sources.put( "upper2", sourceUpper );
1155    
1156        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinswapped" ), SinkMode.REPLACE );
1157    
1158        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1159    
1160        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1161        Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1162        Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1163    
1164        Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1165    
1166        splice1 = new Each( splice1, new Identity() );
1167    
1168        // upper2 becomes leftmost, forcing a tap between the joins
1169        Pipe splice2 = new HashJoin( pipeUpper2, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1170    
1171        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1172    
1173        if( getPlatform().isMapReduce() )
1174          assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1175    
1176        flow.complete();
1177    
1178        validateLength( flow, 5, null );
1179    
1180        List<Tuple> actual = getSinkAsList( flow );
1181    
1182        assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\tA" ) ) );
1183        assertTrue( actual.contains( new Tuple( "2\tB\t2\tb\t2\tB" ) ) );
1184        }
1185    
1186      @Test
1187      public void testJoinGroupByJoin() throws Exception
1188        {
1189        getPlatform().copyFromLocal( inputFileLower );
1190        getPlatform().copyFromLocal( inputFileUpper );
1191        getPlatform().copyFromLocal( inputFileJoined );
1192    
1193        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1194        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1195        Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined );
1196    
1197        Map sources = new HashMap();
1198    
1199        sources.put( "lower", sourceLower );
1200        sources.put( "upper", sourceUpper );
1201        sources.put( "joined", sourceJoined );
1202    
1203        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupbyjoin" ), SinkMode.REPLACE );
1204    
1205        Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
1206        Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
1207        Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" );
1208    
1209        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1210        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1211        Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined );
1212    
1213        Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1214    
1215        pipe = new GroupBy( pipe, new Fields( "numA" ) );
1216    
1217        pipe = new HashJoin( pipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) );
1218    
1219        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
1220    
1221        if( getPlatform().isMapReduce() )
1222          assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1223    
1224        flow.complete();
1225    
1226        validateLength( flow, 5, null );
1227    
1228        List<Tuple> actual = getSinkAsList( flow );
1229    
1230        assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\tA" ) ) );
1231        assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tb\tB" ) ) );
1232        }
1233    
1234      /**
1235       * here the same file is fed into the same HashJoin.
1236       * <p/>
1237       * This is three jobs.
1238       * <p/>
1239       * a temp tap is inserted before the accumulated branch for two reasons on the common HashJoin
1240       * <p/>
1241       * it is assumed the accumulated side is filtered down, so pushing to disk will preserve io
1242       * if accumulated side was streamed instead via a fork, only part of the file will accumulate into the HashJoin
1243       * <p/>
1244       * /-T-\ <-- accumulated
1245       * T      HJ
1246       * \---/ <-- streamed
1247       *
1248       * @throws Exception
1249       */
1250      @Test
1251      public void testJoinSameSourceIntoJoin() throws Exception
1252        {
1253        getPlatform().copyFromLocal( inputFileLower );
1254        getPlatform().copyFromLocal( inputFileUpper );
1255    
1256        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1257        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1258    
1259        Map sources = new HashMap();
1260    
1261        sources.put( "lower", sourceLower );
1262        sources.put( "upper1", sourceUpper );
1263        sources.put( "upper2", sourceUpper );
1264    
1265        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceintojoin" ), SinkMode.REPLACE );
1266    
1267        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1268    
1269        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1270        Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1271        Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1272    
1273        Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1274    
1275        splice1 = new Each( splice1, new Identity() );
1276    
1277        Pipe splice2 = new HashJoin( pipeLower, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1278    
1279        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1280    
1281    //    flow.writeDOT( "joinsamesourceintojoin.dot" );
1282    
1283        if( getPlatform().isMapReduce() )
1284          assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );
1285    
1286        flow.complete();
1287    
1288        validateLength( flow, 5, null );
1289    
1290        List<Tuple> actual = getSinkAsList( flow );
1291    
1292        assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
1293        assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) );
1294        }
1295    
1296      /**
1297       * Tests that two independent streamed sources with loadable tributaries properly plan into a GroupBy
1298       * without loading unused sources
1299       *
1300       * @throws Exception
1301       */
1302      @Test
1303      public void testJoinsIntoGroupBy() throws Exception
1304        {
1305        getPlatform().copyFromLocal( inputFileLower );
1306        getPlatform().copyFromLocal( inputFileUpper );
1307    
1308        getPlatform().copyFromLocal( inputFileLhs );
1309        getPlatform().copyFromLocal( inputFileRhs );
1310    
1311        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1312        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1313    
1314        Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1315        Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1316    
1317        Map sources = new HashMap();
1318    
1319        sources.put( "lower", sourceLower );
1320        sources.put( "upper", sourceUpper );
1321        sources.put( "lhs", sourceLhs );
1322        sources.put( "rhs", sourceRhs );
1323    
1324        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintogroupby" ), SinkMode.REPLACE );
1325    
1326        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1327    
1328        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1329        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1330    
1331        Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1332        Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1333    
1334        Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1335    
1336        upperLower = new Each( upperLower, new Identity() );
1337    
1338        Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1339    
1340        lhsRhs = new Each( lhsRhs, new Identity() );
1341    
1342        Pipe grouped = new GroupBy( "merging", Pipe.pipes( upperLower, lhsRhs ), new Fields( "num1" ) );
1343    
1344        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1345    
1346        if( getPlatform().isMapReduce() )
1347          assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1348    
1349        flow.complete();
1350    
1351        validateLength( flow, 42, null );
1352    
1353        List<Tuple> actual = getSinkAsList( flow );
1354    
1355        assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1356        assertTrue( actual.contains( new Tuple( "5\te\t5\tE" ) ) );
1357        }
1358    
1359      @Test
1360      public void testJoinSamePipeAroundGroupBy() throws Exception
1361        {
1362        getPlatform().copyFromLocal( inputFileLower );
1363    
1364        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1365        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipearoundgroupby" ), SinkMode.REPLACE );
1366    
1367        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1368    
1369        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1370    
1371        Pipe lhsPipe = new Each( new Pipe( "lhs", pipeLower ), new Identity() );
1372    
1373        Pipe rhsPipe = new Each( new Pipe( "rhs", pipeLower ), new Identity() );
1374    
1375        rhsPipe = new GroupBy( rhsPipe, new Fields( "num" ) );
1376    
1377        rhsPipe = new Each( rhsPipe, new Identity() );
1378    
1379        Pipe pipe = new HashJoin( lhsPipe, new Fields( "num" ), rhsPipe, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1380    
1381        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
1382    
1383        flow.complete();
1384    
1385        validateLength( flow, 5, null );
1386    
1387        List<Tuple> actual = getSinkAsList( flow );
1388    
1389        assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1390        assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1391        }
1392    
1393      /**
1394       * This test results in two MR jobs because one join feeds into the accumulated side of the second. A mapper
1395       * can only stream on branch at a time forcing a temp file between the mappers. see next test for swapped join
1396       *
1397       * @throws Exception
1398       */
1399      @Test
1400      public void testJoinsIntoCoGroupLhs() throws Exception
1401        {
1402        getPlatform().copyFromLocal( inputFileLower );
1403        getPlatform().copyFromLocal( inputFileUpper );
1404    
1405        getPlatform().copyFromLocal( inputFileLhs );
1406        getPlatform().copyFromLocal( inputFileRhs );
1407    
1408        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1409        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1410    
1411        Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1412        Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1413    
1414        Map sources = new HashMap();
1415    
1416        sources.put( "lower", sourceLower );
1417        sources.put( "upper", sourceUpper );
1418        sources.put( "lhs", sourceLhs );
1419        sources.put( "rhs", sourceRhs );
1420    
1421        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhs" ), SinkMode.REPLACE );
1422    
1423        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1424    
1425        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1426        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1427    
1428        Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1429        Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1430    
1431        Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1432    
1433        upperLower = new Each( upperLower, new Identity() );
1434    
1435        Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1436    
1437        lhsUpperLower = new Each( lhsUpperLower, new Identity() );
1438    
1439        Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) );
1440    
1441        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1442    
1443        if( getPlatform().isMapReduce() )
1444          assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1445    
1446        flow.complete();
1447    
1448        validateLength( flow, 37, null );
1449    
1450        List<Tuple> actual = getSinkAsList( flow );
1451    
1452        assertTrue( actual.contains( new Tuple( "1\ta\t1\ta\t1\tA\t1\tA" ) ) );
1453        assertTrue( actual.contains( new Tuple( "5\ta\t5\te\t5\tE\t5\tA" ) ) );
1454        }
1455    
1456      /**
1457       * This test results in one MR jobs because one join feeds into the streamed side of the second.
1458       *
1459       * @throws Exception
1460       */
1461      @Test
1462      public void testJoinsIntoCoGroupLhsSwappedJoin() throws Exception
1463        {
1464        getPlatform().copyFromLocal( inputFileLower );
1465        getPlatform().copyFromLocal( inputFileUpper );
1466    
1467        getPlatform().copyFromLocal( inputFileLhs );
1468        getPlatform().copyFromLocal( inputFileRhs );
1469    
1470        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1471        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1472    
1473        Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1474        Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1475    
1476        Map sources = new HashMap();
1477    
1478        sources.put( "lower", sourceLower );
1479        sources.put( "upper", sourceUpper );
1480        sources.put( "lhs", sourceLhs );
1481        sources.put( "rhs", sourceRhs );
1482    
1483        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhsswappedjoin" ), SinkMode.REPLACE );
1484    
1485        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1486    
1487        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1488        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1489    
1490        Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1491        Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1492    
1493        Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1494    
1495        upperLower = new Each( upperLower, new Identity() );
1496    
1497        Pipe lhsUpperLower = new HashJoin( upperLower, new Fields( "numUpperLower" ), pipeLhs, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower", "numLhs", "charLhs" ) );
1498    
1499        lhsUpperLower = new Each( lhsUpperLower, new Identity() );
1500    
1501        Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) );
1502    
1503        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1504    
1505        if( getPlatform().isMapReduce() )
1506          assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1507    
1508        flow.complete();
1509    
1510        validateLength( flow, 37, null );
1511    
1512        List<Tuple> actual = getSinkAsList( flow );
1513    
1514        assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) );
1515        assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) );
1516        }
1517    
1518      @Test
1519      public void testJoinsIntoCoGroupRhs() throws Exception
1520        {
1521        getPlatform().copyFromLocal( inputFileLower );
1522        getPlatform().copyFromLocal( inputFileUpper );
1523    
1524        getPlatform().copyFromLocal( inputFileLhs );
1525        getPlatform().copyFromLocal( inputFileRhs );
1526    
1527        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1528        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1529    
1530        Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1531        Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1532    
1533        Map sources = new HashMap();
1534    
1535        sources.put( "lower", sourceLower );
1536        sources.put( "upper", sourceUpper );
1537        sources.put( "lhs", sourceLhs );
1538        sources.put( "rhs", sourceRhs );
1539    
1540        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouprhs" ), SinkMode.REPLACE );
1541    
1542        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1543    
1544        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1545        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1546    
1547        Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1548        Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1549    
1550        Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1551    
1552        upperLower = new Each( upperLower, new Identity() );
1553    
1554        Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1555    
1556        lhsUpperLower = new Each( lhsUpperLower, new Identity() );
1557    
1558        Pipe grouped = new CoGroup( "cogrouping", pipeRhs, new Fields( "num" ), lhsUpperLower, new Fields( "numLhs" ) );
1559    
1560        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1561    
1562        if( getPlatform().isMapReduce() )
1563          assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1564    
1565        flow.complete();
1566    
1567        validateLength( flow, 37, null );
1568    
1569        List<Tuple> actual = getSinkAsList( flow );
1570    
1571        assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\ta\t1\tA" ) ) );
1572        assertTrue( actual.contains( new Tuple( "5\tE\t5\te\t5\te\t5\tE" ) ) );
1573        }
1574    
1575      @Test
1576      public void testJoinsIntoCoGroup() throws Exception
1577        {
1578        getPlatform().copyFromLocal( inputFileLower );
1579        getPlatform().copyFromLocal( inputFileUpper );
1580    
1581        getPlatform().copyFromLocal( inputFileLhs );
1582        getPlatform().copyFromLocal( inputFileRhs );
1583    
1584        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1585        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1586    
1587        Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1588        Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1589    
1590        Map sources = new HashMap();
1591    
1592        sources.put( "lower", sourceLower );
1593        sources.put( "upper", sourceUpper );
1594        sources.put( "lhs", sourceLhs );
1595        sources.put( "rhs", sourceRhs );
1596    
1597        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogroup" ), SinkMode.REPLACE );
1598    
1599        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1600    
1601        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1602        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1603    
1604        Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1605        Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1606    
1607        Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower1", "charUpperLower1", "numUpperLower2", "charUpperLower2" ) );
1608    
1609        upperLower = new Each( upperLower, new Identity() );
1610    
1611        Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "numLhsRhs1", "charLhsRhs1", "numLhsRhs2", "charLhsRhs2" ) );
1612    
1613        lhsRhs = new Each( lhsRhs, new Identity() );
1614    
1615        Pipe grouped = new CoGroup( "cogrouping", upperLower, new Fields( "numUpperLower1" ), lhsRhs, new Fields( "numLhsRhs1" ) );
1616    
1617        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1618    
1619        if( getPlatform().isMapReduce() )
1620          assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1621    
1622        flow.complete();
1623    
1624        validateLength( flow, 37, null );
1625    
1626        List<Tuple> actual = getSinkAsList( flow );
1627    
1628        assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) );
1629        assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) );
1630        }
1631    
1632      public static class AllComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable
1633        {
1634    
1635        @Override
1636        public int compare( Comparable lhs, Comparable rhs )
1637          {
1638          return lhs.toString().compareTo( rhs.toString() );
1639          }
1640    
1641        @Override
1642        public int hashCode( Comparable value )
1643          {
1644          if( value == null )
1645            return 0;
1646    
1647          return value.toString().hashCode();
1648          }
1649        }
1650    
1651      /**
1652       * Tests Hasher being honored even if default comparator is null.
1653       *
1654       * @throws Exception
1655       */
1656      @Test
1657      public void testJoinWithHasher() throws Exception
1658        {
1659        getPlatform().copyFromLocal( inputFileLower );
1660        getPlatform().copyFromLocal( inputFileUpper );
1661    
1662        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1663        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1664    
1665        Map sources = new HashMap();
1666    
1667        sources.put( "lower", sourceLower );
1668        sources.put( "upper", sourceUpper );
1669    
1670        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinhasher" ), SinkMode.REPLACE );
1671    
1672        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1673    
1674        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1675    
1676        pipeLower = new Each( pipeLower, new Fields( "num" ), new ExpressionFunction( Fields.ARGS, "Integer.parseInt( num )", String.class ), Fields.REPLACE );
1677    
1678        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1679    
1680        Fields num = new Fields( "num" );
1681        num.setComparator( "num", new AllComparator() );
1682    
1683        Pipe splice = new HashJoin( pipeLower, num, pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
1684    
1685        Map<Object, Object> properties = getProperties();
1686    
1687        Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
1688    
1689        flow.complete();
1690    
1691        validateLength( flow, 5 );
1692    
1693        List<Tuple> values = getSinkAsList( flow );
1694    
1695        assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
1696        assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
1697        }
1698    
1699      @Test
1700      public void testJoinNone() throws Exception
1701        {
1702        getPlatform().copyFromLocal( inputFileLower );
1703        getPlatform().copyFromLocal( inputFileUpper );
1704    
1705        Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1706        Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1707    
1708        Map sources = new HashMap();
1709    
1710        sources.put( "lower", sourceLower );
1711        sources.put( "upper", sourceUpper );
1712    
1713        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE );
1714    
1715        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1716    
1717        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1718        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1719    
1720        Pipe splice = new HashJoin( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) );
1721    
1722        Map<Object, Object> properties = getProperties();
1723    
1724        Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
1725    
1726        flow.complete();
1727    
1728        validateLength( flow, 25 );
1729    
1730        List<Tuple> values = getSinkAsList( flow );
1731    
1732        assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
1733        assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) );
1734        assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
1735        }
1736    
1737      /**
1738       * When run against a cluster a Merge before a GroupBy can hide the streamed/accumulated nature of a branch.
1739       *
1740       * commented code is for troubleshooting.
1741       * @throws Exception
1742       */
1743      @Test
1744      public void testJoinMergeGroupBy() throws Exception
1745        {
1746        getPlatform().copyFromLocal( inputFileNums10 );
1747        getPlatform().copyFromLocal( inputFileNums20 );
1748    
1749        Tap lhsTap = getPlatform().getTextFile( new Fields( "id" ), inputFileNums10 );
1750        Tap rhsTap = getPlatform().getTextFile( new Fields( "id2" ), inputFileNums20 );
1751    
1752        Pipe lhs = new Pipe( "lhs" );
1753        Pipe rhs = new Pipe( "rhs" );
1754    
1755    //    Pipe joined = new CoGroup( messages, new Fields( "id" ), people, new Fields( "id2" ) );
1756        Pipe joined = new HashJoin( lhs, new Fields( "id" ), rhs, new Fields( "id2" ) );
1757    
1758        Pipe pruned = new Each( joined, new Fields( "id2" ), new Identity(), Fields.RESULTS );
1759    //    pruned = new Checkpoint( pruned );
1760        Pipe merged = new Merge( pruned, rhs );
1761        Pipe grouped = new GroupBy( merged, new Fields( "id2" ) );
1762    //    Pipe grouped = new GroupBy( Pipe.pipes(  pruned, people  ), new Fields( "id2" ) );
1763        Aggregator count = new Count( new Fields( "count" ) );
1764        Pipe counted = new Every( grouped, count );
1765    
1766        String testJoinMerge = "testJoinMergeGroupBy/" + ( ( joined instanceof CoGroup ) ? "cogroup" : "hashjoin" );
1767        Tap sink = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", null, getOutputPath( testJoinMerge ), SinkMode.REPLACE );
1768    
1769        FlowDef flowDef = FlowDef.flowDef()
1770          .addSource( rhs, rhsTap )
1771          .addSource( lhs, lhsTap )
1772          .addTailSink( counted, sink );
1773    
1774        Flow flow = getPlatform().getFlowConnector().connect( flowDef );
1775    
1776    //    flow.writeDOT( "joinmerge.dot" );
1777    //    flow.writeStepsDOT( "joinmerge-steps.dot" );
1778    
1779        flow.complete();
1780    
1781        validateLength( flow, 20 );
1782    
1783        List<Tuple> values = getSinkAsList( flow );
1784        List<Tuple> expected = new ArrayList<Tuple>();
1785    
1786        expected.add( new Tuple( "1", "2" ) );
1787        expected.add( new Tuple( "10", "2" ) );
1788        expected.add( new Tuple( "11", "1" ) );
1789        expected.add( new Tuple( "12", "1" ) );
1790        expected.add( new Tuple( "13", "1" ) );
1791        expected.add( new Tuple( "14", "1" ) );
1792        expected.add( new Tuple( "15", "1" ) );
1793        expected.add( new Tuple( "16", "1" ) );
1794        expected.add( new Tuple( "17", "1" ) );
1795        expected.add( new Tuple( "18", "1" ) );
1796        expected.add( new Tuple( "19", "1" ) );
1797        expected.add( new Tuple( "2", "2" ) );
1798        expected.add( new Tuple( "20", "1" ) );
1799        expected.add( new Tuple( "3", "2" ) );
1800        expected.add( new Tuple( "4", "2" ) );
1801        expected.add( new Tuple( "5", "2" ) );
1802        expected.add( new Tuple( "6", "2" ) );
1803        expected.add( new Tuple( "7", "2" ) );
1804        expected.add( new Tuple( "8", "2" ) );
1805        expected.add( new Tuple( "9", "2" ) );
1806    
1807        Collections.sort(values);
1808        Collections.sort(expected);
1809    
1810        assertEquals( expected, values );
1811        }
1812      }