001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.util.Arrays;
024    import java.util.Collection;
025    import java.util.Iterator;
026    
027    import cascading.flow.FlowProcess;
028    import cascading.pipe.joiner.JoinerClosure;
029    import cascading.tuple.Fields;
030    import cascading.tuple.Tuple;
031    import cascading.tuple.Tuples;
032    import cascading.tuple.util.TupleViews;
033    
034    /**
035     *
036     */
037    public class MemoryCoGroupClosure extends JoinerClosure
038      {
039      private Collection<Tuple>[] collections;
040      private final int numSelfJoins;
041      private final Tuple emptyTuple;
042      private Tuple joinedTuple = new Tuple(); // is discarded
043    
044      private Tuple[] joinedTuplesArray;
045      private TupleBuilder joinedBuilder;
046    
047      public MemoryCoGroupClosure( FlowProcess flowProcess, int numSelfJoins, Fields[] groupingFields, Fields[] valueFields )
048        {
049        super( flowProcess, groupingFields, valueFields );
050        this.numSelfJoins = numSelfJoins;
051        this.emptyTuple = Tuple.size( groupingFields[ 0 ].size() );
052    
053        this.joinedTuplesArray = new Tuple[ size() ];
054        this.joinedBuilder = makeJoinedBuilder( groupingFields );
055        }
056    
057      @Override
058      public int size()
059        {
060        return Math.max( joinFields.length, numSelfJoins + 1 );
061        }
062    
063      public void reset( Collection<Tuple>[] collections )
064        {
065        this.collections = collections;
066        }
067    
068      @Override
069      public Iterator<Tuple> getIterator( int pos )
070        {
071        if( numSelfJoins != 0 )
072          return collections[ 0 ].iterator();
073        else
074          return collections[ pos ].iterator();
075        }
076    
077      @Override
078      public boolean isEmpty( int pos )
079        {
080        if( numSelfJoins != 0 )
081          return collections[ 0 ].isEmpty();
082        else
083          return collections[ pos ].isEmpty();
084        }
085    
086      @Override
087      public Tuple getGroupTuple( Tuple keysTuple )
088        {
089        Tuples.asModifiable( joinedTuple );
090    
091        for( int i = 0; i < collections.length; i++ )
092          joinedTuplesArray[ i ] = collections[ i ].isEmpty() ? emptyTuple : keysTuple;
093    
094        joinedTuple = joinedBuilder.makeResult( joinedTuplesArray );
095    
096        return joinedTuple;
097        }
098    
099      static interface TupleBuilder
100        {
101        Tuple makeResult( Tuple[] tuples );
102        }
103    
104      private TupleBuilder makeJoinedBuilder( final Fields[] joinFields )
105        {
106        final Fields[] fields = isSelfJoin() ? new Fields[ size() ] : joinFields;
107    
108        if( isSelfJoin() )
109          Arrays.fill( fields, 0, fields.length, joinFields[ 0 ] );
110    
111        return new TupleBuilder()
112        {
113        Tuple result = TupleViews.createComposite( fields );
114    
115        @Override
116        public Tuple makeResult( Tuple[] tuples )
117          {
118          return TupleViews.reset( result, tuples );
119          }
120        };
121        }
122      }