001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.util.Iterator; 024 025 import cascading.flow.FlowProcess; 026 import cascading.pipe.joiner.JoinerClosure; 027 import cascading.tuple.Fields; 028 import cascading.tuple.Tuple; 029 import cascading.tuple.util.TupleBuilder; 030 import cascading.tuple.util.TupleViews; 031 032 /** Class GroupClosure is used internally to represent groups of tuples during grouping. */ 033 public class HadoopGroupByClosure extends JoinerClosure 034 { 035 protected Tuple grouping; 036 protected Iterator values; 037 038 public HadoopGroupByClosure( FlowProcess flowProcess, Fields[] groupingFields, Fields[] valueFields ) 039 { 040 super( flowProcess, groupingFields, valueFields ); 041 } 042 043 public Tuple getGrouping() 044 { 045 return grouping; 046 } 047 048 public int size() 049 { 050 return 1; 051 } 052 053 @Override 054 public Iterator getIterator( int pos ) 055 { 056 if( pos != 0 ) 057 throw new IllegalArgumentException( "invalid group position: " + pos ); 058 059 return makeIterator( 0, values ); 060 } 061 062 @Override 063 public boolean isEmpty( int pos ) 064 { 065 return values != null; 066 } 067 068 protected Iterator<Tuple> makeIterator( final int pos, final Iterator values ) 069 { 070 return new Iterator<Tuple>() 071 { 072 final int cleanPos = valueFields.length == 1 ? 0 : pos; // support repeated pipes 073 TupleBuilder[] valueBuilder = new TupleBuilder[ valueFields.length ]; 074 075 { 076 for( int i = 0; i < valueFields.length; i++ ) 077 valueBuilder[ i ] = makeBuilder( valueFields[ i ], joinFields[ i ] ); 078 } 079 080 private TupleBuilder makeBuilder( final Fields valueField, final Fields joinField ) 081 { 082 if( valueField.isUnknown() || joinField.isNone() ) 083 return new TupleBuilder() 084 { 085 @Override 086 public Tuple makeResult( Tuple valueTuple, Tuple groupTuple ) 087 { 088 valueTuple.set( valueFields[ cleanPos ], joinFields[ cleanPos ], groupTuple ); 089 090 return valueTuple; 091 } 092 }; 093 094 return new TupleBuilder() 095 { 096 Tuple result = TupleViews.createOverride( valueField, joinField ); 097 098 @Override 099 public Tuple makeResult( Tuple valueTuple, Tuple groupTuple ) 100 { 101 return TupleViews.reset( result, valueTuple, groupTuple ); 102 } 103 }; 104 } 105 106 public boolean hasNext() 107 { 108 return values.hasNext(); 109 } 110 111 public Tuple next() 112 { 113 Tuple tuple = (Tuple) values.next(); 114 115 return valueBuilder[ cleanPos ].makeResult( tuple, grouping ); 116 } 117 118 public void remove() 119 { 120 throw new UnsupportedOperationException( "remove not supported" ); 121 } 122 }; 123 } 124 125 public void reset( Tuple grouping, Iterator values ) 126 { 127 this.grouping = grouping; 128 this.values = values; 129 } 130 131 @Override 132 public Tuple getGroupTuple( Tuple keysTuple ) 133 { 134 return keysTuple; 135 } 136 }