001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tuple.hadoop.util; 022 023 import java.io.InputStream; 024 import java.util.Comparator; 025 026 import cascading.CascadingException; 027 import cascading.tuple.StreamComparator; 028 import cascading.tuple.hadoop.io.HadoopTupleInputStream; 029 import cascading.tuple.hadoop.io.HadoopTupleOutputStream; 030 031 /** 032 * 033 */ 034 public class TupleElementStreamComparator implements StreamComparator<HadoopTupleInputStream>, Comparator<Object> 035 { 036 final StreamComparator comparator; 037 038 public TupleElementStreamComparator( StreamComparator comparator ) 039 { 040 this.comparator = comparator; 041 } 042 043 @Override 044 public int compare( Object lhs, Object rhs ) 045 { 046 return ( (Comparator<Object>) comparator ).compare( lhs, rhs ); 047 } 048 049 @Override 050 public int compare( HadoopTupleInputStream lhsStream, HadoopTupleInputStream rhsStream ) 051 { 052 try 053 { 054 // pop off element type, its assumed we know it as we have a stream comparator 055 // to delegate too 056 int lhsToken = lhsStream.readToken(); 057 int rhsToken = rhsStream.readToken(); 058 059 if( lhsToken == HadoopTupleOutputStream.WRITABLE_TOKEN ) 060 lhsStream.readString(); 061 062 if( rhsToken == HadoopTupleOutputStream.WRITABLE_TOKEN ) 063 rhsStream.readString(); 064 065 InputStream lhs = lhsToken == 0 ? null : lhsStream.getInputStream(); 066 InputStream rhs = rhsToken == 0 ? null : rhsStream.getInputStream(); 067 068 return comparator.compare( lhs, rhs ); 069 } 070 catch( Exception exception ) 071 { 072 throw new CascadingException( "unable to compare Tuples, likely a CoGroup is being attempted on fields of " + 073 "different types or custom comparators are incorrectly set on Fields", exception ); 074 } 075 } 076 }