zoukankan      html  css  js  c++  java
  • 一种高并发流控程序的简单轻量实现

    实现一个流控程序,控制客户端每秒调用某个远程服务不超过N次,客户端是会多线程并发调用。

    [java] view plaincopyprint?
    1. import java.util.Date;  
    2. import java.util.concurrent.ExecutorService;  
    3. import java.util.concurrent.Executors;  
    4. import java.util.concurrent.Semaphore;  
    5. import java.util.concurrent.TimeUnit;  
    6. import java.util.concurrent.atomic.AtomicInteger;  
    7.   
    8. public class FlowConcurrentController {  
    9.     // 每秒并发访问控制数量   
    10.     final static int MAX_QPS = 10;  
    11.     // 并发控制信号量   
    12.     final static Semaphore semaphore = new Semaphore(MAX_QPS);  
    13.     // 监控每秒并发访问次数(理论上accessCount.get() <= 10)   
    14.     final static AtomicInteger accessCount = new AtomicInteger(0);  
    15.   
    16.     // 模拟远程访问   
    17.     private static void remoteCall(int i, int j) {  
    18.         System.out.println(String.format("%s - %s: %d %d"new Date(), Thread.currentThread(), i, j));  
    19.     }  
    20.   
    21.     private static void releaseWork() { // 每秒release一次   
    22.         // release semaphore thread   
    23.         Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {  
    24.             @Override  
    25.             public void run() {  
    26.                 semaphore.release(accessCount.get());  
    27.                 accessCount.set(0);  
    28.             }  
    29.         }, 10001000, TimeUnit.MILLISECONDS);  
    30.     }  
    31.   
    32.     // 模拟并发访问控制   
    33.     private static void simulateAccess(final int m, final int n)  
    34.             throws Exception { // m : 线程数;n : 调用数   
    35.         ExecutorService pool = Executors.newFixedThreadPool(100);  
    36.         for (int i = m; i > 0; i--) {  
    37.             final int x = i;  
    38.             pool.submit(new Runnable() {  
    39.                 @Override  
    40.                 public void run() {  
    41.                     for (int j = n; j > 0; j--) {  
    42.                         try {  
    43.                             Thread.sleep(5);  
    44.                         } catch (InterruptedException e) {  
    45.                             e.printStackTrace();  
    46.                         }  
    47.   
    48.                         semaphore.acquireUninterruptibly(1);  
    49.                         accessCount.incrementAndGet();  
    50.                         remoteCall(x, j);  
    51.                     }  
    52.                 }  
    53.             });  
    54.         }  
    55.   
    56.         pool.shutdown();  
    57.         pool.awaitTermination(1, TimeUnit.HOURS);  
    58.     }  
    59.   
    60.     public static void main(String[] args) throws Exception {  
    61.         // 开启releaseWork   
    62.         releaseWork();  
    63.           
    64.         // 开始模拟lots of concurrent calls: 100 * 1000   
    65.         simulateAccess(1001000);  
    66.     }  
    67.   
    68. }  
     

    上面的代码中存在一个小问题,就是accessCount的释放后,存在负数的情况,也就是说高并发的情况下每秒会存在>MAX_QPS次的并发访问次数,还不能做到非常精确控制。

    期待大家更加简单和轻量的方式。

    转自:http://www.cdtarena.com/javapx/201304/8327.html

  • 相关阅读:
    6月15日学习日志
    6月14日学习日志
    6月13日学习日志
    6月12日学习日志
    给建民哥的意见
    6月10日学习日志
    6月9日学习日志
    6月8日学习日志
    梦断代码读书笔记3
    第二次冲刺(六)
  • 原文地址:https://www.cnblogs.com/cdtarena/p/3026485.html
Copyright © 2011-2022 走看看