001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.util;
022
023import java.util.Comparator;
024
025import cascading.tuple.Fields;
026import cascading.tuple.Tuple;
027
028/**
029 *
030 */
031public class SparseTupleComparator implements Comparator<Tuple>
032  {
033  private final static Comparator DEFAULT = new NaturalComparator();
034
035  private static class NaturalComparator implements Comparator<Object>
036    {
037    @Override
038    public int compare( Object lhs, Object rhs )
039      {
040      if( lhs == null && rhs == null )
041        return 0;
042      else if( lhs == null )
043        return -1;
044      else if( rhs == null )
045        return 1;
046      else
047        return ( (Comparable) lhs ).compareTo( rhs ); // guaranteed to not be null
048      }
049    }
050
051  final Comparator[] comparators;
052  final int[] posMap;
053
054  public SparseTupleComparator( Fields valuesField, Fields sortFields )
055    {
056    this( valuesField, sortFields, null );
057    }
058
059  public SparseTupleComparator( Fields groupFields, Comparator defaultComparator )
060    {
061    this( groupFields, groupFields, defaultComparator );
062    }
063
064  public SparseTupleComparator( Fields valuesFields, Fields sortFields, Comparator defaultComparator )
065    {
066    if( defaultComparator == null )
067      defaultComparator = DEFAULT;
068
069    int size = valuesFields != null && !valuesFields.isUnknown() ? valuesFields.size() : sortFields.size();
070    comparators = new Comparator[ size ];
071    posMap = new int[ size ];
072
073    Comparator[] sortFieldComparators = sortFields.getComparators(); // returns a copy
074
075    for( int i = 0; i < sortFields.size(); i++ )
076      {
077      Comparable field = sortFields.get( i );
078      int pos = valuesFields != null ? valuesFields.getPos( field ) : i;
079
080      comparators[ i ] = sortFieldComparators[ i ];
081      posMap[ i ] = pos;
082
083      if( comparators[ i ] == null )
084        comparators[ i ] = defaultComparator;
085      }
086    }
087
088  public Comparator[] getComparators()
089    {
090    return comparators;
091    }
092
093  @Override
094  public int compare( Tuple lhs, Tuple rhs )
095    {
096    for( int i = 0; i < comparators.length; i++ )
097      {
098      Comparator comparator = comparators[ i ];
099
100      if( comparator == null )
101        continue;
102
103      int pos = posMap[ i ];
104      int c = comparator.compare( lhs.getObject( pos ), rhs.getObject( pos ) );
105
106      if( c != 0 )
107        return c;
108      }
109
110    return 0;
111    }
112  }