zoukankan      html  css  js  c++  java
  • Java多线程导致CPU占用100%解决及线程池正确关闭方式

    简介

    情景:1000万表数据导入内存数据库,按分页大小10000查询,多线程,15条线程跑。
    使用了ExecutorService executor = Executors.newFixedThreadPool(15)
    本地跑了一段时间后,发现电脑CPU逐渐升高,最后CPU占用100%卡死,内存使用也高达80%。

    排查问题

    Debug 发现虽然创建了定长15的线程池,但是因为数据量大,在For中循环分页查询的List会持续加入LinkedBlockingQueue()
    队列中每一个等待的任务,又加载了1万的数据。所以不管是线程数的CPU抢占,还是内存的消耗都是极高。
    所以是不是能够控制等待队列LinkedBlockingQueue的上限就可以了。

    解决办法

    使用AtomicLong 统计线程是否完成,再执行executor.submit()提交新的任务导队列中。
    伪代码如下:

    private AtomicLong threadNum = new AtomicLong(0);
    
    public void init() throws Exception {
    	ExecutorService executor = Executors.newFixedThreadPool(15);
    
    	Integer total = accountMapper.selectCount(new QueryWrapper<>());
    	Integer pageSize = 10000;  // 页大小
    	Integer pageCount = (total + pageSize -1) / pageSize; // 总页数
    
    	for (Integer start = 1; start <= pageCount; start++) {
    
    		List<Account> list = accountMapper.selectPage(new Page<>(start, pageSize), query).getRecords();
    
    		//等待线程任务完成,设置30,可令运行线程数为15,等待队列线程数为15
    		while (threadNum.get() >= 30){
    			Thread.sleep(5000);
    		}
    
    		//开启1个线程+1
    		threadNum.incrementAndGet();
    	
    		executor.submit(() -> {
    
    			try {
    				// 处理业务
    				dealMessage(list);
    				// 任务完成 -1
    				threadNum.decrementAndGet();
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    
    		});
    
    	}
            executor.shutdown();
            executor.awaitTermination(1, TimeUnit.DAYS);
    
    }
    

    效果就是CPU保持在15~45%之间,内存占用也只有45%。

    目前只想到这样的方式,控制等待队列LinkedBlockingQueue的上限,还有更好的方式请告知,感谢!


    2021-02-03-分割线
    最近又用到了多线程开发,发现了还是有很多方式控制的。简单的使用java的Semaphore令牌限流控制也能实现。


    多线程:

    1. 线程池必须关闭,main主线程才能结束(接口才会返回)finally { executorService.shutdown(); }
    2. 主线程等待保证多线程所有子线程任务执行完毕,再结束。 -> executorService.awaitTermination(1, TimeUnit.DAYS);
    3. semaphore 令牌限流控制fixedThread线程池,本例子就是最多同时拥有2个线程进行工作
    4. fixedThread.execute() fixedThread.submit() 的差别除了后者可以返回结果外,后者还会catch掉异常信息,无法抛到主线程中。
    
    public static void main(String[] args) {
        final List<String> tableNames = new ArrayList<>();
        tableNames.add("a");
        tableNames.add("b");
        tableNames.add("c");
        tableNames.add("d");
        tableNames.add("e");
        tableNames.add("f");
    
        final Semaphore semaphore = new Semaphore(2);
        final ExecutorService fixedThread = Executors.newCachedThreadPool();
        for (final String tableName : tableNames) {
            //阻塞,获取令牌
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //do
            fixedThread.execute(() -> {  //can throw ex log
                final ExecutorService executorService = Executors.newCachedThreadPool();
                try {
                    executorService.submit(() -> { //can't throw ex log
                        //int i = 1/0;
                        System.out.println("tableName2:" + tableName);
                    });
                    //int i = 1/0;
                    System.out.println("tableName:" + tableName);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    executorService.shutdown();
                    try {
                        executorService.awaitTermination(1, TimeUnit.DAYS);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    semaphore.release();
                    System.out.println("semaphore.release");
                }
            });
        }
        // 记得关闭线程池
        fixedThread.shutdown();
        try {
            fixedThread.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("主线程...");
    }
    
    

    打印结果

    
    tableName:b
    tableName2:b
    tableName:a
    tableName2:a
    semaphore.release
    semaphore.release
    tableName:d
    tableName2:d
    tableName:c
    semaphore.release
    tableName:e
    tableName2:c
    semaphore.release
    tableName:f
    tableName2:e
    semaphore.release
    tableName2:f
    semaphore.release
    主线程...
    
    
  • 相关阅读:
    cs11_c++_lab4a
    cs11_c++_lab3
    cs11_c++_lab2
    cs11_c++_lab1
    Oracle 11.2.4.0 ACTIVE DATAGUARD 单实例安装(COPY创建备库)
    无备份恢复(归档模式)
    rman datafile恢复(归档模式)
    GHOST(幽灵)重大漏洞
    Windows Linux 之间rsync同步CODE文件
    Centos 6.5 SNMP客户端安装及配置版本net-snmp-5.7.3
  • 原文地址:https://www.cnblogs.com/levi125/p/13914883.html
Copyright © 2011-2022 走看看