一、线程的通信机制
1、wait/notify/notifyAll机制
3、Lock+Condition
二、wait/notify机制
该机制涉及三个方法,wait()/notify()/notifyAll()。三个方法均非Thread类中的方法,而是Object类中声明的方法。均为final方法,无法被重写。
wait():让当前线程(Thread.concurrentThread()方法所返回的线程)释放对象锁并进入等待(阻塞)状态,直到线程被唤醒。并且当前线程必须拥有此调用对象的锁。
notify():唤醒一个正在等待相应对象锁的线程,使其进入就绪队列,等待当前线程释放锁后竞争锁,得到CPU的执行。如果有多个线程在等待这个对象的锁,只能唤醒其中一个线程。
notify():唤醒所有正在等待这个对象的锁的线程。
为何这三个方法不是Thread类声明中的方法,而是Object类中声明的方法?
用这几个方法必须拿到当前对象的monitor对象。monitor存在于引用指针中,而synchronized关键字可以获取monitor。
意:一个线程被唤醒不代表立即获取了对象的锁,只有等调用完了notify()/notifyAll()并退出synchronized块,释放对象锁后,其他线程才可获得执行
Demo:利用synchronized、wait、notifyAll实现生产者消,费者模型
Resource:共享资源池
package com.imooc.demo.thread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @Title: Resource * @Description: 公共资源类 * @date 2019/1/189:56 */ public class Resource { private static final Logger logger = LoggerFactory.getLogger(Resource.class); //当前资源池数 private int num = 0; //池中最大数 private int maxSize = 10; /** * 取资源 */ public synchronized void remove(){ //当有资源,消费者可以调用,无资源消费者等待 if (num > 0){ num--; logger.info("消费者:"+Thread.currentThread().getName()+"消耗了一个资源,池中剩余资源:"+num); //唤醒生产者 notifyAll(); }else { try { //消费者等待 wait(); logger.info("资源池已无资源,消费者开始等待"); } catch (InterruptedException e) { logger.error("remove wait error {}",e); } } } /** * 添加资源 */ public synchronized void add(){ //资源小于可以添加,满了生产者等待 if (num < maxSize){ num++; logger.info(Thread.currentThread().getName()+"生产了一个资源,池中还有资源:"+num); //唤醒消费者 notifyAll(); }else { try { //生产者等待 wait(); logger.info(Thread.currentThread().getName()+"进入等待"); } catch (InterruptedException e) { logger.error("add wait error {}",e); } } } }
ConsumerThread:消费者+ProducerThread:生产者
//消费者 import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @Title: ConsumerThread * @Description: 消费者 * @date 2019/1/1810:07 */ public class ConsumerThread extends Thread{ private static final Logger logger = LoggerFactory.getLogger(ConsumerThread.class); private Resource resource; public ConsumerThread(Resource resource){ this.resource = resource; } public void run(){ while (true){ try { Thread.sleep(1000); } catch (InterruptedException e) { logger.error("ConsumerThread error {}",e); } resource.remove(); } } } //生产者 import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @Title: ProducerThread * @Description: 生产者 * @date 2019/1/1810:08 */ public class ProducerThread extends Thread{ private static final Logger logger = LoggerFactory.getLogger(ProducerThread.class); private Resource resource; public ProducerThread(Resource resource){ this.resource = resource; } public void run(){ while (true){ try { Thread.sleep(2000); } catch (InterruptedException e) { logger.error("ProducerThread error {}",e); } resource.add(); } } }
Client:客户端
package com.imooc.demo.thread; /** * @Title: Client * @Description: 客户端 * @date 2019/1/1810:10 */ public class Client { public static void main(String[] args) { Resource resource =new Resource(); ProducerThread pt1 = new ProducerThread(resource); ProducerThread pt2 = new ProducerThread(resource); ProducerThread pt3 = new ProducerThread(resource); ConsumerThread ct1 = new ConsumerThread(resource); ConsumerThread ct2 = new ConsumerThread(resource); pt1.start(); pt2.start(); pt3.start(); ct1.start(); } }
三、Condition
在JDK5中出现,替代传统的Object的wait、notify实现线程间的协作。Condition是个接口,基本方法是await()、signal()/signalAll()
依赖于Lock接口,生成一个Condition的方式是lock.newCondition()。await()和signal()/signalAll()方法都必须在lock的保护之内
Demo:使用lock、condition、await、signalAll实现生产者、消费者模型
Resource
package com.imooc.demo.treadForCondition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * @Title: Resource * @Description: 资源池 * @date 2019/1/1810:31 */ public class Resource { private static final Logger logger = LoggerFactory.getLogger(com.imooc.demo.thread.Resource.class); private volatile int num = 0;//当前资源数量 private int maxSize = 10;//资源池中允许存放的资源数目 private Lock lock; //生产者和消费者的Condition 实例 private Condition producerCondition; private Condition consumerCondition; public Resource(Lock lock,Condition producerCondition,Condition consumerCondition){ this.lock = lock; this.producerCondition = producerCondition; this.consumerCondition = consumerCondition; } /** * 取资源 */ public synchronized void remove(){ lock.lock(); //当有资源,消费者可以调用,无资源消费者等待 try { if (num > 0){ num--; logger.info("消费者:"+Thread.currentThread().getName()+"消耗了一个资源,池中剩余资源:"+num); //唤醒生产者 consumerCondition.signalAll(); }else { try { //消费者等待 producerCondition.await(); logger.info("资源池已无资源,消费者开始等待"); } catch (InterruptedException e) { logger.error("remove wait error {}",e); } } } finally { lock.unlock(); } } /** * 添加资源 */ public synchronized void add(){ //资源小于可以添加,满了生产者等待 lock.lock(); try { if (num < maxSize){ num++; logger.info(Thread.currentThread().getName()+"生产了一个资源,池中还有资源:"+num); //唤醒消费者 producerCondition.signalAll(); }else { try { //生产者等待 consumerCondition.await(); logger.info(Thread.currentThread().getName()+"进入等待"); } catch (InterruptedException e) { logger.error("add wait error {}",e); } } } finally { lock.unlock(); } } }
ConsumerThread+ProducerThread+Client
//ConsumerThread import com.imooc.demo.thread.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @Title: ConsumerThread * @Description: 消费者 * @date 2019/1/1810:07 */ public class ConsumerThread extends Thread{ private static final Logger logger = LoggerFactory.getLogger(ConsumerThread.class); private Resource resource; public ConsumerThread(Resource resource){ this.resource = resource; } public void run(){ while (true){ try { Thread.sleep(3000); } catch (InterruptedException e) { logger.error("ConsumerThread error {}",e); } resource.remove(); } } } //ProducerThread import com.imooc.demo.thread.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @Title: ProducerThread * @Description: 生产者 * @date 2019/1/1810:08 */ public class ProducerThread extends Thread{ private static final Logger logger = LoggerFactory.getLogger(ProducerThread.class); private Resource resource; public ProducerThread(Resource resource){ this.resource = resource; } public void run(){ while (true){ try { Thread.sleep(5000); } catch (InterruptedException e) { logger.error("ProducerThread error {}",e); } resource.add(); } } } //Client import com.imooc.demo.thread.Resource; /** * @Title: Client * @Description: 客户端 * @date 2019/1/1810:10 */ public class Client { public static void main(String[] args) { Resource resource =new Resource(); ProducerThread pt1 = new ProducerThread(resource); ConsumerThread ct1 = new ConsumerThread(resource); ConsumerThread ct2 = new ConsumerThread(resource); pt1.start(); ct1.start(); ct2.start(); } }