001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation.expression;
022
023import java.io.IOException;
024import java.lang.reflect.InvocationTargetException;
025import java.util.Arrays;
026
027import cascading.flow.FlowProcess;
028import cascading.management.annotation.Property;
029import cascading.management.annotation.PropertyDescription;
030import cascading.management.annotation.Visibility;
031import cascading.operation.BaseOperation;
032import cascading.operation.OperationCall;
033import cascading.operation.OperationException;
034import cascading.tuple.Fields;
035import cascading.tuple.Tuple;
036import cascading.tuple.TupleEntry;
037import cascading.tuple.Tuples;
038import cascading.tuple.coerce.Coercions;
039import cascading.tuple.type.CoercibleType;
040import cascading.tuple.util.TupleViews;
041import cascading.util.Util;
042import org.codehaus.commons.compiler.CompileException;
043import org.codehaus.janino.ScriptEvaluator;
044
045/**
046 *
047 */
048public abstract class ScriptOperation extends BaseOperation<ScriptOperation.Context>
049  {
050  /** Field expression */
051  protected final String block;
052  /** Field parameterTypes */
053  protected Class[] parameterTypes;
054  /** Field parameterNames */
055  protected String[] parameterNames;
056  /** returnType */
057  protected Class returnType = Object.class;
058
059  public ScriptOperation( int numArgs, Fields fieldDeclaration, String block )
060    {
061    super( numArgs, fieldDeclaration );
062    this.block = block;
063    this.returnType = fieldDeclaration.getTypeClass( 0 ) == null ? this.returnType : fieldDeclaration.getTypeClass( 0 );
064    }
065
066  public ScriptOperation( int numArgs, Fields fieldDeclaration, String block, Class returnType )
067    {
068    super( numArgs, fieldDeclaration );
069    this.block = block;
070    this.returnType = returnType == null ? this.returnType : returnType;
071    }
072
073  public ScriptOperation( int numArgs, Fields fieldDeclaration, String block, Class returnType, Class[] expectedTypes )
074    {
075    super( numArgs, fieldDeclaration );
076    this.block = block;
077    this.returnType = returnType == null ? this.returnType : returnType;
078
079    if( expectedTypes == null )
080      throw new IllegalArgumentException( "expectedTypes may not be null" );
081
082    this.parameterTypes = Arrays.copyOf( expectedTypes, expectedTypes.length );
083    }
084
085  public ScriptOperation( int numArgs, Fields fieldDeclaration, String block, Class returnType, String[] parameterNames, Class[] parameterTypes )
086    {
087    super( numArgs, fieldDeclaration );
088    this.parameterNames = parameterNames == null ? null : Arrays.copyOf( parameterNames, parameterNames.length );
089    this.block = block;
090    this.returnType = returnType == null ? this.returnType : returnType;
091    this.parameterTypes = Arrays.copyOf( parameterTypes, parameterTypes.length );
092
093    if( getParameterNamesInternal().length != getParameterTypesInternal().length )
094      throw new IllegalArgumentException( "parameterNames must be same length as parameterTypes" );
095    }
096
097  public ScriptOperation( int numArgs, String block, Class returnType )
098    {
099    super( numArgs );
100    this.block = block;
101    this.returnType = returnType == null ? this.returnType : returnType;
102    }
103
104  public ScriptOperation( int numArgs, String block, Class returnType, Class[] expectedTypes )
105    {
106    super( numArgs );
107    this.block = block;
108    this.returnType = returnType == null ? this.returnType : returnType;
109
110    if( expectedTypes == null || expectedTypes.length == 0 )
111      throw new IllegalArgumentException( "expectedTypes may not be null or empty" );
112
113    this.parameterTypes = Arrays.copyOf( expectedTypes, expectedTypes.length );
114    }
115
116  public ScriptOperation( int numArgs, String block, Class returnType, String[] parameterNames, Class[] parameterTypes )
117    {
118    super( numArgs );
119    this.parameterNames = parameterNames == null ? null : Arrays.copyOf( parameterNames, parameterNames.length );
120    this.block = block;
121    this.returnType = returnType == null ? this.returnType : returnType;
122    this.parameterTypes = Arrays.copyOf( parameterTypes, parameterTypes.length );
123
124    if( getParameterNamesInternal().length != getParameterTypesInternal().length )
125      throw new IllegalArgumentException( "parameterNames must be same length as parameterTypes" );
126    }
127
128  @Property(name = "source", visibility = Visibility.PRIVATE)
129  @PropertyDescription("The Java source to execute.")
130  public String getBlock()
131    {
132    return block;
133    }
134
135  private boolean hasParameterNames()
136    {
137    return parameterNames != null;
138    }
139
140  @Property(name = "parameterNames", visibility = Visibility.PUBLIC)
141  @PropertyDescription("The declared parameter names.")
142  public String[] getParameterNames()
143    {
144    return Util.copy( parameterNames );
145    }
146
147  private String[] getParameterNamesInternal()
148    {
149    if( parameterNames != null )
150      return parameterNames;
151
152    try
153      {
154      parameterNames = guessParameterNames();
155      }
156    catch( IOException exception )
157      {
158      throw new OperationException( "could not read expression: " + block, exception );
159      }
160    catch( CompileException exception )
161      {
162      throw new OperationException( "could not compile expression: " + block, exception );
163      }
164
165    return parameterNames;
166    }
167
168  protected String[] guessParameterNames() throws CompileException, IOException
169    {
170    throw new OperationException( "parameter names are required" );
171    }
172
173  private Fields getParameterFields()
174    {
175    return makeFields( getParameterNamesInternal() );
176    }
177
178  private boolean hasParameterTypes()
179    {
180    return parameterTypes != null;
181    }
182
183  @Property(name = "parameterTypes", visibility = Visibility.PUBLIC)
184  @PropertyDescription("The declared parameter types.")
185  public Class[] getParameterTypes()
186    {
187    return Util.copy( parameterTypes );
188    }
189
190  private Class[] getParameterTypesInternal()
191    {
192    if( !hasParameterNames() )
193      return parameterTypes;
194
195    if( hasParameterNames() && parameterNames.length == parameterTypes.length )
196      return parameterTypes;
197
198    if( parameterNames.length > 0 && parameterTypes.length != 1 )
199      throw new IllegalStateException( "wrong number of parameter types, expects: " + parameterNames.length );
200
201    Class[] types = new Class[ parameterNames.length ];
202
203    Arrays.fill( types, parameterTypes[ 0 ] );
204
205    parameterTypes = types;
206
207    return parameterTypes;
208    }
209
210  protected ScriptEvaluator getEvaluator( Class returnType, String[] parameterNames, Class[] parameterTypes )
211    {
212    try
213      {
214      return new ScriptEvaluator( block, returnType, parameterNames, parameterTypes );
215      }
216    catch( CompileException exception )
217      {
218      throw new OperationException( "could not compile script: " + block, exception );
219      }
220    }
221
222  private Fields makeFields( String[] parameters )
223    {
224    Comparable[] fields = new Comparable[ parameters.length ];
225
226    for( int i = 0; i < parameters.length; i++ )
227      {
228      String parameter = parameters[ i ];
229
230      if( parameter.startsWith( "$" ) )
231        fields[ i ] = parse( parameter ); // returns parameter if not a number after $
232      else
233        fields[ i ] = parameter;
234      }
235
236    return new Fields( fields );
237    }
238
239  private Comparable parse( String parameter )
240    {
241    try
242      {
243      return Integer.parseInt( parameter.substring( 1 ) );
244      }
245    catch( NumberFormatException exception )
246      {
247      return parameter;
248      }
249    }
250
251  @Override
252  public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
253    {
254    if( operationCall.getContext() == null )
255      operationCall.setContext( new Context() );
256
257    Context context = operationCall.getContext();
258
259    Fields argumentFields = operationCall.getArgumentFields();
260
261    if( hasParameterNames() && hasParameterTypes() )
262      {
263      context.parameterNames = getParameterNamesInternal();
264      context.parameterFields = argumentFields.select( getParameterFields() ); // inherit argument types
265      context.parameterTypes = getParameterTypesInternal();
266      }
267    else if( hasParameterTypes() )
268      {
269      context.parameterNames = toNames( argumentFields );
270      context.parameterFields = argumentFields.applyTypes( getParameterTypesInternal() );
271      context.parameterTypes = getParameterTypesInternal();
272      }
273    else
274      {
275      context.parameterNames = toNames( argumentFields );
276      context.parameterFields = argumentFields;
277      context.parameterTypes = argumentFields.getTypesClasses();
278
279      if( argumentFields.isNone() )
280        context.parameterTypes = new Class[ 0 ]; // to match names
281
282      if( context.parameterTypes == null )
283        throw new IllegalArgumentException( "field types may not be empty, incoming tuple stream should declare field types" );
284      }
285
286    context.parameterCoercions = Coercions.coercibleArray( context.parameterFields );
287    context.parameterArray = new Object[ context.parameterTypes.length ]; // re-use object array
288    context.scriptEvaluator = getEvaluator( getReturnType(), context.parameterNames, context.parameterTypes );
289    context.intermediate = TupleViews.createNarrow( argumentFields.getPos( context.parameterFields ) );
290    context.result = Tuple.size( 1 ); // re-use the output tuple
291    }
292
293  private String[] toNames( Fields argumentFields )
294    {
295    String[] names = new String[ argumentFields.size() ];
296
297    for( int i = 0; i < names.length; i++ )
298      {
299      Comparable comparable = argumentFields.get( i );
300      if( comparable instanceof String )
301        names[ i ] = (String) comparable;
302      else
303        names[ i ] = "$" + comparable;
304      }
305
306    return names;
307    }
308
309  public Class getReturnType()
310    {
311    return returnType;
312    }
313
314  /**
315   * Performs the actual expression evaluation.
316   *
317   * @param context
318   * @param input   of type TupleEntry
319   * @return Comparable
320   */
321  protected Object evaluate( Context context, TupleEntry input )
322    {
323    try
324      {
325      if( context.parameterTypes.length == 0 )
326        return context.scriptEvaluator.evaluate( null );
327
328      Tuple parameterTuple = TupleViews.reset( context.intermediate, input.getTuple() );
329      Object[] arguments = Tuples.asArray( parameterTuple, context.parameterCoercions, context.parameterTypes, context.parameterArray );
330
331      return context.scriptEvaluator.evaluate( arguments );
332      }
333    catch( InvocationTargetException exception )
334      {
335      throw new OperationException( "could not evaluate expression: " + block, exception.getTargetException() );
336      }
337    }
338
339  @Override
340  public boolean equals( Object object )
341    {
342    if( this == object )
343      return true;
344    if( !( object instanceof ExpressionOperation ) )
345      return false;
346    if( !super.equals( object ) )
347      return false;
348
349    ExpressionOperation that = (ExpressionOperation) object;
350
351    if( block != null ? !block.equals( that.block ) : that.block != null )
352      return false;
353    if( !Arrays.equals( parameterNames, that.parameterNames ) )
354      return false;
355    if( !Arrays.equals( parameterTypes, that.parameterTypes ) )
356      return false;
357
358    return true;
359    }
360
361  @Override
362  public int hashCode()
363    {
364    int result = super.hashCode();
365    result = 31 * result + ( block != null ? block.hashCode() : 0 );
366    result = 31 * result + ( parameterTypes != null ? Arrays.hashCode( parameterTypes ) : 0 );
367    result = 31 * result + ( parameterNames != null ? Arrays.hashCode( parameterNames ) : 0 );
368    return result;
369    }
370
371  public static class Context
372    {
373    private Class[] parameterTypes;
374    private ScriptEvaluator scriptEvaluator;
375    private Fields parameterFields;
376    private CoercibleType[] parameterCoercions;
377    private String[] parameterNames;
378    private Object[] parameterArray;
379    private Tuple intermediate;
380    protected Tuple result;
381    }
382  }