zoukankan      html  css  js  c++  java
  • 线程池原理分析

    并发包

    (计数器)CountDownLatch
    CountDownLatch 类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

    public static void main(String[] args) throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(2);
            new Thread(new Runnable() {
    
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + ",子线程开始执行...");
                    countDownLatch.countDown();
                    System.out.println(Thread.currentThread().getName() + ",子线程结束执行...");
                }
            }).start();
            
            new Thread(new Runnable() {
    
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + ",子线程开始执行...");
                    countDownLatch.countDown();//计数器值每次减去1
                    System.out.println(Thread.currentThread().getName() + ",子线程结束执行...");
                }
            }).start();
            countDownLatch.await();// 減去为0,恢复任务继续执行
            System.out.println("两个子线程执行完毕....");
            System.out.println("主线程继续执行.....");
            for (int i = 0; i <10; i++) {
                System.out.println("main,i:"+i);
            }
        }
    View Code

    (屏障)CyclicBarrier

    CyclicBarrier初始化时规定一个数目,然后计算调用了CyclicBarrier.await()进入等待的线程数。当线程数达到了这个数目时,所有进入等待状态的线程被唤醒并继续。

    CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。

    CyclicBarrier初始时还可带一个Runnable的参数, 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。

    class Writer extends Thread {
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier){
             this.cyclicBarrier=cyclicBarrier;
        }
        @Override
        public void run() {
            System.out.println("线程" + Thread.currentThread().getName() + ",正在写入数据");
            try {
                Thread.sleep(3000);
            } catch (Exception e) {
                // TODO: handle exception
            }
            System.out.println("线程" + Thread.currentThread().getName() + ",写入数据成功.....");
            
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
            }
            System.out.println("所有线程执行完毕..........");
        }
    
    }
    
    public class Test001 {
    
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier=new CyclicBarrier(5);
            for (int i = 0; i < 5; i++) {
                Writer writer = new Writer(cyclicBarrier);
                writer.start();
            }
        }
    
    }
    View Code

    (计数信号量)Semaphore

    Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。它的用法如下:

    availablePermits函数用来获取当前可用的资源数量

    wc.acquire(); //申请资源

    wc.release();// 释放资源

    import java.util.Random;
    import java.util.concurrent.*;
    
    class ThradDemo001 extends Thread {
        private String name;
        private Semaphore wc;
    
        public ThradDemo001(String name, Semaphore wc) {
            this.name = name;
            this.wc = wc;
        }
    
        public void run() {
            // 剩下的资源
            int availablePermits = wc.availablePermits();
            if (availablePermits > 0) {
                System.out.println(name + "天助我也,终于有茅坑了.....");
            } else {
                System.out.println(name + "怎么没有茅坑了...");
            }
            try {
                // 申请资源
                wc.acquire();
            } catch (InterruptedException e) {
    
            }
            System.out.println(name + "终于上厕所啦.爽啊" + ",剩下厕所:" + wc.availablePermits());
            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (Exception e) {
                // TODO: handle exception
            }
            System.out.println(name + "厕所上完啦!");
            // 释放资源
            wc.release();
        }
    }
    
    public class TestSemaphore {
    
        public static void main(String[] args) {
            Semaphore semaphore = new Semaphore(3);
            for (int i = 1; i <= 10; i++) {
                ThradDemo001 thradDemo001 = new ThradDemo001("" + i + "个人", semaphore);
                thradDemo001.start();
            }
        }
    
    } 
    View Code

    结果

    第1个人天助我也,终于有茅坑了.....
    第10个人天助我也,终于有茅坑了.....
    第10个人终于上厕所啦.爽啊,剩下厕所:1
    第9个人天助我也,终于有茅坑了.....
    第3个人天助我也,终于有茅坑了.....
    第5个人天助我也,终于有茅坑了.....
    第8个人天助我也,终于有茅坑了.....
    第6个人天助我也,终于有茅坑了.....
    第4个人天助我也,终于有茅坑了.....
    第2个人天助我也,终于有茅坑了.....
    第7个人天助我也,终于有茅坑了.....
    第9个人终于上厕所啦.爽啊,剩下厕所:0
    第1个人终于上厕所啦.爽啊,剩下厕所:2
    第1个人厕所上完啦!
    第3个人终于上厕所啦.爽啊,剩下厕所:0
    第3个人厕所上完啦!
    第5个人终于上厕所啦.爽啊,剩下厕所:0
    第10个人厕所上完啦!
    第8个人终于上厕所啦.爽啊,剩下厕所:0
    第5个人厕所上完啦!
    第6个人终于上厕所啦.爽啊,剩下厕所:0
    第9个人厕所上完啦!
    第4个人终于上厕所啦.爽啊,剩下厕所:0
    第6个人厕所上完啦!
    第2个人终于上厕所啦.爽啊,剩下厕所:0
    第4个人厕所上完啦!
    第7个人终于上厕所啦.爽啊,剩下厕所:0
    第7个人厕所上完啦!
    第8个人厕所上完啦!
    第2个人厕所上完啦!
    View Code

    阻塞队列与非阻塞队

    阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列.

    1.ArrayDeque, (数组双端队列) 
    2.PriorityQueue, (优先级队列) 
    3.ConcurrentLinkedQueue, (基于链表的并发队列) 
    4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 
    5.ArrayBlockingQueue, (基于数组的并发阻塞队列) 
    6.LinkedBlockingQueue, (基于链表的FIFO阻塞队列) 
    7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列) 
    8.PriorityBlockingQueue, (带优先级的无界阻塞队列) 
    9.SynchronousQueue (并发同步阻塞队列)

    ConcurrentLinkedDeque

    ConcurrentLinkedQueue : 是一个适用于高并发场景下的队列,通过无锁的方式,实现 了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue.它 是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先 加入的,尾是最近加入的,该队列不允许null元素。 ConcurrentLinkedQueue重要方法: add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中这俩个方法没有任何区别) poll() 和peek() 都是取头元素节点,区别在于前者会删除元素,后者不会。

    import java.util.concurrent.ConcurrentLinkedDeque;
    
    public class Test0009 {
    
         public static void main(String[] args) {
                ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
                q.offer("余胜军");
                q.offer("码云");
                q.offer("蚂蚁课堂");
                q.offer("张杰");
                q.offer("艾姐");
                //从头获取元素,删除该元素
                System.out.println(q.poll());
                //从头获取元素,不刪除该元素
                System.out.println(q.peek());
                //获取总长度
                System.out.println(q.size());
    
        }
        
    }

    输出

    余胜军
    码云
    4

    BlockingQueue模拟生产者与消费者

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    class ProducerThread implements Runnable {
        private BlockingQueue<String> blockingQueue;
        private AtomicInteger count = new AtomicInteger();
        private volatile boolean FLAG = true;
    
        public ProducerThread(BlockingQueue<String> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
    
        public void run() {
            System.out.println(Thread.currentThread().getName() + "生产者开始启动....");
            while (FLAG) {
                String data = count.incrementAndGet() + "";
                try {
                    boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
                    if (offer) {
                        System.out.println(Thread.currentThread().getName() + ",生产队列" + data + "成功..");
                    } else {
                        System.out.println(Thread.currentThread().getName() + ",生产队列" + data + "失败..");
                    }
                    Thread.sleep(1000);
                } catch (Exception e) {
    
                }
            }
            System.out.println(Thread.currentThread().getName() + ",生产者线程停止...");
        }
    
        public void stop() {
            this.FLAG = false;
        }
    
    }
    
    class ConsumerThread implements Runnable {
        private volatile boolean FLAG = true;
        private BlockingQueue<String> blockingQueue;
    
        public ConsumerThread(BlockingQueue<String> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
    
        public void run() {
            System.out.println(Thread.currentThread().getName() + "消费者开始启动....");
            while (FLAG) {
                try {
                    String data = blockingQueue.poll(2, TimeUnit.SECONDS);
                    if (data == null || data == "") {
                        FLAG = false;
                        System.out.println("消费者超过2秒时间未获取到消息.");
                        return;
                    }
                    System.out.println("消费者获取到队列信息成功,data:" + data);
    
                } catch (Exception e) {
                    // TODO: handle exception
                }
            }
        }
    
    }
    
    public class Test0008 {
    
        public static void main(String[] args) {
            BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
            ProducerThread producerThread = new ProducerThread(blockingQueue);
            ConsumerThread consumerThread = new ConsumerThread(blockingQueue);
            Thread t1 = new Thread(producerThread);
            Thread t2 = new Thread(consumerThread);
            t1.start();
            t2.start();
            //10秒后 停止线程..
            try {
                Thread.sleep(10*1000);
                producerThread.stop();
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    
    }
    View Code

    输出

    Thread-0生产者开始启动....
    Thread-1消费者开始启动....
    消费者获取到队列信息成功,data:1
    Thread-0,生产队列1成功..
    Thread-0,生产队列2成功..
    消费者获取到队列信息成功,data:2
    Thread-0,生产队列3成功..
    消费者获取到队列信息成功,data:3
    Thread-0,生产队列4成功..
    消费者获取到队列信息成功,data:4
    Thread-0,生产队列5成功..
    消费者获取到队列信息成功,data:5
    Thread-0,生产队列6成功..
    消费者获取到队列信息成功,data:6
    Thread-0,生产队列7成功..
    消费者获取到队列信息成功,data:7
    Thread-0,生产队列8成功..
    消费者获取到队列信息成功,data:8
    Thread-0,生产队列9成功..
    消费者获取到队列信息成功,data:9
    Thread-0,生产队列10成功..
    消费者获取到队列信息成功,data:10
    Thread-0,生产者线程停止...
    消费者超过2秒时间未获取到消息.
    View Code

    线程池

    什么是线程池

    Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序
    都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
    第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
    第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,
    还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用
    线程池,必须对其实现原理了如指掌。

    线程池作用

    线程池是为突然大量爆发的线程设计的,通过有限的几个固定线程为大量的操作服务,减少了创建和销毁线程所需的时间,从而提高效率。

    如果一个线程的时间非常长,就没必要用线程池了(不是不能作长时间操作,而是不宜。),况且我们还不能控制线程池中线程的开始、挂起、和中止。

    线程池的分类

    ThreadPoolExecutor

    Java是天生就支持并发的语言,支持并发意味着多线程,线程的频繁创建在高并发及大数据量是非常消耗资源的,因为java提供了线程池。在jdk1.5以前的版本中,线程池的使用是及其简陋的,但是在JDK1.5后,有了很大的改善。JDK1.5之后加入了java.util.concurrent包,java.util.concurrent包的加入给予开发人员开发并发程序以及解决并发问题很大的帮助。这篇文章主要介绍下并发包下的Executor接口,Executor接口虽然作为一个非常旧的接口(JDK1.5 2004年发布),但是很多程序员对于其中的一些原理还是不熟悉,因此写这篇文章来介绍下Executor接口,同时巩固下自己的知识。如果文章中有出现错误,欢迎大家指出。

    Executor框架的最顶层实现是ThreadPoolExecutor类,Executors工厂类中提供的newScheduledThreadPool、newFixedThreadPool、newCachedThreadPool方法其实也只是ThreadPoolExecutor的构造函数参数不同而已。通过传入不同的参数,就可以构造出适用于不同应用场景下的线程池,那么它的底层原理是怎样实现的呢,这篇就来介绍下ThreadPoolExecutor线程池的运行过程。

    corePoolSize: 核心池的大小。 当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
    maximumPoolSize: 线程池最大线程数,它表示在线程池中最多能创建多少个线程;
    keepAliveTime: 表示线程没有任务执行时最多保持多久时间会终止。
    unit: 参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    线程池四种创建方式

    Java通过Executors(jdk1.5并发包)提供四种线程池,分别为:

    newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

    newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

    newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。

    newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    newCachedThreadPool

    创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:

        // 无限大小线程池 jvm自动回收
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            Thread.sleep(100);
            for (int i = 0; i < 10; i++) {
                final int temp = i;
                newCachedThreadPool.execute(new Runnable() {
    
                    @Override
                    public void run() {
                        try {
                            Thread.sleep(100);
                        } catch (Exception e) {
                            // TODO: handle exception
                        }
                        System.out.println(Thread.currentThread().getName() + ",i:" + temp);
    
                    }
                });
            }
    View Code

    总结: 线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

    newFixedThreadPool

    创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:

    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 10; i++) {
                final int temp = i;
                newFixedThreadPool.execute(new Runnable() {
    
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getId() + ",i:" + temp);
                            try {
                               Thread.sleep(2000);
                            } catch (Exception e) {
                                // TODO: handle exception
                            }
                    }
                });
            }

    总结:因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。

    定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()

    newScheduledThreadPool

    创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:

    public static void main(String[] args) {
            // 可以定时线程池
            ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(3);
            for (int i = 0; i < 10; i++) {
                final int temp = i;
                newScheduledThreadPool.schedule(new Runnable() {
                    public void run() {
                        System.out.println(Thread.currentThread().getName()+",i:" + temp);
                    }
                }, 3, TimeUnit.SECONDS);
            }
        }

    表示延迟3秒执行。

    newSingleThreadExecutor

    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:

        public static void main(String[] args) {
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 10; i++) {
                final int temp = i;
                newSingleThreadExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName() + ",i:" + temp);
                    }
                });
            }
        }

     

  • 相关阅读:
    安卓开发_浅谈TimePicker(时间选择器)
    eclipse显示代码行数
    Java数据解析---JSON
    Java数据解析---PULL
    Java数据解析---SAX
    统计机器学习(目录)
    FP Tree算法原理总结
    梯度下降(Gradient Descent)小结
    用scikit-learn和pandas学习线性回归
    用scikit-learn学习BIRCH聚类
  • 原文地址:https://www.cnblogs.com/wangzhanhua/p/10444117.html
Copyright © 2011-2022 走看看