001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.hadoop.util; 022 023import java.io.IOException; 024import java.util.Comparator; 025 026import cascading.CascadingException; 027import cascading.tuple.StreamComparator; 028import cascading.tuple.hadoop.TupleSerialization; 029import cascading.tuple.io.TupleInputStream; 030 031/** 032 * 033 */ 034public class DelegatingTupleElementComparator implements StreamComparator<TupleInputStream>, Comparator<Object> 035 { 036 final TupleSerialization tupleSerialization; 037 Comparator<Object> objectComparator = null; 038 StreamComparator<TupleInputStream> streamComparator = null; 039 040 public DelegatingTupleElementComparator( TupleSerialization tupleSerialization ) 041 { 042 this.tupleSerialization = tupleSerialization; 043 } 044 045 @Override 046 public int compare( Object lhs, Object rhs ) 047 { 048 if( objectComparator == null ) 049 { 050 if( lhs == null && rhs == null ) 051 return 0; 052 053 objectComparator = getComparator( lhs, rhs ); 054 } 055 056 return objectComparator.compare( lhs, rhs ); 057 } 058 059 private Comparator<Object> getComparator( Object lhs, Object rhs ) 060 { 061 Class type = lhs != null ? lhs.getClass() : null; 062 063 type = type == null && rhs != null ? rhs.getClass() : type; 064 065 Comparator comparator = tupleSerialization.getComparator( type ); 066 067 if( comparator instanceof StreamComparator ) 068 return new TupleElementStreamComparator( (StreamComparator) comparator ); 069 070 return new TupleElementComparator( comparator ); 071 } 072 073 @Override 074 public int compare( TupleInputStream lhsStream, TupleInputStream rhsStream ) 075 { 076 if( streamComparator == null ) 077 streamComparator = getComparator( lhsStream ); 078 079 return streamComparator.compare( lhsStream, rhsStream ); 080 } 081 082 private StreamComparator getComparator( TupleInputStream lhsStream ) 083 { 084 try 085 { 086 lhsStream.mark( 4 * 1024 ); 087 088 Comparator foundComparator = lhsStream.getComparatorFor( lhsStream.readToken() ); 089 090 // grab the configured default comparator, its ok if its null, just wasn't configured externally 091 if( foundComparator == null ) 092 foundComparator = tupleSerialization.getDefaultComparator(); 093 094 if( foundComparator instanceof StreamComparator ) 095 return new TupleElementStreamComparator( (StreamComparator) foundComparator ); 096 else 097 return new TupleElementComparator( foundComparator ); 098 } 099 catch( IOException exception ) 100 { 101 throw new CascadingException( exception ); 102 } 103 finally 104 { 105 try 106 { 107 lhsStream.reset(); 108 } 109 catch( IOException exception ) 110 { 111 throw new CascadingException( exception ); 112 } 113 } 114 } 115 116 }