001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.collect;
022    
023    import java.util.Collection;
024    import java.util.HashMap;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.tuple.Tuple;
028    
029    import static cascading.tuple.collect.SpillableProps.defaultMapInitialCapacity;
030    import static cascading.tuple.collect.SpillableProps.defaultMapLoadFactor;
031    
032    /**
033     * SpillableTupleMap is a HashMap that will allow for multiple values per key, and if the number of values for a given
034     * key reach a specific threshold, they will be spilled to disk using a {@link SpillableTupleList} instance. Only
035     * values are spilled, keys are not spilled and too many keys can result in a {@link OutOfMemoryError}.
036     * <p/>
037     * The {@link cascading.tuple.collect.SpillableProps#MAP_THRESHOLD} value sets the number of total values that this map will
038     * strive to keep in memory regardless of the number of keys. This is achieved by dynamically calculating the threshold
039     * used by each child SpillableTupleList instance using
040     * {@code threshold = Min( list_threshold, map_threshold / current_num_keys ) }.
041     * <p/>
042     * To set the list threshold, see {@link cascading.tuple.collect.SpillableProps} fluent helper class.
043     * <p/>
044     * This class is used by the {@link cascading.pipe.HashJoin} pipe, to set properties specific to a given
045     * join instance, see the {@link cascading.pipe.HashJoin#getConfigDef()} method.
046     *
047     * @see cascading.tuple.hadoop.collect.HadoopSpillableTupleMap
048     */
049    public abstract class SpillableTupleMap extends HashMap<Tuple, Collection<Tuple>> implements Spillable
050      {
051      /** The total number of tuple values (not keys) to attempt to keep in memory. */
052      @Deprecated
053      public static final String MAP_THRESHOLD = SpillableProps.MAP_THRESHOLD;
054    
055      /**
056       * The initial hash map capacity.
057       *
058       * @see java.util.HashMap
059       */
060      @Deprecated
061      public static final String MAP_CAPACITY = SpillableProps.MAP_CAPACITY;
062    
063      /**
064       * The initial hash map load factor.
065       *
066       * @see java.util.HashMap
067       */
068      @Deprecated
069      public static final String MAP_LOADFACTOR = SpillableProps.MAP_LOADFACTOR;
070    
071      private int mapThreshold;
072      private int initListThreshold;
073      private Spillable.SpillListener spillListener = Spillable.SpillListener.NULL;
074    
075      public static int getMapThreshold( FlowProcess flowProcess, int defaultValue )
076        {
077        String value = (String) flowProcess.getProperty( SpillableProps.MAP_THRESHOLD );
078    
079        if( value == null || value.length() == 0 )
080          return defaultValue;
081    
082        return Integer.parseInt( value );
083        }
084    
085      public static int getMapCapacity( FlowProcess flowProcess, int defaultValue )
086        {
087        String value = (String) flowProcess.getProperty( SpillableProps.MAP_CAPACITY );
088    
089        if( value == null || value.length() == 0 )
090          return defaultValue;
091    
092        return Integer.parseInt( value );
093        }
094    
095      public static float getMapLoadFactor( FlowProcess flowProcess, float defaultValue )
096        {
097        String value = (String) flowProcess.getProperty( SpillableProps.MAP_LOADFACTOR );
098    
099        if( value == null || value.length() == 0 )
100          return defaultValue;
101    
102        return Float.parseFloat( value );
103        }
104    
105      public SpillableTupleMap( int mapThreshold, int initListThreshold )
106        {
107        super( defaultMapInitialCapacity, defaultMapLoadFactor );
108        this.mapThreshold = mapThreshold;
109        this.initListThreshold = initListThreshold;
110        }
111    
112      public SpillableTupleMap( int initialCapacity, float loadFactor, int mapThreshold, int initListThreshold )
113        {
114        super( initialCapacity, loadFactor );
115        this.mapThreshold = mapThreshold;
116        this.initListThreshold = initListThreshold;
117        }
118    
119      protected int getMapThreshold()
120        {
121        return mapThreshold;
122        }
123    
124      public int getInitListThreshold()
125        {
126        return initListThreshold;
127        }
128    
129      @Override
130      public Collection<Tuple> get( Object object )
131        {
132        Collection<Tuple> value = super.get( object );
133    
134        if( value == null )
135          {
136          value = createTupleCollection( (Tuple) object );
137    
138          super.put( (Tuple) object, value );
139          }
140    
141        return value;
142        }
143    
144      protected abstract Collection<Tuple> createTupleCollection( Tuple object );
145    
146      @Override
147      public void setGrouping( Tuple group )
148        {
149        }
150    
151      @Override
152      public Tuple getGrouping()
153        {
154        return null;
155        }
156    
157      @Override
158      public void setSpillStrategy( SpillStrategy spillStrategy )
159        {
160        }
161    
162      @Override
163      public int spillCount()
164        {
165        return 0;
166        }
167    
168      public Spillable.SpillListener getSpillListener()
169        {
170        return spillListener;
171        }
172    
173      public void setSpillListener( Spillable.SpillListener spillListener )
174        {
175        this.spillListener = spillListener;
176        }
177      }