/*** Eclipse Class Decompiler plugin, copyright (c) 2016 Chen Chao (cnfree2000@hotmail.com) ***/
package com.alibaba.jstorm.utils;
import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
public class TimeCacheMap<K, V> implements TimeOutMap<K, V> {
private static final int DEFAULT_NUM_BUCKETS = 3;
private LinkedList<HashMap<K, V>> _buckets;
private final Object _lock;
private Thread _cleaner;
private ExpiredCallback _callback;
public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
this._lock = new Object();
if (numBuckets < 2) {
throw new IllegalArgumentException("numBuckets must be >= 2");
}
this._buckets = new LinkedList();
for (int i = 0; i < numBuckets; ++i) {
this._buckets.add(new HashMap());
}
this._callback = callback;
long expirationMillis = expirationSecs * 1000L;
long sleepTime = expirationMillis / (numBuckets - 1);
this._cleaner = new Thread(new Runnable(sleepTime) {
public void run() {
while (!(AsyncLoopRunnable.getShutdown().get())) {
Map dead = null;
JStormUtils.sleepMs(this.val$sleepTime);
synchronized (TimeCacheMap.this._lock) {
dead = (Map) TimeCacheMap.this._buckets.removeLast();
TimeCacheMap.this._buckets.addFirst(new HashMap());
}
if (TimeCacheMap.this._callback != null)
for (Map.Entry entry : dead.entrySet())
TimeCacheMap.this._callback.expire(entry.getKey(), entry.getValue());
}
}
});
this._cleaner.setDaemon(true);
this._cleaner.start();
}
public TimeCacheMap(int expirationSecs, ExpiredCallback<K, V> callback) {
this(expirationSecs, 3, callback);
}
public TimeCacheMap(int expirationSecs) {
this(expirationSecs, 3);
}
public TimeCacheMap(int expirationSecs, int numBuckets) {
this(expirationSecs, numBuckets, null);
}
public boolean containsKey(K key) {
synchronized (this._lock) {
for (HashMap bucket : this._buckets) {
if (bucket.containsKey(key)) {
return true;
}
}
return false;
}
}
public V get(K key) {
synchronized (this._lock) {
for (HashMap bucket : this._buckets) {
if (bucket.containsKey(key)) {
return bucket.get(key);
}
}
return null;
}
}
public void putHead(K key, V value) {
synchronized (this._lock) {
((HashMap) this._buckets.getFirst()).put(key, value);
}
}
public void put(K key, V value) {
synchronized (this._lock) {
Iterator it = this._buckets.iterator();
HashMap bucket = (HashMap) it.next();
bucket.put(key, value);
while (it.hasNext()) {
bucket = (HashMap) it.next();
bucket.remove(key);
}
}
}
public Object remove(K key) {
synchronized (this._lock) {
for (HashMap bucket : this._buckets) {
if (bucket.containsKey(key)) {
return bucket.remove(key);
}
}
return null;
}
}
public int size() {
synchronized (this._lock) {
int size = 0;
for (HashMap bucket : this._buckets) {
size += bucket.size();
}
return size;
}
}
public void cleanup() {
this._cleaner.interrupt();
}
public Map<K, V> buildMap() {
Map ret = new HashMap();
synchronized (this._lock) {
for (HashMap bucket : this._buckets) {
ret.putAll(bucket);
}
return ret;
}
}
}
总体思路,linkList下默认带有3个hashmap,每次新加数据加到第一个hashmap内,同时删除后面map同样key的数据,里面一个线程定时清理过期数据,sleep后,删除list最后一个hashmap,新建一个空的hashmap放到linklist第一个的位置,下一个时间窗口添加数据就添加到该hashmap内,原有的第一个hashmap变为第二个,原有的第二个变为第三个,下次删除就清除最后一个hashmap, 依次循环。
sleep时间 如果默认为30秒参数, 根据代码公式,计算窗口移动时间为15秒, 第一个窗口最后一秒添加数据,30秒后删除,如果是第一个窗口第一秒添加,则需要45秒后删除
long expirationMillis = expirationSecs * 1000L;
long sleepTime = expirationMillis / (numBuckets - 1);
取数据则是遍历list下所有hashmap拿取数据