001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop.util;
022
023import java.io.Serializable;
024import java.util.Arrays;
025import java.util.Comparator;
026
027import cascading.tuple.Hasher;
028import cascading.tuple.StreamComparator;
029import cascading.tuple.hadoop.io.BufferedInputStream;
030import org.apache.hadoop.io.WritableComparator;
031
032/**
033 * Class BytesComparator is used to compare arrays of bytes.
034 * <p/>
035 * Note that BytesComparator implements {@link Hasher}, but for the Hasher interface to be applied during grouping,
036 * sorting or joining, it must be set on a {@link cascading.tuple.Fields} instance via
037 * {@link cascading.tuple.Fields#setComparator(Comparable, java.util.Comparator)}.
038 */
039public class BytesComparator implements StreamComparator<BufferedInputStream>, Hasher<byte[]>, Comparator<byte[]>, Serializable
040  {
041  @Override
042  public int compare( byte[] lhs, byte[] rhs )
043    {
044    if( lhs == rhs )
045      return 0;
046
047    return WritableComparator.compareBytes( lhs, 0, lhs.length, rhs, 0, rhs.length );
048    }
049
050  @Override
051  public int compare( BufferedInputStream lhsStream, BufferedInputStream rhsStream )
052    {
053    byte[] lhs = lhsStream.getBuffer();
054    int lhsPos = lhsStream.getPosition();
055    int lhsLen = readLen( lhs, lhsPos );
056
057    lhsStream.skip( lhsLen + 4 );
058
059    byte[] rhs = rhsStream.getBuffer();
060    int rhsPos = rhsStream.getPosition();
061    int rhsLen = readLen( rhs, rhsPos );
062
063    rhsStream.skip( rhsLen + 4 );
064
065    return WritableComparator.compareBytes( lhs, lhsPos + 4, lhsLen, rhs, rhsPos + 4, rhsLen );
066    }
067
068  private int readLen( byte[] buffer, int off )
069    {
070    return ( ( buffer[ off ] & 0xff ) << 24 ) +
071      ( ( buffer[ off + 1 ] & 0xff ) << 16 ) +
072      ( ( buffer[ off + 2 ] & 0xff ) << 8 ) +
073      ( buffer[ off + 3 ] & 0xff );
074    }
075
076  @Override
077  public int hashCode( byte[] value )
078    {
079    return Arrays.hashCode( value );
080    }
081  }