001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez; 022 023import java.util.Collection; 024import java.util.Iterator; 025 026import cascading.flow.FlowProcess; 027import cascading.flow.hadoop.HadoopCoGroupClosure; 028import cascading.flow.hadoop.util.LazySpillableTupleCollection; 029import cascading.flow.hadoop.util.ResettableCollection; 030import cascading.tuple.Fields; 031import cascading.tuple.Tuple; 032 033/** 034 * 035 */ 036public class TezCoGroupClosure extends HadoopCoGroupClosure 037 { 038 public TezCoGroupClosure( FlowProcess flowProcess, int numSelfJoins, Fields[] groupingFields, Fields[] valueFields ) 039 { 040 super( flowProcess, numSelfJoins, groupingFields, valueFields ); 041 } 042 043 protected void build() 044 { 045 clearGroups(); 046 047 for( int pos = 0; pos < values.length; pos++ ) 048 ( (ResettableCollection) collections[ pos ] ).reset( values[ pos ] ); 049 050 // todo: prevent an initial iteration to populate the lazy collection 051 if( numSelfJoins != 0 ) // force fill of lazy collection 052 { 053 Iterator<Tuple> iterator = collections[ 0 ].iterator(); 054 055 while( iterator.hasNext() ) 056 iterator.next(); // do nothing, populates the lazy collection 057 } 058 } 059 060 @Override 061 protected Collection<Tuple> createTupleCollection( Fields joinField ) 062 { 063 return new LazySpillableTupleCollection( super.createTupleCollection( joinField ) ); 064 } 065 }