性能:
数据量少时,Synchronized> Lock、Semaphore。
数据量大时,Lock > Synchronized > Semaphore。
Blockingqueue底层也是使用ReentrantLock + Condition。
一、Synchronized方式
synchronized作用于方法或者对象上,保证方法或者对象在某一时刻只能被一个线程占用。配合wait()、notify()进行线程间通讯。当库存不足时,消费者线程暂停等待。当生产者生产出新的商品时,通知消费者进行消费。
1 package com.boot.demo; 2 3 import java.util.concurrent.atomic.AtomicInteger; 4 5 /** 6 * @author braska 7 * @date 2020/3/9 8 **/ 9 public class SynchronizedTest { 10 11 private AtomicInteger stock = new AtomicInteger(0); 12 13 public synchronized void produce() { 14 if (stock.get() < 5) { 15 System.out.println(String.format("目前库存:%s", stock.addAndGet(1))); 16 notify(); 17 } else { 18 try { 19 System.out.println("仓库已满。"); 20 wait(); 21 } catch (InterruptedException e) { 22 e.printStackTrace(); 23 } 24 } 25 } 26 27 public synchronized void consume(String name) { 28 if (stock.get() > 0) { 29 System.out.println(String.format("%s在消费, 目前库存:%s", name, stock.addAndGet(-1))); 30 notify(); 31 } else { 32 try { 33 System.out.println("库存已光。"); 34 wait(); 35 } catch (InterruptedException e) { 36 e.printStackTrace(); 37 } 38 } 39 } 40 41 static class Producer implements Runnable { 42 private SynchronizedTest test; 43 44 public Producer(SynchronizedTest test) { 45 this.test = test; 46 } 47 48 @Override 49 public void run() { 50 while(true) { 51 try { 52 Thread.sleep(1000); 53 } catch (InterruptedException e) { 54 e.printStackTrace(); 55 } 56 test.produce(); 57 } 58 } 59 } 60 61 static class Consumer implements Runnable { 62 private SynchronizedTest test; 63 private String name; 64 65 public Consumer(SynchronizedTest test, String name) { 66 this.test = test; 67 this.name = name; 68 } 69 70 @Override 71 public void run() { 72 while(true) { 73 try { 74 Thread.sleep(3000); 75 } catch (InterruptedException e) { 76 e.printStackTrace(); 77 } 78 test.consume(name); 79 } 80 } 81 } 82 83 84 public static void main(String[] args) { 85 SynchronizedTest test = new SynchronizedTest(); 86 87 Thread p1 = new Thread(new Producer(test)); 88 Thread c1 = new Thread(new Consumer(test, "消费者1")); 89 Thread c2 = new Thread(new Consumer(test, "消费者2")); 90 91 p1.start(); 92 c1.start(); 93 c2.start(); 94 } 95 }
二、Semaphore方式
简单点说,Semaphore构造函数定义开放的购物窗口比如new Semaphore(6),无参默认开放一个窗口。tryAcquire()表示消费者抢占购物窗口,当然一个消费者可以抢占多个窗口比如tryAcquire(2)。当窗口被占满时,后面的消费者需要等待前面的消费者消费完,然后离开(释放)release(2)。
1 package com.boot.demo; 2 3 import java.util.concurrent.Semaphore; 4 import java.util.concurrent.atomic.AtomicInteger; 5 6 /** 7 * @author braska 8 * @date 2020/3/9 9 **/ 10 public class SemaphoreTest { 11 12 Semaphore semaphore = new Semaphore(1); 13 AtomicInteger stock = new AtomicInteger(0); 14 15 public void produce() { 16 if (stock.get() < 5) { 17 System.out.println(String.format("目前库存:%s", stock.addAndGet(1))); 18 semaphore.release(); 19 } else { 20 System.out.println("仓库已满。"); 21 } 22 } 23 24 public void comsume(String name) { 25 if (semaphore.tryAcquire()) { 26 if (stock.get() > 0) { 27 System.out.println(String.format("%s正在消费,目前库存: %s", name, stock.addAndGet(-1))); 28 } else { 29 System.out.println("库存已光。"); 30 } 31 } 32 } 33 34 static class Producer implements Runnable{ 35 36 private SemaphoreTest deal; 37 public Producer(SemaphoreTest deal) { 38 this.deal = deal; 39 } 40 41 @Override 42 public void run() { 43 while (true) 44 try { 45 Thread.sleep(1000); 46 deal.produce(); 47 } catch (InterruptedException e) { 48 e.printStackTrace(); 49 } 50 51 } 52 } 53 54 static class Consumer implements Runnable { 55 private SemaphoreTest deal; 56 private String name; 57 public Consumer(SemaphoreTest deal, String name) { 58 this.deal = deal; 59 this.name = name; 60 } 61 62 @Override 63 public void run() { 64 while (true) 65 try { 66 Thread.sleep(3000); 67 deal.comsume(name); 68 } catch (InterruptedException e) { 69 e.printStackTrace(); 70 } 71 } 72 } 73 74 75 public static void main(String[] args) { 76 SemaphoreTest deal = new SemaphoreTest(); 77 78 Thread p = new Thread(new Producer(deal)); 79 80 Thread c1 = new Thread(new Consumer(deal, "消费者1")); 81 Thread c2 = new Thread(new Consumer(deal, "消费者2")); 82 83 p.start(); 84 c1.start(); 85 c2.start(); 86 } 87 }
三、Lock方式
说起来同synchronized方式是一样的。锁住某一个对象或者执行体保证在某一时刻不被多个线程同时占用。区别:synchronized不需要手动释放;lock不是java内置特性。
1 package com.boot.demo; 2 3 import java.util.concurrent.atomic.AtomicInteger; 4 import java.util.concurrent.locks.Lock; 5 import java.util.concurrent.locks.ReentrantLock; 6 7 /** 8 * @author braska 9 * @date 2020/3/9 10 **/ 11 public class LockTest { 12 13 private Lock lock = new ReentrantLock(); 14 private AtomicInteger stock = new AtomicInteger(0); 15 16 public void produce() { 17 lock.lock(); 18 if (stock.get() < 5) { 19 System.out.println(String.format("目前库存:%s", stock.addAndGet(1))); 20 } else { 21 System.out.println("仓库已满。"); 22 } 23 lock.unlock(); 24 } 25 26 public void consume(String name) { 27 lock.lock(); 28 if (stock.get() > 0) { 29 System.out.println(String.format("%s正在消费,目前库存: %s", name, stock.addAndGet(-1))); 30 } else { 31 System.out.println("库存已光。"); 32 } 33 lock.unlock(); 34 } 35 36 static class Producer implements Runnable{ 37 38 private LockTest deal; 39 public Producer(LockTest deal) { 40 this.deal = deal; 41 } 42 43 @Override 44 public void run() { 45 while (true) 46 try { 47 Thread.sleep(1000); 48 deal.produce(); 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } 52 53 } 54 } 55 56 static class Consumer implements Runnable { 57 private LockTest deal; 58 private String name; 59 public Consumer(LockTest deal, String name) { 60 this.deal = deal; 61 this.name = name; 62 } 63 64 @Override 65 public void run() { 66 while (true) 67 try { 68 Thread.sleep(3000); 69 deal.consume(name); 70 } catch (InterruptedException e) { 71 e.printStackTrace(); 72 } 73 } 74 } 75 76 77 public static void main(String[] args) { 78 LockTest deal = new LockTest(); 79 80 Thread p = new Thread(new Producer(deal)); 81 82 Thread c1 = new Thread(new Consumer(deal, "消费者1")); 83 Thread c2 = new Thread(new Consumer(deal, "消费者2")); 84 85 p.start(); 86 c1.start(); 87 c2.start(); 88 } 89 }
四、BlockingQueue方式
底层采用的也是lock机制。
1 package com.boot.demo; 2 3 import java.util.concurrent.ArrayBlockingQueue; 4 import java.util.concurrent.BlockingQueue; 5 import java.util.concurrent.atomic.AtomicInteger; 6 7 /** 8 * @author braska 9 * @date 2020/3/9 10 **/ 11 public class BlockingQueueTest { 12 13 private BlockingQueue<Integer> queue = new ArrayBlockingQueue(5); 14 private AtomicInteger stock = new AtomicInteger(0); 15 16 public void produce() throws InterruptedException { 17 try { 18 queue.add(stock.addAndGet(1)); 19 System.out.println(String.format("目前库存:%s", stock.get())); 20 } catch (Exception e) { 21 stock.addAndGet(-1); 22 System.out.println("仓库已满。"); 23 } 24 } 25 26 public void consume(String name) { 27 Integer good = queue.poll(); 28 if (good != null) { 29 System.out.println(String.format("%s正在消费,目前库存: %s", name, good)); 30 stock.addAndGet(-1); 31 } else { 32 System.out.println("库存已光。"); 33 } 34 } 35 36 static class Producer implements Runnable{ 37 38 private BlockingQueueTest deal; 39 public Producer(BlockingQueueTest deal) { 40 this.deal = deal; 41 } 42 43 @Override 44 public void run() { 45 while (true) 46 try { 47 Thread.sleep(1000); 48 deal.produce(); 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } 52 53 } 54 } 55 56 static class Consumer implements Runnable { 57 private BlockingQueueTest deal; 58 private String name; 59 public Consumer(BlockingQueueTest deal, String name) { 60 this.deal = deal; 61 this.name = name; 62 } 63 64 @Override 65 public void run() { 66 while (true) 67 try { 68 Thread.sleep(3000); 69 deal.consume(name); 70 } catch (InterruptedException e) { 71 e.printStackTrace(); 72 } 73 } 74 } 75 76 77 public static void main(String[] args) { 78 BlockingQueueTest deal = new BlockingQueueTest(); 79 80 Thread p = new Thread(new Producer(deal)); 81 82 Thread c1 = new Thread(new Consumer(deal, "消费者1")); 83 Thread c2 = new Thread(new Consumer(deal, "消费者2")); 84 85 p.start(); 86 c1.start(); 87 c2.start(); 88 } 89 }