zoukankan      html  css  js  c++  java
  • synchronized、lock、semaphore、blockingqueue实现生产消费场景。

    性能:

    数据量少时,Synchronized> Lock、Semaphore。

    数据量大时,Lock > Synchronized > Semaphore。

    Blockingqueue底层也是使用ReentrantLock + Condition。

    一、Synchronized方式

    synchronized作用于方法或者对象上,保证方法或者对象在某一时刻只能被一个线程占用。配合wait()、notify()进行线程间通讯。当库存不足时,消费者线程暂停等待。当生产者生产出新的商品时,通知消费者进行消费。

     1 package com.boot.demo;
     2 
     3 import java.util.concurrent.atomic.AtomicInteger;
     4 
     5 /**
     6  * @author braska
     7  * @date 2020/3/9
     8  **/
     9 public class SynchronizedTest {
    10 
    11     private AtomicInteger stock = new AtomicInteger(0);
    12 
    13     public synchronized void produce() {
    14         if (stock.get() < 5) {
    15             System.out.println(String.format("目前库存:%s", stock.addAndGet(1)));
    16             notify();
    17         } else {
    18             try {
    19                 System.out.println("仓库已满。");
    20                 wait();
    21             } catch (InterruptedException e) {
    22                 e.printStackTrace();
    23             }
    24         }
    25     }
    26 
    27     public synchronized void consume(String name) {
    28         if (stock.get() > 0) {
    29             System.out.println(String.format("%s在消费, 目前库存:%s", name,  stock.addAndGet(-1)));
    30             notify();
    31         } else {
    32             try {
    33                 System.out.println("库存已光。");
    34                 wait();
    35             } catch (InterruptedException e) {
    36                 e.printStackTrace();
    37             }
    38         }
    39     }
    40 
    41     static class Producer implements Runnable {
    42         private SynchronizedTest test;
    43 
    44         public Producer(SynchronizedTest test) {
    45             this.test = test;
    46         }
    47 
    48         @Override
    49         public void run() {
    50             while(true) {
    51                 try {
    52                     Thread.sleep(1000);
    53                 } catch (InterruptedException e) {
    54                     e.printStackTrace();
    55                 }
    56                 test.produce();
    57             }
    58         }
    59     }
    60 
    61     static class Consumer implements Runnable {
    62         private SynchronizedTest test;
    63         private String name;
    64 
    65         public Consumer(SynchronizedTest test, String name) {
    66             this.test = test;
    67             this.name = name;
    68         }
    69 
    70         @Override
    71         public void run() {
    72             while(true) {
    73                 try {
    74                     Thread.sleep(3000);
    75                 } catch (InterruptedException e) {
    76                     e.printStackTrace();
    77                 }
    78                 test.consume(name);
    79             }
    80         }
    81     }
    82 
    83 
    84     public static void main(String[] args) {
    85         SynchronizedTest test = new SynchronizedTest();
    86 
    87         Thread p1 = new Thread(new Producer(test));
    88         Thread c1 = new Thread(new Consumer(test, "消费者1"));
    89         Thread c2 = new Thread(new Consumer(test, "消费者2"));
    90 
    91         p1.start();
    92         c1.start();
    93         c2.start();
    94     }
    95 }
    View Code

    二、Semaphore方式

    简单点说,Semaphore构造函数定义开放的购物窗口比如new Semaphore(6),无参默认开放一个窗口。tryAcquire()表示消费者抢占购物窗口,当然一个消费者可以抢占多个窗口比如tryAcquire(2)。当窗口被占满时,后面的消费者需要等待前面的消费者消费完,然后离开(释放)release(2)。

     1 package com.boot.demo;
     2 
     3 import java.util.concurrent.Semaphore;
     4 import java.util.concurrent.atomic.AtomicInteger;
     5 
     6 /**
     7  * @author braska
     8  * @date 2020/3/9
     9  **/
    10 public class SemaphoreTest {
    11 
    12     Semaphore semaphore = new Semaphore(1);
    13     AtomicInteger stock = new AtomicInteger(0);
    14 
    15     public void produce() {
    16         if (stock.get() < 5) {
    17             System.out.println(String.format("目前库存:%s", stock.addAndGet(1)));
    18             semaphore.release();
    19         } else {
    20             System.out.println("仓库已满。");
    21         }
    22     }
    23 
    24     public void comsume(String name) {
    25         if (semaphore.tryAcquire()) {
    26             if (stock.get() > 0) {
    27                 System.out.println(String.format("%s正在消费,目前库存: %s", name, stock.addAndGet(-1)));
    28             } else {
    29                 System.out.println("库存已光。");
    30             }
    31         }
    32     }
    33 
    34     static class Producer implements Runnable{
    35 
    36         private SemaphoreTest deal;
    37         public Producer(SemaphoreTest deal) {
    38             this.deal = deal;
    39         }
    40 
    41         @Override
    42         public void run() {
    43             while (true)
    44             try {
    45                 Thread.sleep(1000);
    46                 deal.produce();
    47             } catch (InterruptedException e) {
    48                 e.printStackTrace();
    49             }
    50 
    51         }
    52     }
    53 
    54     static class Consumer implements Runnable {
    55         private SemaphoreTest deal;
    56         private String name;
    57         public Consumer(SemaphoreTest deal, String name) {
    58             this.deal = deal;
    59             this.name = name;
    60         }
    61 
    62         @Override
    63         public void run() {
    64             while (true)
    65             try {
    66                 Thread.sleep(3000);
    67                 deal.comsume(name);
    68             } catch (InterruptedException e) {
    69                 e.printStackTrace();
    70             }
    71         }
    72     }
    73 
    74 
    75     public static void main(String[] args) {
    76         SemaphoreTest deal = new SemaphoreTest();
    77 
    78         Thread p = new Thread(new Producer(deal));
    79 
    80         Thread c1 = new Thread(new Consumer(deal, "消费者1"));
    81         Thread c2 = new Thread(new Consumer(deal, "消费者2"));
    82 
    83         p.start();
    84         c1.start();
    85         c2.start();
    86     }
    87 }
    View Code

    三、Lock方式

    说起来同synchronized方式是一样的。锁住某一个对象或者执行体保证在某一时刻不被多个线程同时占用。区别:synchronized不需要手动释放;lock不是java内置特性。

     1 package com.boot.demo;
     2 
     3 import java.util.concurrent.atomic.AtomicInteger;
     4 import java.util.concurrent.locks.Lock;
     5 import java.util.concurrent.locks.ReentrantLock;
     6 
     7 /**
     8  * @author braska
     9  * @date 2020/3/9
    10  **/
    11 public class LockTest {
    12 
    13     private Lock lock = new ReentrantLock();
    14     private AtomicInteger stock = new AtomicInteger(0);
    15 
    16     public void produce() {
    17         lock.lock();
    18         if (stock.get() < 5) {
    19             System.out.println(String.format("目前库存:%s", stock.addAndGet(1)));
    20         } else {
    21             System.out.println("仓库已满。");
    22         }
    23         lock.unlock();
    24     }
    25 
    26     public void consume(String name) {
    27         lock.lock();
    28         if (stock.get() > 0) {
    29             System.out.println(String.format("%s正在消费,目前库存: %s", name, stock.addAndGet(-1)));
    30         } else {
    31             System.out.println("库存已光。");
    32         }
    33         lock.unlock();
    34     }
    35 
    36     static class Producer implements Runnable{
    37 
    38         private LockTest deal;
    39         public Producer(LockTest deal) {
    40             this.deal = deal;
    41         }
    42 
    43         @Override
    44         public void run() {
    45             while (true)
    46                 try {
    47                     Thread.sleep(1000);
    48                     deal.produce();
    49                 } catch (InterruptedException e) {
    50                     e.printStackTrace();
    51                 }
    52 
    53         }
    54     }
    55 
    56     static class Consumer implements Runnable {
    57         private LockTest deal;
    58         private String name;
    59         public Consumer(LockTest deal, String name) {
    60             this.deal = deal;
    61             this.name = name;
    62         }
    63 
    64         @Override
    65         public void run() {
    66             while (true)
    67                 try {
    68                     Thread.sleep(3000);
    69                     deal.consume(name);
    70                 } catch (InterruptedException e) {
    71                     e.printStackTrace();
    72                 }
    73         }
    74     }
    75 
    76 
    77     public static void main(String[] args) {
    78         LockTest deal = new LockTest();
    79 
    80         Thread p = new Thread(new Producer(deal));
    81 
    82         Thread c1 = new Thread(new Consumer(deal, "消费者1"));
    83         Thread c2 = new Thread(new Consumer(deal, "消费者2"));
    84 
    85         p.start();
    86         c1.start();
    87         c2.start();
    88     }
    89 }
    View Code

    四、BlockingQueue方式

    底层采用的也是lock机制。

     1 package com.boot.demo;
     2 
     3 import java.util.concurrent.ArrayBlockingQueue;
     4 import java.util.concurrent.BlockingQueue;
     5 import java.util.concurrent.atomic.AtomicInteger;
     6 
     7 /**
     8  * @author braska
     9  * @date 2020/3/9
    10  **/
    11 public class BlockingQueueTest {
    12 
    13     private BlockingQueue<Integer> queue = new ArrayBlockingQueue(5);
    14     private AtomicInteger stock = new AtomicInteger(0);
    15 
    16     public void produce() throws InterruptedException {
    17         try {
    18             queue.add(stock.addAndGet(1));
    19             System.out.println(String.format("目前库存:%s", stock.get()));
    20         } catch (Exception e) {
    21             stock.addAndGet(-1);
    22             System.out.println("仓库已满。");
    23         }
    24     }
    25 
    26     public void consume(String name) {
    27         Integer good = queue.poll();
    28         if (good != null) {
    29             System.out.println(String.format("%s正在消费,目前库存: %s", name, good));
    30             stock.addAndGet(-1);
    31         } else {
    32             System.out.println("库存已光。");
    33         }
    34     }
    35 
    36     static class Producer implements Runnable{
    37 
    38         private BlockingQueueTest deal;
    39         public Producer(BlockingQueueTest deal) {
    40             this.deal = deal;
    41         }
    42 
    43         @Override
    44         public void run() {
    45             while (true)
    46                 try {
    47                     Thread.sleep(1000);
    48                     deal.produce();
    49                 } catch (InterruptedException e) {
    50                     e.printStackTrace();
    51                 }
    52 
    53         }
    54     }
    55 
    56     static class Consumer implements Runnable {
    57         private BlockingQueueTest deal;
    58         private String name;
    59         public Consumer(BlockingQueueTest deal, String name) {
    60             this.deal = deal;
    61             this.name = name;
    62         }
    63 
    64         @Override
    65         public void run() {
    66             while (true)
    67                 try {
    68                     Thread.sleep(3000);
    69                     deal.consume(name);
    70                 } catch (InterruptedException e) {
    71                     e.printStackTrace();
    72                 }
    73         }
    74     }
    75 
    76 
    77     public static void main(String[] args) {
    78         BlockingQueueTest deal = new BlockingQueueTest();
    79 
    80         Thread p = new Thread(new Producer(deal));
    81 
    82         Thread c1 = new Thread(new Consumer(deal, "消费者1"));
    83         Thread c2 = new Thread(new Consumer(deal, "消费者2"));
    84 
    85         p.start();
    86         c1.start();
    87         c2.start();
    88     }
    89 }
    View Code
  • 相关阅读:
    CORS 跨域问题, 以及作为api server 的正确配置, 后台 nginx 配置
    angular2 各种开发种遇到的问题和设置
    angular2 cli 无法正确安装使用解决
    inline-block text-align: justify 实现自适应布局, 当子inline-block之间没有空格时失效及原因
    rails active record 使用default_scope is evil, 记一次 order not work 的排查
    java class jar 的加载问题
    es6 匿名函数求阶乘
    ruby 一些基础的语法, 各种杂物箱
    ruby 给对象添加新的方法
    javascript 核心语言笔记 7
  • 原文地址:https://www.cnblogs.com/braska/p/12449864.html
Copyright © 2011-2022 走看看