001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop.util;
022
023import java.io.IOException;
024
025import cascading.CascadingException;
026import cascading.tuple.Tuple;
027import cascading.tuple.hadoop.TupleSerializationProps;
028import org.apache.hadoop.conf.Configurable;
029import org.apache.hadoop.conf.Configuration;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 *
035 */
036public abstract class BaseTupleComparator extends DeserializerComparator<Tuple> implements Configurable
037  {
038  private static final Logger LOG = LoggerFactory.getLogger( BaseTupleComparator.class );
039
040  RawComparison actual;
041
042  @Override
043  protected boolean canPerformRawComparisons()
044    {
045    return !getConf().getBoolean( TupleSerializationProps.SERIALIZATION_COMPARISON_BITWISE_PREVENT, false );
046    }
047
048  @Override
049  public void setConf( Configuration conf )
050    {
051    super.setConf( conf );
052
053    if( conf == null )
054      return;
055
056    if( performRawComparison() )
057      LOG.info( "enabling raw byte comparison and ordering" );
058
059    if( performRawComparison() )
060      actual = getByteComparison();
061    else
062      actual = getStreamComparison();
063    }
064
065  protected RawComparison getStreamComparison()
066    {
067    return new StreamComparison();
068    }
069
070  protected abstract RawComparison getByteComparison();
071
072  public int compare( byte[] b1, int s1, int l1, byte[] b2, int s2, int l2 )
073    {
074    return actual.compare( b1, s1, l1, b2, s2, l2 );
075    }
076
077  public int compare( Tuple lhs, Tuple rhs )
078    {
079    return compareTuples( groupComparators, lhs, rhs );
080    }
081
082  protected interface RawComparison
083    {
084    int compare( byte[] b1, int s1, int l1, byte[] b2, int s2, int l2 );
085    }
086
087  class StreamComparison implements RawComparison
088    {
089    @Override
090    public int compare( byte[] b1, int s1, int l1, byte[] b2, int s2, int l2 )
091      {
092      try
093        {
094        lhsBuffer.reset( b1, s1, l1 );
095        rhsBuffer.reset( b2, s2, l2 );
096
097        return compareTuples( keyTypes, groupComparators );
098        }
099      catch( IOException exception )
100        {
101        throw new CascadingException( exception );
102        }
103      finally
104        {
105        lhsBuffer.clear();
106        rhsBuffer.clear();
107        }
108      }
109    }
110  }