1、订阅发布简介
我们都知道redis支持简单的订阅发布功能,那么怎么实现的呢,其实是通过blpop
实现
blpop
常见用于阻塞拿消息,其实就是redis订阅发布,订阅那一端去通过blpop
阻塞拿消息,而发布那一段则使用lpush
或rpush
往列表里追加数据。
1. blpop 语法
# 将多个列表排列,按照从左到右去pop对应列表的元素
blpop(keys, timeout=0)
keys:redis的name的集合
timeout:超时时间,当元素所有列表的元素获取完之后,阻塞等待列表内有数据的时间(秒), 0 表示永远阻塞
2. brpoplpush语法(了解即可)
# 从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧
brpoplpush(src, dst, timeout=0)
src:取出并要移除元素的列表对应的name
dst:要插入元素的列表对应的name
timeout:当src对应的列表中没有数据时,阻塞等待其有数据的超时时间(秒),0 表示永远阻塞
2、基于blpop实现订阅/发布
1.publish.py
:发布者,发布数据到redis中的指定key
# 首先,我在一端生成任务ID作为列表名,然后将数据写入到该列表,
import redis
import time
# 连接池
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=10)
r = redis.Redis(connection_pool=pool)
jid = "20200624110134251155"
for x in range(10):
time.sleep(2)
r.rpush(jid, x)
r.rpush(jid, "job_end") # 最后发送一个关键字,让订阅者接受到该关键字后就认为任务结束,退出订阅
Note:为了看到效果,我给造数据的for循环添加了time.sleep让其每循环一次都slepp 2秒。
2.subscribe.py
:订阅者,订阅reids中指定的key(阻塞监听数据)
# 然后在另一端,根据该任务ID阻塞监听该列表有数据就获取,没有则继续阻塞,避免阻塞时间过久,需要设置timeout
import redis
# 连接池
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=10)
r = redis.Redis(connection_pool=pool)
jid = "20200624110134251155"
# 设置超时时间,阻塞读数据
timeout = 10
while True:
if timeout < 0:
print("超时退出")
break
_, msg = r.blpop(jid, timeout=10)
rev_msg = msg.decode(encoding='UTF-8')
if rev_msg == "job_end":
print("消息获取完毕,退出。。。。")
break
print(rev_msg)
3.开始订阅发布
# 执行发布者
$ python3 publish.py
# 执行订阅者
$ python3 subscribe.py
开始订阅-20200624110134251155
0
1
2
3
4
5
6
7
8
9
消息获取完毕,退出。。。。