zoukankan      html  css  js  c++  java
  • 线程之间的通信方式

    前言 
    说到线程之间通信方式:依据我的理解 主要是有两种吧 
    1.是通过共享变量,线程之间通过该变量进行协作通信; 
    2.通过队列(本质上也是线程间共享同一块内存)来实现消费者和生产者的模式来进行通信;


    1.通过线程之间共享变量的方式

    • 这个就有必要说下 wait(),notify(),以及notifyAll() 这三个方法

      • 这三个方法都是属于Object的方法;所以所有类都可以继承这三方法; 
        • wait()方法使得当前线程必须要等待,等到另外一个线程调用notify()或者notifyAll()方法。
        • notify()方法会唤醒一个等待当前对象的锁的线程。而notifyAll()顾名思义;就是唤醒所有在等待中的方法;
        • wait()和notify()方法要求在调用时线程已经获得了对象的锁,因此对这两个方法的调用需要放在synchronized方法或synchronized块中。
    • 来看下下面这个实例吧 
      -通过wait() 和notifyAll() 来实现多个线程之间加减的demo

    package com.zeng.awaitNotify;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    /**
     * 通过共享一个变量,wait()+notify() 来践行通信
     * wait()和notify()方法要求在调用时线程已经获得了对象的锁,因此对这两个方法的调用需要放在synchronized方法或synchronized块中。
     *  
     * 针对两个线程的时候  没有问题
     * 针对线程一多的时候, 就必须要用notifyAll()
     * @author leo-zeng
     *  */
    public class NumberHolder {
    
        private int number;
    
        public synchronized void increase(){
            while(number !=0){
                try {
                    //若是nuber 不为0 时 等待
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //能执行到这里,说明 已经被唤醒了,并且为0
            number ++;
            System.out.println("我要递增:"+number);
            //通知在等待的线程
            notifyAll();
        }
    
        public synchronized void decrease(){
            while(number ==0){
                try {
                    //若是等于零的时候  等待唤醒
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //能执行到这里,说明 已经被唤醒了,并且不为0
            number --;
            System.out.println("我要递减:"+number);
            notifyAll();
        }
        public static void main(String[] args) {
            ExecutorService pool = Executors.newFixedThreadPool(10);
    
            NumberHolder holder  =new NumberHolder();
    
            //执行任务
            pool.execute(new IncreaseThread(holder));
            pool.execute(new DecreaseThread(holder));
    
            pool.execute(new IncreaseThread(holder));
            pool.execute(new DecreaseThread(holder));
    
            pool.shutdown();
            try {
                pool.awaitTermination(300,TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 累加的类
     * @author leo
     *  */
    class IncreaseThread extends Thread{
    
        private NumberHolder numberHolder;
    
        public IncreaseThread(NumberHolder numberHolder) {
            this.numberHolder =numberHolder;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 20; i++) {
                //每次都有不多的延迟
                try {
                    Thread.sleep((long)Math.random()*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //执行新增操作
                numberHolder.increase();
            }
        }
    }
    class DecreaseThread extends Thread{
    
        private NumberHolder holder;
    
        public DecreaseThread(NumberHolder holder){
            this.holder =holder;
        }
        @Override
        public void run() {
            for (int i = 0; i <20; i++) {
                //每次都有不多的延迟
                try {
                    Thread.sleep((long)Math.random()*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //执行递减函数
                holder.decrease();
            }
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114

    注意一点:这里用while 不用 if 是因为保证可能多线程中,杜绝可能累加/递减会进行多次的可能。

    • 使用lock.newCondition().await() 和 signal() 方法实现线程之间交互 
      • 除了上述在synchronized代码块中使用 wait和 notify之外呢,其实在在java.util.concurrent包中,有两个很特殊的工具类,Condition和ReentrantLock,也可以同样实现线程间的交互协作。 
        • ReentrantLock(重入锁)和Condition 我在这里不想细说,有兴趣的可以去看些jdk源码。
        • 这里要介绍一下condition中的await()和signal() 方法; 
          我们这边先看demo 然后再来解释这两个的含义:
    package com.zeng.awaitNotify;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * 通过共享变量来实现
     * @author leo-zeng
     *
     */
    public class Resource {
    
        private int i;
    
        private final ReentrantLock lock = new ReentrantLock();
    
        private final Condition condition =lock.newCondition();
    
        public void incr(){
            try {
                //上锁
                lock.lock();
                while(i!=0){
                    //叫停,等待唤醒的信号
                    condition.await();
                }
                //说明已经得到可以用的信号
                i++;
                System.out.println( "递增:"+i);
                //给其他添加信号
                condition.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                //解锁
                lock.unlock();
            }
        }
    
        public void decr(){
            try {
                //上锁
                lock.lock();
                while(i==0){
                    //叫停, 等待递增那边的信号
                    condition.await();
                }
                //i !=0  拿到那边的信号
                i--;
                System.out.println( "递减:"+i);
                //给其他添加信号
                condition.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                //解锁
                lock.unlock();
            }
        }
    
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newFixedThreadPool(10);
            Resource ss = new Resource();
    
            threadPool.submit(new IncrThread(ss));
            threadPool.submit(new DecrThread(ss));
            threadPool.submit(new IncrThread(ss));
            threadPool.submit(new DecrThread(ss));
            threadPool.shutdown();
            try {
                threadPool.awaitTermination(300,TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    }
    class IncrThread extends Thread{
        private Resource resource;
    
        public IncrThread(Resource resource) {
            this.resource = resource;
        }
    
        @Override
        public void run() {
            for (int i = 0; i <20; i++) {
                //每次停顿一秒
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource.incr();
            }
        }
    }
    
    class DecrThread extends Thread{
        private Resource resource;
    
        public DecrThread(Resource resource) {
            this.resource = resource;
        }
    
        @Override
        public void run() {
            for (int i = 0; i <20; i++) {
                //每次停顿一秒
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource.decr();
            }
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121

    从demo中可以看到await()方法,是要先给A(累加)线程加锁,进入await之后会让线程沉睡,等待signal信号来叫醒,这是A线程解锁后会进入沉睡,运行B线程;b线程先加锁然后进行递减,当值为0值也会进行也会睡眠的,然后解锁,把锁给A。就这样来进行通信的。


    2.通过队列来实现线程的通信

    • 这里用的是java.util.concurrent包中linkedBlockingQueue 来进行线程间交互; 
      • java.util.concurrent.LinkedBlockingQueue 是一个基于单向链表的、范围任意的(其实是有界的)、FIFO 阻塞队列。访问与移除操作是在队头进行,添加操作是在队尾进行,并分别使用不同的锁进行保护,只有在可能涉及多个节点的操作才同时对两个锁进行加锁。
      • 这里通过共享一个队列的信息,实现生产者和消费者
    package com.zeng.awaitNotify;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    /**
     * 通过linkedblockingQueue 构建线程间通信
     * @author leo-zeng
     *
     */
    public class LinkedBlockingQueueTest {
        public static void main(String[] args) {
            LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>();
            ExecutorService threadPool = Executors.newFixedThreadPool(10);
            threadPool.execute(new Producer(queue));
            threadPool.execute(new Consumer(queue));
    
    
            if(!threadPool.isShutdown()){
                threadPool.shutdown();
                try {
                    threadPool.awaitTermination(300, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
    
        }
    }
    class Producer extends Thread{
        private LinkedBlockingQueue<String> queue;
    
        public Producer(LinkedBlockingQueue<String> queue) {
            this.queue =queue;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 20; i++) {
                System.out.println("生产出:"+i);
                try {
                    Thread.sleep(100);
                    queue.put(new String("producer:"+i));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    class Consumer extends Thread{
        private LinkedBlockingQueue<?> queue;
    
        public Consumer(LinkedBlockingQueue<String> q) {
            this.queue =q;
        }
        @Override
        public void run() {
            while(true){
                try {
                    System.out.println("consumer 消费了:"+queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
  • 相关阅读:
    《Linux系统编程(第2版)》
    《深入网站开发和运维》
    《软件定义网络:SDN与OpenFlow解析》
    《大话重构》
    《程序员的修炼——从优秀到卓越》
    《Web性能权威指南》
    自定义项目脚手架- Maven Archetypes
    Intellij修改archetype Plugin配置
    ng-template寄宿方式
    ThoughtWorks持续集成平台GO开源了
  • 原文地址:https://www.cnblogs.com/efforts-will-be-lucky/p/7199980.html
Copyright © 2011-2022 走看看