zoukankan      html  css  js  c++  java
  • 多线程之生产者消费者模式

    最近在项目中需要使用使用多线程实现一种功能,和生产者消费者模式类似,因此,学习了下生产者消费者模式的多线程实现。在生产者消费者模式中,通常有两类线程,

    即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程则负责处理生产者提交的任务。生产者和消费者之间则通过共享内存缓冲区进行通信。

    在这里我们选择BlockingQueue做为共享内存缓冲区。

    首先,我们构建生产者生产的,和消费者需要处理的数据PCData,即相关任务数据。

    public class PCData {

    private final int intData;

    public PCData(int d){
    intData = d ;
    }

    public PCData(String d){
    intData = Integer.valueOf(d);
    }

    public int getData(){
    return intData;
    }

    @Override
    public String toString() {
    return "data:"+intData;
    }
    }

    接下来,实现生产者线程,它构建PCData对象,并放入BlockingQueue队列中。
    public class Producer implements Runnable {
    private volatile boolean isRunning = true;
    private BlockingQueue<PCData> queue;
    private static AtomicInteger count = new AtomicInteger(); //总数,原子操作
    private static final int SLEEPTIME = 1000;

    public Producer(BlockingQueue<PCData> queue){
    this.queue = queue;
    }

    @Override
    public void run() {
    PCData data = null;
    Random random = new Random();

    System.out.println("start produce id =" + Thread.currentThread().getId());
    while (isRunning){
    try {
    Thread.sleep(random.nextInt(SLEEPTIME));
    data = new PCData(count.incrementAndGet());
    System.out.println(data + "is put into queue" );
    if(!queue.offer(data, 2, TimeUnit.SECONDS)){
    System.err.println("failed to put data: " + data);
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    Thread.currentThread().interrupt();
    }
    }
    }

    public void stop(){
    isRunning = false;
    }
    }
    接下来,实现消费者,它从BlockingQueue队列中取出PCData对象进行处理:
    public class Comsumer implements Runnable {
    private BlockingQueue<PCData> queue;
    private static final int SLEEPTIME = 1000;

    public Comsumer(BlockingQueue<PCData> queue){
    this.queue = queue;
    }

    @Override
    public void run() {
    System.out.println("start Comsume id = " + Thread.currentThread().getId());

    Random random = new Random();

    try {
    while(true){
    PCData data = queue.take();
    if(null != data){
    int re = data.getData() * data.getData();
    System.out.println(MessageFormat.format("{0}*{1}={2}",data.getData(),
    data.getData(), re));
    Thread.sleep(random.nextInt(SLEEPTIME));
    }
    }
    }catch (InterruptedException e){
    e.printStackTrace();
    Thread.currentThread().interrupt();
    }
    }
    }
    最后,在主函数中,我们创建三个生产者和三个消费者,并让他们协作运行。
    public class MainTest {

    public static void main(String[] args) throws InterruptedException{
    BlockingQueue<PCData> queue = new LinkedBlockingDeque<>(10);
    Producer pro1 = new Producer(queue);
    Producer pro2 = new Producer(queue);
    Producer pro3 = new Producer(queue);
    Comsumer coms1 = new Comsumer(queue);
    Comsumer coms2 = new Comsumer(queue);
    Comsumer coms3 = new Comsumer(queue);
    ExecutorService service = Executors.newCachedThreadPool();
    service.execute(pro1);
    service.execute(pro2);
    service.execute(pro3);
    service.execute(coms1);
    service.execute(coms2);
    service.execute(coms3);
    Thread.sleep(10*1000);
    pro1.stop();
    pro2.stop();
    pro3.stop();
    Thread.sleep(3000);
    service.shutdown();
    }
    }
  • 相关阅读:
    Bootstrap<基础十四> 按钮下拉菜单
    Bootstrap<基础十三> 按钮组
    Bootstrap <基础十二>下拉菜单(Dropdowns)
    Bootstrap<基础十一>字体图标(Glyphicons)
    Bootstrap<基础十> 响应式实用工具
    Bootstrap<基础九>辅助类
    Bootstrap <基础八>图片
    Bootstrap <基础七>按钮
    Bootstrap<基础六> 表单
    Bootstrap <基础五>表格
  • 原文地址:https://www.cnblogs.com/junjiang3/p/7096747.html
Copyright © 2011-2022 走看看