zoukankan      html  css  js  c++  java
  • java多线程通信

    • 技术需求: 当生产者更新一条数据后,会立即通知消费者。
    • 原理分析图
    • 原理图
    • 这种需求可以看成一种消息队列
      我们可以利用多线程来开启两个队列,一个是生产者,另一个是消费者。

    利用synchronized 对象锁来实现线程原子性。

    package com.Thread.Test;
    
    
    
    //消息模型
    class Msg{
        public String data1;
        public String data2;
    
    }
    
    //生产者线程
    class Producer extends Thread{
    
        private Msg msg;
        private int count = 0;
    
        public Producer(Msg msg) {
            this.msg = msg;
        }
    
        @Override
        public void run() {
            while (true){
                synchronized (msg){
                    if(count == 0){
                        msg.data1 = "消息1";
                        msg.data2 = "状态1";
                    }else{
                        msg.data1 = "消息2";
                        msg.data2 = "状态2";
                    }
    
                    count = (count +1)%2;
                }
            }
        }
    }
    
    
    //消费者线程
    class Consumer extends Thread{
    
    
        private Msg msg;
    
        public Consumer(Msg msg) {
            this.msg = msg;
        }
    
        @Override
        public void run() {
    
            while (true){
                synchronized (msg){
                    System.out.println("data1:"+msg.data1+";data2:"+msg.data2);
                }
            }
    
        }
    }
    
    
    public class Main {
    
        public static void main(String[] args) throws InterruptedException {
    
    
            Msg msg = new Msg();
    
            Producer producer = new Producer(msg);
    
            Consumer consumer = new Consumer(msg);
    
            producer.start();
            Thread.sleep(300);
            consumer.start();
    
        }
    }
    
    

    这里有两个关键点

    • 为什么要使用对象锁?
    • 因为如果不利用对象锁的话,这两个线程是不安全的,因为JMM的原因使得线程不可见。
    • 并且对象锁一定是一致的,不然数据不会同步。
    • 这个解决方案是否可行?
    • 并不可以,因为两个线程存在抢占资源锁的情况,所以有可能生产者更新几次资源,但是消费者只显示一次,或者消费者重复显示几次的情况,并不符合我们的预期。
    • 基于以上两点,我们采用线程通信技术。

    多线程通信常用函数

    1. wait()函数
      该函数基于Object对象,他的作用是,暂时休眠该线程,并且释放锁资源
    2. notify()函数
      他的作用是唤醒线程池其他线程
    3. interrupt()函数
      将当前正在等待的线程【可以是wait的线程】,直接抛出异常,用来停止线程。

    这两个函数通常都是配套使用,并且一定用在synchronized锁对象的情况下


    利用上面两个函数进行线程通信

    package com.Thread.Test;
    
    
    
    //消息模型
    class Msg{
        public String data1;
        public String data2;
        //假定flag= false 的时候 生产者激活,反之消费者激活
        public boolean flag = false;
    }
    
    //生产者线程
    class Producer extends Thread{
    
        private Msg msg;
        private int count = 0;
    
        public Producer(Msg msg) {
            this.msg = msg;
        }
    
        @Override
        public void run() {
            while (true){
                synchronized (msg){
    //                此时生产者线程应该休眠
                    if(msg.flag){
                        try {
                            Thread.sleep(1000);
                            msg.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                    if(count == 0){
                        msg.data1 = "消息1";
                        msg.data2 = "状态1";
                    }else{
                        msg.data1 = "消息2";
                        msg.data2 = "状态2";
                    }
    
                    count = (count +1)%2;
    
                    msg.flag = true;
                    //通知其他线程
                    msg.notify();
                }
            }
        }
    }
    
    class Consumer extends Thread{
    
    
        private Msg msg;
    
        public Consumer(Msg msg) {
            this.msg = msg;
        }
    
        @Override
        public void run() {
    
            while (true){
                synchronized (msg){
                    if(!msg.flag){
    
                        try {
                            Thread.sleep(1000);
                            msg.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
    
                    }
                    System.out.println("data1:"+msg.data1+";data2:"+msg.data2);
                    msg.flag = false;
                    msg.notify();
                }
            }
    
        }
    }
    
    
    public class Main {
    
        public static void main(String[] args) throws InterruptedException {
    
    
            Msg msg = new Msg();
    
            Producer producer = new Producer(msg);
    
            Consumer consumer = new Consumer(msg);
    
            producer.start();
            consumer.start();
    
        }
    }
    

    利用Lock锁实现通信

    Lock锁与synchronized的不同之处在于,wait和notify函数对于lock是没有用的

    不多说,上代码

    package com.Thread.Test;
    
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    //消息模型
    class Msg2{
        public String data1;
        public String data2;
        //假定flag= false 的时候 生产者激活,反之消费者激活
        public boolean flag = false;
        public Lock lock = new ReentrantLock();
    }
    
    //生产者线程
    class Producer2 extends Thread {
    
        private Msg2 msg;
        private int count = 0;
        private Condition condition;
    
    
        public Producer2(Msg2 msg, Condition condition) {
            this.msg = msg;
            this.condition = condition;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    msg.lock.lock();
    //                此时生产者线程应该休眠
                    if (msg.flag) {
                        try {
                            Thread.sleep(1000);
                            condition.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    if (count == 0) {
                        msg.data1 = "消息1";
                        msg.data2 = "状态1";
                    } else {
                        msg.data1 = "消息2";
                        msg.data2 = "状态2";
                    }
    
                    count = (count + 1) % 2;
    
                    msg.flag = true;
                    //通知其他线程
                    condition.signal();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    msg.lock.unlock();
                }
            }
        }
    }
    
    class Consumer2 extends Thread{
    
    
        private Msg2 msg;
    
        private Condition condition;
    
        public Consumer2(Msg2 msg, Condition condition) {
            this.msg = msg;
            this.condition = condition;
        }
    
        @Override
        public void run() {
    
            while (true){
                    try {
                        msg.lock.lock();
    
                        if(!msg.flag){
    
                            try {
                                Thread.sleep(1000);
                                condition.await();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
    
                        }
                        System.out.println("data1:"+msg.data1+";data2:"+msg.data2);
                        msg.flag = false;
                        condition.signal();
                    }catch (Exception e) {
                        e.printStackTrace();
                    }finally {
                        msg.lock.unlock();
                    }
    
    
                }
            }
    
    }
    
    
    
    public class LockTest {
    
        public static void main(String[] args) throws InterruptedException {
    
    
            Msg2 msg = new Msg2();
            //利用Condition来限定通信
            Condition newCondition = msg.lock.newCondition();
    
            Producer2 producer = new Producer2(msg,newCondition);
    
            Consumer2 consumer = new Consumer2(msg,newCondition);
    
            producer.start();
            consumer.start();
    
        }
    }
    

    如何优雅的停止线程(补充)

    首先,放弃Thread.stop()函数
    要知道,一些线程都是一些while循环的,即可能是while(true)格式的,这样停止他可以用这样的思路:
    设置一个boolean的flag,当flag = true的时候正常运行,flag = false的时候停止线程。

    这时应该考虑线程可见问题。需要将flag修改为 voliate格式。

    但是,如果当前的线程是synchronized锁,并且在wait状态下,flag修改对本线程没有什么影响。因为现在线程已经休眠了。
    那么可以利用interrupt函数让他抛出异常,然后在catch的代码块上面将flag修改。

  • 相关阅读:
    NSPrediccate 查询
    集合 不可变集合
    集合 不可变
    考核题 7
    考核题 6
    考核题 4
    练习题12
    练习题3
    iOS 实现在string任意位置添加新的表情
    在 ZBarSDK 中使用Block回调传值 Block在扫描成功后 变为空
  • 原文地址:https://www.cnblogs.com/adroitwolf/p/14309803.html
Copyright © 2011-2022 走看看