001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.fluid; 022 023import java.lang.reflect.InvocationHandler; 024import java.lang.reflect.Method; 025import java.lang.reflect.Proxy; 026import java.lang.reflect.Type; 027 028import cascading.fluid.api.operation.Operation.OperationBuilder; 029import cascading.fluid.api.subassembly.SubAssembly.SubAssemblyBuilder; 030import cascading.fluid.builder.AssemblyMethodHandler; 031import cascading.fluid.builder.ConcreteAssemblyHelper; 032import cascading.fluid.builder.LocalMethodLogger; 033import cascading.fluid.builder.OperationMethodHandler; 034import cascading.fluid.builder.SubAssemblyMethodHandler; 035import cascading.fluid.factory.Reflection; 036import cascading.fluid.internal.assembly.Assembly.AssemblyBuilder; 037import cascading.fluid.internal.assembly.Assembly.AssemblyGenerator; 038import cascading.fluid.internal.assembly.Assembly.AssemblyHelper; 039import cascading.fluid.internal.operation.Aggregator.AggregatorBuilder; 040import cascading.fluid.internal.operation.Buffer.BufferBuilder; 041import cascading.fluid.internal.operation.Filter.FilterBuilder; 042import cascading.fluid.internal.operation.Function.FunctionBuilder; 043import cascading.fluid.internal.operation.GroupAssertion.GroupAssertionBuilder; 044import cascading.fluid.internal.operation.Operation.OperationGenerator; 045import cascading.fluid.internal.operation.Operation.OperationHelper; 046import cascading.fluid.internal.operation.ValueAssertion.ValueAssertionBuilder; 047import cascading.fluid.internal.subassembly.AggregateBy.AggregateByBuilder; 048import cascading.fluid.internal.subassembly.SubAssembly.SubAssemblyGenerator; 049import cascading.fluid.internal.subassembly.SubAssembly.SubAssemblyHelper; 050import cascading.fluid.util.Version; 051import cascading.property.AppProps; 052import cascading.tuple.Fields; 053 054/** 055 * The Fluid class is the starting point for constructing new Pipe Assemblies via this API. 056 * </p></p> 057 * To get started, a new assembly builder must be created: 058 * <p></p> 059 * <pre> 060 * AssemblyBuilder.Start builder = Fluid.assembly(); 061 * </pre> 062 * <p></p> 063 * Next a branch must be started: 064 * <p></p> 065 * <pre> 066 * Pipe pipe = builder.startBranch( "rhs" ) 067 * .groupBy( Fields.ALL ) 068 * .every( Fields.ALL ).aggregator( new Count() ).outgoing( Fields.ALL ) 069 * .completeGroupBy() 070 * .each( Fields.ALL ).filter( new RegexFilter( "" ) ) 071 * .coerce().coerceFields( Fields.fields( "foo", int.class ) ).end() 072 * .completeBranch(); 073 * </pre> 074 * <p></p> 075 * Note {@code completeBranch()} is a factory, it will return a {@link cascading.pipe.Pipe} instance. Also note 076 * the assembly builder is stateful, and will keep the return Pipe as a known assembly tail. 077 * <p></p> 078 * {@link #fields(Comparable[])} is a convenience for {@code new Fields("...")}. Type Fields itself also has many fluent helper 079 * methods, for example {@code Fluid.fields( "average").applyTypes(long.class);} 080 * <p></p> 081 * Calling: 082 * <p></p> 083 * <pre> 084 * Pipe[] tails = assembly.completeAssembly(); 085 * </pre> 086 * Will return a {@code Pipe[]} with a single entry, the same called from the {@code completeBranch()} call 087 * previously. 088 * <p></p> 089 * To begin a join, two or more pipes will need to be created prior to the next call: 090 * </p></p> 091 * <pre> 092 * Pipe lhsUpperLower = assembly 093 * .startHashJoin() 094 * .lhs( pipeLhs ).lhsJoinFields( fields( "num" ) ) 095 * .rhs( upperLower ).rhsJoinFields( fields( "numUpperLower" ) ) 096 * .declaredFields( fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ) 097 * .createHashJoin(); 098 * </pre> 099 * <p></p> 100 * This is a factory and must be added to the assembly via: 101 * <p></p> 102 * <pre> 103 * lhsUpperLower = assembly 104 * .continueBranch( lhsUpperLower ) 105 * .each( Fields.ALL ).function 106 * ( 107 * function().Identity().fieldDeclaration( Fields.ALL ).end() 108 * ) 109 * .outgoing( Fields.RESULTS ) 110 * .completeBranch(); 111 * </pre> 112 * <p></p> 113 * If the two given pipes ({@code pipeLhs} and {@code pipeRhs}) were previously tails 114 * in the assembly, they will be no longer tails within the assembly, replaced by the result of {@code completeBranch()}. 115 * <p></p> 116 * Finally notice in the above code, {@code function()} is used to create a new {@link cascading.operation.Function} 117 * for use in the assembly. 118 * <p></p> 119 * In the first example at the top, {@code new Count()} was called. This call could have 120 * been replaced with {@code function().Count().end()}. 121 * <p></p> 122 * AggregateBy sub-classes also have factory helpers than can be used when adding a base 123 * {@link cascading.pipe.assembly.AggregateBy} to the pipe assembly. 124 * <p></p> 125 * <pre> 126 * Pipe rhs = builder 127 * .startBranch( "rhs" ) 128 * .aggregateBy() 129 * .groupingFields( fields( "grouping" ) ) 130 * .assemblies 131 * ( 132 * Fluid.aggregateBy().AverageBy().valueField( fields( "value" ) ).averageField( fields( "average" ) ).end(), 133 * Fluid.aggregateBy().SumBy().valueField( fields( "value" ) ).sumField( fields( "sum", long.class ) ).end() 134 * ) 135 * .end() // end aggregateBy builder 136 * .completeBranch(); 137 * </pre> 138 */ 139public class Fluid 140 { 141 static 142 { 143 AppProps.addApplicationFramework( null, Version.getName() + ":" + Version.getVersionString() ); 144 } 145 146 private Fluid() 147 { 148 } 149 150 /** 151 * Method fields is a convenience helper factory for creating a new {@link cascading.tuple.Fields} instance. 152 * <p></p> 153 * The Fields class is also fluent, so adding types to the result is as simple as calling 154 * {@link Fields#applyTypes(java.lang.reflect.Type...)}. 155 * 156 * @param fields is a set of integer ordinals or field names. 157 * @return a new Fields instance. 158 * @see cascading.tuple.Fields 159 */ 160 public static Fields fields( Comparable... fields ) 161 { 162 return new Fields( fields ); 163 } 164 165 /** 166 * Method fields is a convenience helper factory for creating a new {@link cascading.tuple.Fields} instance. 167 * 168 * @param field is a an integer ordinal or field name. 169 * @param type is the Type of the given field 170 * @return a new Fields instance. 171 * @see cascading.tuple.Fields 172 */ 173 public static Fields fields( Comparable field, Type type ) 174 { 175 return new Fields( field, type ); 176 } 177 178 /** 179 * Method assembly returns a new assembly builder. 180 * <p></p> 181 * An assembly is a collection of branches. 182 * <p></p> 183 * To add a new branch, call {@link cascading.fluid.api.assembly.Assembly.AssemblyBuilder.Start#startBranch(String)}. 184 * <p></p> 185 * To complete the current branch, call {@code completeBranch()} on the builder. 186 * <p></p> 187 * To complete the assembly, call {@link cascading.fluid.api.assembly.Assembly.AssemblyBuilder.Start#completeAssembly()}. 188 * 189 * @return a new Assembly builder instance 190 */ 191 public static cascading.fluid.api.assembly.Assembly.AssemblyBuilder.Start assembly() 192 { 193 AssemblyMethodHandler methodHandler = new AssemblyMethodHandler(); 194 AssemblyHelper helper = Reflection.create( AssemblyHelper.class, methodHandler, ConcreteAssemblyHelper.class ); 195 196 ( (ConcreteAssemblyHelper) helper ).setMethodHandler( methodHandler ); 197 198 AssemblyBuilder.Start<Void> builder = AssemblyGenerator.startAssembly( helper, new LocalMethodLogger() ); 199 return simpleProxy( cascading.fluid.api.assembly.Assembly.AssemblyBuilder.Start.class, builder ); 200 } 201 202 private static OperationBuilder.Start getOperationBuilder() 203 { 204 OperationHelper operationHelper = Reflection.create( OperationHelper.class, new OperationMethodHandler() ); 205 cascading.fluid.internal.operation.Operation.OperationBuilder.Start<Void> builder 206 = OperationGenerator.build( operationHelper, new LocalMethodLogger() ); 207 208 return simpleProxy( OperationBuilder.Start.class, builder ); 209 } 210 211 /** 212 * Method function returns a new {@link cascading.operation.Function} factory builder. 213 * <p></p> 214 * Unlike the assembly builder, a Function factory builder provides a simple api for constructing 215 * known Function types, that should ba added after an {@code each()} builder method is called. 216 * <p></p> 217 * Factory builders retain no internal state, and can be shared and re-used across assembly builders. 218 * 219 * @return a new Function builder instance 220 * @see cascading.operation.Function 221 */ 222 public static cascading.fluid.api.operation.Function.FunctionBuilder<Void> function() 223 { 224 FunctionBuilder.Start<Void> builder = getOperationBuilder().function(); 225 return simpleProxy( cascading.fluid.api.operation.Function.FunctionBuilder.class, builder ); 226 } 227 228 /** 229 * Method filter returns a new {@link cascading.operation.Filter} factory builder. 230 * <p></p> 231 * Unlike the assembly builder, a Filter factory builder provides a simple api for constructing 232 * known Filter types, that should ba added after an {@code each()} builder method is called. 233 * <p></p> 234 * Factory builders retain no internal state, and can be shared and re-used across assembly builders. 235 * 236 * @return a new Filter builder instance 237 * @see cascading.operation.Filter 238 */ 239 public static cascading.fluid.api.operation.Filter.FilterBuilder<Void> filter() 240 { 241 FilterBuilder.Start<Void> builder = getOperationBuilder().filter(); 242 return simpleProxy( cascading.fluid.api.operation.Filter.FilterBuilder.class, builder ); 243 } 244 245 /** 246 * Method aggregator returns a new {@link cascading.operation.Aggregator} factory builder. 247 * <p></p> 248 * Unlike the assembly builder, a Aggregator factory builder provides a simple api for constructing 249 * known Aggregator types, that should ba added after an {@code every()} builder method is called. 250 * <p></p> 251 * Factory builders retain no internal state, and can be shared and re-used across assembly builders. 252 * 253 * @return a new Aggregator builder instance 254 * @see cascading.operation.Aggregator 255 */ 256 public static cascading.fluid.api.operation.Aggregator.AggregatorBuilder<Void> aggregator() 257 { 258 AggregatorBuilder.Start<Void> builder = getOperationBuilder().aggregator(); 259 return simpleProxy( cascading.fluid.api.operation.Aggregator.AggregatorBuilder.class, builder ); 260 } 261 262 /** 263 * Method buffer returns a new {@link cascading.operation.Aggregator} factory builder. 264 * <p></p> 265 * Unlike the assembly builder, a Buffer factory builder provides a simple api for constructing 266 * known Buffer types, that should ba added after an {@code every()} builder method is called. 267 * <p></p> 268 * Factory builders retain no internal state, and can be shared and re-used across assembly builders. 269 * 270 * @return a new Buffer builder instance 271 * @see cascading.operation.Buffer 272 */ 273 public static cascading.fluid.api.operation.Buffer.BufferBuilder<Void> buffer() 274 { 275 BufferBuilder.Start<Void> builder = getOperationBuilder().buffer(); 276 return simpleProxy( cascading.fluid.api.operation.Buffer.BufferBuilder.class, builder ); 277 } 278 279 /** 280 * Method valueAssertion returns a new {@link cascading.operation.ValueAssertion} factory builder. 281 * <p></p> 282 * Unlike the assembly builder, a ValueAssertion factory builder provides a simple api for constructing 283 * known ValueAssertion types, that should ba added after an {@code each()} builder method is called. 284 * <p></p> 285 * Factory builders retain no internal state, and can be shared and re-used across assembly builders. 286 * 287 * @return a new ValueAssertion builder instance 288 * @see cascading.operation.ValueAssertion 289 */ 290 public static cascading.fluid.api.operation.ValueAssertion.ValueAssertionBuilder<Void> valueAssertion() 291 { 292 ValueAssertionBuilder.Start<Void> builder = getOperationBuilder().valueAssertion(); 293 return simpleProxy( cascading.fluid.api.operation.ValueAssertion.ValueAssertionBuilder.class, builder ); 294 } 295 296 /** 297 * Method groupAssertion returns a new {@link cascading.operation.GroupAssertion} factory builder. 298 * <p></p> 299 * Unlike the assembly builder, a GroupAssertion factory builder provides a simple api for constructing 300 * known GroupAssertion types, that should ba added after an {@code every()} builder method is called. 301 * <p></p> 302 * Factory builders retain no internal state, and can be shared and re-used across assembly builders. 303 * 304 * @return a new GroupAssertion builder instance 305 * @see cascading.operation.GroupAssertion 306 */ 307 public static cascading.fluid.api.operation.GroupAssertion.GroupAssertionBuilder<Void> groupAssertion() 308 { 309 GroupAssertionBuilder.Start<Void> builder = getOperationBuilder().groupAssertion(); 310 return simpleProxy( cascading.fluid.api.operation.GroupAssertion.GroupAssertionBuilder.class, builder ); 311 } 312 313 private static SubAssemblyBuilder.Start getSubAssemblyBuilder() 314 { 315 SubAssemblyHelper subassemblyHelper = Reflection.create( SubAssemblyHelper.class, new SubAssemblyMethodHandler() ); 316 cascading.fluid.internal.subassembly.SubAssembly.SubAssemblyBuilder.Start<Void> builder 317 = SubAssemblyGenerator.build( subassemblyHelper, new LocalMethodLogger() ); 318 319 return simpleProxy( SubAssemblyBuilder.Start.class, builder ); 320 } 321 322 /** 323 * Method aggregateBy returns a new {@link cascading.pipe.assembly.AggregateBy} factory builder. 324 * <p></p> 325 * Unlike the assembly builder, an AggregateBy factory builder provides a simple api for constructing 326 * known AggregateBy sub-types that should be passed to a parent {@link cascading.pipe.assembly.AggregateBy} for 327 * concurrent aggregation of multiple values. 328 * <p></p> 329 * Factory builders retain no internal state, and can be shared and re-used across assembly builders. 330 * 331 * @return a new AggregateBy builder instance 332 * @see cascading.pipe.assembly.AggregateBy 333 */ 334 public static cascading.fluid.api.subassembly.AggregateBy.AggregateByBuilder<Void> aggregateBy() 335 { 336 AggregateByBuilder.Start<Void> builder = getSubAssemblyBuilder().aggregateBy(); 337 return simpleProxy( cascading.fluid.api.subassembly.AggregateBy.AggregateByBuilder.class, builder ); 338 } 339 340 @SuppressWarnings("unchecked") 341 private static <T> T simpleProxy( final Class<?> proxyInterface, final Object target ) 342 { 343 return (T) Proxy.newProxyInstance( Thread.currentThread().getContextClassLoader(), new Class[]{ 344 proxyInterface}, new InvocationHandler() 345 { 346 public Object invoke( Object proxy, Method method, Object[] args ) throws Throwable 347 { 348 return method.invoke( target, args ); 349 } 350 } ); 351 } 352 }