zoukankan      html  css  js  c++  java
  • 使用并发工具实现 RPC 调用流量控制

    前言

    RPC 服务中,每个服务的容量都是有限的,即资源有限,只能承受住给定的网络请求,所以,在设计 RPC 框架的时候,一定要考虑流量控制这个问题。而 Java 中,实现流量控制有很多中方式,今天说 2 种。

    Semaphore 实现流控

    代码:

      static Semaphore semaphore = new Semaphore(100);
    
      public static void main(String[] args) {
    
        Executor timeTask = Executors.newScheduledThreadPool(1);
        ((ScheduledExecutorService) timeTask).scheduleAtFixedRate(
            () -> semaphore.release(100 - semaphore.availablePermits()), 1000, 1000,
            TimeUnit.MILLISECONDS);
    
        Executor pool = Executors.newFixedThreadPool(100);
    
        for (int i = 0; i < 100; i++) {
          final int num = i;
          pool.execute(() -> {
            for (; ; ) {
              for (int j = 0; j < 200; j++) {
                if (semaphore.tryAcquire()) {
                  callRpc(num, j);
                } else {
                  System.err.println("call fail");
                }
              }
            }
          });
        }
      }
    
      private static void callRpc(int num, int j) {
        System.out.println(String.format("%s - %s: %d %d", new Date(), Thread.currentThread(), num, j));
      }
    

    代码中,我们模拟了 100 个线程,每个线程无限调用 RPC。

    同时使用另一个定时任务,定时更新 Semaphore 可用许可为 100。

    客户端线程调用时,会尝试获取信号量,当获取成功时,才会调用调用 RPC,反之,打印失败。

    这个小程序实现了每秒钟限制 100 个请求的 RPC 的流量控制。

    AtomicInteger 实现流控

    代码:

      static AtomicInteger count = new AtomicInteger();
    
      public static void main(String[] args) {
        Executor timeTask = Executors.newScheduledThreadPool(1);
        ((ScheduledExecutorService) timeTask).scheduleAtFixedRate(new Runnable() {
          @Override
          public void run() {
            count.getAndSet(100);
          }
        }, 1000, 1000, TimeUnit.MILLISECONDS);
    
        Executor pool = Executors.newFixedThreadPool(100);
    
        for (int i = 0; i < 100; i++) {
          final int num = i;
          pool.execute(() -> {
            for (; ; ) {
              for (int j = 0; j < 200; j++) {
                if (count.get() >= 0) {// 快速判断,否则大量的 CAS 操作将会定时任务更新计数器 count
                  if (count.decrementAndGet() >= 0) {
                    callRpc(num, j);
                  }
                }
              }
            }
          });
        }
      }
    
      private static void callRpc(int num, int j) {
        System.out.println(String.format("%s - %s: %d %d", new Date(), Thread.currentThread(), num, j));
      }
    

    这段代码和上面的类似,只是使用的 API 不同,这里使用的是 CAS。通过对 CAS 递减,达到流控的目的。

    注意,这里有一个双重判断,先判断 count.get() >= 0,为什么呢?

    如果直接使用 decrementAndGet 方法,则会使用 CAS,100 个线程并发使用 CAS ,将会导致定时任务的 CAS 操作不够及时。

    所以,先判断,是否小于0 ,如果小于0了,就不必尝试 CAS,避免影响定时任务。

  • 相关阅读:
    Scala课程01
    深入分析面向对象中的对象概念(转)
    代码审查时,发现功能实现的原因,而不仅仅是挑毛病(转)
    独立开发者复盘:手游研发犯过的8个错误(转)
    HTTPS背后的加密算法(转)
    How to recover from 'programmers burnout(转)
    数据流图的画法
    Filter及FilterChain的使用具体解释
    SimpleDateFormat使用具体解释
    TCP/IP协议,HTTP协议
  • 原文地址:https://www.cnblogs.com/stateis0/p/9062080.html
Copyright © 2011-2022 走看看