zoukankan      html  css  js  c++  java
  • [Java Concurrent] 多线程合作 producer-consumers / queue 的简单案例

    在多线程环境下,通过 BlockingQueue,实现生产者-消费者场景。

    Toast 被生产和消费的对象。

    ToastQueue 继承了 LinkedblockingQueue ,用于中间存储 Toast 。

    Producer 生产 Toast ,并将生产出来的 Toast 放进队列 initialToastQ 中。

    Processor 加工 Toast,从 initialToastQ 中获得生产出来的 Toast,将其加工并放进队列 finishedToast 中。

    Consumer 消费 Toast,从 finishedToastQ 中获得加工完成的 Toast。

    ThreadHelper 工具类,用于输出线程相关信息。

    ProducerConsumerDemo 演示这个场景

    代码实现:

    Toast 实现

    public class Toast {
        
        private int id;
        
        public Toast(int id){
            this.id = id;
        }
    
        public String toString(){
            return " toast#" + id;
        }
    }

    ToastQueue 实现

    import java.util.concurrent.LinkedBlockingQueue;
    
    public class ToastQueue extends LinkedBlockingQueue<Toast> {
        private static final long serialVersionUID = 1L;
    }

    Producer 循环生产 Toast

    import java.util.concurrent.TimeUnit;
    
    public class Producer implements Runnable {
    
        private ToastQueue toastQueue;
        private int count;
        
        public Producer(ToastQueue toastQueue){
            this.toastQueue = toastQueue;
            this.count = 0;
        }
        
        @Override
        public void run() {
            try {
                while (true){
                    TimeUnit.MILLISECONDS.sleep(100);
                    
                    Toast toast = new Toast(count);
                    count++;
                    toastQueue.put(toast);
                    ThreadHelper.print(" produced " + toast);
                }
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    Processor 从 intialToastQ 获得 Toast ,对其加工,并放进 finishedToastQ 中。

    import java.util.concurrent.TimeUnit;
    
    public class Processor implements Runnable {
    
        private ToastQueue initialToastQ;
        private ToastQueue finishedToastQ;
        
        
        public Processor(ToastQueue initialToastQ, ToastQueue finishedToastQ){
            this.initialToastQ = initialToastQ;
            this.finishedToastQ = finishedToastQ;
        }
        
        @Override
        public void run() {
            try {
                while (true){
                    Toast toast = initialToastQ.take();
                    
                    ThreadHelper.print(" processing " + toast);
    
                    TimeUnit.MILLISECONDS.sleep(180);
                    
                    finishedToastQ.put(toast);
                }
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    Consumer 消耗 Toast

    public class Consumer implements Runnable {
    
        private ToastQueue finishedToastQ;
    
        public Consumer(ToastQueue finishedToastQ){
            this.finishedToastQ = finishedToastQ;
        }
        
        @Override
        public void run() {
            try {
                while (true){
                    Toast toast = finishedToastQ.take();
                    ThreadHelper.print(" consumed " + toast);
                }
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    ThreadHelper 线程帮助类

    public class ThreadHelper {    
        public static void print(String msg){
            System.out.println("[" + Thread.currentThread().getName() + " ] " + msg);
        }
    }

    演示烤面包的生产、加工、消费的场景

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class ProducerConsumerDemo {
    
        public static void main() throws InterruptedException{
            
            ToastQueue initialToastQ = new ToastQueue();
            ToastQueue finishedToastQ = new ToastQueue();
            
            ExecutorService exec = Executors.newCachedThreadPool();
            exec.execute(new Producer(initialToastQ));
            exec.execute(new Processor(initialToastQ, finishedToastQ));
            exec.execute(new Consumer(finishedToastQ));
    
            TimeUnit.SECONDS.sleep(2);
            exec.shutdownNow();
        }
    }

    输出结果:

    [pool-1-thread-2 ]  processing  toast#0
    [pool-1-thread-1 ]  produced  toast#0
    [pool-1-thread-1 ]  produced  toast#1
    [pool-1-thread-2 ]  processing  toast#1
    [pool-1-thread-3 ]  consumed  toast#0
    [pool-1-thread-1 ]  produced  toast#2
    [pool-1-thread-1 ]  produced  toast#3
    [pool-1-thread-2 ]  processing  toast#2
    [pool-1-thread-3 ]  consumed  toast#1
    [pool-1-thread-1 ]  produced  toast#4
    [pool-1-thread-1 ]  produced  toast#5
    [pool-1-thread-2 ]  processing  toast#3
    [pool-1-thread-3 ]  consumed  toast#2
    [pool-1-thread-1 ]  produced  toast#6
    [pool-1-thread-1 ]  produced  toast#7
    [pool-1-thread-2 ]  processing  toast#4
    [pool-1-thread-3 ]  consumed  toast#3
    [pool-1-thread-1 ]  produced  toast#8
    [pool-1-thread-2 ]  processing  toast#5
    [pool-1-thread-3 ]  consumed  toast#4
    [pool-1-thread-1 ]  produced  toast#9
    [pool-1-thread-1 ]  produced  toast#10
    [pool-1-thread-2 ]  processing  toast#6
    [pool-1-thread-3 ]  consumed  toast#5
    [pool-1-thread-1 ]  produced  toast#11
    [pool-1-thread-1 ]  produced  toast#12
    [pool-1-thread-2 ]  processing  toast#7
    [pool-1-thread-3 ]  consumed  toast#6
    [pool-1-thread-1 ]  produced  toast#13
    [pool-1-thread-1 ]  produced  toast#14
    [pool-1-thread-2 ]  processing  toast#8
    [pool-1-thread-3 ]  consumed  toast#7
    [pool-1-thread-1 ]  produced  toast#15
    [pool-1-thread-1 ]  produced  toast#16
    [pool-1-thread-2 ]  processing  toast#9
    [pool-1-thread-3 ]  consumed  toast#8
    [pool-1-thread-1 ]  produced  toast#17
    [pool-1-thread-2 ]  processing  toast#10
    [pool-1-thread-3 ]  consumed  toast#9
    [pool-1-thread-1 ]  produced  toast#18
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at concurrencyProducerConsumer.Consumer.run(Consumer.java:15)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
        at concurrencyProducerConsumer.Producer.run(Producer.java:19)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
        at concurrencyProducerConsumer.Processor.run(Processor.java:24)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

    参考资料

    Page 868, Produer-consumers and queue, Thinking in Java

  • 相关阅读:
    经典小故事
    清晨六问
    ui相关书籍
    ui设计书籍推荐
    生成器
    函数之装饰器
    函数之闭包
    函数之作用域的查找顺序
    函数之命名空间/名字空间/名称空间
    内置函数
  • 原文地址:https://www.cnblogs.com/TonyYPZhang/p/5561179.html
Copyright © 2011-2022 走看看