001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.util.cache; 022 023import java.util.Collection; 024import java.util.LinkedHashMap; 025import java.util.Map; 026import java.util.Set; 027 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * Implementation of the {@link CascadingCache} interface backed by a {@link java.util.LinkedHashMap} configured to 033 * evict the least recently used key. 034 * <p/> 035 * That is, if duplicate keys are clustered near each other in the incoming tuple stream, this cache will provide the 036 * most benefit as keys that begin to occur less frequently or not at all will be evicted as the key capacity is reached. 037 * <p/> 038 * If the keys are very random, if not uniformly distributed in the stream, consider using the 039 * {@link cascading.util.cache.DirectMappedCache} to reduce the amount of hash and equality comparisons. 040 * <p/> 041 * This implementation is used by default by {@link cascading.pipe.assembly.Unique} and 042 * {@link cascading.pipe.assembly.AggregateBy} and their subclasses. 043 * 044 * @see cascading.pipe.assembly.Unique 045 * @see cascading.pipe.assembly.AggregateBy 046 * @see LRUHashMapCacheFactory 047 * @see DirectMappedCacheFactory 048 * @see DirectMappedCache 049 */ 050public class LRUHashMapCache<Key, Value> implements CascadingCache<Key, Value> 051 { 052 /** logger */ 053 private static final Logger LOG = LoggerFactory.getLogger( LRUHashMapCache.class ); 054 055 /** capacity of the map. */ 056 private int capacity; 057 058 /** call-back used, when entries are removed from the cache. */ 059 private CacheEvictionCallback callback = CacheEvictionCallback.NULL; 060 061 /** counts flushes. */ 062 private long flushes = 0; 063 064 private LinkedHashMap<Key, Value> backingMap; 065 066 @Override 067 public int getCapacity() 068 { 069 return capacity; 070 } 071 072 @Override 073 public void setCacheEvictionCallback( CacheEvictionCallback cacheEvictionCallback ) 074 { 075 this.callback = cacheEvictionCallback; 076 } 077 078 @Override 079 public void setCapacity( int capacity ) 080 { 081 this.capacity = capacity; 082 } 083 084 @Override 085 public void initialize() 086 { 087 this.backingMap = new LinkedHashMap<Key, Value>( capacity, 0.75f, true ) 088 { 089 @Override 090 protected boolean removeEldestEntry( Map.Entry<Key, Value> eldest ) 091 { 092 boolean doRemove = size() > capacity; 093 094 if( doRemove ) 095 { 096 callback.evict( eldest ); 097 098 if( flushes % getCapacity() == 0 ) // every multiple, write out data 099 { 100 Runtime runtime = Runtime.getRuntime(); 101 long freeMem = runtime.freeMemory() / 1024 / 1024; 102 long maxMem = runtime.maxMemory() / 1024 / 1024; 103 long totalMem = runtime.totalMemory() / 1024 / 1024; 104 105 LOG.info( "flushed keys num times: {}, with capacity: {}", flushes + 1, capacity ); 106 LOG.info( "mem on flush (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem ); 107 108 float percent = (float) totalMem / (float) maxMem; 109 110 if( percent < 0.80F ) 111 LOG.info( "total mem is {}% of max mem, to better utilize unused memory consider increasing the cache size", (int) ( percent * 100.0F ) ); 112 } 113 flushes++; 114 } 115 116 return doRemove; 117 } 118 }; 119 } 120 121 @Override 122 public int size() 123 { 124 return backingMap.size(); 125 } 126 127 @Override 128 public boolean isEmpty() 129 { 130 return backingMap.isEmpty(); 131 } 132 133 @Override 134 public boolean containsKey( Object key ) 135 { 136 return backingMap.containsKey( key ); 137 } 138 139 @Override 140 public boolean containsValue( Object value ) 141 { 142 return backingMap.containsValue( value ); 143 } 144 145 @Override 146 public Value get( Object key ) 147 { 148 return backingMap.get( key ); 149 } 150 151 @Override 152 public Value put( Key key, Value value ) 153 { 154 return backingMap.put( key, value ); 155 } 156 157 @Override 158 public Value remove( Object key ) 159 { 160 return backingMap.remove( key ); 161 } 162 163 @Override 164 public void putAll( Map<? extends Key, ? extends Value> m ) 165 { 166 backingMap.putAll( m ); 167 } 168 169 @Override 170 public void clear() 171 { 172 backingMap.clear(); 173 } 174 175 @Override 176 public Set<Key> keySet() 177 { 178 return backingMap.keySet(); 179 } 180 181 @Override 182 public Collection<Value> values() 183 { 184 return backingMap.values(); 185 } 186 187 @Override 188 public Set<Entry<Key, Value>> entrySet() 189 { 190 return backingMap.entrySet(); 191 } 192 }