001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.collect; 022 023import java.util.Collection; 024import java.util.HashMap; 025 026import cascading.flow.FlowProcess; 027import cascading.tuple.Tuple; 028 029import static cascading.tuple.collect.SpillableProps.defaultMapInitialCapacity; 030import static cascading.tuple.collect.SpillableProps.defaultMapLoadFactor; 031 032/** 033 * SpillableTupleMap is a HashMap that will allow for multiple values per key, and if the number of values for a given 034 * key reach a specific threshold, they will be spilled to disk using a {@link SpillableTupleList} instance. Only 035 * values are spilled, keys are not spilled and too many keys can result in a {@link OutOfMemoryError}. 036 * <p/> 037 * The {@link cascading.tuple.collect.SpillableProps#MAP_THRESHOLD} value sets the number of total values that this map will 038 * strive to keep in memory regardless of the number of keys. This is achieved by dynamically calculating the threshold 039 * used by each child SpillableTupleList instance using 040 * {@code threshold = Min( list_threshold, map_threshold / current_num_keys ) }. 041 * <p/> 042 * To set the list threshold, see {@link cascading.tuple.collect.SpillableProps} fluent helper class. 043 * <p/> 044 * This class is used by the {@link cascading.pipe.HashJoin} pipe, to set properties specific to a given 045 * join instance, see the {@link cascading.pipe.HashJoin#getConfigDef()} method. 046 * 047 * @see cascading.tuple.hadoop.collect.HadoopSpillableTupleMap 048 */ 049public abstract class SpillableTupleMap extends HashMap<Tuple, Collection<Tuple>> implements Spillable 050 { 051 private int mapThreshold; 052 private int initListThreshold; 053 private Spillable.SpillListener spillListener = Spillable.SpillListener.NULL; 054 055 public static int getMapThreshold( FlowProcess flowProcess, int defaultValue ) 056 { 057 String value = (String) flowProcess.getProperty( SpillableProps.MAP_THRESHOLD ); 058 059 if( value == null || value.length() == 0 ) 060 return defaultValue; 061 062 return Integer.parseInt( value ); 063 } 064 065 public static int getMapCapacity( FlowProcess flowProcess, int defaultValue ) 066 { 067 String value = (String) flowProcess.getProperty( SpillableProps.MAP_CAPACITY ); 068 069 if( value == null || value.length() == 0 ) 070 return defaultValue; 071 072 return Integer.parseInt( value ); 073 } 074 075 public static float getMapLoadFactor( FlowProcess flowProcess, float defaultValue ) 076 { 077 String value = (String) flowProcess.getProperty( SpillableProps.MAP_LOADFACTOR ); 078 079 if( value == null || value.length() == 0 ) 080 return defaultValue; 081 082 return Float.parseFloat( value ); 083 } 084 085 public SpillableTupleMap( int mapThreshold, int initListThreshold ) 086 { 087 super( defaultMapInitialCapacity, defaultMapLoadFactor ); 088 this.mapThreshold = mapThreshold; 089 this.initListThreshold = initListThreshold; 090 } 091 092 public SpillableTupleMap( int initialCapacity, float loadFactor, int mapThreshold, int initListThreshold ) 093 { 094 super( initialCapacity, loadFactor ); 095 this.mapThreshold = mapThreshold; 096 this.initListThreshold = initListThreshold; 097 } 098 099 protected int getMapThreshold() 100 { 101 return mapThreshold; 102 } 103 104 public int getInitListThreshold() 105 { 106 return initListThreshold; 107 } 108 109 @Override 110 public Collection<Tuple> get( Object object ) 111 { 112 Collection<Tuple> value = super.get( object ); 113 114 if( value == null ) 115 { 116 value = createTupleCollection( (Tuple) object ); 117 118 super.put( (Tuple) object, value ); 119 } 120 121 return value; 122 } 123 124 protected abstract Collection<Tuple> createTupleCollection( Tuple object ); 125 126 @Override 127 public void setGrouping( Tuple group ) 128 { 129 } 130 131 @Override 132 public Tuple getGrouping() 133 { 134 return null; 135 } 136 137 @Override 138 public void setSpillStrategy( SpillStrategy spillStrategy ) 139 { 140 } 141 142 @Override 143 public int spillCount() 144 { 145 return 0; 146 } 147 148 public Spillable.SpillListener getSpillListener() 149 { 150 return spillListener; 151 } 152 153 public void setSpillListener( Spillable.SpillListener spillListener ) 154 { 155 this.spillListener = spillListener; 156 } 157 }