zoukankan      html  css  js  c++  java
  • 异步回调CompletableFuture

    在jdk7中,我们使用线程池可能会使用ExecutorService,默认有四种方式

    Executors.newSingleeThreadPool()

    Executors.newFixedThreadPool()

    Executors.newCacheThreadPool()

    Executors.newScheduledThreadPool()

    在jdk8中,CompletableFuture腾空出世,它简化了异步任务的写法,提供了很多异步任务的计算方式。

    demo

    public class CompletableFutureDemo {
        /* 有异常的
         *  ForkJoinPool.commonPool-worker-1    没有返回值的,Update mysql ok
            ForkJoinPool.commonPool-worker-1     completableFuture有返回值的。
             ****t null
             ****u java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
             ***exceptionallyjava.lang.ArithmeticException: / by zero
    
         */
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            
            CompletableFuture<Void> completableFuture =CompletableFuture.runAsync(()->{
                System.out.println(Thread.currentThread().getName()+"	没有返回值的,Update mysql ok");
            });
            completableFuture.get();
            //异步调用
            CompletableFuture<Integer> completableFuture2 =CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread().getName()+"	 completableFuture有返回值的。");
                //int age =10/0;
                return 2048;
            });
            
            completableFuture2.whenCompleteAsync((t,u)->{
                System.out.println("****t "+t);
                System.out.println("****u "+u);
            }).exceptionally(f->{
             System.out.println("***exceptionally" +f.getMessage());
             return 4444;
            }).get();
            
        }
        /*
         *  orkJoinPool.commonPool-worker-1    没有返回值的,Update mysql ok
            ForkJoinPool.commonPool-worker-1     completableFuture有返回值的。
             ****t 2048
             ****u null
         */
    }

    言归正传,现在生产上面出现的问题是,在流量激增的时候,响应很慢,慢慢的所有请求都无妨得到响应结果。

    解决方案:

    ①查看cpu和内存使用率

    cpu使用率很低,5%左右,内存使用一直不变,基本排除不是他们的问题。

    ①.查看gc

    看到full gc没有发生,young gc 虽然增加了一点,但是平均响应时间也就是50ms,也算正常了。

    ② 分析dump堆

    下了一个126M的dump包,有一个类占了60m。首先怀疑是不是内存溢出了,但是通过分析,这部分缓存是必需要做的,并且当时dump下来的包确实有点小,数据观测的不够准确。

    ③.查看栈信息

    发现有很多ForkJoinPool.commonPool-worker-线程正在等待,其实使用过CompletableFuture的同学就知道,它里面用的是ForkJoin池来实现的。有想了解线程池源码的可以去读读这篇文章。

    为什么这里会有这么多的线程在等待呢?生产上面的服务器使用的是一个两核的服务器,线程池里面只会是1个线程可以执行。为什么是一个,请看源码。

    @Test
    public void test12() throws InterruptedException { 先做一个单元测试
    CompletableFuture.runAsync(()->{ //在此处打断点
    System.out.println("111");
    });
    Thread.sleep(400000);
    }
    一步一步把代码贴出来,看官看好。

    public static CompletableFuture<Void> runAsync(Runnable runnable) { //运行线程的方法
    return asyncRunStage(asyncPool, runnable);
    }
    asyncPool是什么?看一下这个值的设置。

    private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    useCommonPool是什么?

    private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);
    public static int getCommonPoolParallelism() {
    return commonParallelism;
    }
    commonParallelism就是那个并发的线程数,它又是怎么来的呢?

    static {
    // initialize field offsets for CAS etc
    。。。。。。
    commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
    defaultForkJoinWorkerThreadFactory =
    new DefaultForkJoinWorkerThreadFactory();
    modifyThreadPermission = new RuntimePermission("modifyThread");

    common = java.security.AccessController.doPrivileged
    (new java.security.PrivilegedAction<ForkJoinPool>() {
    public ForkJoinPool run() { return makeCommonPool(); }}); //重点看makeCommonPool方法
    int par = common.config & SMASK; // 获取到par SMASK的值是 65535 也就是1111111111111111 &操作还是common.config本身,看样子还是要看看config是怎么来的
    commonParallelism = par > 0 ? par : 1; 想知道par是什么值,这个值为负数默认是1
    }
    private static ForkJoinPool makeCommonPool() {
    int parallelism = -1; //这个并发的线程数默认是-1
    ForkJoinWorkerThreadFactory factory = null;
    。。。。。。
    if (parallelism < 0 &&
    (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) //看到了吧,线程池中的处理线程数=电脑核数-1
    parallelism = 1;
    if (parallelism > MAX_CAP)
    parallelism = MAX_CAP;
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
    "ForkJoinPool.commonPool-worker-"); //指定线程的名字
    }
    到此分析完毕,使用了逆推法。

    由于生产服务器电脑核数较小,而在CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑核数-1。这样等有大请求量过来,处理逻辑又很复杂,很多线程都在等待执行,慢慢拖垮了服务器。

    调整线程池的大小
    《Java并发编程实战》(http://mng.bz/979c)一书中,Brian Goetz和合著者们为线程池大小
    的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争
    稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正
    如你的应用所面临的情况,处理器的一些核可能就无法充分利用。Brian Goetz建议,线程池大
    小与处理器的利用率之比可以使用下面的公式进行估算:
    N threads = N CPU * U CPU * (1 + W/C)
    其中:
    ❑N CPU 是处理器的核的数目,可以通过 Runtime.getRuntime().availableProce-
    ssors() 得到
    ❑U CPU 是期望的CPU利用率(该值应该介于0和1之间)
    ❑W/C是等待时间与计算时间的比率

    这里太啰嗦了,一般的设置线程池的大小规则是

    如果服务是cpu密集型的,设置为电脑的核数

    如果服务是io密集型的,设置为电脑的核数*2

  • 相关阅读:
    hdu 1281 棋盘游戏(二分匹配)
    UVA 12545 Bits Equalizer
    算法之匈牙利算法
    I题 hdu 1234 开门人和关门人
    H题 hdu 2520 我是菜鸟,我怕谁
    G题 hdu 1466 计算直线的交点数
    F题 hdu 1431 素数回文
    E题hdu 1425 sort
    D题 hdu 1412 {A} + {B}
    有12个球,外形相同,其中一个小球的质量与其他11个不同,给一个天平,需要几次把这个小球找出来并且求出这个小球是比其他的轻还是重
  • 原文地址:https://www.cnblogs.com/fengyangcai/p/12914743.html
Copyright © 2011-2022 走看看