Storm常見模式——TimeCacheMap
Storm中使用一種叫做TimeCacheMap的數據結構,用于在內存中保存近期活躍的對象,它的實現非常地高效,而且可以自動刪除過期不再活躍的對象。
TimeCacheMap使用多個桶buckets來縮小鎖的粒度,以此換取高并發讀寫性能。下面我們來看看TimeCacheMap內部是如何實現的。
1. 實現原理
桶鏈表:鏈表中每個元素是一個HashMap,用于保存key,value格式的數據。
private LinkedList<HashMap<K, V>> _buckets;
鎖對象:用于對TimeCacheMap進行get/put等操作時上鎖保證原子性。
private final Object _lock = new Object();
后臺清理線程:負責超時后清理數據。
private Thread _cleaner;
超時回調接口:用于超時后進行函數回調,做一些其他處理。
public static interface ExpiredCallback<K, V> { public void expire(K key, V val); } private ExpiredCallback _callback;
有了以上數據結構,下面來看看構造函數的具體實現:
1、 首先,初始化指定個數的bucket,以鏈式鏈表形式存儲,每個bucket中放入空的HashMap;
2、 然后,設置清理線程,處理流程為:
a) 休眠expirationMillis / (numBuckets-1)毫秒時間(即:expirationSecs / (numBuckets-1)秒);
b) 對_lock對象上鎖,然后從buckets鏈表中移除最后一個元素;
c) 向buckets鏈表頭部新加入一個空的HashMap桶,解除_lock對象鎖;
d) 如果設置了callback函數,則進行回調。
public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) { if(numBuckets<2) { throw new IllegalArgumentException("numBuckets must be >= 2"); } _buckets = new LinkedList<HashMap<K, V>>(); for(int i=0; i<numBuckets; i++) { _buckets.add(new HashMap<K, V>()); } _callback = callback; final long expirationMillis = expirationSecs * 1000L; final long sleepTime = expirationMillis / (numBuckets-1); _cleaner = new Thread(new Runnable() { public void run() { try { while(true) { Map<K, V> dead = null; Time.sleep(sleepTime); synchronized(_lock) { dead = _buckets.removeLast(); _buckets.addFirst(new HashMap<K, V>()); } if(_callback!=null) { for(Entry<K, V> entry: dead.entrySet()) { _callback.expire(entry.getKey(), entry.getValue()); } } } } catch (InterruptedException ex) { } } }); _cleaner.setDaemon(true); _cleaner.start(); }
構造函數需要傳遞三個參數:expirationSecs:超時的時間,單位為秒;numBuckets:桶的個數;callback:超時回調函數。
為了方便使用,還提供了以下三種形式的構造函數,使用時可以根據需要選擇:
//this default ensures things expire at most 50% past the expiration time private static final int DEFAULT_NUM_BUCKETS = 3; public TimeCacheMap(int expirationSecs, ExpiredCallback<K, V> callback) { this(expirationSecs, DEFAULT_NUM_BUCKETS, callback); } public TimeCacheMap(int expirationSecs, int numBuckets) { this(expirationSecs, numBuckets, null); } public TimeCacheMap(int expirationSecs) { this(expirationSecs, DEFAULT_NUM_BUCKETS); }
2. 性能分析
get操作:遍歷各個bucket,如果存在指定的key則返回,時間復雜度為O(numBuckets)
public V get(K key) { synchronized(_lock) { for(HashMap<K, V> bucket: _buckets) { if(bucket.containsKey(key)) { return bucket.get(key); } } return null; } }
put操作:將key,value放到_buckets的第一個桶中,然后遍歷其他numBuckets-1個桶,從HashMap中移除其中鍵為key的記錄,時間復雜度為O(numBuckets)
public void put(K key, V value) { synchronized(_lock) { Iterator<HashMap<K, V>> it = _buckets.iterator(); HashMap<K, V> bucket = it.next(); bucket.put(key, value); while(it.hasNext()) { bucket = it.next(); bucket.remove(key); } } }
remove操作:遍歷各個bucket,如果存在以key為鍵的記錄,直接刪除,時間復雜度為O(numBuckets)
public Object remove(K key) { synchronized(_lock) { for(HashMap<K, V> bucket: _buckets) { if(bucket.containsKey(key)) { return bucket.remove(key); } } return null; } }
containsKey操作:遍歷各個bucket,如果存在指定的key則返回true,否則返回false,時間復雜度為O(numBuckets)
public boolean containsKey(K key) { synchronized(_lock) { for(HashMap<K, V> bucket: _buckets) { if(bucket.containsKey(key)) { return true; } } return false; } }
size操作:遍歷各個bucket,累加各個bucket的HashMap的大小,時間復雜度為O (numBuckets)
public int size() { synchronized(_lock) { int size = 0; for(HashMap<K, V> bucket: _buckets) { size+=bucket.size(); } return size; } }
3. 超時時間
經過上面對put操作和_cleaner線程的分析,我們已經知道:
a) put操作將數據放到_buckets的第一個桶中,然后遍歷其他numBuckets-1個桶,從HashMap中移除其中鍵為key的記錄;
b) _cleaner線程每隔expirationSecs / (numBuckets-1)秒會把_buckets中最后一個桶中的數據從TimeCacheMap中移除掉。
因此,假設_cleaner線程剛剛清理數據,put函數調用發生將key放入桶中,那么一條數據的超時時間為:
expirationSecs / (numBuckets-1) * numBuckets = expirationSecs * (1 + 1 / (numBuckets-1))
然而,假設put函數調用剛剛執行結束,_cleaner線程就開始清理數據,那么一條數據的超時時間為:
expirationSecs / (numBuckets-1) * numBuckets - expirationSecs / (numBuckets-1) = expirationSecs
4. 總結
1、 TimeCacheMap的高效之處在于鎖的粒度小,O(1)時間內完成鎖操作,因此,大部分時間內都可以進行get和put操作。
2、 get,put,remove,containsKey和size操作都可以在O(numBuckets)時間內完成,其中numBuckets是桶的個數,默認為3。
3、 未更新數據的超時時間在expirationSecs和expirationSecs * (1 + 1 / (numBuckets-1))之間。
浙公網安備 33010602011771號