001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.collect;
022
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Properties;
026
027import cascading.property.Props;
028import cascading.util.Util;
029
030/**
031 * Class SpillableProps is a fluent interface for building properties to be passed to a
032 * {@link cascading.flow.FlowConnector} before creating new {@link cascading.flow.Flow} instances.
033 *
034 * @see SpillableTupleList
035 * @see SpillableTupleMap
036 */
037public class SpillableProps extends Props
038  {
039  /**
040   * Whether to enable compression of the spills or not, on by default.
041   *
042   * @see Boolean#parseBoolean(String)
043   */
044  public static final String SPILL_COMPRESS = "cascading.spill.compress";
045
046  /** A comma delimited list of possible codecs to try. This is platform dependent. */
047  public static final String SPILL_CODECS = "cascading.spill.codecs";
048
049  /** Number of tuples to hold in memory before spilling them to disk. */
050  public static final String LIST_THRESHOLD = "cascading.spill.list.threshold";
051
052  /** The total number of tuple values (not keys) to attempt to keep in memory. */
053  public static final String MAP_THRESHOLD = "cascading.spill.map.threshold";
054
055  /**
056   * The initial hash map capacity.
057   *
058   * @see java.util.HashMap
059   */
060  public static final String MAP_CAPACITY = "cascading.spill.map.capacity";
061
062  /**
063   * The initial hash map load factor.
064   *
065   * @see java.util.HashMap
066   */
067  public static final String MAP_LOADFACTOR = "cascading.spill.map.loadfactor";
068
069  public static final int defaultListThreshold = 10 * 1000;
070
071  public static final int defaultMapThreshold = 10 * 1000;
072  public static final int defaultMapInitialCapacity = 100 * 1000;
073  public static final float defaultMapLoadFactor = 0.75f;
074
075  boolean compressSpill = true;
076  List<String> codecs = new ArrayList<String>();
077
078  int listSpillThreshold = defaultListThreshold;
079
080  int mapSpillThreshold = defaultMapThreshold;
081  int mapInitialCapacity = defaultMapInitialCapacity;
082  float mapLoadFactor = defaultMapLoadFactor;
083
084  /**
085   * Creates a new SpillableProps instance.
086   *
087   * @return SpillableProps instance
088   */
089  public static SpillableProps spillableProps()
090    {
091    return new SpillableProps();
092    }
093
094  public SpillableProps()
095    {
096    }
097
098  public boolean isCompressSpill()
099    {
100    return compressSpill;
101    }
102
103  /**
104   * Method setCompressSpill either enables or disables spill compression. Enabled by default.
105   * <p/>
106   * Spill compression relies on properly configured and available codecs. See {@link #setCodecs(java.util.List)}.
107   *
108   * @param compressSpill type boolean
109   * @return this
110   */
111  public SpillableProps setCompressSpill( boolean compressSpill )
112    {
113    this.compressSpill = compressSpill;
114
115    return this;
116    }
117
118  public List<String> getCodecs()
119    {
120    return codecs;
121    }
122
123  /**
124   * Method setCodecs sets list of possible codec class names to use. They will be loaded in order, if available.
125   * <p/>
126   * This is platform dependent.
127   *
128   * @param codecs type list
129   * @return this
130   */
131  public SpillableProps setCodecs( List<String> codecs )
132    {
133    this.codecs = codecs;
134
135    return this;
136    }
137
138  /**
139   * Method addCodecs adds a list of possible codec class names to use. They will be loaded in order, if available.
140   * <p/>
141   * This is platform dependent.
142   *
143   * @param codecs type list
144   */
145  public SpillableProps addCodecs( List<String> codecs )
146    {
147    this.codecs.addAll( codecs );
148
149    return this;
150    }
151
152  /**
153   * Method addCodec adds a codec class names to use.
154   * <p/>
155   * This is platform dependent.
156   *
157   * @param codec type String
158   */
159  public SpillableProps addCodec( String codec )
160    {
161    this.codecs.add( codec );
162
163    return this;
164    }
165
166  public int getListSpillThreshold()
167    {
168    return listSpillThreshold;
169    }
170
171  /**
172   * Method setListSpillThreshold sets the number of tuples to hold in memory before spilling them to disk.
173   *
174   * @param listSpillThreshold of type int
175   * @return this
176   */
177  public SpillableProps setListSpillThreshold( int listSpillThreshold )
178    {
179    this.listSpillThreshold = listSpillThreshold;
180
181    return this;
182    }
183
184  public int getMapSpillThreshold()
185    {
186    return mapSpillThreshold;
187    }
188
189  /**
190   * Method setMapSpillThreshold the total number of tuple values (not keys) to attempt to keep in memory.
191   * <p/>
192   * The default implementation cannot spill Map keys to disk.
193   *
194   * @param mapSpillThreshold of type int
195   * @return this
196   */
197  public SpillableProps setMapSpillThreshold( int mapSpillThreshold )
198    {
199    this.mapSpillThreshold = mapSpillThreshold;
200
201    return this;
202    }
203
204  public int getMapInitialCapacity()
205    {
206    return mapInitialCapacity;
207    }
208
209  /**
210   * Method setMapInitialCapacity sets the default capacity to be used by the backing Map implementation.
211   *
212   * @param mapInitialCapacity type int
213   * @return this
214   */
215  public SpillableProps setMapInitialCapacity( int mapInitialCapacity )
216    {
217    this.mapInitialCapacity = mapInitialCapacity;
218
219    return this;
220    }
221
222  public float getMapLoadFactor()
223    {
224    return mapLoadFactor;
225    }
226
227  /**
228   * Method setMapLoadFactor sets the default load factor to be used by the backing Map implementation.
229   *
230   * @param mapLoadFactor type float
231   * @return this
232   */
233  public SpillableProps setMapLoadFactor( float mapLoadFactor )
234    {
235    this.mapLoadFactor = mapLoadFactor;
236
237    return this;
238    }
239
240  @Override
241  protected void addPropertiesTo( Properties properties )
242    {
243    for( String codec : codecs )
244      {
245      String codecs = (String) properties.get( SPILL_CODECS );
246
247      properties.put( SPILL_CODECS, Util.join( ",", Util.removeNulls( codecs, codec ) ) );
248      }
249
250    properties.setProperty( SPILL_COMPRESS, Boolean.toString( compressSpill ) );
251    properties.setProperty( LIST_THRESHOLD, Integer.toString( listSpillThreshold ) );
252
253    properties.setProperty( MAP_THRESHOLD, Integer.toString( mapSpillThreshold ) );
254    properties.setProperty( MAP_CAPACITY, Integer.toString( mapInitialCapacity ) );
255    properties.setProperty( MAP_LOADFACTOR, Float.toString( mapLoadFactor ) );
256    }
257  }