001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.util.Iterator;
024    
025    import cascading.flow.FlowProcess;
026    import cascading.pipe.joiner.JoinerClosure;
027    import cascading.tuple.Fields;
028    import cascading.tuple.Tuple;
029    import cascading.tuple.util.TupleBuilder;
030    import cascading.tuple.util.TupleViews;
031    
032    /** Class GroupClosure is used internally to represent groups of tuples during grouping. */
033    public class HadoopGroupByClosure extends JoinerClosure
034      {
035      protected Tuple grouping;
036      protected Iterator values;
037    
038      public HadoopGroupByClosure( FlowProcess flowProcess, Fields[] groupingFields, Fields[] valueFields )
039        {
040        super( flowProcess, groupingFields, valueFields );
041        }
042    
043      public Tuple getGrouping()
044        {
045        return grouping;
046        }
047    
048      public int size()
049        {
050        return 1;
051        }
052    
053      @Override
054      public Iterator getIterator( int pos )
055        {
056        if( pos != 0 )
057          throw new IllegalArgumentException( "invalid group position: " + pos );
058    
059        return makeIterator( 0, values );
060        }
061    
062      @Override
063      public boolean isEmpty( int pos )
064        {
065        return values != null;
066        }
067    
068      protected Iterator<Tuple> makeIterator( final int pos, final Iterator values )
069        {
070        return new Iterator<Tuple>()
071        {
072        final int cleanPos = valueFields.length == 1 ? 0 : pos; // support repeated pipes
073        TupleBuilder[] valueBuilder = new TupleBuilder[ valueFields.length ];
074    
075        {
076        for( int i = 0; i < valueFields.length; i++ )
077          valueBuilder[ i ] = makeBuilder( valueFields[ i ], joinFields[ i ] );
078        }
079    
080        private TupleBuilder makeBuilder( final Fields valueField, final Fields joinField )
081          {
082          if( valueField.isUnknown() && joinField.hasRelativePos() )
083            return new TupleBuilder()
084            {
085            @Override
086            public Tuple makeResult( Tuple valueTuple, Tuple groupTuple )
087              {
088              Fields fields = joinFields[ cleanPos ];
089    
090              fields = Fields.size( valueTuple.size() ).select( fields );
091    
092              valueTuple.set( valueFields[ cleanPos ], fields, groupTuple );
093    
094              return valueTuple;
095              }
096            };
097    
098          if( valueField.isUnknown() || joinField.isNone() )
099            return new TupleBuilder()
100            {
101            @Override
102            public Tuple makeResult( Tuple valueTuple, Tuple groupTuple )
103              {
104              valueTuple.set( valueFields[ cleanPos ], joinFields[ cleanPos ], groupTuple );
105    
106              return valueTuple;
107              }
108            };
109    
110          return new TupleBuilder()
111          {
112          Tuple result = TupleViews.createOverride( valueField, joinField );
113    
114          @Override
115          public Tuple makeResult( Tuple valueTuple, Tuple groupTuple )
116            {
117            return TupleViews.reset( result, valueTuple, groupTuple );
118            }
119          };
120          }
121    
122        public boolean hasNext()
123          {
124          return values.hasNext();
125          }
126    
127        public Tuple next()
128          {
129          Tuple tuple = (Tuple) values.next();
130    
131          return valueBuilder[ cleanPos ].makeResult( tuple, grouping );
132          }
133    
134        public void remove()
135          {
136          throw new UnsupportedOperationException( "remove not supported" );
137          }
138        };
139        }
140    
141      public void reset( Tuple grouping, Iterator values )
142        {
143        this.grouping = grouping;
144        this.values = values;
145        }
146    
147      @Override
148      public Tuple getGroupTuple( Tuple keysTuple )
149        {
150        return keysTuple;
151        }
152      }