001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.hadoop.util;
022    
023    import java.io.IOException;
024    
025    import cascading.CascadingException;
026    import cascading.tuple.io.IndexTuple;
027    import org.apache.hadoop.conf.Configurable;
028    
029    public class IndexTupleCoGroupingComparator extends DeserializerComparator<IndexTuple> implements Configurable
030      {
031      public int compare( byte[] b1, int s1, int l1, byte[] b2, int s2, int l2 )
032        {
033        try
034          {
035          lhsBuffer.reset( b1, s1, l1 );
036          rhsBuffer.reset( b2, s2, l2 );
037    
038          int lhsIndex = lhsStream.readVInt();
039          int rhsIndex = rhsStream.readVInt();
040    
041          int c = compareTuples( groupComparators );
042    
043          if( c != 0 )
044            return c;
045    
046          return rhsIndex - lhsIndex;
047          }
048        catch( IOException exception )
049          {
050          throw new CascadingException( exception );
051          }
052        finally
053          {
054          lhsBuffer.clear();
055          rhsBuffer.clear();
056          }
057        }
058    
059      public int compare( IndexTuple lhs, IndexTuple rhs )
060        {
061        int c = compareTuples( groupComparators, lhs.getTuple(), rhs.getTuple() );
062    
063        if( c != 0 )
064          return c;
065    
066        // intentionally sort in reverse
067        return rhs.getIndex() - lhs.getIndex();
068        }
069      }