001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.filter;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Arrays;
025    import java.util.HashSet;
026    import java.util.Set;
027    
028    import cascading.flow.FlowProcess;
029    import cascading.operation.BaseOperation;
030    import cascading.operation.ConcreteCall;
031    import cascading.operation.Filter;
032    import cascading.operation.OperationCall;
033    import cascading.tuple.Fields;
034    import cascading.tuple.TupleEntry;
035    import cascading.util.Util;
036    
037    /**
038     * Class Logic is the base class for logical {@link Filter} operations.
039     *
040     * @see And
041     * @see Or
042     * @see Xor
043     */
044    public abstract class Logic extends BaseOperation<Logic.Context> implements Filter<Logic.Context>
045      {
046      /** Field fields */
047      protected final Fields[] argumentSelectors;
048      /** Field filters */
049      protected final Filter[] filters;
050    
051      private static Filter[] filters( Filter... filters )
052        {
053        return filters;
054        }
055    
056      public class Context
057        {
058        TupleEntry[] argumentEntries;
059        ConcreteCall[] calls;
060        }
061    
062      @ConstructorProperties({"filters"})
063      protected Logic( Filter... filters )
064        {
065        this.filters = filters;
066    
067        if( filters == null )
068          throw new IllegalArgumentException( "given filters array must not be null" );
069    
070        this.argumentSelectors = new Fields[ filters.length ];
071        Arrays.fill( this.argumentSelectors, Fields.ALL );
072    
073        verify();
074    
075        this.numArgs = getFieldsSize();
076        }
077    
078      @ConstructorProperties({"lhsArgumentsSelector", "lhsFilter", "rhsArgumentSelector", "rhsFilter"})
079      protected Logic( Fields lhsArgumentSelector, Filter lhsFilter, Fields rhsArgumentSelector, Filter rhsFilter )
080        {
081        this( Fields.fields( lhsArgumentSelector, rhsArgumentSelector ), filters( lhsFilter, rhsFilter ) );
082        }
083    
084      @ConstructorProperties({"argumentSelectors", "filters"})
085      protected Logic( Fields[] argumentSelectors, Filter[] filters )
086        {
087        this.argumentSelectors = argumentSelectors;
088        this.filters = filters;
089    
090        verify();
091    
092        this.numArgs = getFieldsSize();
093        }
094    
095      public Fields[] getArgumentSelectors()
096        {
097        return Util.copy( argumentSelectors );
098        }
099    
100      public Filter[] getFilters()
101        {
102        return Util.copy( filters );
103        }
104    
105      protected void verify()
106        {
107        if( argumentSelectors == null )
108          throw new IllegalArgumentException( "given argumentSelectors array must not be null" );
109    
110        if( filters == null )
111          throw new IllegalArgumentException( "given filters array must not be null" );
112    
113        for( Fields field : argumentSelectors )
114          {
115          if( field == null )
116            throw new IllegalArgumentException( "given argumentSelectors must not be null" );
117    
118          if( !field.isAll() && !field.isDefined() )
119            throw new IllegalArgumentException( "given argumentSelectors must be ALL or 'defined' selectors, got: " + field.print() );
120          }
121    
122        for( Filter filter : filters )
123          {
124          if( filter == null )
125            throw new IllegalArgumentException( "given filters must not be null" );
126          }
127        }
128    
129      @Override
130      public void prepare( FlowProcess flowProcess, OperationCall operationCall )
131        {
132        Context context = new Context();
133    
134        context.argumentEntries = getArgumentEntries();
135        context.calls = new ConcreteCall[ filters.length ];
136    
137        for( int i = 0; i < filters.length; i++ )
138          {
139          Filter filter = filters[ i ];
140    
141          context.calls[ i ] = new ConcreteCall( (ConcreteCall) operationCall );
142          context.calls[ i ].setArguments( context.argumentEntries[ i ] );
143          context.calls[ i ].setArgumentFields( context.argumentEntries[ i ].getFields() );
144    
145          filter.prepare( flowProcess, context.calls[ i ] );
146          }
147    
148        operationCall.setContext( context );
149        }
150    
151      @Override
152      public void cleanup( FlowProcess flowProcess, OperationCall operationCall )
153        {
154        Context context = (Context) operationCall.getContext();
155        ConcreteCall[] calls = context.calls;
156    
157        for( int i = 0; i < filters.length; i++ )
158          {
159          Filter filter = filters[ i ];
160    
161          filter.cleanup( flowProcess, calls[ i ] );
162          }
163    
164        operationCall.setContext( null );
165        }
166    
167      protected int getFieldsSize()
168        {
169        Set<Comparable> pos = new HashSet<Comparable>();
170    
171        for( Fields field : argumentSelectors )
172          {
173          if( field.isSubstitution() ) // will be tested to be ALL in verify
174            return ANY;
175    
176          for( int i = 0; i < field.size(); i++ )
177            pos.add( field.get( i ) );
178          }
179    
180        return pos.size();
181        }
182    
183      private TupleEntry[] getArgumentEntries()
184        {
185        TupleEntry[] argumentEntries = new TupleEntry[ argumentSelectors.length ];
186    
187        for( int i = 0; i < argumentSelectors.length; i++ )
188          {
189          Fields argumentSelector = argumentSelectors[ i ];
190          argumentEntries[ i ] = new TupleEntry( Fields.asDeclaration( argumentSelector ), true );
191          }
192    
193        return argumentEntries;
194        }
195    
196      @Override
197      public boolean equals( Object object )
198        {
199        if( this == object )
200          return true;
201        if( !( object instanceof Logic ) )
202          return false;
203        if( !super.equals( object ) )
204          return false;
205    
206        Logic logic = (Logic) object;
207    
208        if( !Arrays.equals( argumentSelectors, logic.argumentSelectors ) )
209          return false;
210        if( !Arrays.equals( filters, logic.filters ) )
211          return false;
212    
213        return true;
214        }
215    
216      @Override
217      public int hashCode()
218        {
219        int result = super.hashCode();
220        result = 31 * result + ( argumentSelectors != null ? Arrays.hashCode( argumentSelectors ) : 0 );
221        result = 31 * result + ( filters != null ? Arrays.hashCode( filters ) : 0 );
222        return result;
223        }
224      }