zoukankan      html  css  js  c++  java
  • java多线程基本概述(六)——简单生产者消费者模式

      在线程里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。下面实现一个简单的生产者消费者模式:

    1.一个消费者一个生产者循环消费生产

    package soarhu;
    
    import java.util.ArrayList;
    import java.util.List;
    
    class Service{
    
        private final Object lock;
        private List<String> list = new ArrayList<>();
    
        public Service(Object lock) {
            this.lock = lock;
        }
    
        void waiting(){
            synchronized (lock){
                    try {
                        while(list.size()==0){
                            lock.wait();
                        }
                        String value = list.remove(0);
                        System.out.println("consume: "+value);
                        lock.notifyAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
            }
        }
    
        void notifying(){
            synchronized (lock){
                try {
                    while(list.size()!=0){
                        lock.wait();
                    }
                    String value=System.currentTimeMillis()+"";
                    list.add(value);
                    System.out.println("produce: "+value);
                    lock.notifyAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    
    public class Test {
        public static void main(String[] args) throws InterruptedException {
                Object o = new Object();
                Service service = new Service(o);
                for (int i = 0; i < 1; i++) {  //这里的数量为1
                    new Thread(){
                        @Override
                        public void run() {
                            while (true) {
                                service.notifying();
                            }
                        }
                    }.start();
                }
                Thread.sleep(1000);
                for (int i = 0; i < 1; i++) { //此时数量改为1
                    new Thread(){
                        @Override
                        public void run() {
                            while (true) {
                                service.waiting();
                            }
                        }
                    }.start();
                }
        }
    }

    输出结果

    produce: 1492495274866
    consume: 1492495274866
    produce: 1492495274866
    consume: 1492495274866
    produce: 1492495274866
    consume: 1492495274866
    produce: 1492495274866
    consume: 1492495274866
    produce: 1492495274866
    consume: 1492495274866
    produce: 1492495274866
    consume: 1492495274866
    produce: 1492495274866
    consume: 1492495274866
    produce: 1492495274866
    consume: 1492495274866
    produce: 1492495274866
    consume: 1492495274866
    produce: 1492495274866
    consume: 1492495274866
    produce: 1492495274866
    consume: 1492495274866
    ..............
    ..........
    .........

     这里可能有个踩坑的地方需要注意,代码改为如下后:

     1  void notifying(){
     2         synchronized (lock){
     3             try {
     4                 int size = list.size();
     5                 System.out.println("before produce size: "+size);
     6                 while(size!=0){
     7                     System.out.println(Thread.currentThread().getName()+" PRODUCE WAITING..."+size);
     8                     lock.wait();
     9                 }
    10                 String value=System.currentTimeMillis()+"";
    11                 list.add(value);
    12                 System.out.println(Thread.currentThread().getName()+" produce: "+value);
    13                 lock.notifyAll();
    14                 System.out.println("after produce size: "+list.size());
    15             } catch (InterruptedException e) {
    16                 e.printStackTrace();
    17             }
    18         }
    19     }

    输出结果:死锁

    before produce size: 0
    Thread-0 produce: 1492507566267
    after produce size: 1
    before produce size: 1
    Thread-0 PRODUCE WAITING...1
    Thread-1 consume: 1492507566267
    Thread-1 CONSUME WAITING...
    Thread-0 PRODUCE WAITING...1

    之所以会产生死锁是因为:list.size()的的判断条件没有再while循环中,那么再次求size的大小时,size的值并不时最新的,因为生产者线程从wait()苏醒后走的是while条件。那么将list.size()放置到while()条件中即可正常。

     其中管道流是一种特殊的流,用于再不同的线程间直接通过管道传送数据。一个线程发送数据到管道,另一个线程从输入管道读取数据。

    例如:

    package soarhu;
    
    
    import java.io.IOException;
    import java.io.PipedInputStream;
    import java.io.PipedOutputStream;
    
    class Service{
    
        void readMethod(PipedInputStream inputStream){
            try {
                System.out.println("read: ");
                byte[] bytes = new byte[20];
                int readLength = inputStream.read(bytes);
                while (readLength!=-1){
                    readLength = inputStream.read(bytes);
                }
                System.out.println("read finish......");
                inputStream.close();
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    
        void writeMethod(PipedOutputStream outStream){
            try {
                System.out.println("write: ");
                for (int i = 0; i < 30; i++) {
                    String outData = ""+(i+1);
                    outStream.write(outData.getBytes());
                    System.out.println("write "+outData+" -> "+Thread.currentThread().getName());
                }
                System.out.println("write finish......");
                outStream.close();
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }
    
    
    public class Test {
        public static void main(String[] args) throws Exception {
                Service o = new Service();
                PipedInputStream inputStream = new PipedInputStream();
                PipedOutputStream outputStream = new PipedOutputStream();
                outputStream.connect(inputStream);
                for (int i = 0; i < 1; i++) {
                    new Thread(){
                        @Override
                        public void run() {
                            o.readMethod(inputStream);
                        }
                    }.start();
                }
                Thread.sleep(5000);
                for (int i = 0; i < 1; i++) {
                    new Thread(){
                        @Override
                        public void run() {
                            o.writeMethod(outputStream);
                        }
                    }.start();
                }
        }
    }
    read: 
    write: 
    write 1 -> Thread-1
    write 2 -> Thread-1
    write 3 -> Thread-1
    write 4 -> Thread-1
    write 5 -> Thread-1
    write 6 -> Thread-1
    write 7 -> Thread-1
    write 8 -> Thread-1
    write 9 -> Thread-1
    write 10 -> Thread-1
    write 11 -> Thread-1
    write 12 -> Thread-1
    write 13 -> Thread-1
    write 14 -> Thread-1
    write 15 -> Thread-1
    write 16 -> Thread-1
    write 17 -> Thread-1
    write 18 -> Thread-1
    write 19 -> Thread-1
    write 20 -> Thread-1
    write 21 -> Thread-1
    write 22 -> Thread-1
    write 23 -> Thread-1
    write 24 -> Thread-1
    write 25 -> Thread-1
    write 26 -> Thread-1
    write 27 -> Thread-1
    write 28 -> Thread-1
    write 29 -> Thread-1
    write 30 -> Thread-1
    write finish......
    read finish......
    
    Process finished with exit code 0

     使用队列:

    package tij;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Created by huaox on 2017/4/20.
     *
     */
    
    class Apple{
    
        private static  AtomicInteger task = new AtomicInteger(1) ;
        private final int id = task.getAndIncrement();
    
        @Override
        public String toString() {
            return "Apple{" +"id=" + id +'}';
        }
    }
    
    class Producer implements Runnable {
        private static AtomicInteger count = new AtomicInteger(0);
        private final BlockingQueue<Apple> queue;
        Producer(BlockingQueue<Apple> queue) { this.queue = queue; }
        public void run() {
            try {
                while(!Thread.currentThread().isInterrupted()) {
                    //产生一个Apple需要1秒钟
                    Thread.sleep(1000);
                    queue.put(produce());
                    count.incrementAndGet();
                    if (count.get()==10){
                        System.out.println("produce apple is all "+count);
                        System.exit(0);
                    }
                }
            } catch (InterruptedException ex) {
                System.out.println("interrupted in produce");
            }
        }
        private Apple produce() {
            Apple  apple= new Apple();
            System.out.println("produce apple: "+apple+"   "+System.currentTimeMillis());
            return apple;
        }
    }
    
    class Consumer implements Runnable {
        private final BlockingQueue<Apple> queue;
        Consumer(BlockingQueue<Apple> q) { queue = q; }
        public void run() {
            try {
                while(!Thread.currentThread().isInterrupted()) {
                    Thread.sleep(2000);//消费一个产品需要2秒钟
                    consume(queue.take());
                }
            } catch (InterruptedException ex) { System.out.println("interrupted in produce");}
        }
        private void consume(Apple x) {
            System.out.println("consume apple: "+x+"   "+System.currentTimeMillis());
        }
    }
    
    class Setup {
        private static final int P_SIZE=2;//生产者个数
        private static final int C_SIZE=6;//消费者个数
        public static void main(String[] args) {
            BlockingQueue<Apple> queue = new LinkedBlockingDeque<>();
            System.out.println("job starting");
           /* for (int i = 0; i < C_SIZE; i++) {
                new Thread(new Consumer(queue)).start();
            }
            for (int i = 0; i < P_SIZE; i++) {
                new Thread(new Producer(queue)).start();
            }*/
            ExecutorService service = Executors.newCachedThreadPool();
            for (int i = 0; i < C_SIZE; i++) {
                service.execute(new Consumer(queue));
            }
            for (int i = 0; i < P_SIZE; i++) {
                service.execute(new Producer(queue));
            }
        }
    }

    结果:

    job starting
    produce apple: Apple{id=1}   1492694951683
    produce apple: Apple{id=2}   1492694951683
    produce apple: Apple{id=4}   1492694952684
    produce apple: Apple{id=3}   1492694952684
    consume apple: Apple{id=1}   1492694952684
    consume apple: Apple{id=4}   1492694952684
    consume apple: Apple{id=3}   1492694952684
    consume apple: Apple{id=2}   1492694952684
    produce apple: Apple{id=6}   1492694953684
    produce apple: Apple{id=5}   1492694953684
    consume apple: Apple{id=6}   1492694953684
    consume apple: Apple{id=5}   1492694953684
    produce apple: Apple{id=7}   1492694954686
    produce apple: Apple{id=8}   1492694954686
    consume apple: Apple{id=7}   1492694954686
    consume apple: Apple{id=8}   1492694954686
    produce apple: Apple{id=9}   1492694955688
    produce apple: Apple{id=10}   1492694955688
    consume apple: Apple{id=9}   1492694955688
    consume apple: Apple{id=10}   1492694955688
    produce apple is all 10
    
    Process finished with exit code 0
    package tij;
    
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Created by huaox on 2017/4/20.
     *
     */
    class Toast{
        public enum Status{DRY,BUTTERED,JAMMED}
        private Status status = Status.DRY;
        private final int id;
        Toast(int id){this.id = id;}
        void butter(){status=Status.BUTTERED;}
        void jam(){status=Status.JAMMED;}
        Status getStatus(){return status;}
        int getId(){return id;}
    
        @Override
        public String toString() {
            return "Toast[" +"status=" + status +", id=" + id +']';
        }
    }
    class ToastQueue<T> extends LinkedBlockingQueue<T>{}
    
    class Toaster implements Runnable{//生产吐司
        private ToastQueue<Toast> toastQueue;
        private AtomicInteger count = new AtomicInteger(0);
        private Random random = new Random(47);
    
        Toaster(ToastQueue<Toast> toastQueue) {
            this.toastQueue = toastQueue;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()){
                    TimeUnit.MILLISECONDS.sleep(100+random.nextInt(500));
                    Toast toast = new Toast(count.getAndIncrement());
                    System.out.println(toast);
                    toastQueue.put(toast);
                }
            } catch (InterruptedException e) {
                System.out.println("toaster interrupted! ");
            }
            System.out.println("toaster off");
        }
    }
    //给吐司抹黄油
    class Butter implements Runnable{
        private ToastQueue<Toast> dryQueue,butterQueue;
    
        Butter(ToastQueue<Toast> dryQueue, ToastQueue<Toast> butterQueue) {
            this.dryQueue = dryQueue;
            this.butterQueue = butterQueue;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()){
                    Toast take = dryQueue.take();//阻塞直到上一个步骤做好了
                    take.butter();
                    System.out.println(take);
                    butterQueue.put(take);
                }
            } catch (InterruptedException e) {
                System.out.println("butter interrupted! ");
            }
            System.out.println("butter off");
        }
    }
    
    //给吐司涂酱
    class Jam implements Runnable{
        private ToastQueue<Toast> butterQueue,finishQueue;
    
        Jam(ToastQueue<Toast> butterQueue, ToastQueue<Toast> finishQueue) {
            this.finishQueue = finishQueue;
            this.butterQueue = butterQueue;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()){
                    Toast take = butterQueue.take();//阻塞直到上一个步骤做好了
                    take.jam();
                    System.out.println(take);
                    finishQueue.put(take);
                }
            } catch (InterruptedException e) {
                System.out.println("jam interrupted! ");
            }
            System.out.println("jam off");
        }
    }
    
    class Easter implements Runnable{
    
        private ToastQueue<Toast> finishQueue;
        private AtomicInteger count = new AtomicInteger(0);
    
        Easter(ToastQueue<Toast> finishQueue) {
            this.finishQueue = finishQueue;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()){
                    Toast toast = finishQueue.take();
                    //验证吐司是不是按顺序来的,并且是不是涂酱了
                    if(toast.getId()!=count.getAndIncrement() || toast.getStatus()!=Toast.Status.JAMMED){
                        System.out.println("ERROR!!");
                        System.exit(1);
    
                    }else{
                        System.out.println("CHOMP "+toast);
                    }
                }
            } catch (InterruptedException e) {
                System.out.println("Eater interrupted");
            }
            System.out.println("eater off");
        }
    }
    
    public class ToastOMatic {
    
        public static void main(String[] args) throws InterruptedException {
            ToastQueue<Toast> dryQueue = new ToastQueue<>(),
                    butterQueue = new ToastQueue<>(),
                    finishQueue = new ToastQueue<>();
            ExecutorService service = Executors.newCachedThreadPool();
            service.execute(new Toaster(dryQueue));
            service.execute(new Butter(dryQueue,butterQueue));
            service.execute(new Jam(butterQueue,finishQueue));
            service.execute(new Easter(finishQueue));
    
            TimeUnit.SECONDS.sleep(7);
            service.shutdownNow();
        }
    }
    Connected to the target VM, address: '127.0.0.1:2223', transport: 'socket'
    Toast[status=DRY, id=0]
    Toast[status=BUTTERED, id=0]
    Toast[status=JAMMED, id=0]
    CHOMP Toast[status=JAMMED, id=0]
    Toast[status=DRY, id=1]
    Toast[status=BUTTERED, id=1]
    Toast[status=JAMMED, id=1]
    CHOMP Toast[status=JAMMED, id=1]
    Toast[status=DRY, id=2]
    Toast[status=BUTTERED, id=2]
    Toast[status=JAMMED, id=2]
    CHOMP Toast[status=JAMMED, id=2]
    Toast[status=DRY, id=3]
    Toast[status=BUTTERED, id=3]
    Toast[status=JAMMED, id=3]
    CHOMP Toast[status=JAMMED, id=3]
    Toast[status=DRY, id=4]
    Toast[status=BUTTERED, id=4]
    Toast[status=JAMMED, id=4]
    CHOMP Toast[status=JAMMED, id=4]
    Toast[status=DRY, id=5]
    Toast[status=BUTTERED, id=5]
    Toast[status=JAMMED, id=5]
    CHOMP Toast[status=JAMMED, id=5]
    Toast[status=DRY, id=6]
    Toast[status=BUTTERED, id=6]
    Toast[status=JAMMED, id=6]
    CHOMP Toast[status=JAMMED, id=6]
    Toast[status=DRY, id=7]
    Toast[status=BUTTERED, id=7]
    Toast[status=JAMMED, id=7]
    CHOMP Toast[status=JAMMED, id=7]
    Toast[status=DRY, id=8]
    Toast[status=BUTTERED, id=8]
    Toast[status=JAMMED, id=8]
    CHOMP Toast[status=JAMMED, id=8]
    Toast[status=DRY, id=9]
    Toast[status=BUTTERED, id=9]
    Toast[status=JAMMED, id=9]
    CHOMP Toast[status=JAMMED, id=9]
    Toast[status=DRY, id=10]
    Toast[status=BUTTERED, id=10]
    Toast[status=JAMMED, id=10]
    CHOMP Toast[status=JAMMED, id=10]
    Disconnected from the target VM, address: '127.0.0.1:2223', transport: 'socket'
    Toast[status=DRY, id=11]
    Toast[status=BUTTERED, id=11]
    Toast[status=JAMMED, id=11]
    CHOMP Toast[status=JAMMED, id=11]
  • 相关阅读:
    内存对齐规则
    ATL窗口
    ATL的GUI程序设计(4)
    ATL的GUI程序设计(4)
    ATL的GUI程序设计(3)
    ATL的GUI程序设计(3)
    VMware Workstation 9.0 安装苹果Mac OS X10.9系统
    高级UIKit-03(NSFileManager、NSFileHandle)
    高级UIKit-02(文件操作)
    高级UIKit-01(总结基础UIKit)
  • 原文地址:https://www.cnblogs.com/soar-hu/p/6727849.html
Copyright © 2011-2022 走看看