001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.operation.function; 022 023 import java.beans.ConstructorProperties; 024 import java.util.Arrays; 025 026 import cascading.flow.FlowProcess; 027 import cascading.operation.BaseOperation; 028 import cascading.operation.Function; 029 import cascading.operation.FunctionCall; 030 import cascading.tuple.Fields; 031 import cascading.tuple.Tuple; 032 import cascading.tuple.TupleEntry; 033 import cascading.tuple.TupleEntryCollector; 034 import cascading.util.Util; 035 import org.slf4j.Logger; 036 import org.slf4j.LoggerFactory; 037 038 /** 039 * Class UnGroup is a {@link Function} that will 'un-group' data from a given dataset. 040 * <p/> 041 * That is, for the given field positions, this function will emit a new Tuple for every value. For example: 042 * <p/> 043 * <pre> 044 * A, x, y 045 * B, x, z 046 * C, y, z 047 * </pre> 048 * <p/> 049 * to: 050 * <p/> 051 * <pre> 052 * A, x 053 * A, y 054 * B, x 055 * B, z 056 * C, y 057 * C, z 058 * </pre> 059 */ 060 public class UnGroup extends BaseOperation implements Function 061 { 062 /** Field LOG */ 063 private static final Logger LOG = LoggerFactory.getLogger( UnGroup.class ); 064 065 /** Field groupFieldSelector */ 066 private Fields groupFieldSelector; 067 /** Field resultFieldSelectors */ 068 private Fields[] resultFieldSelectors; 069 /** Field size */ 070 private int size = 1; 071 072 /** 073 * Constructor UnGroup creates a new UnGroup instance. 074 * 075 * @param groupSelector of type Fields 076 * @param valueSelectors of type Fields[] 077 */ 078 @ConstructorProperties({"groupSelector", "valueSelectors"}) 079 public UnGroup( Fields groupSelector, Fields[] valueSelectors ) 080 { 081 if( valueSelectors == null || valueSelectors.length == 1 ) 082 throw new IllegalArgumentException( "value selectors may not be empty" ); 083 084 int size = valueSelectors[ 0 ].size(); 085 086 for( int i = 1; i < valueSelectors.length; i++ ) 087 { 088 if( valueSelectors[ 0 ].size() != valueSelectors[ i ].size() ) 089 throw new IllegalArgumentException( "all value selectors must be the same size" ); 090 091 size = valueSelectors[ i ].size(); 092 } 093 094 this.numArgs = groupSelector.size() + size * valueSelectors.length; 095 this.groupFieldSelector = groupSelector; 096 this.resultFieldSelectors = Arrays.copyOf( valueSelectors, valueSelectors.length ); 097 this.fieldDeclaration = Fields.size( groupSelector.size() + size ); 098 } 099 100 /** 101 * Constructor UnGroup creates a new UnGroup instance. 102 * 103 * @param fieldDeclaration of type Fields 104 * @param groupSelector of type Fields 105 * @param valueSelectors of type Fields[] 106 */ 107 @ConstructorProperties({"fieldDeclaration", "groupSelector", "valueSelectors"}) 108 public UnGroup( Fields fieldDeclaration, Fields groupSelector, Fields[] valueSelectors ) 109 { 110 super( fieldDeclaration ); 111 112 if( valueSelectors == null || valueSelectors.length == 1 ) 113 throw new IllegalArgumentException( "value selectors may not be empty" ); 114 115 numArgs = groupSelector.size(); 116 int selectorSize = -1; 117 118 for( Fields resultFieldSelector : valueSelectors ) 119 { 120 numArgs += resultFieldSelector.size(); 121 int fieldSize = groupSelector.size() + resultFieldSelector.size(); 122 123 if( selectorSize != -1 && selectorSize != resultFieldSelector.size() ) 124 throw new IllegalArgumentException( "all value selectors must be the same size, and this size plus group selector size must equal the declared field size" ); 125 126 selectorSize = resultFieldSelector.size(); 127 128 if( fieldDeclaration.size() != fieldSize ) 129 throw new IllegalArgumentException( "all value selectors must be the same size, and this size plus group selector size must equal the declared field size" ); 130 } 131 132 this.groupFieldSelector = groupSelector; 133 this.resultFieldSelectors = Arrays.copyOf( valueSelectors, valueSelectors.length ); 134 } 135 136 /** 137 * Constructor UnGroup creates a new UnGroup instance. Where the numValues argument specifies the number 138 * of values to include. 139 * 140 * @param fieldDeclaration of type Fields 141 * @param groupSelector of type Fields 142 * @param numValues of type int 143 */ 144 @ConstructorProperties({"fieldDeclaration", "groupSelector", "numValues"}) 145 public UnGroup( Fields fieldDeclaration, Fields groupSelector, int numValues ) 146 { 147 super( fieldDeclaration ); 148 this.groupFieldSelector = groupSelector; 149 this.size = numValues; 150 } 151 152 public Fields getGroupFieldSelector() 153 { 154 return groupFieldSelector; 155 } 156 157 public Fields[] getResultFieldSelectors() 158 { 159 return Util.copy( resultFieldSelectors ); 160 } 161 162 public int getSize() 163 { 164 return size; 165 } 166 167 @Override 168 public void operate( FlowProcess flowProcess, FunctionCall functionCall ) 169 { 170 if( resultFieldSelectors != null ) 171 useResultSelectors( functionCall.getArguments(), functionCall.getOutputCollector() ); 172 else 173 useSize( functionCall.getArguments(), functionCall.getOutputCollector() ); 174 } 175 176 private void useSize( TupleEntry input, TupleEntryCollector outputCollector ) 177 { 178 LOG.debug( "using size: {}", size ); 179 180 Tuple tuple = new Tuple( input.getTuple() ); // make clone 181 Tuple group = tuple.remove( input.getFields(), groupFieldSelector ); 182 183 for( int i = 0; i < tuple.size(); i = i + size ) 184 { 185 Tuple result = new Tuple( group ); 186 result.addAll( tuple.get( Fields.offsetSelector( size, i ).getPos() ) ); 187 188 outputCollector.add( result ); 189 } 190 } 191 192 private void useResultSelectors( TupleEntry input, TupleEntryCollector outputCollector ) 193 { 194 LOG.debug( "using result selectors: {}", resultFieldSelectors.length ); 195 196 for( Fields resultFieldSelector : resultFieldSelectors ) 197 { 198 Tuple group = input.selectTupleCopy( groupFieldSelector ); // need a mutable copy 199 200 input.selectInto( resultFieldSelector, group ); 201 202 outputCollector.add( group ); 203 } 204 } 205 206 @Override 207 public boolean equals( Object object ) 208 { 209 if( this == object ) 210 return true; 211 if( !( object instanceof UnGroup ) ) 212 return false; 213 if( !super.equals( object ) ) 214 return false; 215 216 UnGroup unGroup = (UnGroup) object; 217 218 if( size != unGroup.size ) 219 return false; 220 if( groupFieldSelector != null ? !groupFieldSelector.equals( unGroup.groupFieldSelector ) : unGroup.groupFieldSelector != null ) 221 return false; 222 if( !Arrays.equals( resultFieldSelectors, unGroup.resultFieldSelectors ) ) 223 return false; 224 225 return true; 226 } 227 228 @Override 229 public int hashCode() 230 { 231 int result = super.hashCode(); 232 result = 31 * result + ( groupFieldSelector != null ? groupFieldSelector.hashCode() : 0 ); 233 result = 31 * result + ( resultFieldSelectors != null ? Arrays.hashCode( resultFieldSelectors ) : 0 ); 234 result = 31 * result + size; 235 return result; 236 } 237 }