001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.util.Iterator; 024 025 import cascading.flow.FlowProcess; 026 import cascading.pipe.joiner.JoinerClosure; 027 import cascading.tuple.Fields; 028 import cascading.tuple.Tuple; 029 import cascading.tuple.util.TupleBuilder; 030 import cascading.tuple.util.TupleViews; 031 032 /** Class GroupClosure is used internally to represent groups of tuples during grouping. */ 033 public class HadoopGroupByClosure extends JoinerClosure 034 { 035 protected Tuple grouping; 036 protected Iterator values; 037 038 public HadoopGroupByClosure( FlowProcess flowProcess, Fields[] groupingFields, Fields[] valueFields ) 039 { 040 super( flowProcess, groupingFields, valueFields ); 041 } 042 043 public Tuple getGrouping() 044 { 045 return grouping; 046 } 047 048 public int size() 049 { 050 return 1; 051 } 052 053 @Override 054 public Iterator getIterator( int pos ) 055 { 056 if( pos != 0 ) 057 throw new IllegalArgumentException( "invalid group position: " + pos ); 058 059 return makeIterator( 0, values ); 060 } 061 062 @Override 063 public boolean isEmpty( int pos ) 064 { 065 return values != null; 066 } 067 068 protected Iterator<Tuple> makeIterator( final int pos, final Iterator values ) 069 { 070 return new Iterator<Tuple>() 071 { 072 final int cleanPos = valueFields.length == 1 ? 0 : pos; // support repeated pipes 073 TupleBuilder[] valueBuilder = new TupleBuilder[ valueFields.length ]; 074 075 { 076 for( int i = 0; i < valueFields.length; i++ ) 077 valueBuilder[ i ] = makeBuilder( valueFields[ i ], joinFields[ i ] ); 078 } 079 080 private TupleBuilder makeBuilder( final Fields valueField, final Fields joinField ) 081 { 082 if( valueField.isUnknown() && joinField.hasRelativePos() ) 083 return new TupleBuilder() 084 { 085 @Override 086 public Tuple makeResult( Tuple valueTuple, Tuple groupTuple ) 087 { 088 Fields fields = joinFields[ cleanPos ]; 089 090 fields = Fields.size( valueTuple.size() ).select( fields ); 091 092 valueTuple.set( valueFields[ cleanPos ], fields, groupTuple ); 093 094 return valueTuple; 095 } 096 }; 097 098 if( valueField.isUnknown() || joinField.isNone() ) 099 return new TupleBuilder() 100 { 101 @Override 102 public Tuple makeResult( Tuple valueTuple, Tuple groupTuple ) 103 { 104 valueTuple.set( valueFields[ cleanPos ], joinFields[ cleanPos ], groupTuple ); 105 106 return valueTuple; 107 } 108 }; 109 110 return new TupleBuilder() 111 { 112 Tuple result = TupleViews.createOverride( valueField, joinField ); 113 114 @Override 115 public Tuple makeResult( Tuple valueTuple, Tuple groupTuple ) 116 { 117 return TupleViews.reset( result, valueTuple, groupTuple ); 118 } 119 }; 120 } 121 122 public boolean hasNext() 123 { 124 return values.hasNext(); 125 } 126 127 public Tuple next() 128 { 129 Tuple tuple = (Tuple) values.next(); 130 131 return valueBuilder[ cleanPos ].makeResult( tuple, grouping ); 132 } 133 134 public void remove() 135 { 136 throw new UnsupportedOperationException( "remove not supported" ); 137 } 138 }; 139 } 140 141 public void reset( Tuple grouping, Iterator values ) 142 { 143 this.grouping = grouping; 144 this.values = values; 145 } 146 147 @Override 148 public Tuple getGroupTuple( Tuple keysTuple ) 149 { 150 return keysTuple; 151 } 152 }