001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tuple.collect; 022 023 import java.util.Collection; 024 import java.util.HashMap; 025 026 import cascading.flow.FlowProcess; 027 import cascading.tuple.Tuple; 028 029 import static cascading.tuple.collect.SpillableProps.defaultMapInitialCapacity; 030 import static cascading.tuple.collect.SpillableProps.defaultMapLoadFactor; 031 032 /** 033 * SpillableTupleMap is a HashMap that will allow for multiple values per key, and if the number of values for a given 034 * key reach a specific threshold, they will be spilled to disk using a {@link SpillableTupleList} instance. Only 035 * values are spilled, keys are not spilled and too many keys can result in a {@link OutOfMemoryError}. 036 * <p/> 037 * The {@link cascading.tuple.collect.SpillableProps#MAP_THRESHOLD} value sets the number of total values that this map will 038 * strive to keep in memory regardless of the number of keys. This is achieved by dynamically calculating the threshold 039 * used by each child SpillableTupleList instance using 040 * {@code threshold = Min( list_threshold, map_threshold / current_num_keys ) }. 041 * <p/> 042 * To set the list threshold, see {@link cascading.tuple.collect.SpillableProps} fluent helper class. 043 * <p/> 044 * This class is used by the {@link cascading.pipe.HashJoin} pipe, to set properties specific to a given 045 * join instance, see the {@link cascading.pipe.HashJoin#getConfigDef()} method. 046 * 047 * @see cascading.tuple.hadoop.collect.HadoopSpillableTupleMap 048 */ 049 public abstract class SpillableTupleMap extends HashMap<Tuple, Collection<Tuple>> implements Spillable 050 { 051 /** The total number of tuple values (not keys) to attempt to keep in memory. */ 052 @Deprecated 053 public static final String MAP_THRESHOLD = SpillableProps.MAP_THRESHOLD; 054 055 /** 056 * The initial hash map capacity. 057 * 058 * @see java.util.HashMap 059 */ 060 @Deprecated 061 public static final String MAP_CAPACITY = SpillableProps.MAP_CAPACITY; 062 063 /** 064 * The initial hash map load factor. 065 * 066 * @see java.util.HashMap 067 */ 068 @Deprecated 069 public static final String MAP_LOADFACTOR = SpillableProps.MAP_LOADFACTOR; 070 071 private int mapThreshold; 072 private int initListThreshold; 073 private Spillable.SpillListener spillListener = Spillable.SpillListener.NULL; 074 075 public static int getMapThreshold( FlowProcess flowProcess, int defaultValue ) 076 { 077 String value = (String) flowProcess.getProperty( SpillableProps.MAP_THRESHOLD ); 078 079 if( value == null || value.length() == 0 ) 080 return defaultValue; 081 082 return Integer.parseInt( value ); 083 } 084 085 public static int getMapCapacity( FlowProcess flowProcess, int defaultValue ) 086 { 087 String value = (String) flowProcess.getProperty( SpillableProps.MAP_CAPACITY ); 088 089 if( value == null || value.length() == 0 ) 090 return defaultValue; 091 092 return Integer.parseInt( value ); 093 } 094 095 public static float getMapLoadFactor( FlowProcess flowProcess, float defaultValue ) 096 { 097 String value = (String) flowProcess.getProperty( SpillableProps.MAP_LOADFACTOR ); 098 099 if( value == null || value.length() == 0 ) 100 return defaultValue; 101 102 return Float.parseFloat( value ); 103 } 104 105 public SpillableTupleMap( int mapThreshold, int initListThreshold ) 106 { 107 super( defaultMapInitialCapacity, defaultMapLoadFactor ); 108 this.mapThreshold = mapThreshold; 109 this.initListThreshold = initListThreshold; 110 } 111 112 public SpillableTupleMap( int initialCapacity, float loadFactor, int mapThreshold, int initListThreshold ) 113 { 114 super( initialCapacity, loadFactor ); 115 this.mapThreshold = mapThreshold; 116 this.initListThreshold = initListThreshold; 117 } 118 119 protected int getMapThreshold() 120 { 121 return mapThreshold; 122 } 123 124 public int getInitListThreshold() 125 { 126 return initListThreshold; 127 } 128 129 @Override 130 public Collection<Tuple> get( Object object ) 131 { 132 Collection<Tuple> value = super.get( object ); 133 134 if( value == null ) 135 { 136 value = createTupleCollection( (Tuple) object ); 137 138 super.put( (Tuple) object, value ); 139 } 140 141 return value; 142 } 143 144 protected abstract Collection<Tuple> createTupleCollection( Tuple object ); 145 146 @Override 147 public void setGrouping( Tuple group ) 148 { 149 } 150 151 @Override 152 public Tuple getGrouping() 153 { 154 return null; 155 } 156 157 @Override 158 public void setSpillStrategy( SpillStrategy spillStrategy ) 159 { 160 } 161 162 @Override 163 public int spillCount() 164 { 165 return 0; 166 } 167 168 public Spillable.SpillListener getSpillListener() 169 { 170 return spillListener; 171 } 172 173 public void setSpillListener( Spillable.SpillListener spillListener ) 174 { 175 this.spillListener = spillListener; 176 } 177 }