zoukankan      html  css  js  c++  java
  • [Java Concurrent] 多线程合作 producer-consumers / queue 的简单案例

    在多线程环境下,通过 BlockingQueue,实现生产者-消费者场景。

    Toast 被生产和消费的对象。

    ToastQueue 继承了 LinkedblockingQueue ,用于中间存储 Toast 。

    Producer 生产 Toast ,并将生产出来的 Toast 放进队列 initialToastQ 中。

    Processor 加工 Toast,从 initialToastQ 中获得生产出来的 Toast,将其加工并放进队列 finishedToast 中。

    Consumer 消费 Toast,从 finishedToastQ 中获得加工完成的 Toast。

    ThreadHelper 工具类,用于输出线程相关信息。

    ProducerConsumerDemo 演示这个场景

    代码实现:

    Toast 实现

    public class Toast {
        
        private int id;
        
        public Toast(int id){
            this.id = id;
        }
    
        public String toString(){
            return " toast#" + id;
        }
    }

    ToastQueue 实现

    import java.util.concurrent.LinkedBlockingQueue;
    
    public class ToastQueue extends LinkedBlockingQueue<Toast> {
        private static final long serialVersionUID = 1L;
    }

    Producer 循环生产 Toast

    import java.util.concurrent.TimeUnit;
    
    public class Producer implements Runnable {
    
        private ToastQueue toastQueue;
        private int count;
        
        public Producer(ToastQueue toastQueue){
            this.toastQueue = toastQueue;
            this.count = 0;
        }
        
        @Override
        public void run() {
            try {
                while (true){
                    TimeUnit.MILLISECONDS.sleep(100);
                    
                    Toast toast = new Toast(count);
                    count++;
                    toastQueue.put(toast);
                    ThreadHelper.print(" produced " + toast);
                }
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    Processor 从 intialToastQ 获得 Toast ,对其加工,并放进 finishedToastQ 中。

    import java.util.concurrent.TimeUnit;
    
    public class Processor implements Runnable {
    
        private ToastQueue initialToastQ;
        private ToastQueue finishedToastQ;
        
        
        public Processor(ToastQueue initialToastQ, ToastQueue finishedToastQ){
            this.initialToastQ = initialToastQ;
            this.finishedToastQ = finishedToastQ;
        }
        
        @Override
        public void run() {
            try {
                while (true){
                    Toast toast = initialToastQ.take();
                    
                    ThreadHelper.print(" processing " + toast);
    
                    TimeUnit.MILLISECONDS.sleep(180);
                    
                    finishedToastQ.put(toast);
                }
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    Consumer 消耗 Toast

    public class Consumer implements Runnable {
    
        private ToastQueue finishedToastQ;
    
        public Consumer(ToastQueue finishedToastQ){
            this.finishedToastQ = finishedToastQ;
        }
        
        @Override
        public void run() {
            try {
                while (true){
                    Toast toast = finishedToastQ.take();
                    ThreadHelper.print(" consumed " + toast);
                }
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    ThreadHelper 线程帮助类

    public class ThreadHelper {    
        public static void print(String msg){
            System.out.println("[" + Thread.currentThread().getName() + " ] " + msg);
        }
    }

    演示烤面包的生产、加工、消费的场景

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class ProducerConsumerDemo {
    
        public static void main() throws InterruptedException{
            
            ToastQueue initialToastQ = new ToastQueue();
            ToastQueue finishedToastQ = new ToastQueue();
            
            ExecutorService exec = Executors.newCachedThreadPool();
            exec.execute(new Producer(initialToastQ));
            exec.execute(new Processor(initialToastQ, finishedToastQ));
            exec.execute(new Consumer(finishedToastQ));
    
            TimeUnit.SECONDS.sleep(2);
            exec.shutdownNow();
        }
    }

    输出结果:

    [pool-1-thread-2 ]  processing  toast#0
    [pool-1-thread-1 ]  produced  toast#0
    [pool-1-thread-1 ]  produced  toast#1
    [pool-1-thread-2 ]  processing  toast#1
    [pool-1-thread-3 ]  consumed  toast#0
    [pool-1-thread-1 ]  produced  toast#2
    [pool-1-thread-1 ]  produced  toast#3
    [pool-1-thread-2 ]  processing  toast#2
    [pool-1-thread-3 ]  consumed  toast#1
    [pool-1-thread-1 ]  produced  toast#4
    [pool-1-thread-1 ]  produced  toast#5
    [pool-1-thread-2 ]  processing  toast#3
    [pool-1-thread-3 ]  consumed  toast#2
    [pool-1-thread-1 ]  produced  toast#6
    [pool-1-thread-1 ]  produced  toast#7
    [pool-1-thread-2 ]  processing  toast#4
    [pool-1-thread-3 ]  consumed  toast#3
    [pool-1-thread-1 ]  produced  toast#8
    [pool-1-thread-2 ]  processing  toast#5
    [pool-1-thread-3 ]  consumed  toast#4
    [pool-1-thread-1 ]  produced  toast#9
    [pool-1-thread-1 ]  produced  toast#10
    [pool-1-thread-2 ]  processing  toast#6
    [pool-1-thread-3 ]  consumed  toast#5
    [pool-1-thread-1 ]  produced  toast#11
    [pool-1-thread-1 ]  produced  toast#12
    [pool-1-thread-2 ]  processing  toast#7
    [pool-1-thread-3 ]  consumed  toast#6
    [pool-1-thread-1 ]  produced  toast#13
    [pool-1-thread-1 ]  produced  toast#14
    [pool-1-thread-2 ]  processing  toast#8
    [pool-1-thread-3 ]  consumed  toast#7
    [pool-1-thread-1 ]  produced  toast#15
    [pool-1-thread-1 ]  produced  toast#16
    [pool-1-thread-2 ]  processing  toast#9
    [pool-1-thread-3 ]  consumed  toast#8
    [pool-1-thread-1 ]  produced  toast#17
    [pool-1-thread-2 ]  processing  toast#10
    [pool-1-thread-3 ]  consumed  toast#9
    [pool-1-thread-1 ]  produced  toast#18
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at concurrencyProducerConsumer.Consumer.run(Consumer.java:15)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
        at concurrencyProducerConsumer.Producer.run(Producer.java:19)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
        at concurrencyProducerConsumer.Processor.run(Processor.java:24)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

    参考资料

    Page 868, Produer-consumers and queue, Thinking in Java

  • 相关阅读:
    ubuntu12.04 死机 卡屏 画面冻结解决方案
    Install Firefox 20 in Ubuntu 13.04, Ubuntu 12.10, Ubuntu 12.04, Linux Mint 14 and Linux Mint 13 by PPA
    ListView1.SelectedItems.Clear()
    android studio 下载地址
    jquery.slider.js jquery幻灯片测试
    jquery.hovermenu.js
    jquery.tab.js选项卡效果
    适配 placeholder,jquery版
    jquery.autoscroll.js jquery自动滚动效果
    将 Google Earth 地图集成到自己的窗体上的 简单控件
  • 原文地址:https://www.cnblogs.com/TonyYPZhang/p/5561179.html
Copyright © 2011-2022 走看看