zoukankan      html  css  js  c++  java
  • [JAVA]流控及超流控后的延迟处理

    流控检查(每半秒累计,因此最小留空阀值只能做到每秒2条):

    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.lang.Thread;
    
    /**
     * 流量控制
     * 
     * @author chenx
     */
    public class OverflowController {
    
    	private int maxSendCountPerSecend; // 该条链路上流控阀值
    	private Date sendTime = new Date();
    	private int sendCount = 0; // 该条链路上发送的数量
    
    	public OverflowController(int maxSendCountPerSecend) {
    		if (maxSendCountPerSecend < 2) {
    			maxSendCountPerSecend = 2;
    		}
    
    		this.maxSendCountPerSecend = maxSendCountPerSecend;
    	}
    
    	public int getMaxSendCountPerSecend() {
    		if (getMilliseconds(new Date()) >= 500) {
    			return maxSendCountPerSecend / 2;
    		}
    
    		return maxSendCountPerSecend - (maxSendCountPerSecend / 2);
    	}
    
    	/**
    	 * 是否超流控
    	 */
    	public boolean isOverflow(int sendNum) {
    		synchronized (this) {
    			Date now = new Date();
    			if (now.getTime() - sendTime.getTime() >= 500) {
    				sendTime = now;
    				sendCount = sendNum;
    			} else {
    				if (sendCount + sendNum > getMaxSendCountPerSecend()) {
    					return true;
    				} else {
    					sendCount += sendNum;
    				}
    			}
    
    			return false;
    		}
    	}
    
    	/**
    	 * 获取指定时间的毫秒数
    	 */
    	private int getMilliseconds(Date date) {
    		SimpleDateFormat df = new SimpleDateFormat("SSS");
    		return Integer.valueOf(df.format(date));
    	}
    
    	public static void main(String[] args) throws InterruptedException {
    		OverflowController oc = new OverflowController(50);
    		SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
    		for (int i = 0; i <= 100; i++) {
    			if (oc.isOverflow(1)) {
    				System.out.println(i + "-isOverflow-" + df.format(new Date()));
    			} else {
    				System.out.println(i + "-sendOk-" + df.format(new Date()));
    			}
    
    			Thread.sleep(10);
    		}
    	}
    }
    

    超流控后的延迟处理,由于java中没有.net的“延迟委托”一说:

    ThreadPool.RegisterWaitForSingleObject(
     WaitHandle waitObject,
          WaitOrTimerCallback callBack,
          Object state,
         int millisecondsTimeOutInterval,
         bool executeOnlyOnce

    )

    Java下需实现一个简单的延迟队列:

    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class DelayEntry implements Delayed {
    
    	private int count;
    	private long dequeuedTimeMillis; // 出队列时间
    
    	public int getCount() {
    		return count;
    	}
    
    	public void setCount(int count) {
    		this.count = count;
    	}
    
    	public long getDequeuedTimeMillis() {
    		return dequeuedTimeMillis;
    	}
    
    	public DelayEntry(long delayMillis) {
    		dequeuedTimeMillis = System.currentTimeMillis() + delayMillis;
    	}
    
    	@Override
    	public int compareTo(Delayed o) {
    		DelayEntry de = (DelayEntry) o;
    		long timeout = dequeuedTimeMillis - de.dequeuedTimeMillis;
    		return timeout > 0 ? 1 : timeout < 0 ? -1 : 0;
    	}
    
    	@Override
    	public long getDelay(TimeUnit unit) {
    		return dequeuedTimeMillis - System.currentTimeMillis();
    	}
    }
    
    import java.util.concurrent.DelayQueue;
    
    public class DelayService {
    
    	public void run() {
    		DelayQueue<DelayEntry> queue = new DelayQueue<DelayEntry>();
    		DelayConsumer delayConsumer = new DelayConsumer(queue);
    		delayConsumer.start();
    
    		for (int i = 0; i < 100; i++) {
    			DelayEntry de = new DelayEntry(5000);
    			de.setCount(i);
    			System.out.println(System.currentTimeMillis() + "--------" + de.getCount());
    			queue.add(de);
    		}
    	}
    
    	class DelayConsumer extends Thread {
    		DelayQueue<DelayEntry> queue;
    		public DelayConsumer(DelayQueue<DelayEntry> queue) {
    			this.queue = queue;
    		}
    
    		public void run() {
    			while (true) {
    				try {
    					DelayEntry de = queue.take();
    					System.out.println("queue size=" + queue.size());
    					System.out.println(de.getCount());
    					System.out.println(System.currentTimeMillis());
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    
    	public static void main(String[] args) {
    		DelayService ds = new DelayService();
    		ds.run();
    	}
    }
    
  • 相关阅读:
    webRTC中语音降噪模块ANS细节详解(四)
    基于MCRAOMLSA的语音降噪(三):实现(续)
    基于MCRAOMLSA的语音降噪(一):原理
    VoIP语音处理流程和知识点梳理
    linux I/O内存访问
    十六、输入子系统驱动模型
    十三、【ADC】ADC读取S5p6818电源值
    put_user()和get_user()用户空间传递数据
    十四、【watchdog】看门狗
    十七、内核中的锁机制
  • 原文地址:https://www.cnblogs.com/CopyPaster/p/4194005.html
Copyright © 2011-2022 走看看