001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.hadoop.util; 022 023import java.io.IOException; 024import java.util.Comparator; 025 026import cascading.CascadingException; 027import cascading.tuple.StreamComparator; 028import cascading.tuple.io.TupleInputStream; 029 030/** 031 * 032 */ 033public class TypedTupleElementComparator implements StreamComparator<TupleInputStream>, Comparator<Object> 034 { 035 private Class type; 036 037 Comparator comparator = new Comparator<Comparable>() 038 { 039 @Override 040 public int compare( Comparable lhs, Comparable rhs ) 041 { 042 if( lhs == null && rhs == null ) 043 return 0; 044 045 if( lhs == null ) 046 return -1; 047 048 if( rhs == null ) 049 return 1; 050 051 return lhs.compareTo( rhs ); // guaranteed to not be null 052 } 053 }; 054 055 public TypedTupleElementComparator() 056 { 057 } 058 059 public TypedTupleElementComparator( Class type, Comparator comparator ) 060 { 061 this.type = type; 062 063 if( comparator != null ) 064 this.comparator = comparator; 065 } 066 067 @Override 068 public int compare( Object lhs, Object rhs ) 069 { 070 return comparator.compare( lhs, rhs ); 071 } 072 073 @Override 074 public int compare( TupleInputStream lhsStream, TupleInputStream rhsStream ) 075 { 076 Object lhs; 077 Object rhs; 078 079 try 080 { 081 lhs = lhsStream.readType( type ); 082 rhs = rhsStream.readType( type ); 083 } 084 catch( IOException exception ) 085 { 086 throw new CascadingException( "unable to read element from underlying stream", exception ); 087 } 088 089 try 090 { 091 return comparator.compare( lhs, rhs ); 092 } 093 catch( Exception exception ) 094 { 095 throw new CascadingException( "unable to compare Tuples, likely a CoGroup is being attempted on fields of " + 096 "different types or custom comparators are incorrectly set on Fields, lhs: '" + lhs + "' rhs: '" + rhs + "'", exception ); 097 } 098 } 099 }