线程通信
为什么要进行线程通信
把一个大的任务放到主线程的话,由于顺序执行,会严重影响程序执行的效率。为了提高效率和并发度,可以将任务解耦为多个线程执行,比如一个线程接收数据,一个线程处理数据,这个时候,线程间就会出现交互,也就是线程通信。
生产者消费者模型
线程通信的关键应用场景就是生产者消费者问题
- 生产者是负责生产数据的模块,生产产品后,要通知消费者消费
- 消费者是负责处理数据的模块,消费产品后,需要通知生产者生产
- Java提供了几个用于线程通信的方法
方法名 | 作用 |
---|---|
wait() | 表示线程一直等待,直到其他线程通知,与sleep不同,会释放锁 |
wait(long time out) | 指定等待的毫秒数 |
notify() | 唤醒一个处于等待状态的线程 |
notifiyAll() | 唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先调度 |
生产者消费者的两种方法实现
管程法:
消费者不能直接使用生产者的数据,需要创建一个缓冲区生产者将生产好的数据放入到缓冲区中,消费者从缓冲区拿出数据。
package MultiProcess;
import javax.swing.text.AsyncBoxView;
//管程法:利用缓冲区解决
public class TestProduceConsumer {
public static void main(String[] args) {
SyContaner container = new SyContaner();
new Producetor(container).start();
new Consumer(container).start();
}
}
//生产者
class Producetor extends Thread{
SyContaner container;
public Producetor(SyContaner container){
this.container = container;
}
//生产
@Override
public void run() {
for (int i = 0; i < 10; i++) {
container.push(new Chicken(i));
System.out.println("生产第" + i + "只鸡");
}
}
}
//消费者
class Consumer extends Thread{
SyContaner container;
public Consumer(SyContaner container){
this.container = container;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("消费第" + container.pop().id + "只鸡");
}
}
}
//产品
class Chicken{
int id;
public Chicken(int id){
this.id = id;
}
}
class SyContaner{
Chicken[] chickens = new Chicken[10];
int count = 0;
//生产者生产产品
public synchronized void push(Chicken chicken){
//满了,通知消费者消费产品
if(count == chickens.length){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//没有满,生产者生产产品
chickens[count] = chicken;
count++;
this.notifyAll();
}
//消费者消费产品
public synchronized Chicken pop(){
//没有产品了,通知生产者去生产
if(count == 0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//有产品,消费
count--;
Chicken chicken = chickens[count];
this.notifyAll();
return chicken;
}
}
信号灯法:
消费者可以直接使用生产者生产好的数据,不需要设置缓冲区,
只需要设置一个标志位,用来控制生产者和消费者的状态。
package MultiProcess;
public class TestProducerConsumer2 {
public static void main(String[] args) {
TV tv = new TV();
new Player(tv).start();
new Watcher(tv).start();
}
}
class Player extends Thread{
TV tv;
public Player(TV tv){
this.tv = tv;
}
@Override
public void run(){
for (int i = 0; i < 20; i++) {
if(i % 2 == 0){
this.tv.play("快乐大本营");
}else{
this.tv.play("抖音:记录美好生活");
}
}
}
}
class Watcher extends Thread{
TV tv;
public Watcher(TV tv){
this.tv = tv;
}
@Override
public void run(){
for (int i = 0; i < 20; i++) {
tv.watch();
}
}
}
//产品-->节目
class TV{
String voice;
boolean flag = true; //true代表需要生产者生产
public synchronized void play(String voice){
if(!flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("演员表演了:" + voice);
this.voice = voice;
this.flag = !this.flag; //不能继续表演了,要等别人看完,通知消费者线程
this.notifyAll();
}
public synchronized void watch(){
if(flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("观看了:" + voice);
this.flag = !this.flag;
this.notifyAll();
}
}