001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.function;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Arrays;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.operation.BaseOperation;
028    import cascading.operation.Function;
029    import cascading.operation.FunctionCall;
030    import cascading.tuple.Fields;
031    import cascading.tuple.Tuple;
032    import cascading.tuple.TupleEntry;
033    import cascading.tuple.TupleEntryCollector;
034    import cascading.util.Util;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * Class UnGroup is a {@link Function} that will 'un-group' data from a given dataset.
040     * <p/>
041     * That is, for the given field positions, this function will emit a new Tuple for every value. For example:
042     * <p/>
043     * <pre>
044     * A, x, y
045     * B, x, z
046     * C, y, z
047     * </pre>
048     * <p/>
049     * to:
050     * <p/>
051     * <pre>
052     * A, x
053     * A, y
054     * B, x
055     * B, z
056     * C, y
057     * C, z
058     * </pre>
059     */
060    public class UnGroup extends BaseOperation implements Function
061      {
062      /** Field LOG */
063      private static final Logger LOG = LoggerFactory.getLogger( UnGroup.class );
064    
065      /** Field groupFieldSelector */
066      private Fields groupFieldSelector;
067      /** Field resultFieldSelectors */
068      private Fields[] resultFieldSelectors;
069      /** Field size */
070      private int size = 1;
071    
072      /**
073       * Constructor UnGroup creates a new UnGroup instance.
074       *
075       * @param groupSelector  of type Fields
076       * @param valueSelectors of type Fields[]
077       */
078      @ConstructorProperties({"groupSelector", "valueSelectors"})
079      public UnGroup( Fields groupSelector, Fields[] valueSelectors )
080        {
081        if( valueSelectors == null || valueSelectors.length == 1 )
082          throw new IllegalArgumentException( "value selectors may not be empty" );
083    
084        int size = valueSelectors[ 0 ].size();
085    
086        for( int i = 1; i < valueSelectors.length; i++ )
087          {
088          if( valueSelectors[ 0 ].size() != valueSelectors[ i ].size() )
089            throw new IllegalArgumentException( "all value selectors must be the same size" );
090    
091          size = valueSelectors[ i ].size();
092          }
093    
094        this.numArgs = groupSelector.size() + size * valueSelectors.length;
095        this.groupFieldSelector = groupSelector;
096        this.resultFieldSelectors = Arrays.copyOf( valueSelectors, valueSelectors.length );
097        this.fieldDeclaration = Fields.size( groupSelector.size() + size );
098        }
099    
100      /**
101       * Constructor UnGroup creates a new UnGroup instance.
102       *
103       * @param fieldDeclaration of type Fields
104       * @param groupSelector    of type Fields
105       * @param valueSelectors   of type Fields[]
106       */
107      @ConstructorProperties({"fieldDeclaration", "groupSelector", "valueSelectors"})
108      public UnGroup( Fields fieldDeclaration, Fields groupSelector, Fields[] valueSelectors )
109        {
110        super( fieldDeclaration );
111    
112        if( valueSelectors == null || valueSelectors.length == 1 )
113          throw new IllegalArgumentException( "value selectors may not be empty" );
114    
115        numArgs = groupSelector.size();
116        int selectorSize = -1;
117    
118        for( Fields resultFieldSelector : valueSelectors )
119          {
120          numArgs += resultFieldSelector.size();
121          int fieldSize = groupSelector.size() + resultFieldSelector.size();
122    
123          if( selectorSize != -1 && selectorSize != resultFieldSelector.size() )
124            throw new IllegalArgumentException( "all value selectors must be the same size, and this size plus group selector size must equal the declared field size" );
125    
126          selectorSize = resultFieldSelector.size();
127    
128          if( fieldDeclaration.size() != fieldSize )
129            throw new IllegalArgumentException( "all value selectors must be the same size, and this size plus group selector size must equal the declared field size" );
130          }
131    
132        this.groupFieldSelector = groupSelector;
133        this.resultFieldSelectors = Arrays.copyOf( valueSelectors, valueSelectors.length );
134        }
135    
136      /**
137       * Constructor UnGroup creates a new UnGroup instance. Where the numValues argument specifies the number
138       * of values to include.
139       *
140       * @param fieldDeclaration of type Fields
141       * @param groupSelector    of type Fields
142       * @param numValues        of type int
143       */
144      @ConstructorProperties({"fieldDeclaration", "groupSelector", "numValues"})
145      public UnGroup( Fields fieldDeclaration, Fields groupSelector, int numValues )
146        {
147        super( fieldDeclaration );
148        this.groupFieldSelector = groupSelector;
149        this.size = numValues;
150        }
151    
152      public Fields getGroupFieldSelector()
153        {
154        return groupFieldSelector;
155        }
156    
157      public Fields[] getResultFieldSelectors()
158        {
159        return Util.copy( resultFieldSelectors );
160        }
161    
162      public int getSize()
163        {
164        return size;
165        }
166    
167      @Override
168      public void operate( FlowProcess flowProcess, FunctionCall functionCall )
169        {
170        if( resultFieldSelectors != null )
171          useResultSelectors( functionCall.getArguments(), functionCall.getOutputCollector() );
172        else
173          useSize( functionCall.getArguments(), functionCall.getOutputCollector() );
174        }
175    
176      private void useSize( TupleEntry input, TupleEntryCollector outputCollector )
177        {
178        LOG.debug( "using size: {}", size );
179    
180        Tuple tuple = new Tuple( input.getTuple() ); // make clone
181        Tuple group = tuple.remove( input.getFields(), groupFieldSelector );
182    
183        for( int i = 0; i < tuple.size(); i = i + size )
184          {
185          Tuple result = new Tuple( group );
186          result.addAll( tuple.get( Fields.offsetSelector( size, i ).getPos() ) );
187    
188          outputCollector.add( result );
189          }
190        }
191    
192      private void useResultSelectors( TupleEntry input, TupleEntryCollector outputCollector )
193        {
194        LOG.debug( "using result selectors: {}", resultFieldSelectors.length );
195    
196        for( Fields resultFieldSelector : resultFieldSelectors )
197          {
198          Tuple group = input.selectTupleCopy( groupFieldSelector ); // need a mutable copy
199    
200          input.selectInto( resultFieldSelector, group );
201    
202          outputCollector.add( group );
203          }
204        }
205    
206      @Override
207      public boolean equals( Object object )
208        {
209        if( this == object )
210          return true;
211        if( !( object instanceof UnGroup ) )
212          return false;
213        if( !super.equals( object ) )
214          return false;
215    
216        UnGroup unGroup = (UnGroup) object;
217    
218        if( size != unGroup.size )
219          return false;
220        if( groupFieldSelector != null ? !groupFieldSelector.equals( unGroup.groupFieldSelector ) : unGroup.groupFieldSelector != null )
221          return false;
222        if( !Arrays.equals( resultFieldSelectors, unGroup.resultFieldSelectors ) )
223          return false;
224    
225        return true;
226        }
227    
228      @Override
229      public int hashCode()
230        {
231        int result = super.hashCode();
232        result = 31 * result + ( groupFieldSelector != null ? groupFieldSelector.hashCode() : 0 );
233        result = 31 * result + ( resultFieldSelectors != null ? Arrays.hashCode( resultFieldSelectors ) : 0 );
234        result = 31 * result + size;
235        return result;
236        }
237      }