001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading;
022    
023    import java.io.Serializable;
024    import java.util.ArrayList;
025    import java.util.Collection;
026    import java.util.Comparator;
027    import java.util.HashMap;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.regex.Pattern;
031    
032    import cascading.cascade.Cascades;
033    import cascading.flow.Flow;
034    import cascading.operation.Debug;
035    import cascading.operation.Filter;
036    import cascading.operation.Function;
037    import cascading.operation.Identity;
038    import cascading.operation.Insert;
039    import cascading.operation.NoOp;
040    import cascading.operation.aggregator.Count;
041    import cascading.operation.aggregator.First;
042    import cascading.operation.expression.ExpressionFunction;
043    import cascading.operation.filter.And;
044    import cascading.operation.function.UnGroup;
045    import cascading.operation.regex.RegexFilter;
046    import cascading.operation.regex.RegexParser;
047    import cascading.operation.regex.RegexSplitter;
048    import cascading.pipe.Each;
049    import cascading.pipe.Every;
050    import cascading.pipe.GroupBy;
051    import cascading.pipe.Merge;
052    import cascading.pipe.Pipe;
053    import cascading.tap.MultiSourceTap;
054    import cascading.tap.SinkMode;
055    import cascading.tap.Tap;
056    import cascading.tuple.Fields;
057    import cascading.tuple.Hasher;
058    import cascading.tuple.Tuple;
059    import cascading.tuple.TupleEntryIterator;
060    import org.junit.Test;
061    
062    import static cascading.ComparePlatformsTest.NONDETERMINISTIC;
063    import static data.InputData.*;
064    
065    
066    public class FieldedPipesPlatformTest extends PlatformTestCase
067      {
068      public FieldedPipesPlatformTest()
069        {
070        super( true ); // leave cluster testing enabled
071        }
072    
073      @Test
074      public void testSimpleGroup() throws Exception
075        {
076        getPlatform().copyFromLocal( inputFileApache );
077    
078        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
079    
080        Pipe pipe = new Pipe( "test" );
081    
082        pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
083    
084        pipe = new GroupBy( pipe, new Fields( "ip" ) );
085    
086        pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
087    
088        Tap sink = getPlatform().getTextFile( getOutputPath( "simple" ), SinkMode.REPLACE );
089    
090        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
091    
092        flow.complete();
093    
094        validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
095        validateLength( flow, 8, null );
096        }
097    
098      @Test
099      public void testSimpleChain() throws Exception
100        {
101        getPlatform().copyFromLocal( inputFileApache );
102    
103        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
104    
105        Pipe pipe = new Pipe( "test" );
106    
107        pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
108    
109        pipe = new GroupBy( pipe, new Fields( "ip" ) );
110    
111        pipe = new Every( pipe, new Count( new Fields( "count1" ) ) );
112        pipe = new Every( pipe, new Count( new Fields( "count2" ) ) );
113        pipe = new Every( pipe, new Count( new Fields( "count3" ) ) );
114        pipe = new Every( pipe, new Count( new Fields( "count4" ) ) );
115    
116        Tap sink = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "simplechain" ), SinkMode.REPLACE );
117    
118        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
119    
120        flow.complete();
121    
122        validateLength( flow, 8, 5 );
123        }
124    
125      @Test
126      public void testChainEndingWithEach() throws Exception
127        {
128        getPlatform().copyFromLocal( inputFileApache );
129    
130        Pipe pipe = new Pipe( "test" );
131    
132        pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
133    
134        pipe = new GroupBy( pipe, new Fields( "ip" ) );
135    
136        pipe = new Every( pipe, new Count( new Fields( "count1" ) ) );
137        pipe = new Every( pipe, new Count( new Fields( "count2" ) ) );
138    
139        pipe = new Each( pipe, new Fields( "count1", "count2" ), new ExpressionFunction( new Fields( "sum" ), "count1 + count2", int.class ), Fields.ALL );
140    
141        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
142        Tap sink = getPlatform().getTextFile( getOutputPath( "chaineach" ), SinkMode.REPLACE );
143    
144        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
145    
146        flow.complete();
147    
148        validateLength( flow, 8, null );
149        }
150    
151      // also tests the RegexSplitter
152    
153      @Test
154      public void testNoGroup() throws Exception
155        {
156        getPlatform().copyFromLocal( inputFileApache );
157    
158        Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileApache );
159    
160        Pipe pipe = new Pipe( "test" );
161    
162        pipe = new Each( pipe, new RegexSplitter( "\\s+" ), new Fields( 1 ) );
163    
164        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "nogroup" ), SinkMode.REPLACE );
165    
166        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
167    
168        flow.complete();
169    
170        validateLength( flow, 10, null );
171    
172        List<Tuple> results = getSinkAsList( flow );
173    
174        assertTrue( results.contains( new Tuple( "75.185.76.245" ) ) );
175        }
176    
177      @Test
178      public void testCopy() throws Exception
179        {
180        getPlatform().copyFromLocal( inputFileApache );
181    
182        Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileApache );
183    
184        Pipe pipe = new Pipe( "test" );
185    
186        Tap sink = getPlatform().getTextFile( getOutputPath( "copy" ), SinkMode.REPLACE );
187    
188        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
189    
190        flow.complete();
191    
192        validateLength( flow, 10, null );
193        }
194    
195      @Test
196      public void testSimpleMerge() throws Exception
197        {
198        getPlatform().copyFromLocal( inputFileLower );
199        getPlatform().copyFromLocal( inputFileUpper );
200    
201        Tap sourceLower = getPlatform().getTextFile( inputFileLower );
202        Tap sourceUpper = getPlatform().getTextFile( inputFileUpper );
203    
204        Map sources = new HashMap();
205    
206        sources.put( "lower", sourceLower );
207        sources.put( "upper", sourceUpper );
208    
209        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
210    
211        // using null pos so all fields are written
212        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "simplemerge" ), SinkMode.REPLACE );
213    
214        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
215        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
216    
217        Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ), null, false );
218    
219        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
220    
221        flow.complete();
222    
223        validateLength( flow, 10 );
224    
225        Collection results = getSinkAsList( flow );
226    
227        assertTrue( "missing value", results.contains( new Tuple( "1\ta" ) ) );
228        assertTrue( "missing value", results.contains( new Tuple( "1\tA" ) ) );
229        assertTrue( "missing value", results.contains( new Tuple( "2\tb" ) ) );
230        assertTrue( "missing value", results.contains( new Tuple( "2\tB" ) ) );
231        assertTrue( "missing value", results.contains( new Tuple( "3\tc" ) ) );
232        assertTrue( "missing value", results.contains( new Tuple( "3\tC" ) ) );
233        }
234    
235      /**
236       * Specifically tests GroupBy will return the correct grouping fields to the following Every
237       *
238       * @throws Exception
239       */
240      @Test
241      public void testSimpleMergeThree() throws Exception
242        {
243        getPlatform().copyFromLocal( inputFileLower );
244        getPlatform().copyFromLocal( inputFileUpper );
245        getPlatform().copyFromLocal( inputFileLowerOffset );
246    
247        Tap sourceLower = getPlatform().getTextFile( inputFileLower );
248        Tap sourceUpper = getPlatform().getTextFile( inputFileUpper );
249        Tap sourceLowerOffset = getPlatform().getTextFile( inputFileLowerOffset );
250    
251        Map sources = new HashMap();
252    
253        sources.put( "lower", sourceLower );
254        sources.put( "upper", sourceUpper );
255        sources.put( "offset", sourceLowerOffset );
256    
257        Tap sink = getPlatform().getTextFile( getOutputPath( "simplemergethree" ), SinkMode.REPLACE );
258    
259        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
260    
261        Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
262        Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
263        Pipe pipeOffset = new Each( new Pipe( "offset" ), new Fields( "line" ), splitter );
264    
265        Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper, pipeOffset ), new Fields( "num" ), new Fields( "char" ) );
266    
267        splice = new Every( splice, new Fields( "char" ), new First( new Fields( "first" ) ) );
268    
269        splice = new Each( splice, new Fields( "num", "first" ), new Identity() );
270    
271        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
272    
273        flow.complete();
274    
275        validateLength( flow, 6 );
276        }
277    
278      /**
279       * same test as MergePipesTest, but to test that chained groupby don't exhibit similar failures
280       *
281       * @throws Exception
282       */
283      @Test
284      public void testSameSourceMergeThreeChainGroup() throws Exception
285        {
286        getPlatform().copyFromLocal( inputFileLower );
287    
288        Tap sourceLower = getPlatform().getTextFile( inputFileLower );
289    
290        Map sources = new HashMap();
291    
292        sources.put( "split", sourceLower );
293    
294        Tap sink = getPlatform().getTextFile( getOutputPath( "samemergethreechaingroup" ), SinkMode.REPLACE );
295    
296        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
297    
298        Pipe pipe = new Pipe( "split" );
299    
300        Pipe pipeLower = new Each( new Pipe( "lower", pipe ), new Fields( "line" ), splitter );
301        Pipe pipeUpper = new Each( new Pipe( "upper", pipe ), new Fields( "line" ), splitter );
302        Pipe pipeOffset = new Each( new Pipe( "offset", pipe ), new Fields( "line" ), splitter );
303    
304        //put group before merge to test path counts
305        Pipe splice = new GroupBy( Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ) );
306    
307        // this group has its incoming paths counted, gated by the previous group
308        splice = new GroupBy( Pipe.pipes( splice, pipeOffset ), new Fields( "num" ) );
309    
310        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
311    
312        if( getPlatform().isMapReduce() )
313          assertEquals( "wrong num jobs", 2, flow.getFlowSteps().size() );
314    
315        flow.complete();
316    
317        validateLength( flow, 15 );
318        }
319    
320      @Test
321      public void testUnGroup() throws Exception
322        {
323        getPlatform().copyFromLocal( inputFileJoined );
324    
325        Tap source = getPlatform().getTextFile( inputFileJoined );
326        Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped" ), SinkMode.REPLACE );
327    
328        Pipe pipe = new Pipe( "test" );
329    
330        pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
331    
332        pipe = new Each( pipe, new UnGroup( new Fields( "num", "char" ), new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
333    
334        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
335    
336        flow.complete();
337    
338        validateLength( flow, 10 );
339        }
340    
341      @Test
342      public void testUnGroupAnon() throws Exception
343        {
344        getPlatform().copyFromLocal( inputFileJoined );
345    
346        Tap source = getPlatform().getTextFile( inputFileJoined );
347        Tap sink = getPlatform().getTextFile( getOutputPath( "ungroupedanon" ), SinkMode.REPLACE );
348    
349        Pipe pipe = new Pipe( "test" );
350    
351        pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
352    
353        pipe = new Each( pipe, new UnGroup( new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
354    
355        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
356    
357        flow.complete();
358    
359        validateLength( flow, 10 );
360        }
361    
362      @Test
363      public void testUnGroupBySize() throws Exception
364        {
365        getPlatform().copyFromLocal( inputFileJoinedExtra );
366    
367        Tap source = getPlatform().getTextFile( inputFileJoinedExtra );
368        Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped_size" ), SinkMode.REPLACE );
369    
370        Pipe pipe = new Pipe( "test" );
371    
372        pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num1", "num2", "lower", "upper" ) ) );
373    
374        pipe = new Each( pipe, new UnGroup( new Fields( "num1", "num2", "char" ), new Fields( "num1", "num2" ), 1 ) );
375    
376        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
377    
378        flow.complete();
379    
380        List<Tuple> tuples  = asList( flow, sink );
381        assertEquals( 10, tuples.size() );
382    
383        List<Object> values = new ArrayList<Object>(  );
384        for (Tuple tuple: tuples)
385          values.add( tuple.getObject( 1 ) );
386    
387        assertTrue( values.contains( "1\t1\ta" ) );
388        assertTrue( values.contains( "1\t1\tA" ) );
389        assertTrue( values.contains( "2\t2\tb" ) );
390        assertTrue( values.contains( "2\t2\tB" ) );
391        assertTrue( values.contains( "3\t3\tc" ) );
392        assertTrue( values.contains( "3\t3\tC" ) );
393        assertTrue( values.contains( "4\t4\td" ) );
394        assertTrue( values.contains( "4\t4\tD" ) );
395        assertTrue( values.contains( "5\t5\te" ) );
396        assertTrue( values.contains( "5\t5\tE" ) );
397        }
398    
399      @Test
400      public void testFilter() throws Exception
401        {
402        getPlatform().copyFromLocal( inputFileApache );
403    
404        Tap source = getPlatform().getTextFile( inputFileApache );
405        Tap sink = getPlatform().getTextFile( getOutputPath( "filter" ), SinkMode.REPLACE );
406    
407        Pipe pipe = new Pipe( "test" );
408    
409        Filter filter = new RegexFilter( "^68.*" );
410    
411        pipe = new Each( pipe, new Fields( "line" ), filter );
412    
413        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
414    
415        flow.complete();
416    
417        validateLength( flow, 3 );
418        }
419    
420      @Test
421      public void testLogicFilter() throws Exception
422        {
423        getPlatform().copyFromLocal( inputFileApache );
424    
425        Tap source = getPlatform().getTextFile( inputFileApache );
426        Tap sink = getPlatform().getTextFile( getOutputPath( "logicfilter" ), SinkMode.REPLACE );
427    
428        Pipe pipe = new Pipe( "test" );
429    
430        Filter filter = new And( new RegexFilter( "^68.*$" ), new RegexFilter( "^1000.*$" ) );
431    
432        pipe = new Each( pipe, new Fields( "line" ), filter );
433    
434        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
435    
436        flow.complete();
437    
438        validateLength( flow, 3 );
439        }
440    
441      @Test
442      public void testFilterComplex() throws Exception
443        {
444        getPlatform().copyFromLocal( inputFileApache );
445    
446        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
447        Tap sink = getPlatform().getTextFile( getOutputPath( "filtercomplex" ), SinkMode.REPLACE );
448    
449        Pipe pipe = new Pipe( "test" );
450    
451        pipe = new Each( pipe, new Fields( "line" ), TestConstants.APACHE_COMMON_PARSER );
452    
453        pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^POST" ) );
454        pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^POST" ) );
455    
456        pipe = new Each( pipe, new Fields( "method" ), new Identity( new Fields( "value" ) ), Fields.ALL );
457    
458        pipe = new GroupBy( pipe, new Fields( "value" ) );
459    
460        pipe = new Every( pipe, new Count(), new Fields( "value", "count" ) );
461    
462        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
463    
464        flow.complete();
465    
466        validateLength( flow, 1, null );
467        }
468    
469      /**
470       * Intentionally filters all values out to test next mr job behaves
471       *
472       * @throws Exception
473       */
474      @Test
475      public void testFilterAll() throws Exception
476        {
477        getPlatform().copyFromLocal( inputFileApache );
478    
479        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
480        Tap sink = getPlatform().getTextFile( getOutputPath( "filterall" ), SinkMode.REPLACE );
481    
482        Pipe pipe = new Pipe( "test" );
483    
484        String regex = "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$";
485        Fields fieldDeclaration = new Fields( "ip", "time", "method", "event", "status", "size" );
486        int[] groups = {1, 2, 3, 4, 5, 6};
487        RegexParser function = new RegexParser( fieldDeclaration, regex, groups );
488        pipe = new Each( pipe, new Fields( "line" ), function );
489    
490        pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all
491    
492        pipe = new GroupBy( pipe, new Fields( "method" ) );
493    
494        pipe = new Each( pipe, new Fields( "method" ), new Identity( new Fields( "value" ) ), Fields.ALL );
495    
496        pipe = new GroupBy( pipe, new Fields( "value" ) );
497    
498        pipe = new Every( pipe, new Count(), new Fields( "value", "count" ) );
499    
500        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
501    
502        flow.complete();
503    
504        validateLength( flow, 0, null );
505        }
506    
507    //  public void testLimitFilter() throws Exception
508    //    {
509    //    copyFromLocal( inputFileApache );
510    //
511    //    Tap source = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileApache );
512    //    Tap sink = new Lfs( new TextLine(), outputPath + "/limitfilter", true );
513    //
514    //    Pipe pipe = new Pipe( "test" );
515    //
516    //    Filter filter = new Limit( 7 );
517    //
518    //    pipe = new Each( pipe, new Fields( "line" ), filter );
519    //
520    //    Flow flow = new FlowConnector( getProperties() ).connect( source, sink, pipe );
521    //
522    ////    flow.writeDOT( "flow.dot" );
523    //
524    //    flow.complete();
525    //
526    //    validateLength( flow, 7, null );
527    //    }
528    
529      //
530    
531      @Test
532      public void testSplit() throws Exception
533        {
534        getPlatform().copyFromLocal( inputFileApache );
535    
536        // 46 192
537    
538        Tap source = getPlatform().getTextFile( inputFileApache );
539        Tap sink1 = getPlatform().getTextFile( getOutputPath( "split1" ), SinkMode.REPLACE );
540        Tap sink2 = getPlatform().getTextFile( getOutputPath( "split2" ), SinkMode.REPLACE );
541    
542        Pipe pipe = new Pipe( "split" );
543    
544        pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) );
545    
546        Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) );
547        Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) );
548    
549        Map sources = new HashMap();
550        sources.put( "split", source );
551    
552        Map sinks = new HashMap();
553        sinks.put( "left", sink1 );
554        sinks.put( "right", sink2 );
555    
556        Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right );
557    
558        flow.complete();
559    
560        validateLength( flow, 1, "left" );
561        validateLength( flow, 2, "right" );
562        }
563    
564      /**
565       * verifies non-safe rules apply in the proper place
566       *
567       * @throws Exception
568       */
569      @Test
570      public void testSplitNonSafe() throws Exception
571        {
572        getPlatform().copyFromLocal( inputFileApache );
573    
574        // 46 192
575    
576        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
577        Tap sink1 = getPlatform().getTextFile( getOutputPath( "nonsafesplit1" ), SinkMode.REPLACE );
578        Tap sink2 = getPlatform().getTextFile( getOutputPath( "nonsafesplit2" ), SinkMode.REPLACE );
579    
580        Pipe pipe = new Pipe( "split" );
581    
582        // run job on non-safe operation, forces 3 mr jobs.
583        pipe = new Each( pipe, new TestFunction( new Fields( "ignore" ), new Tuple( 1 ), false ), new Fields( "line" ) );
584    
585        pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) );
586    
587        Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) );
588        Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) );
589    
590        Map sources = new HashMap();
591        sources.put( "split", source );
592    
593        Map sinks = new HashMap();
594        sinks.put( "left", sink1 );
595        sinks.put( "right", sink2 );
596    
597        Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right );
598    
599        flow.complete();
600    
601        validateLength( flow, 1, "left" );
602        validateLength( flow, 2, "right" );
603        }
604    
605      @Test
606      public void testSplitSameSourceMerged() throws Exception
607        {
608        getPlatform().copyFromLocal( inputFileApache );
609    
610        // 46 192
611    
612        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
613        Tap sink = getPlatform().getTextFile( getOutputPath( "splitsourcemerged" ), SinkMode.REPLACE );
614    
615        Pipe pipe = new Pipe( "split" );
616    
617        pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) );
618    
619        Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) );
620        Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) );
621    
622        Pipe merged = new GroupBy( "merged", Pipe.pipes( left, right ), new Fields( "line" ) );
623    
624        Flow flow = getPlatform().getFlowConnector().connect( source, sink, merged );
625    
626        flow.complete();
627    
628        validateLength( flow, 3 );
629        }
630    
631      /**
632       * verifies not inserting Identity between groups works
633       *
634       * @throws Exception
635       */
636      @Test
637      public void testSplitOut() throws Exception
638        {
639        getPlatform().copyFromLocal( inputFileApache );
640    
641        Tap sourceLower = getPlatform().getTextFile( new Fields( "num", "line" ), inputFileApache );
642    
643        Map sources = new HashMap();
644    
645        sources.put( "lower1", sourceLower );
646    
647        // using null pos so all fields are written
648        Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitout1" ), SinkMode.REPLACE );
649        Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitout2" ), SinkMode.REPLACE );
650    
651        Map sinks = new HashMap();
652    
653        sinks.put( "output1", sink1 );
654        sinks.put( "output2", sink2 );
655    
656        Pipe pipeLower1 = new Pipe( "lower1" );
657    
658        Pipe left = new GroupBy( "output1", pipeLower1, new Fields( 0 ) );
659        Pipe right = new GroupBy( "output2", left, new Fields( 0 ) );
660    
661        Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, Pipe.pipes( left, right ) );
662    
663    //    flow.writeDOT( "spit.dot" );
664    
665        flow.complete();
666    
667        validateLength( flow, 10, "output1" );
668        validateLength( flow, 10, "output2" );
669    
670        assertEquals( 10, asSet( flow, sink1 ).size() );
671        assertEquals( 10, asSet( flow, sink2 ).size() );
672        }
673    
674      @Test
675      public void testSplitComplex() throws Exception
676        {
677        getPlatform().copyFromLocal( inputFileApache );
678    
679        // 46 192
680    
681        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
682        Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitcomp1" ), SinkMode.REPLACE );
683        Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitcomp2" ), SinkMode.REPLACE );
684    
685        Pipe pipe = new Pipe( "split" );
686    
687        pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
688    
689        pipe = new GroupBy( pipe, new Fields( "ip" ) );
690    
691        pipe = new Every( pipe, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) );
692    
693        pipe = new Each( pipe, new Fields( "ip" ), new RegexFilter( "^68.*" ) );
694    
695        Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) );
696    
697        Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) );
698    
699        Map sources = Cascades.tapsMap( "split", source );
700        Map sinks = Cascades.tapsMap( Pipe.pipes( left, right ), Tap.taps( sink1, sink2 ) );
701    
702        Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right );
703    
704        flow.complete();
705    
706        validateLength( flow, 1, "left" );
707        validateLength( flow, 1, "right" );
708        }
709    
710      @Test
711      public void testConcatenation() throws Exception
712        {
713        getPlatform().copyFromLocal( inputFileLower );
714        getPlatform().copyFromLocal( inputFileUpper );
715    
716        Tap sourceLower = getPlatform().getTextFile( inputFileLower );
717        Tap sourceUpper = getPlatform().getTextFile( inputFileUpper );
718    
719        Tap source = new MultiSourceTap( sourceLower, sourceUpper );
720    
721        Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
722    
723        // using null pos so all fields are written
724        Tap sink = getPlatform().getTextFile( getOutputPath( "complexconcat" ), SinkMode.REPLACE );
725    
726        Pipe pipe = new Each( new Pipe( "concat" ), new Fields( "line" ), splitter );
727    
728        Pipe splice = new GroupBy( pipe, new Fields( "num" ) );
729    
730        Flow countFlow = getPlatform().getFlowConnector().connect( source, sink, splice );
731    
732        countFlow.complete();
733    
734        validateLength( countFlow, 10, null );
735        }
736    
737      @Test
738      public void testGeneratorAggregator() throws Exception
739        {
740        getPlatform().copyFromLocal( inputFileApache );
741    
742        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
743    
744        Pipe pipe = new Pipe( "test" );
745    
746        pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
747    
748        pipe = new GroupBy( pipe, new Fields( "ip" ) );
749    
750        pipe = new Every( pipe, new TestAggregator( new Fields( "count1" ), new Fields( "ip" ), new Tuple( "first1" ), new Tuple( "first2" ) ) );
751        pipe = new Every( pipe, new TestAggregator( new Fields( "count2" ), new Fields( "ip" ), new Tuple( "second" ), new Tuple( "second2" ), new Tuple( "second3" ) ) );
752    
753        Tap sink = getPlatform().getTextFile( getOutputPath( "generatoraggregator" ), SinkMode.REPLACE );
754    
755        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
756    
757        flow.complete();
758    
759        validateLength( flow, 8 * 2 * 3, null );
760        }
761    
762      /**
763       * If the sinks have the same scheme as a temp tap, replace the temp tap
764       *
765       * @throws Exception
766       */
767      @Test
768      public void testChainedTaps() throws Exception
769        {
770        getPlatform().copyFromLocal( inputFileApache );
771    
772        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
773    
774        Pipe pipe = new Each( new Pipe( "first" ), new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
775        pipe = new GroupBy( pipe, new Fields( "ip" ) );
776    
777        pipe = new Each( new Pipe( "second", pipe ), new Fields( "ip" ), new RegexFilter( "7" ) );
778        pipe = new GroupBy( pipe, new Fields( "ip" ) );
779    
780        pipe = new Each( new Pipe( "third", pipe ), new Fields( "ip" ), new RegexFilter( "6" ) );
781        pipe = new GroupBy( pipe, new Fields( "ip" ) );
782    
783        Tap sinkFirst = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "chainedtaps/first" ), SinkMode.REPLACE );
784        Tap sinkSecond = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "chainedtaps/second" ), SinkMode.REPLACE );
785        Tap sinkThird = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "chainedtaps/third" ), SinkMode.REPLACE );
786    
787        Map<String, Tap> sinks = Cascades.tapsMap( new String[]{"first", "second",
788                                                                "third"}, Tap.taps( sinkFirst, sinkSecond, sinkThird ) );
789    
790        Flow flow = getPlatform().getFlowConnector().connect( source, sinks, pipe );
791    
792        if( getPlatform().isMapReduce() )
793          assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );
794    
795        flow.complete();
796    
797        validateLength( flow, 3 );
798        }
799    
800      @Test
801      public void testReplace() throws Exception
802        {
803        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
804        Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "offset", "line" ), getOutputPath( "replace" ), SinkMode.REPLACE );
805    
806        Pipe pipe = new Pipe( "test" );
807    
808        Function parser = new RegexParser( new Fields( 0 ), "^[^ ]*" );
809        pipe = new Each( pipe, new Fields( "line" ), parser, Fields.REPLACE );
810        pipe = new Each( pipe, new Fields( "line" ), new Identity( Fields.ARGS ), Fields.REPLACE );
811        pipe = new Each( pipe, new Fields( "line" ), new Identity( new Fields( "line" ) ), Fields.REPLACE );
812    
813        pipe = new Each( pipe, new Debug( true ) );
814    
815        Flow flow = getPlatform().getFlowConnector( disableDebug() ).connect( source, sink, pipe );
816    
817        flow.complete();
818    
819        validateLength( flow, 10, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) );
820        }
821    
822      @Test
823      public void testSwap() throws Exception
824        {
825        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
826        Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ipaddress" ), getOutputPath( "swap" ), SinkMode.REPLACE );
827    
828        Pipe pipe = new Pipe( "test" );
829    
830        Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" );
831        pipe = new Each( pipe, new Fields( "line" ), parser, Fields.SWAP );
832        pipe = new GroupBy( pipe, new Fields( "ip" ) );
833        pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) );
834        pipe = new Each( pipe, new Fields( "ip" ), new Identity( new Fields( "ipaddress" ) ), Fields.SWAP );
835    
836        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
837    
838        flow.complete();
839    
840        validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) );
841        }
842    
843      @Test
844      public void testNone() throws Exception
845        {
846        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
847        Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ip" ), getOutputPath( "none" ), SinkMode.REPLACE );
848    
849        Pipe pipe = new Pipe( "test" );
850    
851        Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" );
852        pipe = new Each( pipe, new Fields( "line" ), parser, Fields.ALL );
853        pipe = new Each( pipe, new Fields( "line" ), new NoOp(), Fields.SWAP ); // declares Fields.NONE
854        pipe = new GroupBy( pipe, new Fields( "ip" ) );
855        pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) );
856        pipe = new Each( pipe, Fields.NONE, new Insert( new Fields( "ipaddress" ), "1.2.3.4" ), Fields.ALL );
857    
858        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
859    
860        flow.complete();
861    
862        validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) );
863        }
864    
865      /**
866       * this tests a merge on two pipes with the same source and name.
867       *
868       * @throws Exception
869       */
870      @Test
871      public void testSplitSameSourceMergedSameName() throws Exception
872        {
873        getPlatform().copyFromLocal( inputFileApache );
874    
875        // 46 192
876    
877        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
878        Tap sink = getPlatform().getTextFile( getOutputPath( "splitsourcemergedsamename" ), SinkMode.REPLACE );
879    
880        Pipe pipe = new Pipe( "split" );
881    
882        pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) );
883    
884        Pipe left = new Each( pipe, new Fields( "line" ), new RegexFilter( ".*46.*" ) );
885        Pipe right = new Each( pipe, new Fields( "line" ), new RegexFilter( ".*102.*" ) );
886    
887        Pipe merged = new GroupBy( "merged", Pipe.pipes( left, right ), new Fields( "line" ) );
888    
889        Flow flow = getPlatform().getFlowConnector().connect( source, sink, merged );
890    
891        flow.complete();
892    
893        validateLength( flow, 3 );
894        }
895    
896      /**
897       * Catches failure to properly resolve the grouping fields as incoming to the second group-by
898       *
899       * @throws Exception
900       */
901      @Test
902      public void testGroupGroup() throws Exception
903        {
904        getPlatform().copyFromLocal( inputFileApache );
905    
906        Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
907    
908        Pipe pipe = new Pipe( "test" );
909    
910        pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
911    
912        pipe = new GroupBy( pipe, new Fields( "ip" ) );
913    
914        pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
915    
916        pipe = new GroupBy( pipe, new Fields( "ip" ), new Fields( "count" ) );
917    
918        Tap sink = getPlatform().getTextFile( getOutputPath( "groupgroup" ), SinkMode.REPLACE );
919    
920        Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
921    
922        flow.complete();
923    
924        validateLength( flow, 8, null );
925        }
926    
927    
928      public static class LowerComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable
929        {
930        @Override
931        public int compare( Comparable lhs, Comparable rhs )
932          {
933          return lhs.toString().toLowerCase().compareTo( rhs.toString().toLowerCase() );
934          }
935    
936        @Override
937        public int hashCode( Comparable value )
938          {
939          return value.toString().toLowerCase().hashCode();
940          }
941        }
942    
943      @Test
944      public void testGroupByInsensitive() throws Exception
945        {
946        getPlatform().copyFromLocal( inputFileLower );
947        getPlatform().copyFromLocal( inputFileUpper );
948    
949        Tap sourceLower = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
950        Tap sourceUpper = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileUpper );
951    
952        Map sources = new HashMap();
953    
954        sources.put( "lower", sourceLower );
955        sources.put( "upper", sourceUpper );
956    
957        Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "insensitivegrouping" + NONDETERMINISTIC ), SinkMode.REPLACE );
958    
959        Pipe pipeLower = new Pipe( "lower" );
960        Pipe pipeUpper = new Pipe( "upper" );
961    
962        Pipe merge = new Merge( pipeLower, pipeUpper );
963    
964        Fields charFields = new Fields( "char" );
965        charFields.setComparator( "char", new LowerComparator() );
966    
967        Pipe splice = new GroupBy( merge, charFields );
968    
969        splice = new Every( splice, new Fields( "char" ), new Count() );
970    
971        Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
972    
973        flow.complete();
974    
975        // we can't guarantee if the grouping key will be upper or lower
976        validateLength( flow, 5, 1, Pattern.compile( "^\\w+\\s2$" ) );
977        }
978      }