001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe.joiner; 022 023import java.beans.ConstructorProperties; 024import java.util.Arrays; 025import java.util.Iterator; 026 027import cascading.tuple.Fields; 028import cascading.tuple.Tuple; 029import cascading.tuple.Tuples; 030import cascading.tuple.util.TupleViews; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * Class InnerJoin will return an {@link Iterator} that will iterate over a given {@link Joiner} and return tuples that represent 036 * and inner join of the CoGrouper internal grouped tuple collections. 037 * <p/> 038 * Joins perform based on the equality of the join keys. In the case of null values, Java treats two 039 * null values as equivalent. SQL does not treat null values as equal. To produce SQL like results in a given 040 * join, a new {@link java.util.Comparator} will need to be used on the joined values to prevent null from 041 * equaling null. As a convenience, see the {@link cascading.util.NullNotEquivalentComparator} class. 042 */ 043public class InnerJoin extends BaseJoiner 044 { 045 /** Field LOG */ 046 private static final Logger LOG = LoggerFactory.getLogger( InnerJoin.class ); 047 048 public InnerJoin() 049 { 050 } 051 052 @ConstructorProperties({"fieldDeclaration"}) 053 public InnerJoin( Fields fieldDeclaration ) 054 { 055 super( fieldDeclaration ); 056 } 057 058 public Iterator<Tuple> getIterator( JoinerClosure closure ) 059 { 060 return new JoinIterator( closure ); 061 } 062 063 public int numJoins() 064 { 065 return -1; 066 } 067 068 public static class JoinIterator implements Iterator<Tuple> 069 { 070 final JoinerClosure closure; 071 Iterator[] iterators; 072 Tuple[] lastValues; 073 074 TupleBuilder resultBuilder; 075 Tuple result = new Tuple(); // will be replaced 076 077 public JoinIterator( JoinerClosure closure ) 078 { 079 this.closure = closure; 080 081 LOG.debug( "cogrouped size: {}", closure.size() ); 082 083 init(); 084 } 085 086 protected void init() 087 { 088 iterators = new Iterator[ closure.size() ]; 089 090 for( int i = 0; i < closure.size(); i++ ) 091 iterators[ i ] = getIterator( i ); 092 093 boolean isUnknown = false; 094 095 for( Fields fields : closure.getValueFields() ) 096 isUnknown |= fields.isUnknown(); 097 098 if( isUnknown ) 099 resultBuilder = new TupleBuilder() 100 { 101 Tuple result = new Tuple(); // is re-used 102 103 @Override 104 public Tuple makeResult( Tuple[] tuples ) 105 { 106 result.clear(); 107 108 // flatten the results into one Tuple 109 for( Tuple lastValue : tuples ) 110 result.addAll( lastValue ); 111 112 return result; 113 } 114 }; 115 else 116 resultBuilder = new TupleBuilder() 117 { 118 Tuple result; 119 120 { 121 // handle self join. 122 Fields[] fields = closure.getValueFields(); 123 124 if( closure.isSelfJoin() ) 125 { 126 fields = new Fields[ closure.size() ]; 127 128 Arrays.fill( fields, closure.getValueFields()[ 0 ] ); 129 } 130 131 result = TupleViews.createComposite( fields ); 132 } 133 134 @Override 135 public Tuple makeResult( Tuple[] tuples ) 136 { 137 return TupleViews.reset( result, tuples ); 138 } 139 }; 140 } 141 142 protected Iterator getIterator( int i ) 143 { 144 return closure.getIterator( i ); 145 } 146 147 private Tuple[] initLastValues() 148 { 149 lastValues = new Tuple[ iterators.length ]; 150 151 for( int i = 0; i < iterators.length; i++ ) 152 lastValues[ i ] = (Tuple) iterators[ i ].next(); 153 154 return lastValues; 155 } 156 157 public final boolean hasNext() 158 { 159 // if this is the first pass, and there is an iterator without a next value, 160 // then we have no next element 161 if( lastValues == null ) 162 { 163 for( Iterator iterator : iterators ) 164 { 165 if( !iterator.hasNext() ) 166 return false; 167 } 168 169 return true; 170 } 171 172 for( Iterator iterator : iterators ) 173 { 174 if( iterator.hasNext() ) 175 return true; 176 } 177 178 return false; 179 } 180 181 public Tuple next() 182 { 183 if( lastValues == null ) 184 return makeResult( initLastValues() ); 185 186 for( int i = iterators.length - 1; i >= 0; i-- ) 187 { 188 if( iterators[ i ].hasNext() ) 189 { 190 lastValues[ i ] = (Tuple) iterators[ i ].next(); 191 break; 192 } 193 194 // reset to first 195 iterators[ i ] = getIterator( i ); 196 lastValues[ i ] = (Tuple) iterators[ i ].next(); 197 } 198 199 return makeResult( lastValues ); 200 } 201 202 private Tuple makeResult( Tuple[] lastValues ) 203 { 204 Tuples.asModifiable( result ); 205 206 result = resultBuilder.makeResult( lastValues ); 207 208 if( LOG.isTraceEnabled() ) 209 LOG.trace( "tuple: {}", result.print() ); 210 211 return result; 212 } 213 214 public void remove() 215 { 216 // unsupported 217 } 218 } 219 220 static interface TupleBuilder 221 { 222 Tuple makeResult( Tuple[] tuples ); 223 } 224 }