zoukankan      html  css  js  c++  java
  • 多线程之生产者消费者

      一个简单的生产者消费者案例:自定义同步容器,容器容量上限为10。可以在多线程中应用,并保证数据线程安全。

      对于生产者,消费者,最容易想到的就是利用wait和notify去实现,wait和notify一定是配合着synchronized去操作的,以下便是简单的一个实现:

    package com.cn.cfang;
    
    import java.util.LinkedList;
    /**
     * wait&notify : wait会释放锁,而notify则不会释放锁。
     *  wait/notify都是和while配合应用的。可以避免多线程并发判断逻辑失效问题:当进行while判断后,cpu时间片可能到达,则会让出资源,让其他线程执行,
     *      等再来到此线程的时候,如果不是while判断,则很可能会出现条件失效的情况下,依旧进入了方法执行,而while则会再检查条件是否满足。
     *  synchronized不建议放在方法上进行同步控制,这种效率一般比较低下,建议是同步其中部分需要的代码块
     * @author cfang
     * 2018年5月3日 下午4:32:36
     */
    public class TestContainer01<E> {
    
        private LinkedList<E> list = new LinkedList<E>();
        private int max = 10;
        private int count = 0;
        
        public synchronized int getCount(){
            return count;
        }
        
        public synchronized E get(){
            E e = null;
            while(list.size() == 0){
                try {
                    this.wait();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
            e = list.removeFirst();
            count--;
            this.notifyAll();
            return e;
        }
        
        public synchronized void put(E e){
            while(list.size() == max){
                try {
                    this.wait();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
            list.add(e);
            count++;
            this.notifyAll();
        }
        
        public static void main(String[] args) {
            TestContainer01<String> c = new TestContainer01<String>();
            for(int i = 0; i < 10; i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for(int j = 0; j < 5; j++){
                            System.out.println(Thread.currentThread().getName()+" || "+ c.get());
                        }
                    }
                }, "consumer:" + i).start();
            }
            
            for(int i = 0; i < 2; i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for(int j = 0; j < 25; j++){
                            c.put("producer value " + j); 
                        }
                    }
                }, "producer:" + i).start();
            }
        }
    }

      固定容量,最大限制10个,生产当达到阀值的时候,wait等待,notify消费者去进行消费。但是,此处有点小问题,不管是消费者,还是生产者,再进行notify的时候,都是唤醒其他所以wait线程,这样,会导致唤醒不明确,部分消费者或者生产者也会遭遇唤醒。那么改进,以下利用重入锁ReentrantLock的Condition去进行更细粒度的唤醒:

    package com.cn.cfang;
    
    import java.util.LinkedList;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * ReentrantLock必须进行手动unlock
     * 
     * @author cfang
     * 2018年5月3日 下午4:38:59
     */
    public class TestContainer02<E> {
    
        private volatile LinkedList<E> list = new LinkedList<E>();
        private int max = 10;
        private int count = 0;
        private ReentrantLock lock = new ReentrantLock();
        private Condition producer = lock.newCondition();
        private Condition consumer = lock.newCondition();
        
        public synchronized int getCount(){
            return count;
        }
        //消费者
        public E get(){
            E e = null;
            lock.lock();
            try {
                while(list.size() == 0){
                    System.out.println("无数据,消费者"+Thread.currentThread().getName()+" 进入等待队列");
                    //队列空,则让消费者进入等待队列,释放锁标记
                    consumer.await();
                }
                System.out.println("有数据,消费者"+Thread.currentThread().getName()+" 消费数据");
                e = list.removeFirst();
                count--;
                //消费者已消费数据,通知唤醒所以生产者进行生产活动
                producer.signalAll();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }finally{
                //lock锁必须进行手动释放
                lock.unlock();
            }
            return e;
        }
        //生产者
        public void put(E e){
            lock.lock();
            try {
                while(list.size() == max){
                    System.out.println("容量已满,生产者"+Thread.currentThread().getName()+" 进入等待队列");
                    //队列容量上线,让生产者进入等待队列,释放锁标记
                    producer.await();
                }
                System.out.println("生产者"+Thread.currentThread().getName()+" 生产数据:" + e);
                list.add(e);
                count++;
                //队列中已生产新数据,唤醒通知所有消费者进行消费操作
                consumer.signalAll();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            } finally{
                lock.unlock();
            }
        }
        
        public static void main(String[] args) {
            TestContainer02<String> c = new TestContainer02<String>();
            for(int i = 0; i < 10; i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for(int j = 0; j < 5; j++){
                            System.out.println(Thread.currentThread().getName()+" || "+ c.get());
                        }
                    }
                }, "consumer:" + i).start();
            }
            
            for(int i = 0; i < 2; i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for(int j = 0; j < 25; j++){
                            c.put("producer value " + j); 
                        }
                    }
                }, "producer:" + i).start();
            }
        }
    }

      

      对于wait和notify,wait会释放锁标记,而notify则不会释放锁标记。

    案例:给定一个容器,启动两个线程,A线程取数据,B线程往其中放数据,当放到第N个数据的时候,立即通知A线程继续执行。

    package com.cn.thread;
    
    /**
     * wait会释放锁,notify不会释放锁
     */
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    public class Test1 {
    
        private volatile List<String> list = new ArrayList<String>();
        
        private Object lock = new Object();
        
        private CountDownLatch countDownLatch = new CountDownLatch(1);
        
        private void set() throws InterruptedException{
            synchronized(lock){
                for(int i = 0; i<10; i++){
                    list.add("1");
                    System.out.println(Thread.currentThread().getName() + " add done:" + i);
                    if(list.size() == 5){
                        lock.notify();
    //                    lock.wait();    //标记1
    //                    countDownLatch.countDown(); //标记2
                    }
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        
        private void get() throws InterruptedException{
    //        while(true){
    //            if(list.size() == 5){
    //                System.out.println("t2, get will start");
    //                throw new InterruptedException();
    //            }
    //        }
            synchronized(lock){
                if(list.size() < 5){
                    lock.wait();
    //                countDownLatch.await();  //标记2
                    System.out.println("t2, get will start");
                    lock.notify();
                    throw new InterruptedException();
                }
            }
        }
        
        public static void main(String[] args){
            Test1 test1 = new Test1();
            
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        test1.get();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "t2").start();
            
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        test1.set();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "t1").start();
            
        }
    }

     此时,虽然最终,t2线程也能抛出异常,但是却是等待t1结束才执行的,并不是到容器长度5的时候,立即执行的。虽然达到目的,但是控制不精确。

    解决方案:放开标记1的行代码,则在容器size=5的时候,t2会执行异常。

    另外,标记2代码是利用java并发包中的门闩CountDownLatch去实现,更简洁,明了。

    ps : 水平有限,个人理解表达有错误之处,望不吝赐教,谢谢。

  • 相关阅读:
    elasticsearch 中的Multi Match Query
    activiti 流程部署的各种方式
    elasticsearch 嵌套对象之嵌套类型
    elasticsearch Java High Level REST 相关操作封装
    elasticsearch 字段数据类型
    ubuntu 安装 docker
    elasticsearch 通过HTTP RESTful API 操作数据
    facenet 人脸识别(二)——创建人脸库搭建人脸识别系统
    POJ 3093 Margaritas(Kind of wine) on the River Walk (背包方案统计)
    墨卡托投影, GPS 坐标转像素, GPS 坐标转距离
  • 原文地址:https://www.cnblogs.com/eric-fang/p/8986380.html
Copyright © 2011-2022 走看看