001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.operation.function; 022 023 import java.beans.ConstructorProperties; 024 import java.util.Arrays; 025 026 import cascading.flow.FlowProcess; 027 import cascading.management.annotation.Property; 028 import cascading.management.annotation.PropertyDescription; 029 import cascading.management.annotation.Visibility; 030 import cascading.operation.BaseOperation; 031 import cascading.operation.Function; 032 import cascading.operation.FunctionCall; 033 import cascading.tuple.Fields; 034 import cascading.tuple.Tuple; 035 import cascading.tuple.TupleEntry; 036 import cascading.tuple.TupleEntryCollector; 037 import cascading.util.Util; 038 import org.slf4j.Logger; 039 import org.slf4j.LoggerFactory; 040 041 /** 042 * Class UnGroup is a {@link Function} that will 'un-group' data from a given dataset. 043 * <p/> 044 * That is, for the given field positions, this function will emit a new Tuple for every value. For example: 045 * <p/> 046 * <pre> 047 * A, x, y 048 * B, x, z 049 * C, y, z 050 * </pre> 051 * <p/> 052 * to: 053 * <p/> 054 * <pre> 055 * A, x 056 * A, y 057 * B, x 058 * B, z 059 * C, y 060 * C, z 061 * </pre> 062 */ 063 public class UnGroup extends BaseOperation implements Function 064 { 065 /** Field LOG */ 066 private static final Logger LOG = LoggerFactory.getLogger( UnGroup.class ); 067 068 /** Field groupFieldSelector */ 069 private Fields groupFieldSelector; 070 /** Field resultFieldSelectors */ 071 private Fields[] resultFieldSelectors; 072 /** Field size */ 073 private int size = 1; 074 075 /** 076 * Constructor UnGroup creates a new UnGroup instance. 077 * 078 * @param groupSelector of type Fields 079 * @param valueSelectors of type Fields[] 080 */ 081 @ConstructorProperties({"groupSelector", "valueSelectors"}) 082 public UnGroup( Fields groupSelector, Fields[] valueSelectors ) 083 { 084 if( valueSelectors == null || valueSelectors.length == 1 ) 085 throw new IllegalArgumentException( "value selectors may not be empty" ); 086 087 int size = valueSelectors[ 0 ].size(); 088 089 for( int i = 1; i < valueSelectors.length; i++ ) 090 { 091 if( valueSelectors[ 0 ].size() != valueSelectors[ i ].size() ) 092 throw new IllegalArgumentException( "all value selectors must be the same size" ); 093 094 size = valueSelectors[ i ].size(); 095 } 096 097 this.numArgs = groupSelector.size() + size * valueSelectors.length; 098 this.groupFieldSelector = groupSelector; 099 this.resultFieldSelectors = Arrays.copyOf( valueSelectors, valueSelectors.length ); 100 this.fieldDeclaration = Fields.size( groupSelector.size() + size ); 101 } 102 103 /** 104 * Constructor UnGroup creates a new UnGroup instance. 105 * 106 * @param fieldDeclaration of type Fields 107 * @param groupSelector of type Fields 108 * @param valueSelectors of type Fields[] 109 */ 110 @ConstructorProperties({"fieldDeclaration", "groupSelector", "valueSelectors"}) 111 public UnGroup( Fields fieldDeclaration, Fields groupSelector, Fields[] valueSelectors ) 112 { 113 super( fieldDeclaration ); 114 115 if( valueSelectors == null || valueSelectors.length == 1 ) 116 throw new IllegalArgumentException( "value selectors may not be empty" ); 117 118 numArgs = groupSelector.size(); 119 int selectorSize = -1; 120 121 for( Fields resultFieldSelector : valueSelectors ) 122 { 123 numArgs += resultFieldSelector.size(); 124 int fieldSize = groupSelector.size() + resultFieldSelector.size(); 125 126 if( selectorSize != -1 && selectorSize != resultFieldSelector.size() ) 127 throw new IllegalArgumentException( "all value selectors must be the same size, and this size plus group selector size must equal the declared field size" ); 128 129 selectorSize = resultFieldSelector.size(); 130 131 if( fieldDeclaration.size() != fieldSize ) 132 throw new IllegalArgumentException( "all value selectors must be the same size, and this size plus group selector size must equal the declared field size" ); 133 } 134 135 this.groupFieldSelector = groupSelector; 136 this.resultFieldSelectors = Arrays.copyOf( valueSelectors, valueSelectors.length ); 137 } 138 139 /** 140 * Constructor UnGroup creates a new UnGroup instance. Where the numValues argument specifies the number 141 * of values to include. 142 * 143 * @param fieldDeclaration of type Fields 144 * @param groupSelector of type Fields 145 * @param numValues of type int 146 */ 147 @ConstructorProperties({"fieldDeclaration", "groupSelector", "numValues"}) 148 public UnGroup( Fields fieldDeclaration, Fields groupSelector, int numValues ) 149 { 150 super( fieldDeclaration ); 151 this.groupFieldSelector = groupSelector; 152 this.size = numValues; 153 } 154 155 @Property(name = "ungroupFieldSelector", visibility = Visibility.PRIVATE) 156 @PropertyDescription("The fields to un-group.") 157 public Fields getGroupFieldSelector() 158 { 159 return groupFieldSelector; 160 } 161 162 @Property(name = "resultFieldSelectors", visibility = Visibility.PRIVATE) 163 @PropertyDescription("The result field selectors.") 164 public Fields[] getResultFieldSelectors() 165 { 166 return Util.copy( resultFieldSelectors ); 167 } 168 169 public int getSize() 170 { 171 return size; 172 } 173 174 @Override 175 public void operate( FlowProcess flowProcess, FunctionCall functionCall ) 176 { 177 if( resultFieldSelectors != null ) 178 useResultSelectors( functionCall.getArguments(), functionCall.getOutputCollector() ); 179 else 180 useSize( functionCall.getArguments(), functionCall.getOutputCollector() ); 181 } 182 183 private void useSize( TupleEntry input, TupleEntryCollector outputCollector ) 184 { 185 LOG.debug( "using size: {}", size ); 186 187 Tuple tuple = new Tuple( input.getTuple() ); // make clone 188 Tuple group = tuple.remove( input.getFields(), groupFieldSelector ); 189 190 for( int i = 0; i < tuple.size(); i = i + size ) 191 { 192 Tuple result = new Tuple( group ); 193 result.addAll( tuple.get( Fields.offsetSelector( size, i ).getPos() ) ); 194 195 outputCollector.add( result ); 196 } 197 } 198 199 private void useResultSelectors( TupleEntry input, TupleEntryCollector outputCollector ) 200 { 201 LOG.debug( "using result selectors: {}", resultFieldSelectors.length ); 202 203 for( Fields resultFieldSelector : resultFieldSelectors ) 204 { 205 Tuple group = input.selectTupleCopy( groupFieldSelector ); // need a mutable copy 206 207 input.selectInto( resultFieldSelector, group ); 208 209 outputCollector.add( group ); 210 } 211 } 212 213 @Override 214 public boolean equals( Object object ) 215 { 216 if( this == object ) 217 return true; 218 if( !( object instanceof UnGroup ) ) 219 return false; 220 if( !super.equals( object ) ) 221 return false; 222 223 UnGroup unGroup = (UnGroup) object; 224 225 if( size != unGroup.size ) 226 return false; 227 if( groupFieldSelector != null ? !groupFieldSelector.equals( unGroup.groupFieldSelector ) : unGroup.groupFieldSelector != null ) 228 return false; 229 if( !Arrays.equals( resultFieldSelectors, unGroup.resultFieldSelectors ) ) 230 return false; 231 232 return true; 233 } 234 235 @Override 236 public int hashCode() 237 { 238 int result = super.hashCode(); 239 result = 31 * result + ( groupFieldSelector != null ? groupFieldSelector.hashCode() : 0 ); 240 result = 31 * result + ( resultFieldSelectors != null ? Arrays.hashCode( resultFieldSelectors ) : 0 ); 241 result = 31 * result + size; 242 return result; 243 } 244 }