zoukankan      html  css  js  c++  java
  • Java线程池实现

    电脑的CPU资源是有限的,任务的处理速度与线程数量之间并不是正相关。当线程数量过多,CPU要频繁的在不同线程切换,反而会引起处理性能的下降。线程池中最大的线程数,是考虑多种因素来事先设定的,比如硬件的条件,业务的类型等等。

    当我们向一个固定大小的的线程池中请求一个线程时,当线程池中没有空闲资源了,这个时候线程池如何处理这个请求?是拒绝请求还是排队请求?各种策略又是如何实现的呢?

    实际上,这些问题的处理并不复杂,底层的数据结构,就是队列(queue)。

    一、Java线程池介绍

    1,线程池的作用

    限制系统中执行线程的数量。
    减少了创建和销毁线程的次数,重复利用线程。

    2,主要的类

    Executor:执行线程的接口
    ExecutorSerivce: 线程池接口
    ThreadPoolExecutor :线程池类
    Executors:常用线程池工厂

    3,常用的线程池

    配置线程池是比较复杂的过程,所有可以使用现有的线程池工厂生成常用的线程池:

    1. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。为了合理利用资源,我们通常把定长池的长度设置为当前PC机获取cpu核心数:Runtime.getRuntime().availableProcessors():获取当前CPU核心数;
    2. newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程;
    3. newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行;
    4. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    public class MyThreadPool {
    
        public static void main(String [] args){
            int num = Runtime.getRuntime().availableProcessors();
            Executor executor = Executors.newFixedThreadPool(num);
            for (int i = 0 ; i<num ; i++){
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("我是一个子线程!!");
                    }
                });
            }
        }
    }
    

    点击并拖拽以移动

    我们再来看Executors.newFixedThreadPool(num),点进去,会发现就是new了一个LinkedBlockingQueue:

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    点击并拖拽以移动

    二、线程池和队列结合实现一个日志处理

    JDK自己的线程池底层不光是用队列实现的,我们也可以使用线程池和队列相结合,来实现一些功能。

    通常我们会把要执行的任务放入一个队列中,由线程池来执行,比如爬虫、日志。我们先来看一个线程池和队列结合实现日志记录的例子。

    import com.swagger.demo.Entity.LogContentEntity;
    import org.aspectj.lang.JoinPoint;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Before;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.stereotype.Component;
    
    import javax.servlet.http.HttpServletRequest;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    
    @Configuration
    @Aspect
    @Component
    public class AopLogConfig implements Runnable {
    
        @Autowired
        private HttpServletRequest request;
    
        private LinkedBlockingQueue<LogContentEntity> logQueue;
    
        public AopLogConfig() {
            //Spring启动后,该对象创建时。初始化队列以及线程池。
            logQueue = new LinkedBlockingQueue<LogContentEntity>(3000);
            int num = Runtime.getRuntime().availableProcessors();
            ExecutorService  executor = Executors.newFixedThreadPool(num);
            for (int i = 0 ;i<num ;i++){
                executor.execute(this);
            }
        }
    
        @Before("execution(public * com.swagger.demo.controller..*.*(..))")
        public void doBefore(JoinPoint joinPoint) throws Exception{
    
            //日志记录的信息可自行修改
            LogContentEntity Log = new LogContentEntity();
            String method = request.getMethod();
            Log.setHttpMethod(method);
            String url = request.getRequestURL().toString();
            Log.setUrl(url);
            String ip = request.getRemoteAddr();
            Log.setIp(ip);
            Log.setContent("test Log Content");
            //将需要记录的日志对象放到队列中等待线程异步执行。
            logQueue.put(Log);
        }
    
        @Override
        public void run() {
            try{
                while(true){
                    //如果队列里没有,则会阻塞;
                    LogContentEntity take = logQueue.take();
                    //日志处理逻辑可自行修改;
                    System.out.println(take.toString());
                }
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }
    

    点击并拖拽以移动

    三、线程池+队列以优先级方式执行队列任务

    import java.util.concurrent.TimeUnit;
    
    public class MyPriorityTask implements Runnable, Comparable<MyPriorityTask> {
        private int priority;
        private String name;
    
        public MyPriorityTask(String name, int priority) {
            this.name = name;
            this.priority = priority;
        }
    
        public void run() {
            System.out.printf("MyPriorityTask: %s Priority :%d
    ", name, priority);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public int compareTo(MyPriorityTask o) {
            if (this.getPriority() < o.getPriority()) {
                return 1;
            }
            if (this.getPriority() > o.getPriority()) {
                return -1;
            }
            return 0;
        }
    
        public int getPriority() {
            return priority;
        }
    }
    

    点击并拖拽以移动

    import java.util.concurrent.PriorityBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
        public static void main(String[] args) {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 1, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
            for (int i = 0; i < 100; i++) {
                MyPriorityTask task = new MyPriorityTask("Task " + i, 0);
                executor.execute(task);
                System.out.println(executor.getTaskCount());
            }
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (int i = 101; i < 8; i++) {
                MyPriorityTask task = new MyPriorityTask("Task " + i, 1);
                executor.execute(task);
                System.out.println(executor.getTaskCount());
            }
            try {
                executor.awaitTermination(1, TimeUnit.DAYS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.printf("Main: End of the program.
    ");
    
        }
    }
    

    点击并拖拽以移动

    四、使用线程池的一些陷阱

    尽管线程池对于构建多线程应用是个很强大的机制,但它也不是没有缺点的。使用线程池构建的应用会面临其他多线程应用所面对的一样的并发风险,比如同步错误和死锁,此外线程池还有其他的一些特有缺陷,比如 线程池-关联 死锁,资源不足,还有线程泄漏。

    1.死锁

    任何多线程应用都会面临死锁的风险。彼此双方都在等待一个事件,而这个事件只能有对方提供,这样一对进程或者线程我们称之为死锁。死锁最简单的情况是线程 A 持有了对象 X 的独占锁,线程 A 在等待对象 Y 的锁,而线程 B 恰恰持有了对象 Y 的独占锁,线程 B 在等待对象 X 的锁。除非有某种办法能够打破这种锁等待(Java 锁机制不能支持这个),否则的话这一对死锁线程将会永久地等待下去。
    既然死锁是所有多线程编程都将面临的风险,线程池为我们引入了另一种死锁:线程池中所有线程都在阻塞等待队列中另一个任务的执行结果,但是另一个任务无法得到执行,因为池中压根儿就没用空闲的可用线程。这种情况在线程池用于一些相互影响对象的模拟实现中可能会出现,这些模拟对象彼此发送查询然后作为任务队列进行执行,发起查询的对象同步等待响应。

    2.资源不足

    线程池的优点之一是他们在大多数情况下比其他的调度机制具备更好的性能,比如我们上面所讨论的那几种。但这个取决于你有没有恰当地配置了线程池大小。线程占用大量的资源,包括内存和其他系统资源。除了线程对象所必须的内存之外,每个线程还需要两个执行调用栈,这个栈可能会很大。此外,JVM 可能还会为每个 Java 线程创建一个本地线程,这样将会占用额外的系统资源。最后,虽然线程之间切换的调度开销很小,大量的线程上下文切换也会影响到你的应用性能。
    如果线程池过大的话,这些众多线程所消耗的资源将会明显影响到系统性能。时间会浪费在线程之间的切换上,配置有比你实际需要更多的线程会引起资源不足的问题,因为池中线程所占用的资源如果用在其他任务上可能会更高效。除了这些线程本身所使用的资源之外,服务请求时所做的工作可能会需要额外资源,比如 JDBC 连接,套接字,或者文件。这些也是有限的资源,而且对它们进行过高并发请求的话可能会导致失效,比如无法分配一个 JDBC 连接。

    3.并发错误

    线程池以及其他队列机制依赖于 wait() 和 notify() 方法的使用,这可能会变得很棘手。如果编码不当的话,很可能会导致通知丢失,结果就是池中的线程都处于一个空闲的状态,而实际上队列中有任务需要处理。在使用这些工具的时候要打起十二万分的精神;即便是专家在用它们的时候也经常会失误。幸运的是,可以使用一些现成的实现,这些实现久经考验,比如下文将会讨论到的 你无须自行编码 实现的 java.util.concurrent 包。

    4.线程泄漏

    各种各样的线程池中存在的一个重大的危险就是线程泄漏,当一个线程被从线程池中移除去执行一个任务,任务执行结束之后却没有返还给线程池的时候,就会出现这种危险。出现这种情况的一种方式是当任务抛出一个 RuntimeException 或一个 Error 时。如果线程池类没有捕捉到这些,该线程将会傻傻地存在于线程池之中,而线程池的线程数量则会被永久地减一。当这种情况发生的次数足够多的时候,线程池最终将为空(无可用线程),而系统则会瘫痪,因为已经没有线程来处理任务了。
    瘫痪的任务,比如那些永久等待不保证可用资源或者等待已经回家了的用户输入的任务,也可以造成相等于线程泄漏一样的后果。如果一个线程永久地被这样一个任务所占用了的话,它造成的影响和从池中移除是一样的。像这样的任务应该要么给它们一个线程池之外的线程,要么控制一下它们的等待时间。

    5.请求过载

    服务器很可能会被铺天盖地而来的请求所淹没。这种情况下,我们可能并不想让每个进来的请求都放进我们的工作队列,因为等待执行的任务队列也可能会占用过多系统资源并导致资源不足。这时候要做什么就取决于你的决定了,比如你可以通过一个表示服务器暂时太忙的响应来拒绝这些请求。

    五、高效线程池使用指南

    你只需要遵循一些简单的指导方针,线程池就可以成为你构建服务应用的一个非常有效的方法:

    • 不要把同步等待其他任务执行结果的任务放进任务队列。这将导致上文所描述那种死锁,池中所有线程都在等待一个任务的执行结果,而队列中的这个任务无法得到执行因为所有线程都在使用中。
    • 可能长时间操作的任务放入线程池的时候要慎重。如果程序必须要等待一个资源,比如一个 I/O 的完成,定义一个最长等待时间,然后失败或稍后重新执行。这就保证了通过将一个线程从一个可能会完成的任务中释放出来而最终一些其他任务得到成功执行。
    • 理解你的任务。想要有效地调整线程池大小,你需要理解队列中那些任务要做的事情。它们是 CPU 密集型操作吗?它们会长时间占用 I/O 吗?你的答案会影响到你对你的应用的配置。如果这些任务来自不同的类、有着截然不同的特征,为不同类型的任务定制不同的工作队列也许更行得通,这样每个池都能够得到有据配置。

    线程池大小配置

    调整线程池的大小在很大程度上是一件避免两个错误的事情:拥有过多或过少的线程。幸运的是,对于大多数应用而言太多或太少之间的中间地带还是很宽广的。
    回顾应用中使用线程的两个主要优点:在等待一个诸如 I/O 之类的慢操作的时候进程能够继续进行,利用多个处理器的可用性。在一个 N 处理器主机上运行一个计算密集型的应用,通过设置线程数量为 N 增加额外的线程可能会提高吞吐量,但添加的额外线程超过 N 的话就没有什么好处了。确实,过多的线程甚至会降低性能因为会带来额外的上下文切换开销。
    线程池最佳大小取决于可用处理器的数量和工作队列中任务的性质。对于在一个 N-处理器 系统中一个的将持有完全计算密集型任务的工作队列,通常获得 CPU 最大利用率的话是配置线程池大小为 N 或 N + 1 个线程。
    对于可能要等待 I/O 完成的任务,比如,一个从 socket 中读取一个 HTTP 请求的任务 - 你需要增加线程池的线程的数量超出可用处理器的数量,因为所有的线程都在同一时间工作。通过分析,你可以为一个典型的请求估算出等待时间(WT)和服务时间(ST)之间的比率。比如我们称这个比率为 WT/ST,对于一个 N-处理器系统,你需要大约 N * (1 + WT/ST) 个线程来保持处理器得到充分利用。
    处理器利用率并非配置线程池大小的唯一依据。因为在线程池增长的时候,你可能会遇到调度器的局限性,内存可用性,或者其他系统资源,比如 socket 的数量,打开文件的处理,或者数据库连接等问题。

    六、总结

    1. 使用JDK的方法创建会产生OOM情况,主要原因是用LinkedBlockingQueue队列,该队列可以导致OOM。
    2. 线程可以使用用阿里巴巴推荐的方法,但是因为定线程数量,并且队列用的是ArrayBlockingQueue,所以效率较低,不过可以保证内存不会OOM。
    3. 无需自行编码。Doug Lea 写了一个杰出的开源并发工具包,java.util.concurrent,包含了互斥,信合,能够在并发访问下性能表现良好的集合类诸如队列和哈希表,以及一些工作队列的实现。这个包里的 PooledExecutor 类是一个高效的、被广泛使用的、基于工作队列的一个线程池的正确实现。不用再尝试着自己去写代码实现了,那样很容易出错,你可以考虑使用 java.util.concurrent 包里的一些工具。
    4. 线程池是构建服务器应用的很有用的一个工具。它的概念很简单,但在实现或者使用的时候需要注意一些问题,比如死锁,资源不足,以及 wait() 和 notify() 的复杂性。如果你发现自己的应用需要一个线程池,考虑一下使用 java.util.concurrent 包里的某个 Executor 类,比如 PooledExecutor,不要去从头写一个。如果你发现你在创建一些要处理简短任务的线程,你就应该考虑使用线程池了。

    我的微信公众号:架构真经(id:gentoo666),分享Java干货,高并发编程,热门技术教程,微服务及分布式技术,架构设计,区块链技术,人工智能,大数据,Java面试题,以及前沿热门资讯等。每日更新哦!

    img点击并拖拽以移动

    参考资料:

    1. https://blog.csdn.net/weixin_39770927/article/details/81360511
    2. https://blog.csdn.net/zhangqinfu/article/details/52931530
    3. https://blog.csdn.net/every__day/article/details/83900109
    4. https://blog.csdn.net/wwp231/article/details/52504687
    5. https://blog.csdn.net/qq360694660/article/details/78296919
    6. https://blog.csdn.net/defonds/article/details/43796951
  • 相关阅读:
    云架构和openstack的思考
    关于mysql中[Err] 1451 -Cannot delete or update a parent row: a foreign key constraint fails
    解决CentOS7-python-pip安装失败
    python 中使用 urllib2 伪造 http 报头的2个方法
    python urllib2 对 http 的 get,put,post,delete
    dpdk EAL: Error reading from file descriptor 23: Input/output error
    ovs-vsctl 命令详解
    openstack 虚机迁移 Unacceptable CPU info: CPU doesn't have compatibility
    ip route 命令详解
    influxDB 基本操作
  • 原文地址:https://www.cnblogs.com/anymk/p/11991740.html
Copyright © 2011-2022 走看看