001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.collect;
022    
023    import java.util.ArrayList;
024    import java.util.List;
025    import java.util.Properties;
026    
027    import cascading.property.Props;
028    import cascading.util.Util;
029    
030    /**
031     * Class SpillableProps is a fluent interface for building properties to be passed to a
032     * {@link cascading.flow.FlowConnector} before creating new {@link cascading.flow.Flow} instances.
033     *
034     * @see SpillableTupleList
035     * @see SpillableTupleMap
036     */
037    public class SpillableProps extends Props
038      {
039      /**
040       * Whether to enable compression of the spills or not, on by default.
041       *
042       * @see Boolean#parseBoolean(String)
043       */
044      public static final String SPILL_COMPRESS = "cascading.spill.compress";
045    
046      /** A comma delimited list of possible codecs to try. This is platform dependent. */
047      public static final String SPILL_CODECS = "cascading.spill.codecs";
048    
049      /** Number of tuples to hold in memory before spilling them to disk. */
050      public static final String LIST_THRESHOLD = "cascading.spill.list.threshold";
051    
052      /** The total number of tuple values (not keys) to attempt to keep in memory. */
053      public static final String MAP_THRESHOLD = "cascading.spill.map.threshold";
054    
055      /**
056       * The initial hash map capacity.
057       *
058       * @see java.util.HashMap
059       */
060      public static final String MAP_CAPACITY = "cascading.spill.map.capacity";
061    
062      /**
063       * The initial hash map load factor.
064       *
065       * @see java.util.HashMap
066       */
067      public static final String MAP_LOADFACTOR = "cascading.spill.map.loadfactor";
068    
069      public static final int defaultListThreshold = 10 * 1000;
070    
071      public static final int defaultMapThreshold = 10 * 1000;
072      public static final int defaultMapInitialCapacity = 100 * 1000;
073      public static final float defaultMapLoadFactor = 0.75f;
074    
075      boolean compressSpill = true;
076      List<String> codecs = new ArrayList<String>();
077    
078      int listSpillThreshold = defaultListThreshold;
079    
080      int mapSpillThreshold = defaultMapThreshold;
081      int mapInitialCapacity = defaultMapInitialCapacity;
082      float mapLoadFactor = defaultMapLoadFactor;
083    
084      /**
085       * Creates a new SpillableProps instance.
086       *
087       * @return SpillableProps instance
088       */
089      public static SpillableProps spillableProps()
090        {
091        return new SpillableProps();
092        }
093    
094      public SpillableProps()
095        {
096        }
097    
098      public boolean isCompressSpill()
099        {
100        return compressSpill;
101        }
102    
103      /**
104       * Method setCompressSpill either enables or disables spill compression. Enabled by default.
105       * <p/>
106       * Spill compression relies on properly configured and available codecs. See {@link #setCodecs(java.util.List)}.
107       *
108       * @param compressSpill type boolean
109       * @return this
110       */
111      public SpillableProps setCompressSpill( boolean compressSpill )
112        {
113        this.compressSpill = compressSpill;
114    
115        return this;
116        }
117    
118      public List<String> getCodecs()
119        {
120        return codecs;
121        }
122    
123      /**
124       * Method setCodecs sets list of possible codec class names to use. They will be loaded in order, if available.
125       * <p/>
126       * This is platform dependent.
127       *
128       * @param codecs type list
129       * @return this
130       */
131      public SpillableProps setCodecs( List<String> codecs )
132        {
133        this.codecs = codecs;
134    
135        return this;
136        }
137    
138      /**
139       * Method addCodecs adds a list of possible codec class names to use. They will be loaded in order, if available.
140       * <p/>
141       * This is platform dependent.
142       *
143       * @param codecs type list
144       */
145      public SpillableProps addCodecs( List<String> codecs )
146        {
147        this.codecs.addAll( codecs );
148    
149        return this;
150        }
151    
152      /**
153       * Method addCodec adds a codec class names to use.
154       * <p/>
155       * This is platform dependent.
156       *
157       * @param codec type String
158       */
159      public SpillableProps addCodec( String codec )
160        {
161        this.codecs.add( codec );
162    
163        return this;
164        }
165    
166      public int getListSpillThreshold()
167        {
168        return listSpillThreshold;
169        }
170    
171      /**
172       * Method setListSpillThreshold sets the number of tuples to hold in memory before spilling them to disk.
173       *
174       * @param listSpillThreshold of type int
175       * @return this
176       */
177      public SpillableProps setListSpillThreshold( int listSpillThreshold )
178        {
179        this.listSpillThreshold = listSpillThreshold;
180    
181        return this;
182        }
183    
184      public int getMapSpillThreshold()
185        {
186        return mapSpillThreshold;
187        }
188    
189      /**
190       * Method setMapSpillThreshold the total number of tuple values (not keys) to attempt to keep in memory.
191       * <p/>
192       * The default implementation cannot spill Map keys to disk.
193       *
194       * @param mapSpillThreshold of type int
195       * @return this
196       */
197      public SpillableProps setMapSpillThreshold( int mapSpillThreshold )
198        {
199        this.mapSpillThreshold = mapSpillThreshold;
200    
201        return this;
202        }
203    
204      public int getMapInitialCapacity()
205        {
206        return mapInitialCapacity;
207        }
208    
209      /**
210       * Method setMapInitialCapacity sets the default capacity to be used by the backing Map implementation.
211       *
212       * @param mapInitialCapacity type int
213       * @return this
214       */
215      public SpillableProps setMapInitialCapacity( int mapInitialCapacity )
216        {
217        this.mapInitialCapacity = mapInitialCapacity;
218    
219        return this;
220        }
221    
222      public float getMapLoadFactor()
223        {
224        return mapLoadFactor;
225        }
226    
227      /**
228       * Method setMapLoadFactor sets the default load factor to be used by the backing Map implementation.
229       *
230       * @param mapLoadFactor type float
231       * @return this
232       */
233      public SpillableProps setMapLoadFactor( float mapLoadFactor )
234        {
235        this.mapLoadFactor = mapLoadFactor;
236    
237        return this;
238        }
239    
240      @Override
241      protected void addPropertiesTo( Properties properties )
242        {
243        for( String codec : codecs )
244          {
245          String codecs = (String) properties.get( SPILL_CODECS );
246    
247          properties.put( SPILL_CODECS, Util.join( ",", Util.removeNulls( codecs, codec ) ) );
248          }
249    
250        properties.setProperty( SPILL_COMPRESS, Boolean.toString( compressSpill ) );
251        properties.setProperty( LIST_THRESHOLD, Integer.toString( listSpillThreshold ) );
252    
253        properties.setProperty( MAP_THRESHOLD, Integer.toString( mapSpillThreshold ) );
254        properties.setProperty( MAP_CAPACITY, Integer.toString( mapInitialCapacity ) );
255        properties.setProperty( MAP_LOADFACTOR, Float.toString( mapLoadFactor ) );
256        }
257      }