001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.hadoop.util; 022 023import java.io.InputStream; 024import java.util.Comparator; 025 026import cascading.CascadingException; 027import cascading.tuple.StreamComparator; 028import cascading.tuple.hadoop.io.HadoopTupleInputStream; 029 030/** 031 * 032 */ 033public class TypedTupleElementStreamComparator implements StreamComparator<HadoopTupleInputStream>, Comparator<Object> 034 { 035 private final Class type; 036 final StreamComparator comparator; 037 038 public TypedTupleElementStreamComparator( Class type, StreamComparator comparator ) 039 { 040 this.type = type; 041 this.comparator = comparator; 042 } 043 044 @Override 045 public int compare( Object lhs, Object rhs ) 046 { 047 return ( (Comparator<Object>) comparator ).compare( lhs, rhs ); 048 } 049 050 @Override 051 public int compare( HadoopTupleInputStream lhsStream, HadoopTupleInputStream rhsStream ) 052 { 053 try 054 { 055 InputStream lhs = lhsStream.getInputStream(); 056 InputStream rhs = rhsStream.getInputStream(); 057 058 if( !type.isPrimitive() ) 059 { 060 int lhsToken = lhsStream.readToken(); 061 int rhsToken = rhsStream.readToken(); 062 063 lhs = lhsToken == 0 ? null : lhs; 064 rhs = rhsToken == 0 ? null : rhs; 065 } 066 067 return comparator.compare( lhs, rhs ); 068 } 069 catch( Exception exception ) 070 { 071 throw new CascadingException( "unable to compare Tuples, likely a CoGroup is being attempted on fields of " + 072 "different types or custom comparators are incorrectly set on Fields", exception ); 073 } 074 } 075 }