001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.util;
022
023import java.io.IOException;
024import java.util.Comparator;
025import java.util.Iterator;
026
027import cascading.CascadingException;
028import cascading.tuple.Tuple;
029import cascading.tuple.io.TuplePair;
030import org.apache.tez.runtime.library.api.KeyValuesReader;
031
032/**
033 *
034 */
035public class SecondarySortKeyValuesReader extends KeyValuesReader
036  {
037  private KeyValuesReader parent;
038  private Comparator<Tuple> groupComparator;
039  private Tuple currentKey;
040  private Iterable<Object> currentValues;
041  private boolean isNewKey = false;
042  private TuplePair currentKeyPair;
043
044  public SecondarySortKeyValuesReader( KeyValuesReader parent, Comparator<Tuple> groupComparator )
045    {
046    this.parent = parent;
047    this.groupComparator = groupComparator;
048    }
049
050  @Override
051  public boolean next() throws IOException
052    {
053    // next for forwarding to the next key
054    // so we must keep iterating across keys in the parent until we find a new
055    // grouping key without its accompanied sort values
056
057    if( parent != null && isNewKey )
058      {
059      isNewKey = false; // allow next next() to advance underlying iterator
060      return true;
061      }
062
063    boolean advanced = advance();
064
065    while( !isNewKey && advanced )
066      advanced = advance();
067
068    isNewKey = false;
069
070    return advanced;
071    }
072
073  protected boolean advance() throws IOException
074    {
075    if( parent == null )
076      return false;
077
078    boolean next = parent.next();
079
080    if( !next )
081      {
082      parent = null;
083      return false;
084      }
085
086    currentKeyPair = (TuplePair) parent.getCurrentKey();
087
088    isNewKey = currentKey == null || groupComparator.compare( currentKey, currentKeyPair.getLhs() ) != 0;
089    currentKey = currentKeyPair.getLhs();
090    currentValues = parent.getCurrentValues();
091
092    return true;
093    }
094
095  @Override
096  public Object getCurrentKey() throws IOException
097    {
098    return currentKeyPair;
099    }
100
101  @Override
102  public Iterable<Object> getCurrentValues() throws IOException
103    {
104    return new Iterable<Object>()
105    {
106    @Override
107    public Iterator<Object> iterator()
108      {
109      final Iterator<Object>[] iterator = new Iterator[]{currentValues.iterator()};
110
111      return new Iterator<Object>()
112      {
113      @Override
114      public boolean hasNext()
115        {
116        boolean hasNext = iterator[ 0 ].hasNext();
117
118        if( hasNext )
119          return true;
120
121        if( !advanceSafe() )
122          return false;
123
124        if( isNewKey )
125          return false;
126
127        iterator[ 0 ] = currentValues.iterator();
128
129        return hasNext();
130        }
131
132      @Override
133      public Object next()
134        {
135        return iterator[ 0 ].next();
136        }
137
138      @Override
139      public void remove()
140        {
141        iterator[ 0 ].remove();
142        }
143
144      protected boolean advanceSafe()
145        {
146        try
147          {
148          return advance();
149          }
150        catch( IOException exception )
151          {
152          throw new CascadingException( "unable to advance values iterator", exception );
153          }
154        }
155      };
156      }
157    };
158    }
159  }