001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation.filter;
022
023import java.beans.ConstructorProperties;
024import java.util.Arrays;
025import java.util.HashSet;
026import java.util.Set;
027
028import cascading.flow.FlowProcess;
029import cascading.operation.BaseOperation;
030import cascading.operation.ConcreteCall;
031import cascading.operation.Filter;
032import cascading.operation.OperationCall;
033import cascading.tuple.Fields;
034import cascading.tuple.TupleEntry;
035import cascading.util.Util;
036
037/**
038 * Class Logic is the base class for logical {@link Filter} operations.
039 *
040 * @see And
041 * @see Or
042 * @see Xor
043 */
044public abstract class Logic extends BaseOperation<Logic.Context> implements Filter<Logic.Context>
045  {
046  /** Field fields */
047  protected final Fields[] argumentSelectors;
048  /** Field filters */
049  protected final Filter[] filters;
050
051  private static Filter[] filters( Filter... filters )
052    {
053    return filters;
054    }
055
056  public class Context
057    {
058    TupleEntry[] argumentEntries;
059    ConcreteCall[] calls;
060    }
061
062  @ConstructorProperties({"filters"})
063  protected Logic( Filter... filters )
064    {
065    this.filters = filters;
066
067    if( filters == null )
068      throw new IllegalArgumentException( "given filters array must not be null" );
069
070    this.argumentSelectors = new Fields[ filters.length ];
071    Arrays.fill( this.argumentSelectors, Fields.ALL );
072
073    verify();
074
075    this.numArgs = getFieldsSize();
076    }
077
078  @ConstructorProperties({"lhsArgumentsSelector", "lhsFilter", "rhsArgumentSelector", "rhsFilter"})
079  protected Logic( Fields lhsArgumentSelector, Filter lhsFilter, Fields rhsArgumentSelector, Filter rhsFilter )
080    {
081    this( Fields.fields( lhsArgumentSelector, rhsArgumentSelector ), filters( lhsFilter, rhsFilter ) );
082    }
083
084  @ConstructorProperties({"argumentSelectors", "filters"})
085  protected Logic( Fields[] argumentSelectors, Filter[] filters )
086    {
087    this.argumentSelectors = argumentSelectors;
088    this.filters = filters;
089
090    verify();
091
092    this.numArgs = getFieldsSize();
093    }
094
095  public Fields[] getArgumentSelectors()
096    {
097    return Util.copy( argumentSelectors );
098    }
099
100  public Filter[] getFilters()
101    {
102    return Util.copy( filters );
103    }
104
105  protected void verify()
106    {
107    if( argumentSelectors == null )
108      throw new IllegalArgumentException( "given argumentSelectors array must not be null" );
109
110    if( filters == null )
111      throw new IllegalArgumentException( "given filters array must not be null" );
112
113    for( Fields field : argumentSelectors )
114      {
115      if( field == null )
116        throw new IllegalArgumentException( "given argumentSelectors must not be null" );
117
118      if( !field.isAll() && !field.isDefined() )
119        throw new IllegalArgumentException( "given argumentSelectors must be ALL or 'defined' selectors, got: " + field.print() );
120      }
121
122    for( Filter filter : filters )
123      {
124      if( filter == null )
125        throw new IllegalArgumentException( "given filters must not be null" );
126      }
127    }
128
129  @Override
130  public void prepare( FlowProcess flowProcess, OperationCall operationCall )
131    {
132    Context context = new Context();
133
134    context.argumentEntries = getArgumentEntries();
135    context.calls = new ConcreteCall[ filters.length ];
136
137    for( int i = 0; i < filters.length; i++ )
138      {
139      Filter filter = filters[ i ];
140
141      context.calls[ i ] = new ConcreteCall( (ConcreteCall) operationCall );
142      context.calls[ i ].setArguments( context.argumentEntries[ i ] );
143      context.calls[ i ].setArgumentFields( context.argumentEntries[ i ].getFields() );
144
145      filter.prepare( flowProcess, context.calls[ i ] );
146      }
147
148    operationCall.setContext( context );
149    }
150
151  @Override
152  public void cleanup( FlowProcess flowProcess, OperationCall operationCall )
153    {
154    Context context = (Context) operationCall.getContext();
155    ConcreteCall[] calls = context.calls;
156
157    for( int i = 0; i < filters.length; i++ )
158      {
159      Filter filter = filters[ i ];
160
161      filter.cleanup( flowProcess, calls[ i ] );
162      }
163
164    operationCall.setContext( null );
165    }
166
167  protected int getFieldsSize()
168    {
169    Set<Comparable> pos = new HashSet<Comparable>();
170
171    for( Fields field : argumentSelectors )
172      {
173      if( field.isSubstitution() ) // will be tested to be ALL in verify
174        return ANY;
175
176      for( int i = 0; i < field.size(); i++ )
177        pos.add( field.get( i ) );
178      }
179
180    return pos.size();
181    }
182
183  private TupleEntry[] getArgumentEntries()
184    {
185    TupleEntry[] argumentEntries = new TupleEntry[ argumentSelectors.length ];
186
187    for( int i = 0; i < argumentSelectors.length; i++ )
188      {
189      Fields argumentSelector = argumentSelectors[ i ];
190      argumentEntries[ i ] = new TupleEntry( Fields.asDeclaration( argumentSelector ), true );
191      }
192
193    return argumentEntries;
194    }
195
196  @Override
197  public boolean equals( Object object )
198    {
199    if( this == object )
200      return true;
201    if( !( object instanceof Logic ) )
202      return false;
203    if( !super.equals( object ) )
204      return false;
205
206    Logic logic = (Logic) object;
207
208    if( !Arrays.equals( argumentSelectors, logic.argumentSelectors ) )
209      return false;
210    if( !Arrays.equals( filters, logic.filters ) )
211      return false;
212
213    return true;
214    }
215
216  @Override
217  public int hashCode()
218    {
219    int result = super.hashCode();
220    result = 31 * result + ( argumentSelectors != null ? Arrays.hashCode( argumentSelectors ) : 0 );
221    result = 31 * result + ( filters != null ? Arrays.hashCode( filters ) : 0 );
222    return result;
223    }
224  }