001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tuple.hadoop.util; 022 023 import java.io.IOException; 024 import java.util.Comparator; 025 026 import cascading.CascadingException; 027 import cascading.flow.hadoop.util.HadoopUtil; 028 import cascading.tuple.Fields; 029 import cascading.tuple.StreamComparator; 030 import cascading.tuple.Tuple; 031 import cascading.tuple.hadoop.TupleSerialization; 032 import cascading.tuple.hadoop.io.BufferedInputStream; 033 import cascading.tuple.hadoop.io.HadoopTupleInputStream; 034 import org.apache.hadoop.conf.Configuration; 035 import org.apache.hadoop.conf.Configured; 036 import org.apache.hadoop.io.RawComparator; 037 038 /** Class DeserializerComparator is the base class for all Cascading comparator classes. */ 039 public abstract class DeserializerComparator<T> extends Configured implements RawComparator<T> 040 { 041 final BufferedInputStream lhsBuffer = new BufferedInputStream(); 042 final BufferedInputStream rhsBuffer = new BufferedInputStream(); 043 044 TupleSerialization tupleSerialization; 045 046 HadoopTupleInputStream lhsStream; 047 HadoopTupleInputStream rhsStream; 048 049 Comparator[] groupComparators; 050 051 @Override 052 public void setConf( Configuration conf ) 053 { 054 if( conf == null ) 055 return; 056 057 super.setConf( conf ); 058 059 tupleSerialization = new TupleSerialization( conf ); 060 061 // get new readers so deserializers don't compete for the buffer 062 lhsStream = new HadoopTupleInputStream( lhsBuffer, tupleSerialization.getElementReader() ); 063 rhsStream = new HadoopTupleInputStream( rhsBuffer, tupleSerialization.getElementReader() ); 064 065 groupComparators = deserializeComparatorsFor( "cascading.group.comparator" ); 066 groupComparators = delegatingComparatorsFor( groupComparators ); 067 } 068 069 Comparator[] deserializeComparatorsFor( String name ) 070 { 071 Configuration conf = getConf(); 072 073 if( conf == null ) 074 throw new IllegalStateException( "no conf set" ); 075 076 return getFieldComparatorsFrom( conf, name ); 077 } 078 079 public static Comparator[] getFieldComparatorsFrom( Configuration conf, String name ) 080 { 081 String value = conf.get( name ); 082 083 if( value == null ) 084 return new Comparator[ conf.getInt( name + ".size", 1 ) ]; 085 086 try 087 { 088 return HadoopUtil.deserializeBase64( value, conf, Fields.class ).getComparators(); 089 } 090 catch( IOException exception ) 091 { 092 throw new CascadingException( "unable to deserialize comparators for: " + name ); 093 } 094 } 095 096 Comparator[] delegatingComparatorsFor( Comparator[] fieldComparators ) 097 { 098 Comparator[] comparators = new Comparator[ fieldComparators.length ]; 099 100 for( int i = 0; i < comparators.length; i++ ) 101 { 102 if( fieldComparators[ i ] instanceof StreamComparator ) 103 comparators[ i ] = new TupleElementStreamComparator( (StreamComparator) fieldComparators[ i ] ); 104 else if( fieldComparators[ i ] != null ) 105 comparators[ i ] = new TupleElementComparator( fieldComparators[ i ] ); 106 else 107 comparators[ i ] = new DelegatingTupleElementComparator( tupleSerialization ); 108 } 109 110 return comparators; 111 } 112 113 final int compareTuples( Comparator[] comparators, Tuple lhs, Tuple rhs ) 114 { 115 int lhsLen = lhs.size(); 116 int rhsLen = rhs.size(); 117 118 int c = lhsLen - rhsLen; 119 120 if( c != 0 ) 121 return c; 122 123 for( int i = 0; i < lhsLen; i++ ) 124 { 125 // hack to support comparators array length of 1 126 Object lhsObject = lhs.getObject( i ); 127 Object rhsObject = rhs.getObject( i ); 128 129 try 130 { 131 c = comparators[ i % comparators.length ].compare( lhsObject, rhsObject ); 132 } 133 catch( Exception exception ) 134 { 135 throw new CascadingException( "unable to compare object elements in position: " + i + " lhs: '" + lhsObject + "' rhs: '" + rhsObject + "'", exception ); 136 } 137 138 if( c != 0 ) 139 return c; 140 } 141 142 return 0; 143 } 144 145 final int compareTuples( Comparator[] comparators ) throws IOException 146 { 147 int lhsLen = lhsStream.getNumElements(); 148 int rhsLen = rhsStream.getNumElements(); 149 150 int c = lhsLen - rhsLen; 151 152 if( c != 0 ) 153 return c; 154 155 for( int i = 0; i < lhsLen; i++ ) 156 { 157 // hack to support comparators array length of 1 158 try 159 { 160 c = ( (StreamComparator) comparators[ i % comparators.length ] ).compare( lhsStream, rhsStream ); 161 } 162 catch( Exception exception ) 163 { 164 throw new CascadingException( "unable to compare stream elements in position: " + i, exception ); 165 } 166 167 if( c != 0 ) 168 return c; 169 } 170 171 return 0; 172 } 173 }