001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.fluid;
022
023import java.lang.reflect.InvocationHandler;
024import java.lang.reflect.Method;
025import java.lang.reflect.Proxy;
026import java.lang.reflect.Type;
027
028import cascading.fluid.api.operation.Operation.OperationBuilder;
029import cascading.fluid.api.subassembly.SubAssembly.SubAssemblyBuilder;
030import cascading.fluid.builder.AssemblyMethodHandler;
031import cascading.fluid.builder.ConcreteAssemblyHelper;
032import cascading.fluid.builder.LocalMethodLogger;
033import cascading.fluid.builder.OperationMethodHandler;
034import cascading.fluid.builder.SubAssemblyMethodHandler;
035import cascading.fluid.factory.Reflection;
036import cascading.fluid.internal.assembly.Assembly.AssemblyBuilder;
037import cascading.fluid.internal.assembly.Assembly.AssemblyGenerator;
038import cascading.fluid.internal.assembly.Assembly.AssemblyHelper;
039import cascading.fluid.internal.operation.Aggregator.AggregatorBuilder;
040import cascading.fluid.internal.operation.Buffer.BufferBuilder;
041import cascading.fluid.internal.operation.Filter.FilterBuilder;
042import cascading.fluid.internal.operation.Function.FunctionBuilder;
043import cascading.fluid.internal.operation.GroupAssertion.GroupAssertionBuilder;
044import cascading.fluid.internal.operation.Operation.OperationGenerator;
045import cascading.fluid.internal.operation.Operation.OperationHelper;
046import cascading.fluid.internal.operation.ValueAssertion.ValueAssertionBuilder;
047import cascading.fluid.internal.subassembly.AggregateBy.AggregateByBuilder;
048import cascading.fluid.internal.subassembly.SubAssembly.SubAssemblyGenerator;
049import cascading.fluid.internal.subassembly.SubAssembly.SubAssemblyHelper;
050import cascading.fluid.util.Version;
051import cascading.property.AppProps;
052import cascading.tuple.Fields;
053
054/**
055 * The Fluid class is the starting point for constructing new Pipe Assemblies via this API.
056 * </p></p>
057 * To get started, a new assembly builder must be created:
058 * <p></p>
059 * <pre>
060 *   AssemblyBuilder.Start builder = Fluid.assembly();
061 * </pre>
062 * <p></p>
063 * Next a branch must be started:
064 * <p></p>
065 * <pre>
066 *  Pipe pipe = builder.startBranch( "rhs" )
067 *    .groupBy( Fields.ALL )
068 *      .every( Fields.ALL ).aggregator( new Count() ).outgoing( Fields.ALL )
069 *    .completeGroupBy()
070 *     .each( Fields.ALL ).filter( new RegexFilter( "" ) )
071 *       .coerce().coerceFields( Fields.fields( "foo", int.class ) ).end()
072 *  .completeBranch();
073 * </pre>
074 * <p></p>
075 * Note {@code completeBranch()} is a factory, it will return a {@link cascading.pipe.Pipe} instance. Also note
076 * the assembly builder is stateful, and will keep the return Pipe as a known assembly tail.
077 * <p></p>
078 * {@link #fields(Comparable[])} is a convenience for {@code new Fields("...")}. Type Fields itself also has many fluent helper
079 * methods, for example {@code Fluid.fields( "average").applyTypes(long.class);}
080 * <p></p>
081 * Calling:
082 * <p></p>
083 * <pre>
084 *  Pipe[] tails = assembly.completeAssembly();
085 * </pre>
086 * Will return a {@code Pipe[]} with a single entry, the same called from the {@code completeBranch()} call
087 * previously.
088 * <p></p>
089 * To begin a join, two or more pipes will need to be created prior to the next call:
090 * </p></p>
091 * <pre>
092 *  Pipe lhsUpperLower = assembly
093 *    .startHashJoin()
094 *      .lhs( pipeLhs ).lhsJoinFields( fields( "num" ) )
095 *      .rhs( upperLower ).rhsJoinFields( fields( "numUpperLower" ) )
096 *      .declaredFields( fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) )
097 *    .createHashJoin();
098 * </pre>
099 * <p></p>
100 * This is a factory and must be added to the assembly via:
101 * <p></p>
102 * <pre>
103 * lhsUpperLower = assembly
104 *  .continueBranch( lhsUpperLower )
105 *    .each( Fields.ALL ).function
106 *      (
107 *      function().Identity().fieldDeclaration( Fields.ALL ).end()
108 *      )
109 *    .outgoing( Fields.RESULTS )
110 *  .completeBranch();
111 * </pre>
112 * <p></p>
113 * If the two given pipes ({@code pipeLhs} and {@code pipeRhs}) were previously tails
114 * in the assembly, they will be no longer tails within the assembly, replaced by the result of {@code completeBranch()}.
115 * <p></p>
116 * Finally notice in the above code, {@code function()} is used to create a new {@link cascading.operation.Function}
117 * for use in the assembly.
118 * <p></p>
119 * In the first example at the top, {@code new Count()} was called. This call could have
120 * been replaced with {@code function().Count().end()}.
121 * <p></p>
122 * AggregateBy sub-classes also have factory helpers than can be used when adding a base
123 * {@link cascading.pipe.assembly.AggregateBy} to the pipe assembly.
124 * <p></p>
125 * <pre>
126 * Pipe rhs = builder
127 *  .startBranch( "rhs" )
128 *    .aggregateBy()
129 *      .groupingFields( fields( "grouping" ) )
130 *      .assemblies
131 *        (
132 *          Fluid.aggregateBy().AverageBy().valueField( fields( "value" ) ).averageField( fields( "average" ) ).end(),
133 *          Fluid.aggregateBy().SumBy().valueField( fields( "value" ) ).sumField( fields( "sum", long.class ) ).end()
134 *        )
135 *    .end() // end aggregateBy builder
136 *  .completeBranch();
137 * </pre>
138 */
139public class Fluid
140  {
141  static
142    {
143    AppProps.addApplicationFramework( null, Version.getName() + ":" + Version.getVersionString() );
144    }
145
146  private Fluid()
147    {
148    }
149
150  /**
151   * Method fields is a convenience helper factory for creating a new {@link cascading.tuple.Fields} instance.
152   * <p></p>
153   * The Fields class is also fluent, so adding types to the result is as simple as calling
154   * {@link Fields#applyTypes(java.lang.reflect.Type...)}.
155   *
156   * @param fields is a set of integer ordinals or field names.
157   * @return a new Fields instance.
158   * @see cascading.tuple.Fields
159   */
160  public static Fields fields( Comparable... fields )
161    {
162    return new Fields( fields );
163    }
164
165  /**
166   * Method fields is a convenience helper factory for creating a new {@link cascading.tuple.Fields} instance.
167   *
168   * @param field is a an integer ordinal or field name.
169   * @param type  is the Type of the given field
170   * @return a new Fields instance.
171   * @see cascading.tuple.Fields
172   */
173  public static Fields fields( Comparable field, Type type )
174    {
175    return new Fields( field, type );
176    }
177
178  /**
179   * Method assembly returns a new assembly builder.
180   * <p></p>
181   * An assembly is a collection of branches.
182   * <p></p>
183   * To add a new branch, call {@link cascading.fluid.api.assembly.Assembly.AssemblyBuilder.Start#startBranch(String)}.
184   * <p></p>
185   * To complete the current branch, call {@code completeBranch()} on the builder.
186   * <p></p>
187   * To complete the assembly, call  {@link cascading.fluid.api.assembly.Assembly.AssemblyBuilder.Start#completeAssembly()}.
188   *
189   * @return a new Assembly builder instance
190   */
191  public static cascading.fluid.api.assembly.Assembly.AssemblyBuilder.Start assembly()
192    {
193    AssemblyMethodHandler methodHandler = new AssemblyMethodHandler();
194    AssemblyHelper helper = Reflection.create( AssemblyHelper.class, methodHandler, ConcreteAssemblyHelper.class );
195
196    ( (ConcreteAssemblyHelper) helper ).setMethodHandler( methodHandler );
197
198    AssemblyBuilder.Start<Void> builder = AssemblyGenerator.startAssembly( helper, new LocalMethodLogger() );
199    return simpleProxy( cascading.fluid.api.assembly.Assembly.AssemblyBuilder.Start.class, builder );
200    }
201
202  private static OperationBuilder.Start getOperationBuilder()
203    {
204    OperationHelper operationHelper = Reflection.create( OperationHelper.class, new OperationMethodHandler() );
205    cascading.fluid.internal.operation.Operation.OperationBuilder.Start<Void> builder
206      = OperationGenerator.build( operationHelper, new LocalMethodLogger() );
207
208    return simpleProxy( OperationBuilder.Start.class, builder );
209    }
210
211  /**
212   * Method function returns a new {@link cascading.operation.Function} factory builder.
213   * <p></p>
214   * Unlike the assembly builder, a Function factory builder provides a simple api for constructing
215   * known Function types, that should ba added after an {@code each()} builder method is called.
216   * <p></p>
217   * Factory builders retain no internal state, and can be shared and re-used across assembly builders.
218   *
219   * @return a new Function builder instance
220   * @see cascading.operation.Function
221   */
222  public static cascading.fluid.api.operation.Function.FunctionBuilder<Void> function()
223    {
224    FunctionBuilder.Start<Void> builder = getOperationBuilder().function();
225    return simpleProxy( cascading.fluid.api.operation.Function.FunctionBuilder.class, builder );
226    }
227
228  /**
229   * Method filter returns a new {@link cascading.operation.Filter} factory builder.
230   * <p></p>
231   * Unlike the assembly builder, a Filter factory builder provides a simple api for constructing
232   * known Filter types, that should ba added after an {@code each()} builder method is called.
233   * <p></p>
234   * Factory builders retain no internal state, and can be shared and re-used across assembly builders.
235   *
236   * @return a new Filter builder instance
237   * @see cascading.operation.Filter
238   */
239  public static cascading.fluid.api.operation.Filter.FilterBuilder<Void> filter()
240    {
241    FilterBuilder.Start<Void> builder = getOperationBuilder().filter();
242    return simpleProxy( cascading.fluid.api.operation.Filter.FilterBuilder.class, builder );
243    }
244
245  /**
246   * Method aggregator returns a new {@link cascading.operation.Aggregator} factory builder.
247   * <p></p>
248   * Unlike the assembly builder, a Aggregator factory builder provides a simple api for constructing
249   * known Aggregator types, that should ba added after an {@code every()} builder method is called.
250   * <p></p>
251   * Factory builders retain no internal state, and can be shared and re-used across assembly builders.
252   *
253   * @return a new Aggregator builder instance
254   * @see cascading.operation.Aggregator
255   */
256  public static cascading.fluid.api.operation.Aggregator.AggregatorBuilder<Void> aggregator()
257    {
258    AggregatorBuilder.Start<Void> builder = getOperationBuilder().aggregator();
259    return simpleProxy( cascading.fluid.api.operation.Aggregator.AggregatorBuilder.class, builder );
260    }
261
262  /**
263   * Method buffer returns a new {@link cascading.operation.Aggregator} factory builder.
264   * <p></p>
265   * Unlike the assembly builder, a Buffer factory builder provides a simple api for constructing
266   * known Buffer types, that should ba added after an {@code every()} builder method is called.
267   * <p></p>
268   * Factory builders retain no internal state, and can be shared and re-used across assembly builders.
269   *
270   * @return a new Buffer builder instance
271   * @see cascading.operation.Buffer
272   */
273  public static cascading.fluid.api.operation.Buffer.BufferBuilder<Void> buffer()
274    {
275    BufferBuilder.Start<Void> builder = getOperationBuilder().buffer();
276    return simpleProxy( cascading.fluid.api.operation.Buffer.BufferBuilder.class, builder );
277    }
278
279  /**
280   * Method valueAssertion returns a new {@link cascading.operation.ValueAssertion} factory builder.
281   * <p></p>
282   * Unlike the assembly builder, a ValueAssertion factory builder provides a simple api for constructing
283   * known ValueAssertion types, that should ba added after an {@code each()} builder method is called.
284   * <p></p>
285   * Factory builders retain no internal state, and can be shared and re-used across assembly builders.
286   *
287   * @return a new ValueAssertion builder instance
288   * @see cascading.operation.ValueAssertion
289   */
290  public static cascading.fluid.api.operation.ValueAssertion.ValueAssertionBuilder<Void> valueAssertion()
291    {
292    ValueAssertionBuilder.Start<Void> builder = getOperationBuilder().valueAssertion();
293    return simpleProxy( cascading.fluid.api.operation.ValueAssertion.ValueAssertionBuilder.class, builder );
294    }
295
296  /**
297   * Method groupAssertion returns a new {@link cascading.operation.GroupAssertion} factory builder.
298   * <p></p>
299   * Unlike the assembly builder, a GroupAssertion factory builder provides a simple api for constructing
300   * known GroupAssertion types, that should ba added after an {@code every()} builder method is called.
301   * <p></p>
302   * Factory builders retain no internal state, and can be shared and re-used across assembly builders.
303   *
304   * @return a new GroupAssertion builder instance
305   * @see cascading.operation.GroupAssertion
306   */
307  public static cascading.fluid.api.operation.GroupAssertion.GroupAssertionBuilder<Void> groupAssertion()
308    {
309    GroupAssertionBuilder.Start<Void> builder = getOperationBuilder().groupAssertion();
310    return simpleProxy( cascading.fluid.api.operation.GroupAssertion.GroupAssertionBuilder.class, builder );
311    }
312
313  private static SubAssemblyBuilder.Start getSubAssemblyBuilder()
314    {
315    SubAssemblyHelper subassemblyHelper = Reflection.create( SubAssemblyHelper.class, new SubAssemblyMethodHandler() );
316    cascading.fluid.internal.subassembly.SubAssembly.SubAssemblyBuilder.Start<Void> builder
317      = SubAssemblyGenerator.build( subassemblyHelper, new LocalMethodLogger() );
318
319    return simpleProxy( SubAssemblyBuilder.Start.class, builder );
320    }
321
322  /**
323   * Method aggregateBy returns a new {@link cascading.pipe.assembly.AggregateBy} factory builder.
324   * <p></p>
325   * Unlike the assembly builder, an AggregateBy factory builder provides a simple api for constructing
326   * known AggregateBy sub-types that should be passed to a parent {@link cascading.pipe.assembly.AggregateBy} for
327   * concurrent aggregation of multiple values.
328   * <p></p>
329   * Factory builders retain no internal state, and can be shared and re-used across assembly builders.
330   *
331   * @return a new AggregateBy builder instance
332   * @see cascading.pipe.assembly.AggregateBy
333   */
334  public static cascading.fluid.api.subassembly.AggregateBy.AggregateByBuilder<Void> aggregateBy()
335    {
336    AggregateByBuilder.Start<Void> builder = getSubAssemblyBuilder().aggregateBy();
337    return simpleProxy( cascading.fluid.api.subassembly.AggregateBy.AggregateByBuilder.class, builder );
338    }
339
340  @SuppressWarnings("unchecked")
341  private static <T> T simpleProxy( final Class<?> proxyInterface, final Object target )
342    {
343    return (T) Proxy.newProxyInstance( Thread.currentThread().getContextClassLoader(), new Class[]{
344      proxyInterface}, new InvocationHandler()
345    {
346    public Object invoke( Object proxy, Method method, Object[] args ) throws Throwable
347      {
348      return method.invoke( target, args );
349      }
350    } );
351    }
352  }