001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.hadoop.util;
022    
023    import java.util.Comparator;
024    import java.util.List;
025    
026    import cascading.tuple.Hasher;
027    import cascading.tuple.Tuple;
028    import cascading.tuple.hadoop.TupleSerialization;
029    import cascading.tuple.util.TupleHasher;
030    import org.apache.hadoop.mapred.JobConf;
031    import org.apache.hadoop.mapred.JobConfigurable;
032    
033    /**
034     * Super class of all Hadoop partitioners.
035     * <p/>
036     * As of Cascading 2.7 the hashing used to calculate partitions has been changed to use Murmur3. Users that rely on the
037     * old behaviour should set {@link cascading.tuple.hadoop.util.HasherPartitioner#HASHER_PARTITIONER_USE_LEGACY_HASH} to
038     * {@code true}.
039     */
040    public class HasherPartitioner extends TupleHasher implements JobConfigurable
041      {
042      public final static String HASHER_PARTITIONER_USE_LEGACY_HASH = "cascading.tuple.hadoop.util.hasherpartitioner.uselegacyhash";
043    
044      private static Comparator defaultComparator;
045    
046      private Comparator[] comparators;
047    
048      public void configure( JobConf jobConf )
049        {
050        defaultComparator = TupleSerialization.getDefaultComparator( defaultComparator, jobConf );
051    
052        comparators = DeserializerComparator.getFieldComparatorsFrom( jobConf, "cascading.group.comparator" );
053    
054        if( jobConf.getBoolean( HASHER_PARTITIONER_USE_LEGACY_HASH, false ) )
055          this.hashFunction = new LegacyHashFunction();
056    
057        initialize( defaultComparator, comparators );
058        }
059    
060      static class LegacyHashFunction extends TupleHasher.HashFunction
061        {
062        @Override
063        public int hash( Tuple tuple, Hasher[] hashers )
064          {
065          int hash = 1;
066          List<Object> elements = Tuple.elements( tuple );
067          for( int i = 0; i < elements.size(); i++ )
068            {
069            Object element = elements.get( i );
070            hash = 31 * hash + ( element != null ? hashers[ i % hashers.length ].hashCode( element ) : 0 );
071            }
072          return hash;
073          }
074        }
075      }