001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.collect;
022
023import java.util.Collection;
024import java.util.HashMap;
025
026import cascading.flow.FlowProcess;
027import cascading.tuple.Tuple;
028
029import static cascading.tuple.collect.SpillableProps.defaultMapInitialCapacity;
030import static cascading.tuple.collect.SpillableProps.defaultMapLoadFactor;
031
032/**
033 * SpillableTupleMap is a HashMap that will allow for multiple values per key, and if the number of values for a given
034 * key reach a specific threshold, they will be spilled to disk using a {@link SpillableTupleList} instance. Only
035 * values are spilled, keys are not spilled and too many keys can result in a {@link OutOfMemoryError}.
036 * <p/>
037 * The {@link cascading.tuple.collect.SpillableProps#MAP_THRESHOLD} value sets the number of total values that this map will
038 * strive to keep in memory regardless of the number of keys. This is achieved by dynamically calculating the threshold
039 * used by each child SpillableTupleList instance using
040 * {@code threshold = Min( list_threshold, map_threshold / current_num_keys ) }.
041 * <p/>
042 * To set the list threshold, see {@link cascading.tuple.collect.SpillableProps} fluent helper class.
043 * <p/>
044 * This class is used by the {@link cascading.pipe.HashJoin} pipe, to set properties specific to a given
045 * join instance, see the {@link cascading.pipe.HashJoin#getConfigDef()} method.
046 *
047 * @see cascading.tuple.hadoop.collect.HadoopSpillableTupleMap
048 */
049public abstract class SpillableTupleMap extends HashMap<Tuple, Collection<Tuple>> implements Spillable
050  {
051  private int mapThreshold;
052  private int initListThreshold;
053  private Spillable.SpillListener spillListener = Spillable.SpillListener.NULL;
054
055  public static int getMapThreshold( FlowProcess flowProcess, int defaultValue )
056    {
057    String value = (String) flowProcess.getProperty( SpillableProps.MAP_THRESHOLD );
058
059    if( value == null || value.length() == 0 )
060      return defaultValue;
061
062    return Integer.parseInt( value );
063    }
064
065  public static int getMapCapacity( FlowProcess flowProcess, int defaultValue )
066    {
067    String value = (String) flowProcess.getProperty( SpillableProps.MAP_CAPACITY );
068
069    if( value == null || value.length() == 0 )
070      return defaultValue;
071
072    return Integer.parseInt( value );
073    }
074
075  public static float getMapLoadFactor( FlowProcess flowProcess, float defaultValue )
076    {
077    String value = (String) flowProcess.getProperty( SpillableProps.MAP_LOADFACTOR );
078
079    if( value == null || value.length() == 0 )
080      return defaultValue;
081
082    return Float.parseFloat( value );
083    }
084
085  public SpillableTupleMap( int mapThreshold, int initListThreshold )
086    {
087    super( defaultMapInitialCapacity, defaultMapLoadFactor );
088    this.mapThreshold = mapThreshold;
089    this.initListThreshold = initListThreshold;
090    }
091
092  public SpillableTupleMap( int initialCapacity, float loadFactor, int mapThreshold, int initListThreshold )
093    {
094    super( initialCapacity, loadFactor );
095    this.mapThreshold = mapThreshold;
096    this.initListThreshold = initListThreshold;
097    }
098
099  protected int getMapThreshold()
100    {
101    return mapThreshold;
102    }
103
104  public int getInitListThreshold()
105    {
106    return initListThreshold;
107    }
108
109  @Override
110  public Collection<Tuple> get( Object object )
111    {
112    Collection<Tuple> value = super.get( object );
113
114    if( value == null )
115      {
116      value = createTupleCollection( (Tuple) object );
117
118      super.put( (Tuple) object, value );
119      }
120
121    return value;
122    }
123
124  protected abstract Collection<Tuple> createTupleCollection( Tuple object );
125
126  @Override
127  public void setGrouping( Tuple group )
128    {
129    }
130
131  @Override
132  public Tuple getGrouping()
133    {
134    return null;
135    }
136
137  @Override
138  public void setSpillStrategy( SpillStrategy spillStrategy )
139    {
140    }
141
142  @Override
143  public int spillCount()
144    {
145    return 0;
146    }
147
148  public Spillable.SpillListener getSpillListener()
149    {
150    return spillListener;
151    }
152
153  public void setSpillListener( Spillable.SpillListener spillListener )
154    {
155    this.spillListener = spillListener;
156    }
157  }