001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.util.Iterator;
024
025import cascading.flow.FlowProcess;
026import cascading.pipe.joiner.JoinerClosure;
027import cascading.tuple.Fields;
028import cascading.tuple.Tuple;
029import cascading.tuple.util.TupleBuilder;
030import cascading.tuple.util.TupleViews;
031
032/** Class GroupClosure is used internally to represent groups of tuples during grouping. */
033public class HadoopGroupByClosure extends JoinerClosure
034  {
035  protected Tuple grouping;
036  protected Iterator[] values;
037
038  public HadoopGroupByClosure( FlowProcess flowProcess, Fields[] groupingFields, Fields[] valueFields )
039    {
040    super( flowProcess, groupingFields, valueFields );
041    }
042
043  public Tuple getGrouping()
044    {
045    return grouping;
046    }
047
048  public int size()
049    {
050    return 1;
051    }
052
053  protected Iterator getValueIterator( int pos )
054    {
055    return values[ pos ];
056    }
057
058  @Override
059  public Iterator<Tuple> getIterator( int pos )
060    {
061    if( pos != 0 )
062      throw new IllegalArgumentException( "invalid group position: " + pos );
063
064    return makeIterator( 0, getValueIterator( 0 ) );
065    }
066
067  @Override
068  public boolean isEmpty( int pos )
069    {
070    return values != null;
071    }
072
073  protected Iterator<Tuple> makeIterator( final int pos, final Iterator values )
074    {
075    return new Iterator<Tuple>()
076    {
077    final int cleanPos = valueFields.length == 1 ? 0 : pos; // support repeated pipes
078    TupleBuilder[] valueBuilder = new TupleBuilder[ valueFields.length ];
079
080    {
081    for( int i = 0; i < valueFields.length; i++ )
082      valueBuilder[ i ] = makeBuilder( valueFields[ i ], joinFields[ i ] );
083    }
084
085    private TupleBuilder makeBuilder( final Fields valueField, final Fields joinField )
086      {
087      if( valueField.isUnknown() && joinField.hasRelativePos() )
088        return new TupleBuilder()
089        {
090        @Override
091        public Tuple makeResult( Tuple valueTuple, Tuple groupTuple )
092          {
093          Fields fields = joinFields[ cleanPos ];
094
095          fields = Fields.size( valueTuple.size() ).select( fields );
096
097          valueTuple.set( valueFields[ cleanPos ], fields, groupTuple );
098
099          return valueTuple;
100          }
101        };
102
103      if( valueField.isUnknown() || joinField.isNone() )
104        return new TupleBuilder()
105        {
106        @Override
107        public Tuple makeResult( Tuple valueTuple, Tuple groupTuple )
108          {
109          valueTuple.set( valueFields[ cleanPos ], joinFields[ cleanPos ], groupTuple );
110
111          return valueTuple;
112          }
113        };
114
115      return new TupleBuilder()
116      {
117      Tuple result = TupleViews.createOverride( valueField, joinField );
118
119      @Override
120      public Tuple makeResult( Tuple valueTuple, Tuple groupTuple )
121        {
122        return TupleViews.reset( result, valueTuple, groupTuple );
123        }
124      };
125      }
126
127    public boolean hasNext()
128      {
129      return values.hasNext();
130      }
131
132    public Tuple next()
133      {
134      Tuple tuple = (Tuple) values.next();
135
136      return valueBuilder[ cleanPos ].makeResult( tuple, grouping );
137      }
138
139    public void remove()
140      {
141      throw new UnsupportedOperationException( "remove not supported" );
142      }
143    };
144    }
145
146  public void reset( Tuple grouping, Iterator<Tuple>[] values )
147    {
148    this.grouping = grouping;
149    this.values = values;
150    }
151
152  @Override
153  public Tuple getGroupTuple( Tuple keysTuple )
154    {
155    return keysTuple;
156    }
157  }