个人学习笔记分享,当前能力有限,请勿贬低,菜鸟互学,大佬绕道
如有勘误,欢迎指出和讨论,本文后期也会进行修正和补充
前言
设想一个场景:
你和你女朋友(假设有)打算出门,你问你女朋友打扮好了没,她说还没。
于是过了五分钟,你再去问,她说还没。
再过五分钟,你又问,她说再等会儿
再过五分钟,你又问,她说烦不烦一直问,多等会能咋了??。。。。
2000 years later,她准备好了,你已经准备打开网抑云了
其实解决方案很简单,你跟她说准备好了喊你,而你做好随时被喊的准备即可
设想如下模型:
- 消费者A向生产者B取货。如果B手上没有足够多的货,那么A就得一直等待,直到B有货交付给A;如果B生产的货物已经达到库存上限,那么B需要等待A取货后库存减少,再继续生产。
- 扩展场景3,A监听仓库,如果仓库库存增加,就取货,其余时间摸鱼,不关心生产相关的事情;B只负责生产,不关心消费相关事情。
有发现1和2的区别吗?
在1中,消费者和生产者需要一直询问仓库库存数量,即双方保持沟通,显然太麻烦了。
在2中,生产者一直生产给仓库即可,其余事情不关心;消费者监听仓库,当仓库数量变动时通知消费者,其余事情不关心。生产者和消费者互不沟通。
(什么?你问库存满了咋办?大部分情况下会保证仓库足够大,实在满了。。。通常是把旧的直接丢弃掉。。。啊这。。)
这样就实现了一个极其关键的思想-解耦
实际上,1被成为生产者/消费者模式,2被称为订阅/发布模式(又称观察者模式)
后者可以算是对前者的优化方案。前者可以满足1-1,N-1,1-N,N-N,而后者通常是1-N,或者说N-1-N
本文仅基于redis和java实现,重在整理思路,有兴趣可以自己查其他方面
1.生产者/消费者模式
1.1.场景预设
消费者A从生产者B中读取数据,若有满足要求数据则返回,若无则等待直到满足为止
生产者暂不考虑库存溢出的情况
1.2.理论基础
-
消费者A向仓库取货
- 库存充足时,A取货成功,结束此操作;
- 库存不足时,则A等待。直到有库存,A再取货,若成功,则结束,失败,则继续等待
有没有疑惑,为什么还会失败?很简单,因为可能不止一个消费者,如果没抢到,就只能等下一轮了;也有可能进货了依然不够,那还得等到够了为止,可没有买一半这种说法
-
生产者B向仓库出货
- 仓库空间足够,则B出货成功,结束此操作;
- 仓库空间不足,则B等待。直到库存变动,A再出货,若成功,则结束,失败,则继续等待
同理生产者也可能多个,也可能库存变动后,空间依然不足,那么就得继续等了
1.3.技术基础
-
redis可以存储队列,可以对队首或队尾进行push或者pop操作
命令格式如下:
lpop/rpop/lpush/rpush key
模拟操作如下 -
redis可以进行堵塞读取,即同时检测多个键,若其中一个有元素,则读取,否则等待有数据为止
命令格式如下:
blpop/brpop key1 [key ...] timeout
,模拟操作如下-
双开redis命令行A和B,A中从list1和list2中读取数据,无数据,则开始等待
-
B中向list2添加一个数据,则A中读取成功,读取出刚存入进去的数据
-
A中继续读取一个数据,若list1和list2中有数据则会读取成功,否则将进入等待,即第一步
-
1.4.Java实现
-
配置redis,此处不再赘述,请自行询问度娘,或者参考我所整理的redis集成方法(没写几篇就不给传送门了)
-
配置redisPool,代码如下
redis.properties
redis.url=localhost redis.port=6379 redis.maxIdle=30 redis.minIdle=10 redis.maxTotal=100 redis.maxWait=10000
JedisPoolUtils (自行添加jedis依赖)
package com.yezi_tool.basic_project.commons.utils; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import java.io.IOException; import java.io.InputStream; import java.util.Properties; /** * @title Jedis线程池工具类 * @description 用于控制Jedis线程 * @author Echo_Ye * @date 2020/8/9 17:28 * @email echo_yezi@qq.com */ public class JedisPoolUtils { private static JedisPool pool = null; static { //加载配置文件 InputStream in = JedisPoolUtils.class.getClassLoader().getResourceAsStream("redis.properties"); Properties pro = new Properties(); try { pro.load(in); } catch (IOException e) { e.printStackTrace(); } //获得池子对象 JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxIdle(Integer.parseInt(pro.get("redis.maxIdle").toString()));//最大连接个数 poolConfig.setMaxWaitMillis(Integer.parseInt(pro.get("redis.maxWait").toString()));//最大闲置个数 poolConfig.setMinIdle(Integer.parseInt(pro.get("redis.minIdle").toString()));//最小闲置个数 poolConfig.setMaxTotal(Integer.parseInt(pro.get("redis.maxTotal").toString()));//最大连接数 pool = new JedisPool(poolConfig, pro.getProperty("redis.url"), Integer.parseInt(pro.get("redis.port").toString())); } //获得jedis资源的方法 public static Jedis getJedis() { return pool.getResource(); } public static void main(String[] args) { Jedis jedis = getJedis(); System.out.println(jedis); } }
-
消息生产者(开启3个线程生产,每3秒生产一个)
package com.yezi_tool.basic_project.test.redisTest; import com.yezi_tool.basic_project.commons.utils.JedisPoolUtils; import redis.clients.jedis.Jedis; /** * @author Echo_Ye * @title 生产者模拟器 * @description redis模拟生产者 * @date 2020/8/9 16:26 * @email echo_yezi@qq.com */ public class MessageProducer extends Thread { //消息key,用于区分 public static final String MESSAGE_KEY = "messageQueue"; //消息序号 private volatile Integer count = 0; private final int maxCount = 10; //睡眠时间为3000毫秒 private static final long sleepTime = 3000; /** * 发送消息,为确保count不会错乱,加上同步锁 */ public synchronized void putMessage() { //消息体 String message = "message" + count++; //存入消息 Jedis jedis = JedisPoolUtils.getJedis(); Long size = jedis.lpush(MESSAGE_KEY, message); System.out.println(Thread.currentThread().getName() + " put message:" + message + " nowSize:" + size); } /** * 线程执行内容 */ @Override public void run() { while (count < maxCount) { try { putMessage(); //沉睡 Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 开启线程 */ public static void runMessageProducer() { MessageProducer messageProducer = new MessageProducer(); Thread producer1 = new Thread(messageProducer, "producer1"); Thread producer2 = new Thread(messageProducer, "producer2"); Thread producer3 = new Thread(messageProducer, "producer3"); producer1.start(); producer2.start(); producer3.start(); } public static void main(String args[]) { runMessageProducer(); } }
执行结果如下
-
消息消费者(开启3个线程消费,每2秒消费一个)
package com.yezi_tool.basic_project.test.redisTest; import com.yezi_tool.basic_project.commons.utils.JedisPoolUtils; import redis.clients.jedis.Jedis; /** * @author Echo_Ye * @title 消费者模拟器 * @description redis模拟消费者 * @date 2020/8/9 17:34 * @email echo_yezi@qq.com */ public class MessageConsumer implements Runnable { //消息key,用于区分 public static final String MESSAGE_KEY = "messageQueue"; //睡眠时间为2000毫秒 private static final long sleepTime = 2000; /** * 消费消息 */ public void consumerMessage() { Jedis jedis = JedisPoolUtils.getJedis(); String message = jedis.rpop(MESSAGE_KEY); System.out.println(Thread.currentThread().getName() + " consumer message:" + message); } /** * 线程执行内容 */ @Override public void run() { while (true) { try { consumerMessage(); //沉睡 Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 开启线程 */ public static void runMessageConsumer() { MessageConsumer messageConsumer = new MessageConsumer(); Thread consumer1 = new Thread(messageConsumer, "consumer1"); Thread consumer2 = new Thread(messageConsumer, "consumer2"); Thread consumer3 = new Thread(messageConsumer, "consumer3"); consumer1.start(); consumer2.start(); consumer3.start(); } public static void main(String[] args) { runMessageConsumer(); } }
执行结果如下,当取不到数据时返回的是null
-
修改消费者,改为堵塞读取,即修改第24行为
List<String> message = jedis.brpop(0, MESSAGE_KEY);
执行结果如下,取不到数据时开始等待
-
此时重新启动生产者,执行结果如下
可以看到消费者继续消费了,直到库存再次为空,便继续等待
-
同时启动生产者和消费者,已知消费者速度比生产者快,故让生产者先启动5s
package com.yezi_tool.basic_project.test.redisTest; /** * @author Echo_Ye * @title 消费者/生产者模拟器 * @description 同时启动生产者和消费者,查看运行结果 * @date 2020/8/9 18:11 * @email echo_yezi@qq.com */ public class RedisTest { public static void main(String[] args) { //启动生产者 MessageProducer.runMessageProducer(); try { //沉睡5秒后启动 Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } //启动消费者 MessageConsumer.runMessageConsumer(); } }
执行结果如下
一切都在计划之中,对吧?那么继续看观察者模式
2.订阅/发布模式
2.1.场景预设
订阅者A告知客户端自己订阅了,生产者B发送消息后需要发送给订阅者。
2.2.理论基础
- 订阅者A告知仓库自己订阅,当仓库有变动时,仓库将变动告知订阅者。不关心收到消息前的业务。
- 发布者B向仓库发布信息。不关心后续业务
- AB互不知晓对方,由仓库负责消息的接受和发布
2.3.技术基础
-
订阅消息:可以订阅一个或多个通道,收集来自这些通道的信息,命令格式为为
subscribe channelA channelB
模拟操作如下
-
发布消息:可以向一个通道发布消息,命令格式为
publish channel message
,模拟操作如下发布三条消息 -
此时在订阅端收到三条消息
-
其他命令,暂不做演示,有兴趣可以自己玩玩
unsubscribe [channel1 [channel1 ...]]
:取消订阅,即不再收到来自目标通道的消息,客户端不能取消,只能直接退出。。。psubscribe pattern [pattern ...]
:订阅一个或多个符合给定模式的频道,举例如下psubscribe t*st
可以匹配到test
、tast
、teast
等等psubscribe t[ea]st
可以匹配到test
、tast
psubscribe t?st
可以匹配到tast
、tbst
、tcst
等等
psubscribe pattern [pattern ...]
:取消订阅一个或多个符合给定模式的频道,跟上面那货差不多,订阅全部取消后会退出订阅状态
2.4.补充
- 除开publish,其余所有命令的返回值均包含三个
- 消息类型,即订阅、取消订阅等等
- 相关的通道名
- 消息内容,或者剩余订阅数量
- 使用psubscribe可以重复订阅一个频道多次:此时该通道(订阅N次)收到消息,则订阅者也会收到多条消息(N条)
- 使用subscribe + psubscribe可以重复订阅一个频道多次:此时该通道(订阅N次)收到消息,则订阅者也会收到多条消息(N条),但消息类型会被区分为
message
和pmessage
punsubscribe
无参数时会取消所有订阅- subscribe 与psubscribe互不干扰,因而允许出现同时订阅某通道的情况
2.4.Java实现
2.4.1.常规订阅
-
消息生产者(发布者),将
push
操作改为publish
操作,并在达到上限时发布终止命令package com.yezi_tool.basic_project.test.publishTest; import com.yezi_tool.basic_project.commons.utils.JedisPoolUtils; import redis.clients.jedis.Jedis; /** * @author Echo_Ye * @title 发布者模拟器 * @description 模拟发布者 * @date 2020/8/10 10:28 * @email echo_yezi@qq.com */ public class NewMessageProducer extends Thread { //消息key,用于区分 public static final String MESSAGE_KEY = "messageQueue"; //消息序号 private volatile Integer count = 0; private final int maxCount = 10; //睡眠时间为3000毫秒 private static final long sleepTime = 3000; //结束程序的消息 public static final String EXIT_COMMAND = "exit"; /** * 发送消息,为确保count不会错乱,加上同步锁 */ public synchronized void putMessage() { //消息体 String message = "message" + count++; //达到上限时改为终止命令 if (count == maxCount) { message = EXIT_COMMAND; } //存入消息 Jedis jedis = JedisPoolUtils.getJedis(); Long size = jedis.publish(MESSAGE_KEY, message); System.out.println(Thread.currentThread().getName() + " publish message:" + message + " receiverNum:" + size); } /** * 线程执行内容 */ @Override public void run() { while (count < maxCount) { try { putMessage(); //沉睡 Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 开启线程 */ public static void runMessageProducer() { NewMessageProducer messageProducer = new NewMessageProducer(); Thread publisher1 = new Thread(messageProducer, "publisher1"); Thread publisher2 = new Thread(messageProducer, "publisher2"); Thread publisher3 = new Thread(messageProducer, "publisher3"); publisher1.start(); publisher2.start(); publisher3.start(); } public static void main(String args[]) { runMessageProducer(); } }
-
消息消费者(订阅者),接收到终止命令时退出
package com.yezi_tool.basic_project.test.publishTest; import com.yezi_tool.basic_project.commons.utils.JedisPoolUtils; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; /** * @title 订阅者模拟器 * @description 订阅者模拟器 * @author Echo_Ye * @date 2020/8/10 10:43 * @email echo_yezi@qq.com */ public class NewMessageConsumer implements Runnable { //消息key,用于区分 public static final String MESSAGE_KEY = "messageQueue"; //处理接收消息 private MyJedisPubSub myJedisPubSub = new MyJedisPubSub(); /** * 消费消息 */ public void consumerMessage() { Jedis jedis = JedisPoolUtils.getJedis(); jedis.subscribe(myJedisPubSub, MESSAGE_KEY); } /** * 线程执行内容 */ @Override public void run() { while (true) { consumerMessage(); } } /** * 开启线程 */ public static void runMessageConsumer() { NewMessageConsumer messageConsumer = new NewMessageConsumer(); Thread subscriber1 = new Thread(messageConsumer, "subscriber1"); Thread subscriber2 = new Thread(messageConsumer, "subscriber2"); Thread subscriber3 = new Thread(messageConsumer, "subscriber3"); subscriber1.start(); subscriber2.start(); subscriber3.start(); } public static void main(String[] args) { runMessageConsumer(); } /** * 继承JedisPubSub,重写接收消息的方法 */ class MyJedisPubSub extends JedisPubSub { /** * JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现 * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法 * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[] **/ @Override public void onMessage(String channel, String message) { System.out.println(Thread.currentThread().getName() + " channel:" + channel + " message:" + message); //接收到exit消息后退出 if (NewMessageProducer.EXIT_COMMAND.equals(message)) { System.exit(0); } } } }
-
同时启动订阅者和发布者,让发布者先启动5s
package com.yezi_tool.basic_project.test.publishTest; /** * @author Echo_Ye * @title 订阅/发布模拟器 * @description 同时启动订阅者和发布者,查看运行结果 * @date 2020/8/10 10:43 * @email echo_yezi@qq.com */ public class NewRedisTest { public static void main(String[] args) { //启动生产者 NewMessageProducer.runMessageProducer(); try { //沉睡5秒后启动 Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } //启动消费者 NewMessageConsumer.runMessageConsumer(); } }
执行结果如下
2.4.2.按规则订阅和发布
-
修改发布者,发布命令修改为
public synchronized void putMessage() { //消息体 String message = "message" + count++; //达到上限时改为终止命令 if (count == maxCount) { message = EXIT_COMMAND; } //存入消息 Jedis jedis = JedisPoolUtils.getJedis(); // Long size = jedis.publish(MESSAGE_KEY, message); Long size = jedis.publish(MESSAGE_KEY + "_" + Thread.currentThread().getName(), message); System.out.println(Thread.currentThread().getName() + " publish message:" + message + " receiverNum:" + size); }
-
修改订阅者,订阅命令修改为
public void consumerMessage() { Jedis jedis = JedisPoolUtils.getJedis(); // jedis.subscribe(myJedisPubSub, MESSAGE_KEY); jedis.psubscribe(myJedisPubSub, MESSAGE_KEY + "*"); }
-
修改订阅者消息处理器
/** * 继承JedisPubSub,重写接收消息的方法 */ class MyJedisPubSub extends JedisPubSub { @Override public void onPMessage(String pattern, String channel, String message) { System.out.println(Thread.currentThread().getName() + " channel:" + channel + " message:" + message + " pattern:" + pattern); //接收到exit消息后退出 if (NewMessageProducer.EXIT_COMMAND.equals(message)) { System.exit(0); } } }
-
执行结果如下
可以看到我订阅了messageQueue*
,则收到了来自messageQueue_publisher1
、messageQueue_publisher2
、messageQueue_publisher3
的消息
3.补充
-
堵塞读取(brpop)后线程会处于堵塞状态,直至读取到数据或者超时,才会停止线程
-
订阅后线程也会处于堵塞状态,直到取消所有订阅,或者关闭该线程
-
psubscribe
订阅的通道,仅能通过punsubscribe
取消订阅,且遵循严格字符串匹配规则,不会将其中通配符展开如,
punsubscribe *
不会退订channel.*
,而必须使用punsubscribe channel.*
-
请管理好线程,线程的不当管理会带来极大性能消耗,如果你也想cpu烤肉可以当我没说
4.传送门
没啥好传送的,网上的资料都很零碎,建议多查查,从各个方面思考
BB两句
我一直以为订阅/发布和生产/消费是同一个模式。。。后来越想越不对劲。。。
作者:Echo_Ye
WX:Echo_YeZ
EMAIL :echo_yezi@qq.com
个人站点:在搭了在搭了。。。(右键 - 新建文件夹)