001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.hadoop.util;
022    
023    import java.io.IOException;
024    import java.util.Comparator;
025    
026    import cascading.CascadingException;
027    import cascading.tuple.StreamComparator;
028    import cascading.tuple.io.TupleInputStream;
029    
030    /**
031     *
032     */
033    public class TupleElementComparator implements StreamComparator<TupleInputStream>, Comparator<Object>
034      {
035      Comparator comparator = new Comparator<Comparable>()
036      {
037      @Override
038      public int compare( Comparable lhs, Comparable rhs )
039        {
040        if( lhs == null && rhs == null )
041          return 0;
042    
043        if( lhs == null )
044          return -1;
045    
046        if( rhs == null )
047          return 1;
048    
049        return lhs.compareTo( rhs ); // guaranteed to not be null
050        }
051      };
052    
053      public TupleElementComparator()
054        {
055        }
056    
057      public TupleElementComparator( Comparator comparator )
058        {
059        if( comparator != null )
060          this.comparator = comparator;
061        }
062    
063      @Override
064      public int compare( Object lhs, Object rhs )
065        {
066        return comparator.compare( lhs, rhs );
067        }
068    
069      @Override
070      public int compare( TupleInputStream lhsStream, TupleInputStream rhsStream )
071        {
072        Object lhs;
073        Object rhs;
074    
075        try
076          {
077          lhs = lhsStream.getNextElement();
078          rhs = rhsStream.getNextElement();
079          }
080        catch( IOException exception )
081          {
082          throw new CascadingException( "unable to read element from underlying stream", exception );
083          }
084    
085        try
086          {
087          return comparator.compare( lhs, rhs );
088          }
089        catch( Exception exception )
090          {
091          throw new CascadingException( "unable to compare Tuples, likely a CoGroup is being attempted on fields of " +
092            "different types or custom comparators are incorrectly set on Fields, lhs: '" + lhs + "' rhs: '" + rhs + "'", exception );
093          }
094        }
095      }