001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.stream; 022 023 import java.util.Arrays; 024 import java.util.Collection; 025 import java.util.Iterator; 026 027 import cascading.flow.FlowProcess; 028 import cascading.pipe.joiner.JoinerClosure; 029 import cascading.tuple.Fields; 030 import cascading.tuple.Tuple; 031 import cascading.tuple.Tuples; 032 import cascading.tuple.util.TupleViews; 033 034 /** 035 * 036 */ 037 public class MemoryCoGroupClosure extends JoinerClosure 038 { 039 private Collection<Tuple>[] collections; 040 private final int numSelfJoins; 041 private final Tuple emptyTuple; 042 private Tuple joinedTuple = new Tuple(); // is discarded 043 044 private Tuple[] joinedTuplesArray; 045 private TupleBuilder joinedBuilder; 046 047 public MemoryCoGroupClosure( FlowProcess flowProcess, int numSelfJoins, Fields[] groupingFields, Fields[] valueFields ) 048 { 049 super( flowProcess, groupingFields, valueFields ); 050 this.numSelfJoins = numSelfJoins; 051 this.emptyTuple = Tuple.size( groupingFields[ 0 ].size() ); 052 053 this.joinedTuplesArray = new Tuple[ size() ]; 054 this.joinedBuilder = makeJoinedBuilder( groupingFields ); 055 } 056 057 @Override 058 public int size() 059 { 060 return Math.max( joinFields.length, numSelfJoins + 1 ); 061 } 062 063 public void reset( Collection<Tuple>[] collections ) 064 { 065 this.collections = collections; 066 } 067 068 @Override 069 public Iterator<Tuple> getIterator( int pos ) 070 { 071 if( numSelfJoins != 0 ) 072 return collections[ 0 ].iterator(); 073 else 074 return collections[ pos ].iterator(); 075 } 076 077 @Override 078 public boolean isEmpty( int pos ) 079 { 080 if( numSelfJoins != 0 ) 081 return collections[ 0 ].isEmpty(); 082 else 083 return collections[ pos ].isEmpty(); 084 } 085 086 @Override 087 public Tuple getGroupTuple( Tuple keysTuple ) 088 { 089 Tuples.asModifiable( joinedTuple ); 090 091 for( int i = 0; i < collections.length; i++ ) 092 joinedTuplesArray[ i ] = collections[ i ].isEmpty() ? emptyTuple : keysTuple; 093 094 joinedTuple = joinedBuilder.makeResult( joinedTuplesArray ); 095 096 return joinedTuple; 097 } 098 099 static interface TupleBuilder 100 { 101 Tuple makeResult( Tuple[] tuples ); 102 } 103 104 private TupleBuilder makeJoinedBuilder( final Fields[] joinFields ) 105 { 106 final Fields[] fields = isSelfJoin() ? new Fields[ size() ] : joinFields; 107 108 if( isSelfJoin() ) 109 Arrays.fill( fields, 0, fields.length, joinFields[ 0 ] ); 110 111 return new TupleBuilder() 112 { 113 Tuple result = TupleViews.createComposite( fields ); 114 115 @Override 116 public Tuple makeResult( Tuple[] tuples ) 117 { 118 return TupleViews.reset( result, tuples ); 119 } 120 }; 121 } 122 }