001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.util;
022
023import java.io.Serializable;
024import java.util.Comparator;
025import java.util.List;
026
027import cascading.tuple.Fields;
028import cascading.tuple.Hasher;
029import cascading.tuple.Tuple;
030import cascading.util.Murmur3;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 *
036 */
037public class TupleHasher implements Serializable
038  {
039  private static final Logger LOG = LoggerFactory.getLogger( TupleHasher.class );
040
041  private static HashFunction DEFAULT_HASH_FUNCTION = new HashFunction();
042  private static Hasher DEFAULT_HASHER = new ObjectHasher();
043
044  protected HashFunction hashFunction = DEFAULT_HASH_FUNCTION;
045  private Hasher[] hashers;
046
047  public TupleHasher()
048    {
049    }
050
051  public TupleHasher( Comparator defaultComparator, Comparator[] comparators )
052    {
053    initialize( defaultComparator, comparators );
054    }
055
056  public static Comparator[] merge( Fields[] keyFields )
057    {
058    Comparator[] comparators = new Comparator[ keyFields[ 0 ].size() ];
059
060    for( Fields keyField : keyFields )
061      {
062      if( keyField == null )
063        continue;
064
065      for( int i = 0; i < keyField.getComparators().length; i++ )
066        {
067        Comparator comparator = keyField.getComparators()[ i ];
068
069        if( !( comparator instanceof Hasher ) )
070          continue;
071
072        if( comparators[ i ] != null && !comparators[ i ].equals( comparator ) )
073          LOG.warn( "two unequal Hasher instances for the same key field position found: {}, and: {}", comparators[ i ], comparator );
074
075        comparators[ i ] = comparator;
076        }
077      }
078
079    return comparators;
080    }
081
082  public static boolean isNull( Comparator[] comparators )
083    {
084    int count = 0;
085
086    for( Comparator comparator : comparators )
087      {
088      if( comparator == null )
089        count++;
090      }
091
092    if( count == comparators.length )
093      return true;
094
095    return false;
096    }
097
098  protected void initialize( Comparator defaultComparator, Comparator[] comparators )
099    {
100    Hasher defaultHasher = DEFAULT_HASHER;
101
102    if( defaultComparator instanceof Hasher )
103      defaultHasher = (Hasher) defaultComparator;
104
105    hashers = new Hasher[ comparators.length ];
106
107    for( int i = 0; i < comparators.length; i++ )
108      {
109      Comparator comparator = comparators[ i ];
110
111      if( comparator instanceof Hasher )
112        hashers[ i ] = (Hasher) comparator;
113      else
114        hashers[ i ] = defaultHasher;
115      }
116    }
117
118  public final int hashCode( Tuple tuple )
119    {
120    return getHashFunction().hash( tuple, hashers );
121    }
122
123  protected HashFunction getHashFunction()
124    {
125    return hashFunction;
126    }
127
128  private static class ObjectHasher implements Hasher<Object>, Serializable
129    {
130    @Override
131    public int hashCode( Object value )
132      {
133      if( value == null )
134        return 0;
135
136      return value.hashCode();
137      }
138    }
139
140  static class WrappedTuple extends Tuple
141    {
142    private final TupleHasher tupleHasher;
143
144    public WrappedTuple( TupleHasher tupleHasher, Tuple input )
145      {
146      super( Tuple.elements( input ) );
147      this.tupleHasher = tupleHasher;
148      }
149
150    @Override
151    public int hashCode()
152      {
153      return tupleHasher.hashCode( this );
154      }
155    }
156
157  /**
158   * Wraps the given Tuple in a subtype, that uses the provided Hasher for hashCode calculations. If the given Hasher
159   * is {@code null} the input Tuple will be returned.
160   *
161   * @param tupleHasher A TupleHasher instance.
162   * @param input       A Tuple instance.
163   * @return A tuple using the provided TupleHasher for hashCode calculations if the TupleHasher is not null.
164   */
165  public static Tuple wrapTuple( TupleHasher tupleHasher, Tuple input )
166    {
167    if( tupleHasher == null )
168      return input;
169
170    return new WrappedTuple( tupleHasher, input );
171    }
172
173  public static class HashFunction implements Serializable
174    {
175    public int hash( Tuple tuple, Hasher[] hashers )
176      {
177      int hash = Murmur3.SEED;
178
179      List<Object> elements = Tuple.elements( tuple );
180
181      for( int i = 0; i < elements.size(); i++ )
182        {
183        Object element = elements.get( i );
184
185        int hashCode = hashers[ i % hashers.length ].hashCode( element );
186
187        hash = Murmur3.mixH1( hash, Murmur3.mixK1( hashCode ) );
188        }
189
190      return Murmur3.fmix( hash, elements.size() );
191      }
192    }
193  }