001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop.util;
022
023import java.util.Comparator;
024import java.util.List;
025
026import cascading.tuple.Hasher;
027import cascading.tuple.Tuple;
028import cascading.tuple.hadoop.TupleSerialization;
029import cascading.tuple.util.TupleHasher;
030import org.apache.hadoop.conf.Configurable;
031import org.apache.hadoop.conf.Configuration;
032
033/**
034 * Super class of all Hadoop partitioners.
035 * <p/>
036 * As of Cascading 2.7 the hashing used to calculate partitions has been changed to use Murmur3. Users that rely on the
037 * old behaviour should set {@link cascading.tuple.hadoop.util.HasherPartitioner#HASHER_PARTITIONER_USE_LEGACY_HASH} to
038 * {@code true}.
039 */
040public class HasherPartitioner extends TupleHasher implements Configurable
041  {
042  public final static String HASHER_PARTITIONER_USE_LEGACY_HASH = "cascading.tuple.hadoop.util.hasherpartitioner.uselegacyhash";
043
044  private static Comparator defaultComparator;
045
046  private Comparator[] comparators;
047  private Configuration conf;
048
049  @Override
050  public void setConf( Configuration conf )
051    {
052    if( this.conf != null )
053      return;
054
055    this.conf = conf;
056
057    defaultComparator = TupleSerialization.getDefaultComparator( defaultComparator, conf );
058
059    comparators = DeserializerComparator.getFieldComparatorsFrom( conf, "cascading.group.comparator" );
060
061    if( conf.getBoolean( HASHER_PARTITIONER_USE_LEGACY_HASH, false ) )
062      this.hashFunction = new LegacyHashFunction();
063
064    initialize( defaultComparator, comparators );
065    }
066
067  @Override
068  public Configuration getConf()
069    {
070    return conf;
071    }
072
073  static class LegacyHashFunction extends TupleHasher.HashFunction
074    {
075    @Override
076    public int hash( Tuple tuple, Hasher[] hashers )
077      {
078      int hash = 1;
079      List<Object> elements = Tuple.elements( tuple );
080      for( int i = 0; i < elements.size(); i++ )
081        {
082        Object element = elements.get( i );
083        hash = 31 * hash + ( element != null ? hashers[ i % hashers.length ].hashCode( element ) : 0 );
084        }
085      return hash;
086      }
087    }
088  }