001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.Iterator;
026import java.util.NoSuchElementException;
027
028import cascading.flow.FlowProcess;
029import cascading.flow.hadoop.util.FalseCollection;
030import cascading.provider.FactoryLoader;
031import cascading.tuple.Fields;
032import cascading.tuple.Tuple;
033import cascading.tuple.Tuples;
034import cascading.tuple.collect.Spillable;
035import cascading.tuple.collect.SpillableTupleList;
036import cascading.tuple.collect.TupleCollectionFactory;
037import cascading.tuple.hadoop.collect.HadoopTupleCollectionFactory;
038import cascading.tuple.io.IndexTuple;
039import cascading.tuple.util.TupleViews;
040import org.apache.hadoop.conf.Configuration;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import static cascading.tuple.collect.TupleCollectionFactory.TUPLE_COLLECTION_FACTORY;
045
046/** Class CoGroupClosure is used internally to represent co-grouping results of multiple tuple streams. */
047public class HadoopCoGroupClosure extends HadoopGroupByClosure
048  {
049  /** Field LOG */
050  private static final Logger LOG = LoggerFactory.getLogger( HadoopCoGroupClosure.class );
051
052  public enum Spill
053    {
054      Num_Spills_Written, Num_Spills_Read, Num_Tuples_Spilled, Duration_Millis_Written
055    }
056
057  private class SpillListener implements Spillable.SpillListener
058    {
059    private final FlowProcess flowProcess;
060    private final Fields joinField;
061
062    public SpillListener( FlowProcess flowProcess, Fields joinField )
063      {
064      this.flowProcess = flowProcess;
065      this.joinField = joinField;
066      }
067
068    @Override
069    public void notifyWriteSpillBegin( Spillable spillable, int spillSize, String spillReason )
070      {
071      int numFiles = spillable.spillCount();
072
073      if( numFiles % 10 == 0 )
074        {
075        LOG.info( "spilling group: {}, on grouping: {}, num times: {}, with reason: {}",
076          new Object[]{joinField.printVerbose(), spillable.getGrouping().print(), numFiles + 1, spillReason} );
077
078        Runtime runtime = Runtime.getRuntime();
079        long freeMem = runtime.freeMemory() / 1024 / 1024;
080        long maxMem = runtime.maxMemory() / 1024 / 1024;
081        long totalMem = runtime.totalMemory() / 1024 / 1024;
082
083        LOG.info( "mem on spill (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem );
084        }
085
086      LOG.info( "spilling {} tuples in list to file number {}", spillSize, numFiles + 1 );
087
088      flowProcess.increment( Spill.Num_Spills_Written, 1 );
089      flowProcess.increment( Spill.Num_Tuples_Spilled, spillSize );
090      }
091
092    @Override
093    public void notifyWriteSpillEnd( SpillableTupleList spillableTupleList, long duration )
094      {
095      flowProcess.increment( Spill.Duration_Millis_Written, duration );
096      }
097
098    @Override
099    public void notifyReadSpillBegin( Spillable spillable )
100      {
101      flowProcess.increment( Spill.Num_Spills_Read, 1 );
102      }
103    }
104
105  /** Field groups */
106  protected Collection<Tuple>[] collections;
107  protected final int numSelfJoins;
108
109  private Tuple[] joinedTuplesArray;
110  private final Tuple emptyTuple;
111  private TupleBuilder joinedBuilder;
112  private Tuple joinedTuple = new Tuple(); // is discarded
113
114  private final TupleCollectionFactory<Configuration> tupleCollectionFactory;
115
116  public HadoopCoGroupClosure( FlowProcess flowProcess, int numSelfJoins, Fields[] groupingFields, Fields[] valueFields )
117    {
118    super( flowProcess, groupingFields, valueFields );
119    this.numSelfJoins = numSelfJoins;
120
121    this.emptyTuple = Tuple.size( groupingFields[ 0 ].size() );
122
123    FactoryLoader loader = FactoryLoader.getInstance();
124
125    this.tupleCollectionFactory = loader.loadFactoryFrom( flowProcess, TUPLE_COLLECTION_FACTORY, HadoopTupleCollectionFactory.class );
126
127    initLists();
128    }
129
130  @Override
131  public int size()
132    {
133    return Math.max( joinFields.length, numSelfJoins + 1 );
134    }
135
136  @Override
137  public Iterator<Tuple> getIterator( int pos )
138    {
139    if( pos < 0 || pos >= collections.length )
140      throw new IllegalArgumentException( "invalid group position: " + pos );
141
142    return makeIterator( pos, collections[ pos ].iterator() );
143    }
144
145  @Override
146  public Tuple getGroupTuple( Tuple keysTuple )
147    {
148    Tuples.asModifiable( joinedTuple );
149
150    for( int i = 0; i < collections.length; i++ )
151      joinedTuplesArray[ i ] = collections[ i ].isEmpty() ? emptyTuple : keysTuple;
152
153    joinedTuple = joinedBuilder.makeResult( joinedTuplesArray );
154
155    return joinedTuple;
156    }
157
158  @Override
159  public boolean isEmpty( int pos )
160    {
161    return collections[ pos ].isEmpty();
162    }
163
164  @Override
165  public void reset( Tuple grouping, Iterator<Tuple>[] values )
166    {
167    super.reset( grouping, values );
168
169    build();
170    }
171
172  protected void build()
173    {
174    clearGroups();
175
176    if( collections[ 0 ] instanceof FalseCollection ) // force reset on FalseCollection
177      ( (FalseCollection) collections[ 0 ] ).reset( null );
178
179    while( values[ 0 ].hasNext() )
180      {
181      IndexTuple current = (IndexTuple) values[ 0 ].next();
182      int pos = current.getIndex();
183
184      // if this is the first (lhs) co-group, just use values iterator
185      // we are guaranteed all the remainder tuples in the iterator are from pos == 0
186      if( numSelfJoins == 0 && pos == 0 )
187        {
188        ( (FalseCollection) collections[ 0 ] ).reset( createIterator( current, values[ 0 ] ) );
189        break;
190        }
191
192      collections[ pos ].add( current.getTuple() ); // get the value tuple for this cogroup
193      }
194    }
195
196  protected void clearGroups()
197    {
198    for( Collection<Tuple> collection : collections )
199      {
200      collection.clear();
201
202      if( collection instanceof Spillable )
203        ( (Spillable) collection ).setGrouping( grouping );
204      }
205    }
206
207  protected void initLists()
208    {
209    collections = new Collection[ size() ];
210
211    // handle self joins
212    if( numSelfJoins != 0 )
213      {
214      Arrays.fill( collections, createTupleCollection( joinFields[ 0 ] ) );
215      }
216    else
217      {
218      collections[ 0 ] = new FalseCollection(); // we iterate this only once per grouping
219
220      for( int i = 1; i < joinFields.length; i++ )
221        collections[ i ] = createTupleCollection( joinFields[ i ] );
222      }
223
224    joinedBuilder = makeJoinedBuilder( joinFields );
225    joinedTuplesArray = new Tuple[ collections.length ];
226    }
227
228  static interface TupleBuilder
229    {
230    Tuple makeResult( Tuple[] tuples );
231    }
232
233  private TupleBuilder makeJoinedBuilder( final Fields[] joinFields )
234    {
235    final Fields[] fields = isSelfJoin() ? new Fields[ size() ] : joinFields;
236
237    if( isSelfJoin() )
238      Arrays.fill( fields, 0, fields.length, joinFields[ 0 ] );
239
240    return new TupleBuilder()
241    {
242    Tuple result = TupleViews.createComposite( fields );
243
244    @Override
245    public Tuple makeResult( Tuple[] tuples )
246      {
247      return TupleViews.reset( result, tuples );
248      }
249    };
250    }
251
252  protected Collection<Tuple> createTupleCollection( Fields joinField )
253    {
254    Collection<Tuple> collection = tupleCollectionFactory.create( flowProcess );
255
256    if( collection instanceof Spillable )
257      ( (Spillable) collection ).setSpillListener( createListener( joinField ) );
258
259    return collection;
260    }
261
262  private Spillable.SpillListener createListener( final Fields joinField )
263    {
264    return new SpillListener( flowProcess, joinField );
265    }
266
267  public Iterator<Tuple> createIterator( final IndexTuple current, final Iterator<IndexTuple> values )
268    {
269    return new Iterator<Tuple>()
270    {
271    IndexTuple value = current;
272
273    @Override
274    public boolean hasNext()
275      {
276      return value != null;
277      }
278
279    @Override
280    public Tuple next()
281      {
282      if( value == null && !values.hasNext() )
283        throw new NoSuchElementException();
284
285      Tuple result = value.getTuple();
286
287      if( values.hasNext() )
288        value = values.next();
289      else
290        value = null;
291
292      return result;
293      }
294
295    @Override
296    public void remove()
297      {
298      // unsupported
299      }
300    };
301    }
302  }