001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop.util;
022
023import java.io.IOException;
024import java.util.Comparator;
025
026import cascading.CascadingException;
027import cascading.tuple.StreamComparator;
028import cascading.tuple.hadoop.TupleSerialization;
029import cascading.tuple.io.TupleInputStream;
030
031/**
032 *
033 */
034public class DelegatingTupleElementComparator implements StreamComparator<TupleInputStream>, Comparator<Object>
035  {
036  final TupleSerialization tupleSerialization;
037  Comparator<Object> objectComparator = null;
038  StreamComparator<TupleInputStream> streamComparator = null;
039
040  public DelegatingTupleElementComparator( TupleSerialization tupleSerialization )
041    {
042    this.tupleSerialization = tupleSerialization;
043    }
044
045  @Override
046  public int compare( Object lhs, Object rhs )
047    {
048    if( objectComparator == null )
049      {
050      if( lhs == null && rhs == null )
051        return 0;
052
053      objectComparator = getComparator( lhs, rhs );
054      }
055
056    return objectComparator.compare( lhs, rhs );
057    }
058
059  private Comparator<Object> getComparator( Object lhs, Object rhs )
060    {
061    Class type = lhs != null ? lhs.getClass() : null;
062
063    type = type == null && rhs != null ? rhs.getClass() : type;
064
065    Comparator comparator = tupleSerialization.getComparator( type );
066
067    if( comparator instanceof StreamComparator )
068      return new TupleElementStreamComparator( (StreamComparator) comparator );
069
070    return new TupleElementComparator( comparator );
071    }
072
073  @Override
074  public int compare( TupleInputStream lhsStream, TupleInputStream rhsStream )
075    {
076    if( streamComparator == null )
077      streamComparator = getComparator( lhsStream );
078
079    return streamComparator.compare( lhsStream, rhsStream );
080    }
081
082  private StreamComparator getComparator( TupleInputStream lhsStream )
083    {
084    try
085      {
086      lhsStream.mark( 4 * 1024 );
087
088      Comparator foundComparator = lhsStream.getComparatorFor( lhsStream.readToken() );
089
090      // grab the configured default comparator, its ok if its null, just wasn't configured externally
091      if( foundComparator == null )
092        foundComparator = tupleSerialization.getDefaultComparator();
093
094      if( foundComparator instanceof StreamComparator )
095        return new TupleElementStreamComparator( (StreamComparator) foundComparator );
096      else
097        return new TupleElementComparator( foundComparator );
098      }
099    catch( IOException exception )
100      {
101      throw new CascadingException( exception );
102      }
103    finally
104      {
105      try
106        {
107        lhsStream.reset();
108        }
109      catch( IOException exception )
110        {
111        throw new CascadingException( exception );
112        }
113      }
114    }
115
116  }