001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.Map;
026
027import cascading.flow.FlowProcess;
028import cascading.flow.FlowProps;
029import cascading.flow.planner.Scope;
030import cascading.flow.stream.duct.Duct;
031import cascading.flow.stream.duct.Grouping;
032import cascading.flow.stream.duct.Window;
033import cascading.flow.stream.graph.IORole;
034import cascading.flow.stream.graph.StreamGraph;
035import cascading.flow.stream.util.SparseTupleComparator;
036import cascading.pipe.Splice;
037import cascading.tuple.Fields;
038import cascading.tuple.Tuple;
039import cascading.tuple.TupleEntry;
040import cascading.tuple.TupleEntryChainIterator;
041import cascading.tuple.TupleEntryIterator;
042import cascading.tuple.Tuples;
043import cascading.tuple.util.TupleBuilder;
044import cascading.tuple.util.TupleHasher;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import static cascading.tuple.util.TupleViews.*;
049
050/**
051 *
052 */
053public abstract class GroupingSpliceGate extends SpliceGate<TupleEntry, Grouping<TupleEntry, TupleEntryIterator>> implements Window
054  {
055  private static final Logger LOG = LoggerFactory.getLogger( GroupingSpliceGate.class );
056
057  protected Map<Duct, Integer> ordinalMap;
058
059  protected Fields[] keyFields;
060  protected Fields[] sortFields;
061  protected Fields[] valuesFields;
062
063  protected Comparator<Tuple>[] groupComparators;
064  protected Comparator<Tuple>[] valueComparators;
065  protected TupleHasher groupHasher;
066  protected boolean nullsAreNotEqual;
067
068  protected TupleBuilder[] keyBuilder;
069  protected TupleBuilder[] valuesBuilder;
070  protected TupleBuilder[] sortBuilder;
071
072  protected Grouping<TupleEntry, TupleEntryIterator> grouping;
073  protected TupleEntry keyEntry;
074  protected TupleEntryChainIterator tupleEntryIterator;
075
076  protected GroupingSpliceGate( FlowProcess flowProcess, Splice splice )
077    {
078    super( flowProcess, splice );
079    }
080
081  protected GroupingSpliceGate( FlowProcess flowProcess, Splice splice, IORole role )
082    {
083    super( flowProcess, splice, role );
084    }
085
086  @Override
087  public void bind( StreamGraph streamGraph )
088    {
089    super.bind( streamGraph );
090
091    setOrdinalMap( streamGraph );
092    }
093
094  protected synchronized void setOrdinalMap( StreamGraph streamGraph )
095    {
096    ordinalMap = streamGraph.getOrdinalMap( this );
097    }
098
099  protected TupleBuilder createNarrowBuilder( final Fields incomingFields, final Fields narrowFields )
100    {
101    if( narrowFields.isNone() )
102      return new TupleBuilder()
103      {
104      @Override
105      public Tuple makeResult( Tuple input, Tuple output )
106        {
107        return Tuple.NULL;
108        }
109      };
110
111    if( incomingFields.isUnknown() )
112      return new TupleBuilder()
113      {
114      @Override
115      public Tuple makeResult( Tuple input, Tuple output )
116        {
117        return input.get( incomingFields, narrowFields );
118        }
119      };
120
121    if( narrowFields.isAll() ) // dubious this is ever reached
122      return new TupleBuilder()
123      {
124      @Override
125      public Tuple makeResult( Tuple input, Tuple output )
126        {
127        return input;
128        }
129      };
130
131    return createDefaultNarrowBuilder( incomingFields, narrowFields );
132    }
133
134  protected TupleBuilder createDefaultNarrowBuilder( final Fields incomingFields, final Fields narrowFields )
135    {
136    return new TupleBuilder()
137    {
138    Tuple result = createNarrow( incomingFields.getPos( narrowFields ) );
139
140    @Override
141    public Tuple makeResult( Tuple input, Tuple output )
142      {
143      return reset( result, input );
144      }
145    };
146    }
147
148  protected TupleBuilder createNulledBuilder( final Fields incomingFields, final Fields keyField )
149    {
150    if( incomingFields.isUnknown() )
151      return new TupleBuilder()
152      {
153      @Override
154      public Tuple makeResult( Tuple input, Tuple output )
155        {
156        return Tuples.nulledCopy( incomingFields, input, keyField );
157        }
158      };
159
160    if( keyField.isNone() )
161      return new TupleBuilder()
162      {
163      @Override
164      public Tuple makeResult( Tuple input, Tuple output )
165        {
166        return input;
167        }
168      };
169
170    if( keyField.isAll() )
171      return new TupleBuilder()
172      {
173      Tuple nullTuple = Tuple.size( incomingFields.size() );
174
175      @Override
176      public Tuple makeResult( Tuple input, Tuple output )
177        {
178        return nullTuple;
179        }
180      };
181
182    return new TupleBuilder()
183    {
184    Tuple nullTuple = Tuple.size( keyField.size() );
185    Tuple result = createOverride( incomingFields, keyField );
186
187    @Override
188    public Tuple makeResult( Tuple baseTuple, Tuple output )
189      {
190      return reset( result, baseTuple, nullTuple );
191      }
192    };
193    }
194
195  @Override
196  public void initialize()
197    {
198    super.initialize();
199
200    int size = getNumDeclaredIncomingBranches(); // is the maximum ordinal value
201
202    // this is a merge, all fields have the same declaration
203    // filling out full array has implications on joiner/closure which should be resolved independently
204    if( role == IORole.source && splice.isGroupBy() )
205      size = 1;
206
207    keyFields = new Fields[ size ];
208    valuesFields = new Fields[ size ];
209
210    keyBuilder = new TupleBuilder[ size ];
211    valuesBuilder = new TupleBuilder[ size ];
212
213    if( splice.isSorted() )
214      {
215      sortFields = new Fields[ size ];
216      sortBuilder = new TupleBuilder[ size ];
217      }
218
219    Scope outgoingScope = outgoingScopes.get( 0 );
220
221    int numScopes = Math.min( size, incomingScopes.size() );
222    for( int i = 0; i < numScopes; i++ )
223      {
224      Scope incomingScope = incomingScopes.get( i );
225
226      // for GroupBy, incoming may have same name, but guaranteed to have same key/value/sort fields for merge
227      // arrays may be size 1, then ordinal should always be zero.
228      int ordinal = size == 1 ? 0 : incomingScope.getOrdinal();
229
230      keyFields[ ordinal ] = outgoingScope.getKeySelectors().get( incomingScope.getName() );
231      valuesFields[ ordinal ] = incomingScope.getIncomingSpliceFields();
232
233      keyBuilder[ ordinal ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ ordinal ] );
234      valuesBuilder[ ordinal ] = createNulledBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ ordinal ] );
235
236      if( sortFields != null )
237        {
238        sortFields[ ordinal ] = outgoingScope.getSortingSelectors().get( incomingScope.getName() );
239        sortBuilder[ ordinal ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), sortFields[ ordinal ] );
240        }
241
242      if( LOG.isDebugEnabled() )
243        {
244        LOG.debug( "incomingScope: {}, in pos: {}", incomingScope.getName(), ordinal );
245        LOG.debug( "keyFields: {}", printSafe( keyFields[ ordinal ] ) );
246        LOG.debug( "valueFields: {}", printSafe( valuesFields[ ordinal ] ) );
247
248        if( sortFields != null )
249          LOG.debug( "sortFields: {}", printSafe( sortFields[ ordinal ] ) );
250        }
251      }
252
253    if( role == IORole.sink )
254      return;
255
256    keyEntry = new TupleEntry( outgoingScope.getOutGroupingFields(), true );
257    tupleEntryIterator = new TupleEntryChainIterator( outgoingScope.getOutValuesFields() );
258
259    grouping = new Grouping<>();
260    grouping.key = keyEntry;
261    grouping.joinIterator = tupleEntryIterator;
262    }
263
264  protected void initComparators()
265    {
266    Comparator defaultComparator = (Comparator) flowProcess.newInstance( (String) flowProcess.getProperty( FlowProps.DEFAULT_ELEMENT_COMPARATOR ) );
267
268    Fields[] compareFields = new Fields[ getNumDeclaredIncomingBranches() ];
269    groupComparators = new Comparator[ getNumDeclaredIncomingBranches() ];
270
271    if( splice.isSorted() )
272      valueComparators = new Comparator[ getNumDeclaredIncomingBranches() ];
273
274    int size = splice.isGroupBy() ? 1 : getNumDeclaredIncomingBranches();
275
276    for( int i = 0; i < size; i++ )
277      {
278      Scope incomingScope = incomingScopes.get( i );
279
280      int pos = splice.isGroupBy() ? 0 : splice.getPipePos().get( incomingScope.getName() );
281
282      // we want the comparators
283      Fields groupFields = splice.getKeySelectors().get( incomingScope.getName() );
284
285      compareFields[ pos ] = groupFields; // used for finding hashers
286
287      if( groupFields.size() == 0 )
288        groupComparators[ pos ] = groupFields;
289      else
290        groupComparators[ pos ] = new SparseTupleComparator( Fields.asDeclaration( groupFields ), defaultComparator );
291
292      groupComparators[ pos ] = splice.isSortReversed() ? Collections.reverseOrder( groupComparators[ pos ] ) : groupComparators[ pos ];
293
294      if( sortFields != null )
295        {
296        // we want the comparators, so don't use sortFields array
297        Fields sortFields = splice.getSortingSelectors().get( incomingScope.getName() );
298        valueComparators[ pos ] = new SparseTupleComparator( valuesFields[ pos ], sortFields, defaultComparator );
299
300        if( splice.isSortReversed() )
301          valueComparators[ pos ] = Collections.reverseOrder( valueComparators[ pos ] );
302        }
303      }
304
305    nullsAreNotEqual = !areNullsEqual();
306
307    if( nullsAreNotEqual )
308      LOG.debug( "treating null values in Tuples at not equal during grouping" );
309
310    Comparator[] hashers = TupleHasher.merge( compareFields );
311    groupHasher = defaultComparator != null || !TupleHasher.isNull( hashers ) ? new TupleHasher( defaultComparator, hashers ) : null;
312    }
313
314  protected Comparator getKeyComparator()
315    {
316    if( groupComparators.length > 0 && groupComparators[ 0 ] != null )
317      return groupComparators[ 0 ];
318
319    return new Comparator<Comparable>()
320    {
321    @Override
322    public int compare( Comparable lhs, Comparable rhs )
323      {
324      return lhs.compareTo( rhs );
325      }
326    };
327    }
328
329  @Override
330  public void cleanup()
331    {
332    super.cleanup();
333
334    // close if top of stack
335    if( next == null )
336      flowProcess.closeTrapCollectors();
337    }
338
339  private boolean areNullsEqual()
340    {
341    try
342      {
343      Tuple tupleWithNull = Tuple.size( 1 );
344
345      return groupComparators[ 0 ].compare( tupleWithNull, tupleWithNull ) == 0;
346      }
347    catch( Exception exception )
348      {
349      return true; // assume we have an npe or something and they don't expect to see nulls
350      }
351    }
352
353  protected int getNumDeclaredIncomingBranches()
354    {
355    return splice.getPrevious().length;
356    }
357
358  /**
359   * This allows the tuple to honor the hasher and comparators, if any
360   *
361   * @param object the tuple to wrap
362   * @return a DelegatedTuple instance
363   */
364  protected final Tuple getDelegatedTuple( Tuple object )
365    {
366    if( groupHasher == null )
367      return object;
368
369    return new DelegatedTuple( object );
370    }
371
372  private String printSafe( Fields fields )
373    {
374    if( fields != null )
375      return fields.printVerbose();
376
377    return "";
378    }
379
380  @Override
381  public final boolean equals( Object object )
382    {
383    if( this == object )
384      return true;
385    if( !( object instanceof GroupingSpliceGate ) )
386      return false;
387
388    GroupingSpliceGate groupingSpliceGate = (GroupingSpliceGate) object;
389
390    if( splice != null ? splice != groupingSpliceGate.splice : groupingSpliceGate.splice != null )
391      return false;
392
393    return true;
394    }
395
396  @Override
397  public final int hashCode()
398    {
399    return splice != null ? System.identityHashCode( splice ) : 0;
400    }
401
402  @Override
403  public String toString()
404    {
405    final StringBuilder sb = new StringBuilder( "SpliceGate{" );
406    sb.append( "splice=" ).append( splice );
407    sb.append( ", role=" ).append( role );
408    sb.append( '}' );
409    return sb.toString();
410    }
411
412  protected class DelegatedTuple extends Tuple
413    {
414    public DelegatedTuple( Tuple wrapped )
415      {
416      // pass it in to prevent one being allocated
417      super( Tuple.elements( wrapped ) );
418      }
419
420    @Override
421    public boolean equals( Object object )
422      {
423      return compareTo( object ) == 0;
424      }
425
426    @Override
427    public int compareTo( Object other )
428      {
429      return groupComparators[ 0 ].compare( this, (Tuple) other );
430      }
431
432    @Override
433    public int hashCode()
434      {
435      return groupHasher.hashCode( this );
436      }
437    }
438  }