zoukankan      html  css  js  c++  java
  • Jstorm TimeCacheMap源代码分析

    /*** Eclipse Class Decompiler plugin, copyright (c) 2016 Chen Chao (cnfree2000@hotmail.com) ***/
    package com.alibaba.jstorm.utils;
    
    import com.alibaba.jstorm.callback.AsyncLoopRunnable;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.LinkedList;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    public class TimeCacheMap<K, V> implements TimeOutMap<K, V> {
    	private static final int DEFAULT_NUM_BUCKETS = 3;
    	private LinkedList<HashMap<K, V>> _buckets;
    	private final Object _lock;
    	private Thread _cleaner;
    	private ExpiredCallback _callback;
    
    	public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
    		this._lock = new Object();
    
    		if (numBuckets < 2) {
    			throw new IllegalArgumentException("numBuckets must be >= 2");
    		}
    		this._buckets = new LinkedList();
    		for (int i = 0; i < numBuckets; ++i) {
    			this._buckets.add(new HashMap());
    		}
    
    		this._callback = callback;
    		long expirationMillis = expirationSecs * 1000L;
    		long sleepTime = expirationMillis / (numBuckets - 1);
    		this._cleaner = new Thread(new Runnable(sleepTime) {
    			public void run() {
    				while (!(AsyncLoopRunnable.getShutdown().get())) {
    					Map dead = null;
    					JStormUtils.sleepMs(this.val$sleepTime);
    					synchronized (TimeCacheMap.this._lock) {
    						dead = (Map) TimeCacheMap.this._buckets.removeLast();
    						TimeCacheMap.this._buckets.addFirst(new HashMap());
    					}
    					if (TimeCacheMap.this._callback != null)
    						for (Map.Entry entry : dead.entrySet())
    							TimeCacheMap.this._callback.expire(entry.getKey(), entry.getValue());
    				}
    			}
    		});
    		this._cleaner.setDaemon(true);
    		this._cleaner.start();
    	}
    
    	public TimeCacheMap(int expirationSecs, ExpiredCallback<K, V> callback) {
    		this(expirationSecs, 3, callback);
    	}
    
    	public TimeCacheMap(int expirationSecs) {
    		this(expirationSecs, 3);
    	}
    
    	public TimeCacheMap(int expirationSecs, int numBuckets) {
    		this(expirationSecs, numBuckets, null);
    	}
    
    	public boolean containsKey(K key) {
    		synchronized (this._lock) {
    			for (HashMap bucket : this._buckets) {
    				if (bucket.containsKey(key)) {
    					return true;
    				}
    			}
    			return false;
    		}
    	}
    
    	public V get(K key) {
    		synchronized (this._lock) {
    			for (HashMap bucket : this._buckets) {
    				if (bucket.containsKey(key)) {
    					return bucket.get(key);
    				}
    			}
    			return null;
    		}
    	}
    
    	public void putHead(K key, V value) {
    		synchronized (this._lock) {
    			((HashMap) this._buckets.getFirst()).put(key, value);
    		}
    	}
    
    	public void put(K key, V value) {
    		synchronized (this._lock) {
    			Iterator it = this._buckets.iterator();
    			HashMap bucket = (HashMap) it.next();
    			bucket.put(key, value);
    			while (it.hasNext()) {
    				bucket = (HashMap) it.next();
    				bucket.remove(key);
    			}
    		}
    	}
    
    	public Object remove(K key) {
    		synchronized (this._lock) {
    			for (HashMap bucket : this._buckets) {
    				if (bucket.containsKey(key)) {
    					return bucket.remove(key);
    				}
    			}
    			return null;
    		}
    	}
    
    	public int size() {
    		synchronized (this._lock) {
    			int size = 0;
    			for (HashMap bucket : this._buckets) {
    				size += bucket.size();
    			}
    			return size;
    		}
    	}
    
    	public void cleanup() {
    		this._cleaner.interrupt();
    	}
    
    	public Map<K, V> buildMap() {
    		Map ret = new HashMap();
    		synchronized (this._lock) {
    			for (HashMap bucket : this._buckets) {
    				ret.putAll(bucket);
    			}
    			return ret;
    		}
    	}
    }
    

      总体思路,linkList下默认带有3个hashmap,每次新加数据加到第一个hashmap内,同时删除后面map同样key的数据,里面一个线程定时清理过期数据,sleep后,删除list最后一个hashmap,新建一个空的hashmap放到linklist第一个的位置,下一个时间窗口添加数据就添加到该hashmap内,原有的第一个hashmap变为第二个,原有的第二个变为第三个,下次删除就清除最后一个hashmap, 依次循环。

    sleep时间 如果默认为30秒参数, 根据代码公式,计算窗口移动时间为15秒, 第一个窗口最后一秒添加数据,30秒后删除,如果是第一个窗口第一秒添加,则需要45秒后删除

    long expirationMillis = expirationSecs * 1000L;
    long sleepTime = expirationMillis / (numBuckets - 1);

    取数据则是遍历list下所有hashmap拿取数据

  • 相关阅读:
    无序数组求第K大/第K小的数
    [洛谷][二分搜索]进击的奶牛
    [015]向下类型转换和向上类型转换
    [014]析构函数为虚函数的注意事项
    [013]函数重载--int*和void*的匹配优先级
    [012]链表笔记--在链表中插入一个节点
    [011]链表笔记--删除一个链表节点
    [002]链表笔记--编程实现一个单链表的创建/测长/打印
    [C++]对象的销毁机制
    [011]默认实参
  • 原文地址:https://www.cnblogs.com/chengxin1982/p/6425522.html
Copyright © 2011-2022 走看看