001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe.joiner;
022
023import java.beans.ConstructorProperties;
024import java.util.Arrays;
025import java.util.Iterator;
026
027import cascading.tuple.Fields;
028import cascading.tuple.Tuple;
029import cascading.tuple.Tuples;
030import cascading.tuple.util.TupleViews;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Class InnerJoin will return an {@link Iterator} that will iterate over a given {@link Joiner} and return tuples that represent
036 * and inner join of the CoGrouper internal grouped tuple collections.
037 * <p/>
038 * Joins perform based on the equality of the join keys. In the case of null values, Java treats two
039 * null values as equivalent. SQL does not treat null values as equal. To produce SQL like results in a given
040 * join, a new {@link java.util.Comparator} will need to be used on the joined values to prevent null from
041 * equaling null. As a convenience, see the {@link cascading.util.NullNotEquivalentComparator} class.
042 */
043public class InnerJoin extends BaseJoiner
044  {
045  /** Field LOG */
046  private static final Logger LOG = LoggerFactory.getLogger( InnerJoin.class );
047
048  public InnerJoin()
049    {
050    }
051
052  @ConstructorProperties({"fieldDeclaration"})
053  public InnerJoin( Fields fieldDeclaration )
054    {
055    super( fieldDeclaration );
056    }
057
058  public Iterator<Tuple> getIterator( JoinerClosure closure )
059    {
060    return new JoinIterator( closure );
061    }
062
063  public int numJoins()
064    {
065    return -1;
066    }
067
068  public static class JoinIterator implements Iterator<Tuple>
069    {
070    final JoinerClosure closure;
071    Iterator[] iterators;
072    Tuple[] lastValues;
073
074    TupleBuilder resultBuilder;
075    Tuple result = new Tuple(); // will be replaced
076
077    public JoinIterator( JoinerClosure closure )
078      {
079      this.closure = closure;
080
081      LOG.debug( "cogrouped size: {}", closure.size() );
082
083      init();
084      }
085
086    protected void init()
087      {
088      iterators = new Iterator[ closure.size() ];
089
090      for( int i = 0; i < closure.size(); i++ )
091        iterators[ i ] = getIterator( i );
092
093      boolean isUnknown = false;
094
095      for( Fields fields : closure.getValueFields() )
096        isUnknown |= fields.isUnknown();
097
098      if( isUnknown )
099        resultBuilder = new TupleBuilder()
100        {
101        Tuple result = new Tuple(); // is re-used
102
103        @Override
104        public Tuple makeResult( Tuple[] tuples )
105          {
106          result.clear();
107
108          // flatten the results into one Tuple
109          for( Tuple lastValue : tuples )
110            result.addAll( lastValue );
111
112          return result;
113          }
114        };
115      else
116        resultBuilder = new TupleBuilder()
117        {
118        Tuple result;
119
120        {
121        // handle self join.
122        Fields[] fields = closure.getValueFields();
123
124        if( closure.isSelfJoin() )
125          {
126          fields = new Fields[ closure.size() ];
127
128          Arrays.fill( fields, closure.getValueFields()[ 0 ] );
129          }
130
131        result = TupleViews.createComposite( fields );
132        }
133
134        @Override
135        public Tuple makeResult( Tuple[] tuples )
136          {
137          return TupleViews.reset( result, tuples );
138          }
139        };
140      }
141
142    protected Iterator getIterator( int i )
143      {
144      return closure.getIterator( i );
145      }
146
147    private Tuple[] initLastValues()
148      {
149      lastValues = new Tuple[ iterators.length ];
150
151      for( int i = 0; i < iterators.length; i++ )
152        lastValues[ i ] = (Tuple) iterators[ i ].next();
153
154      return lastValues;
155      }
156
157    public final boolean hasNext()
158      {
159      // if this is the first pass, and there is an iterator without a next value,
160      // then we have no next element
161      if( lastValues == null )
162        {
163        for( Iterator iterator : iterators )
164          {
165          if( !iterator.hasNext() )
166            return false;
167          }
168
169        return true;
170        }
171
172      for( Iterator iterator : iterators )
173        {
174        if( iterator.hasNext() )
175          return true;
176        }
177
178      return false;
179      }
180
181    public Tuple next()
182      {
183      if( lastValues == null )
184        return makeResult( initLastValues() );
185
186      for( int i = iterators.length - 1; i >= 0; i-- )
187        {
188        if( iterators[ i ].hasNext() )
189          {
190          lastValues[ i ] = (Tuple) iterators[ i ].next();
191          break;
192          }
193
194        // reset to first
195        iterators[ i ] = getIterator( i );
196        lastValues[ i ] = (Tuple) iterators[ i ].next();
197        }
198
199      return makeResult( lastValues );
200      }
201
202    private Tuple makeResult( Tuple[] lastValues )
203      {
204      Tuples.asModifiable( result );
205
206      result = resultBuilder.makeResult( lastValues );
207
208      if( LOG.isTraceEnabled() )
209        LOG.trace( "tuple: {}", result.print() );
210
211      return result;
212      }
213
214    public void remove()
215      {
216      // unsupported
217      }
218    }
219
220  static interface TupleBuilder
221    {
222    Tuple makeResult( Tuple[] tuples );
223    }
224  }