/*** Eclipse Class Decompiler plugin, copyright (c) 2016 Chen Chao (cnfree2000@hotmail.com) ***/ package com.alibaba.jstorm.utils; import com.alibaba.jstorm.callback.AsyncLoopRunnable; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; public class TimeCacheMap<K, V> implements TimeOutMap<K, V> { private static final int DEFAULT_NUM_BUCKETS = 3; private LinkedList<HashMap<K, V>> _buckets; private final Object _lock; private Thread _cleaner; private ExpiredCallback _callback; public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) { this._lock = new Object(); if (numBuckets < 2) { throw new IllegalArgumentException("numBuckets must be >= 2"); } this._buckets = new LinkedList(); for (int i = 0; i < numBuckets; ++i) { this._buckets.add(new HashMap()); } this._callback = callback; long expirationMillis = expirationSecs * 1000L; long sleepTime = expirationMillis / (numBuckets - 1); this._cleaner = new Thread(new Runnable(sleepTime) { public void run() { while (!(AsyncLoopRunnable.getShutdown().get())) { Map dead = null; JStormUtils.sleepMs(this.val$sleepTime); synchronized (TimeCacheMap.this._lock) { dead = (Map) TimeCacheMap.this._buckets.removeLast(); TimeCacheMap.this._buckets.addFirst(new HashMap()); } if (TimeCacheMap.this._callback != null) for (Map.Entry entry : dead.entrySet()) TimeCacheMap.this._callback.expire(entry.getKey(), entry.getValue()); } } }); this._cleaner.setDaemon(true); this._cleaner.start(); } public TimeCacheMap(int expirationSecs, ExpiredCallback<K, V> callback) { this(expirationSecs, 3, callback); } public TimeCacheMap(int expirationSecs) { this(expirationSecs, 3); } public TimeCacheMap(int expirationSecs, int numBuckets) { this(expirationSecs, numBuckets, null); } public boolean containsKey(K key) { synchronized (this._lock) { for (HashMap bucket : this._buckets) { if (bucket.containsKey(key)) { return true; } } return false; } } public V get(K key) { synchronized (this._lock) { for (HashMap bucket : this._buckets) { if (bucket.containsKey(key)) { return bucket.get(key); } } return null; } } public void putHead(K key, V value) { synchronized (this._lock) { ((HashMap) this._buckets.getFirst()).put(key, value); } } public void put(K key, V value) { synchronized (this._lock) { Iterator it = this._buckets.iterator(); HashMap bucket = (HashMap) it.next(); bucket.put(key, value); while (it.hasNext()) { bucket = (HashMap) it.next(); bucket.remove(key); } } } public Object remove(K key) { synchronized (this._lock) { for (HashMap bucket : this._buckets) { if (bucket.containsKey(key)) { return bucket.remove(key); } } return null; } } public int size() { synchronized (this._lock) { int size = 0; for (HashMap bucket : this._buckets) { size += bucket.size(); } return size; } } public void cleanup() { this._cleaner.interrupt(); } public Map<K, V> buildMap() { Map ret = new HashMap(); synchronized (this._lock) { for (HashMap bucket : this._buckets) { ret.putAll(bucket); } return ret; } } }
总体思路,linkList下默认带有3个hashmap,每次新加数据加到第一个hashmap内,同时删除后面map同样key的数据,里面一个线程定时清理过期数据,sleep后,删除list最后一个hashmap,新建一个空的hashmap放到linklist第一个的位置,下一个时间窗口添加数据就添加到该hashmap内,原有的第一个hashmap变为第二个,原有的第二个变为第三个,下次删除就清除最后一个hashmap, 依次循环。
sleep时间 如果默认为30秒参数, 根据代码公式,计算窗口移动时间为15秒, 第一个窗口最后一秒添加数据,30秒后删除,如果是第一个窗口第一秒添加,则需要45秒后删除
long expirationMillis = expirationSecs * 1000L;
long sleepTime = expirationMillis / (numBuckets - 1);
取数据则是遍历list下所有hashmap拿取数据