001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.util.Arrays; 024import java.util.Collection; 025import java.util.Iterator; 026import java.util.NoSuchElementException; 027 028import cascading.flow.FlowProcess; 029import cascading.flow.hadoop.util.FalseCollection; 030import cascading.provider.FactoryLoader; 031import cascading.tuple.Fields; 032import cascading.tuple.Tuple; 033import cascading.tuple.Tuples; 034import cascading.tuple.collect.Spillable; 035import cascading.tuple.collect.SpillableTupleList; 036import cascading.tuple.collect.TupleCollectionFactory; 037import cascading.tuple.hadoop.collect.HadoopTupleCollectionFactory; 038import cascading.tuple.io.IndexTuple; 039import cascading.tuple.util.TupleViews; 040import org.apache.hadoop.conf.Configuration; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import static cascading.tuple.collect.TupleCollectionFactory.TUPLE_COLLECTION_FACTORY; 045 046/** Class CoGroupClosure is used internally to represent co-grouping results of multiple tuple streams. */ 047public class HadoopCoGroupClosure extends HadoopGroupByClosure 048 { 049 /** Field LOG */ 050 private static final Logger LOG = LoggerFactory.getLogger( HadoopCoGroupClosure.class ); 051 052 public enum Spill 053 { 054 Num_Spills_Written, Num_Spills_Read, Num_Tuples_Spilled, Duration_Millis_Written 055 } 056 057 private class SpillListener implements Spillable.SpillListener 058 { 059 private final FlowProcess flowProcess; 060 private final Fields joinField; 061 062 public SpillListener( FlowProcess flowProcess, Fields joinField ) 063 { 064 this.flowProcess = flowProcess; 065 this.joinField = joinField; 066 } 067 068 @Override 069 public void notifyWriteSpillBegin( Spillable spillable, int spillSize, String spillReason ) 070 { 071 int numFiles = spillable.spillCount(); 072 073 if( numFiles % 10 == 0 ) 074 { 075 LOG.info( "spilling group: {}, on grouping: {}, num times: {}, with reason: {}", 076 new Object[]{joinField.printVerbose(), spillable.getGrouping().print(), numFiles + 1, spillReason} ); 077 078 Runtime runtime = Runtime.getRuntime(); 079 long freeMem = runtime.freeMemory() / 1024 / 1024; 080 long maxMem = runtime.maxMemory() / 1024 / 1024; 081 long totalMem = runtime.totalMemory() / 1024 / 1024; 082 083 LOG.info( "mem on spill (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem ); 084 } 085 086 LOG.info( "spilling {} tuples in list to file number {}", spillSize, numFiles + 1 ); 087 088 flowProcess.increment( Spill.Num_Spills_Written, 1 ); 089 flowProcess.increment( Spill.Num_Tuples_Spilled, spillSize ); 090 } 091 092 @Override 093 public void notifyWriteSpillEnd( SpillableTupleList spillableTupleList, long duration ) 094 { 095 flowProcess.increment( Spill.Duration_Millis_Written, duration ); 096 } 097 098 @Override 099 public void notifyReadSpillBegin( Spillable spillable ) 100 { 101 flowProcess.increment( Spill.Num_Spills_Read, 1 ); 102 } 103 } 104 105 /** Field groups */ 106 protected Collection<Tuple>[] collections; 107 protected final int numSelfJoins; 108 109 private Tuple[] joinedTuplesArray; 110 private final Tuple emptyTuple; 111 private TupleBuilder joinedBuilder; 112 private Tuple joinedTuple = new Tuple(); // is discarded 113 114 private final TupleCollectionFactory<Configuration> tupleCollectionFactory; 115 116 public HadoopCoGroupClosure( FlowProcess flowProcess, int numSelfJoins, Fields[] groupingFields, Fields[] valueFields ) 117 { 118 super( flowProcess, groupingFields, valueFields ); 119 this.numSelfJoins = numSelfJoins; 120 121 this.emptyTuple = Tuple.size( groupingFields[ 0 ].size() ); 122 123 FactoryLoader loader = FactoryLoader.getInstance(); 124 125 this.tupleCollectionFactory = loader.loadFactoryFrom( flowProcess, TUPLE_COLLECTION_FACTORY, HadoopTupleCollectionFactory.class ); 126 127 initLists(); 128 } 129 130 @Override 131 public int size() 132 { 133 return Math.max( joinFields.length, numSelfJoins + 1 ); 134 } 135 136 @Override 137 public Iterator<Tuple> getIterator( int pos ) 138 { 139 if( pos < 0 || pos >= collections.length ) 140 throw new IllegalArgumentException( "invalid group position: " + pos ); 141 142 return makeIterator( pos, collections[ pos ].iterator() ); 143 } 144 145 @Override 146 public Tuple getGroupTuple( Tuple keysTuple ) 147 { 148 Tuples.asModifiable( joinedTuple ); 149 150 for( int i = 0; i < collections.length; i++ ) 151 joinedTuplesArray[ i ] = collections[ i ].isEmpty() ? emptyTuple : keysTuple; 152 153 joinedTuple = joinedBuilder.makeResult( joinedTuplesArray ); 154 155 return joinedTuple; 156 } 157 158 @Override 159 public boolean isEmpty( int pos ) 160 { 161 return collections[ pos ].isEmpty(); 162 } 163 164 @Override 165 public void reset( Tuple grouping, Iterator<Tuple>[] values ) 166 { 167 super.reset( grouping, values ); 168 169 build(); 170 } 171 172 protected void build() 173 { 174 clearGroups(); 175 176 if( collections[ 0 ] instanceof FalseCollection ) // force reset on FalseCollection 177 ( (FalseCollection) collections[ 0 ] ).reset( null ); 178 179 while( values[ 0 ].hasNext() ) 180 { 181 IndexTuple current = (IndexTuple) values[ 0 ].next(); 182 int pos = current.getIndex(); 183 184 // if this is the first (lhs) co-group, just use values iterator 185 // we are guaranteed all the remainder tuples in the iterator are from pos == 0 186 if( numSelfJoins == 0 && pos == 0 ) 187 { 188 ( (FalseCollection) collections[ 0 ] ).reset( createIterator( current, values[ 0 ] ) ); 189 break; 190 } 191 192 collections[ pos ].add( current.getTuple() ); // get the value tuple for this cogroup 193 } 194 } 195 196 protected void clearGroups() 197 { 198 for( Collection<Tuple> collection : collections ) 199 { 200 collection.clear(); 201 202 if( collection instanceof Spillable ) 203 ( (Spillable) collection ).setGrouping( grouping ); 204 } 205 } 206 207 protected void initLists() 208 { 209 collections = new Collection[ size() ]; 210 211 // handle self joins 212 if( numSelfJoins != 0 ) 213 { 214 Arrays.fill( collections, createTupleCollection( joinFields[ 0 ] ) ); 215 } 216 else 217 { 218 collections[ 0 ] = new FalseCollection(); // we iterate this only once per grouping 219 220 for( int i = 1; i < joinFields.length; i++ ) 221 collections[ i ] = createTupleCollection( joinFields[ i ] ); 222 } 223 224 joinedBuilder = makeJoinedBuilder( joinFields ); 225 joinedTuplesArray = new Tuple[ collections.length ]; 226 } 227 228 static interface TupleBuilder 229 { 230 Tuple makeResult( Tuple[] tuples ); 231 } 232 233 private TupleBuilder makeJoinedBuilder( final Fields[] joinFields ) 234 { 235 final Fields[] fields = isSelfJoin() ? new Fields[ size() ] : joinFields; 236 237 if( isSelfJoin() ) 238 Arrays.fill( fields, 0, fields.length, joinFields[ 0 ] ); 239 240 return new TupleBuilder() 241 { 242 Tuple result = TupleViews.createComposite( fields ); 243 244 @Override 245 public Tuple makeResult( Tuple[] tuples ) 246 { 247 return TupleViews.reset( result, tuples ); 248 } 249 }; 250 } 251 252 protected Collection<Tuple> createTupleCollection( Fields joinField ) 253 { 254 Collection<Tuple> collection = tupleCollectionFactory.create( flowProcess ); 255 256 if( collection instanceof Spillable ) 257 ( (Spillable) collection ).setSpillListener( createListener( joinField ) ); 258 259 return collection; 260 } 261 262 private Spillable.SpillListener createListener( final Fields joinField ) 263 { 264 return new SpillListener( flowProcess, joinField ); 265 } 266 267 public Iterator<Tuple> createIterator( final IndexTuple current, final Iterator<IndexTuple> values ) 268 { 269 return new Iterator<Tuple>() 270 { 271 IndexTuple value = current; 272 273 @Override 274 public boolean hasNext() 275 { 276 return value != null; 277 } 278 279 @Override 280 public Tuple next() 281 { 282 if( value == null && !values.hasNext() ) 283 throw new NoSuchElementException(); 284 285 Tuple result = value.getTuple(); 286 287 if( values.hasNext() ) 288 value = values.next(); 289 else 290 value = null; 291 292 return result; 293 } 294 295 @Override 296 public void remove() 297 { 298 // unsupported 299 } 300 }; 301 } 302 }