001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.operation.filter; 022 023 import java.beans.ConstructorProperties; 024 import java.util.Arrays; 025 import java.util.HashSet; 026 import java.util.Set; 027 028 import cascading.flow.FlowProcess; 029 import cascading.operation.BaseOperation; 030 import cascading.operation.ConcreteCall; 031 import cascading.operation.Filter; 032 import cascading.operation.OperationCall; 033 import cascading.tuple.Fields; 034 import cascading.tuple.TupleEntry; 035 import cascading.util.Util; 036 037 /** 038 * Class Logic is the base class for logical {@link Filter} operations. 039 * 040 * @see And 041 * @see Or 042 * @see Xor 043 */ 044 public abstract class Logic extends BaseOperation<Logic.Context> implements Filter<Logic.Context> 045 { 046 /** Field fields */ 047 protected final Fields[] argumentSelectors; 048 /** Field filters */ 049 protected final Filter[] filters; 050 051 private static Filter[] filters( Filter... filters ) 052 { 053 return filters; 054 } 055 056 public class Context 057 { 058 TupleEntry[] argumentEntries; 059 ConcreteCall[] calls; 060 } 061 062 @ConstructorProperties({"filters"}) 063 protected Logic( Filter... filters ) 064 { 065 this.filters = filters; 066 067 if( filters == null ) 068 throw new IllegalArgumentException( "given filters array must not be null" ); 069 070 this.argumentSelectors = new Fields[ filters.length ]; 071 Arrays.fill( this.argumentSelectors, Fields.ALL ); 072 073 verify(); 074 075 this.numArgs = getFieldsSize(); 076 } 077 078 @ConstructorProperties({"lhsArgumentsSelector", "lhsFilter", "rhsArgumentSelector", "rhsFilter"}) 079 protected Logic( Fields lhsArgumentSelector, Filter lhsFilter, Fields rhsArgumentSelector, Filter rhsFilter ) 080 { 081 this( Fields.fields( lhsArgumentSelector, rhsArgumentSelector ), filters( lhsFilter, rhsFilter ) ); 082 } 083 084 @ConstructorProperties({"argumentSelectors", "filters"}) 085 protected Logic( Fields[] argumentSelectors, Filter[] filters ) 086 { 087 this.argumentSelectors = argumentSelectors; 088 this.filters = filters; 089 090 verify(); 091 092 this.numArgs = getFieldsSize(); 093 } 094 095 public Fields[] getArgumentSelectors() 096 { 097 return Util.copy( argumentSelectors ); 098 } 099 100 public Filter[] getFilters() 101 { 102 return Util.copy( filters ); 103 } 104 105 protected void verify() 106 { 107 if( argumentSelectors == null ) 108 throw new IllegalArgumentException( "given argumentSelectors array must not be null" ); 109 110 if( filters == null ) 111 throw new IllegalArgumentException( "given filters array must not be null" ); 112 113 for( Fields field : argumentSelectors ) 114 { 115 if( field == null ) 116 throw new IllegalArgumentException( "given argumentSelectors must not be null" ); 117 118 if( !field.isAll() && !field.isDefined() ) 119 throw new IllegalArgumentException( "given argumentSelectors must be ALL or 'defined' selectors, got: " + field.print() ); 120 } 121 122 for( Filter filter : filters ) 123 { 124 if( filter == null ) 125 throw new IllegalArgumentException( "given filters must not be null" ); 126 } 127 } 128 129 @Override 130 public void prepare( FlowProcess flowProcess, OperationCall operationCall ) 131 { 132 Context context = new Context(); 133 134 context.argumentEntries = getArgumentEntries(); 135 context.calls = new ConcreteCall[ filters.length ]; 136 137 for( int i = 0; i < filters.length; i++ ) 138 { 139 Filter filter = filters[ i ]; 140 141 context.calls[ i ] = new ConcreteCall( (ConcreteCall) operationCall ); 142 context.calls[ i ].setArguments( context.argumentEntries[ i ] ); 143 context.calls[ i ].setArgumentFields( context.argumentEntries[ i ].getFields() ); 144 145 filter.prepare( flowProcess, context.calls[ i ] ); 146 } 147 148 operationCall.setContext( context ); 149 } 150 151 @Override 152 public void cleanup( FlowProcess flowProcess, OperationCall operationCall ) 153 { 154 Context context = (Context) operationCall.getContext(); 155 ConcreteCall[] calls = context.calls; 156 157 for( int i = 0; i < filters.length; i++ ) 158 { 159 Filter filter = filters[ i ]; 160 161 filter.cleanup( flowProcess, calls[ i ] ); 162 } 163 164 operationCall.setContext( null ); 165 } 166 167 protected int getFieldsSize() 168 { 169 Set<Comparable> pos = new HashSet<Comparable>(); 170 171 for( Fields field : argumentSelectors ) 172 { 173 if( field.isSubstitution() ) // will be tested to be ALL in verify 174 return ANY; 175 176 for( int i = 0; i < field.size(); i++ ) 177 pos.add( field.get( i ) ); 178 } 179 180 return pos.size(); 181 } 182 183 private TupleEntry[] getArgumentEntries() 184 { 185 TupleEntry[] argumentEntries = new TupleEntry[ argumentSelectors.length ]; 186 187 for( int i = 0; i < argumentSelectors.length; i++ ) 188 { 189 Fields argumentSelector = argumentSelectors[ i ]; 190 argumentEntries[ i ] = new TupleEntry( Fields.asDeclaration( argumentSelector ), true ); 191 } 192 193 return argumentEntries; 194 } 195 196 @Override 197 public boolean equals( Object object ) 198 { 199 if( this == object ) 200 return true; 201 if( !( object instanceof Logic ) ) 202 return false; 203 if( !super.equals( object ) ) 204 return false; 205 206 Logic logic = (Logic) object; 207 208 if( !Arrays.equals( argumentSelectors, logic.argumentSelectors ) ) 209 return false; 210 if( !Arrays.equals( filters, logic.filters ) ) 211 return false; 212 213 return true; 214 } 215 216 @Override 217 public int hashCode() 218 { 219 int result = super.hashCode(); 220 result = 31 * result + ( argumentSelectors != null ? Arrays.hashCode( argumentSelectors ) : 0 ); 221 result = 31 * result + ( filters != null ? Arrays.hashCode( filters ) : 0 ); 222 return result; 223 } 224 }