001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop;
022
023import java.util.ArrayList;
024import java.util.LinkedHashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Properties;
028
029import cascading.property.Props;
030import cascading.tuple.Tuple;
031import cascading.util.Util;
032
033/**
034 * Class TupleSerializationProps is a fluent interface for building properties to be passed to a
035 * {@link cascading.flow.FlowConnector} before creating new {@link cascading.flow.Flow} instances.
036 * <p/>
037 * See {@link TupleSerialization} for details on these properties.
038 *
039 * @see TupleSerialization
040 */
041public class TupleSerializationProps extends Props
042  {
043  public static final String SERIALIZATION_TOKENS = "cascading.serialization.tokens";
044  public static final String SERIALIZATION_COMPARISON_BITWISE_PREVENT = "cascading.serialization.comparison.bitwise.prevent";
045  public static final String IGNORE_TYPES = "cascading.serialization.types.ignored";
046  public static final String REQUIRE_TYPES = "cascading.serialization.types.required";
047  public static final String HADOOP_IO_SERIALIZATIONS = "io.serializations";
048
049  Map<Integer, String> serializationTokens = new LinkedHashMap<Integer, String>();
050  List<String> hadoopSerializations = new ArrayList<String>();
051  Boolean ignoreTypes;
052  Boolean requireTypes;
053  Boolean preventBitWiseComparisons;
054
055  /**
056   * Adds the given token and className pair as a serialization token property. During object serialization and deserialization,
057   * the given token will be used instead of the className when an instance of the className is encountered.
058   *
059   * @param properties of type Map
060   * @param token      of type int
061   * @param className  of type String
062   */
063  public static void addSerializationToken( Map<Object, Object> properties, int token, String className )
064    {
065    String tokens = getSerializationTokens( properties );
066
067    properties.put( SERIALIZATION_TOKENS, Util.join( ",", Util.removeNulls( tokens, token + "=" + className ) ) );
068    }
069
070  /**
071   * Returns the serialization tokens property.
072   *
073   * @param properties of type Map
074   * @return returns a String
075   */
076  public static String getSerializationTokens( Map<Object, Object> properties )
077    {
078    return (String) properties.get( SERIALIZATION_TOKENS );
079    }
080
081  /**
082   * Adds the given className as a Hadoop IO serialization class.
083   *
084   * @param properties of type Map
085   * @param className  of type String
086   */
087  public static void addSerialization( Map<Object, Object> properties, String className )
088    {
089    String serializations = (String) properties.get( HADOOP_IO_SERIALIZATIONS );
090
091    properties.put( HADOOP_IO_SERIALIZATIONS, Util.join( ",", Util.removeNulls( serializations, className ) ) );
092    }
093
094  /**
095   * Creates a new TupleSerializationProps instance.
096   *
097   * @return TupleSerializationProps instance
098   */
099  public static TupleSerializationProps tupleSerializationProps()
100    {
101    return new TupleSerializationProps();
102    }
103
104  public TupleSerializationProps()
105    {
106    }
107
108  public Map<Integer, String> getSerializationTokens()
109    {
110    return serializationTokens;
111    }
112
113  /**
114   * Method setSerializationTokens sets the given integer tokens and classNames Map as a serialization properties.
115   * <p/>
116   * During object serialization and deserialization, the given tokens will be used instead of the className when an
117   * instance of the className is encountered.
118   *
119   * @param serializationTokens Map of Integer tokens and String classnames
120   * @return this
121   */
122  public TupleSerializationProps setSerializationTokens( Map<Integer, String> serializationTokens )
123    {
124    this.serializationTokens = serializationTokens;
125
126    return this;
127    }
128
129  /**
130   * Method addSerializationTokens adds the given integer tokens and classNames Map as a serialization properties.
131   * <p/>
132   * During object serialization and deserialization, the given tokens will be used instead of the className when an
133   * instance of the className is encountered.
134   *
135   * @param serializationTokens Map of Integer tokens and String classnames
136   * @return this
137   */
138  public TupleSerializationProps addSerializationTokens( Map<Integer, String> serializationTokens )
139    {
140    this.serializationTokens.putAll( serializationTokens );
141
142    return this;
143    }
144
145  /**
146   * Method addSerializationToken adds the given integer token and classNames as a serialization properties.
147   * <p/>
148   * During object serialization and deserialization, the given tokens will be used instead of the className when an
149   * instance of the className is encountered.
150   *
151   * @param token                  type int
152   * @param serializationClassName type String
153   * @return this
154   */
155  public TupleSerializationProps addSerializationToken( int token, String serializationClassName )
156    {
157    this.serializationTokens.put( token, serializationClassName );
158
159    return this;
160    }
161
162  public List<String> getHadoopSerializations()
163    {
164    return hadoopSerializations;
165    }
166
167  /**
168   * Method setHadoopSerializations sets the Hadoop serialization classNames to be used as properties.
169   *
170   * @param hadoopSerializationClassNames List of classNames
171   * @return this
172   */
173  public TupleSerializationProps setHadoopSerializations( List<String> hadoopSerializationClassNames )
174    {
175    this.hadoopSerializations = hadoopSerializationClassNames;
176
177    return this;
178    }
179
180  /**
181   * Method addHadoopSerializations adds the Hadoop serialization classNames to be used as properties.
182   *
183   * @param hadoopSerializationClassNames List of classNames
184   * @return this
185   */
186  public TupleSerializationProps addHadoopSerializations( List<String> hadoopSerializationClassNames )
187    {
188    this.hadoopSerializations.addAll( hadoopSerializationClassNames );
189
190    return this;
191    }
192
193  /**
194   * Method addHadoopSerialization adds a Hadoop serialization className to be used as properties.
195   *
196   * @param hadoopSerializationClassName List of classNames
197   * @return this
198   */
199  public TupleSerializationProps addHadoopSerialization( String hadoopSerializationClassName )
200    {
201    this.hadoopSerializations.add( hadoopSerializationClassName );
202
203    return this;
204    }
205
206  public Boolean getIgnoreTypes()
207    {
208    return ignoreTypes;
209    }
210
211  /**
212   * Method setIgnoreTypes forces the {@link TupleSerialization} class to ignore any and all
213   * declared types causing the serialization to write each type or {@link SerializationToken}
214   * per {@link Tuple} element.
215   * <p/>
216   * This disables the declared type optimizations.
217   * <p/>
218   * See {@link #setRequireTypes(Boolean)} to force a failure if field type information is missing.
219   *
220   * @param ignoreTypes
221   * @return
222   */
223  public TupleSerializationProps setIgnoreTypes( Boolean ignoreTypes )
224    {
225    this.ignoreTypes = ignoreTypes;
226
227    return this;
228    }
229
230  public Boolean getRequireTypes()
231    {
232    return requireTypes;
233    }
234
235  /**
236   * Method setRequireTypes forces {@link TupleSerialization} to fail if field types are not declared.
237   * <p/>
238   * This ensures the field type optimizations are leveraged.
239   * <p/>
240   * See {@link #setIgnoreTypes(Boolean)} to force field type information to be discarded.
241   *
242   * @param requireTypes
243   * @return
244   */
245  public TupleSerializationProps setRequireTypes( Boolean requireTypes )
246    {
247    this.requireTypes = requireTypes;
248
249    return this;
250    }
251
252  /**
253   * Method preventBitWiseComparison will enable/disable bitwise comparisons of grouping keys
254   * during ordered partitioning ({@link cascading.pipe.GroupBy} and {@link cascading.pipe.CoGroup}).
255   * <p/>
256   * If natural ordering of grouping/join keys is required, disable bit wise comparisons. They are enabled
257   * by default (subject to the below conditions).
258   * <p/>
259   * Bit wise comparisons will only apply if the {@link cascading.tuple.Fields} used in the grouping/join are
260   * declared and no custom {@link java.util.Comparator} instances are provided on the grouping/key Fields, or
261   * no secondary sorting is being performed on a GroupBy.
262   *
263   * @param preventBitWiseComparisons set to true to disable bit wise comparisons
264   * @return this
265   */
266  public TupleSerializationProps preventBitWiseComparison( boolean preventBitWiseComparisons )
267    {
268    this.preventBitWiseComparisons = preventBitWiseComparisons;
269
270    return this;
271    }
272
273  public boolean getPreventBitWiseComparisons()
274    {
275    return preventBitWiseComparisons;
276    }
277
278  @Override
279  protected void addPropertiesTo( Properties properties )
280    {
281    for( Map.Entry<Integer, String> entry : serializationTokens.entrySet() )
282      addSerializationToken( properties, entry.getKey(), entry.getValue() );
283
284    for( String hadoopSerialization : hadoopSerializations )
285      addSerialization( properties, hadoopSerialization );
286
287    if( ignoreTypes != null )
288      properties.put( IGNORE_TYPES, ignoreTypes.toString() );
289
290    if( requireTypes != null )
291      properties.put( REQUIRE_TYPES, requireTypes.toString() );
292
293    if( preventBitWiseComparisons != null )
294      properties.put( SERIALIZATION_COMPARISON_BITWISE_PREVENT, preventBitWiseComparisons.toString() );
295    }
296  }