把 Redis 当作消息队列
List
List 的数据结构是链表,在头部和尾部操作元素,时间复杂度都是 O(1),所以它很适合用来当作消息队列。
消息队列要符合先进先出原则,生产者从左边开始塞,消费者从右边开始消费。
127.0.0.1:6379> lpush queue msg1
(integer) 1
127.0.0.1:6379> rpop queue
"msg1"
消费者在代码中就是加一个 while 循环,不断地执行 rpop 这个操作。这样就会造成一个问题,就是当队列里面没有消息的时候,会一直循环,造成 CPU 空转,不仅浪费 CPU 资源,还影响 Redis 性能。为了解决这个问题,我们可以在队列中没有消息的时候 sleep 几秒。示例代码如下(这里 Redis 我使用的是单例模式):
public function actionRpop()
{
$redis = Redis::getInstance();
while (true) {
$res = $redis->rpop('queue');
if ($res) {
// handle msg
echo $res . "
";
} else {
sleep(2);
}
}
}
如此解决了 CPU 空转的问题,不过也带来了新的问题,就是处理消息最多存在 2s 的延迟。
为了解决这个问题,我们可以用 brpop 阻塞式拉取。当队列中没有消息的时候,客户端会发生阻塞直至有新的消息到来。
127.0.0.1:6379> brpop queue 0 // 0 表示不设置超时时间
这样就解决了 CPU 空转和消费延迟的问题,看起来似乎很完美。不过还是存在问题的:当不设置超时时间或者设置的超时时间太长,然后这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,并且会被强制下线。
在 PHP 中,直接使用 brpop 不设置超时时间可能会报 read error on connection to **.**.**.**:6379
错,所以在客户端需要有重连机制。示例如下:
public function actionBrpop()
{
try {
$redis = Redis::getInstance();
start:
while (true) {
$res = $redis->brpop('queue', 0);
if ($res) {
// handle msg
print_r($res) . "
";
}
}
} catch (RedisException $e) {
echo $e->getMessage() . "
";
goto start;
}
}
或者直接设置阻塞 2s 超时,不断地循环就行了。
这样一来,Redis List 当作消息队列是不是就完美无缺呢?还差得远呢!
- 首先,Redis 并不能保证持久化,就是数据不能保证不丢失!当然,这是 Redis 的设计问题,我们这里不纠结这个,否则文章写不下去了。
- 其次,从 List 中 pop 消息后,这条消息就从 List 中删除了,没法再重复消费,就没有 ack 机制。
- 然后,消息不可堆积。Redis 是基于内存的,内存的容量有限,迟早撑爆。一撑爆就宕机,就会丢失未同步到磁盘的数据。
- 还有,不支持消费者组,没法加速消费。
第一个问题 pass,第三第四个问题无解。这里我们来讨论下第二个问题,我这里有一个解决方案,就是不用 pop,而是用 lrange 和 lrem 组合。先用 lrange 命令批量地获取消息,然后消费,消费成功就用 lrem 命令删除。如此虽然提高了时间复杂度,但是解决了不支持重复消费的问题。不过也带来了新的问题。因为 lrange 只能从左到右读,所以生产者只能用 rpush。还有 lrange 没有阻塞这个说法,所以还是得 sleep 避免 CPU 空转。代码示例:
public function actionLrange()
{
$redis = Redis::getInstance();
while (true) {
// take 100 at a time
$res = $redis->lrange('queue', 0, 100);
if ($res) {
foreach ($res as $v) {
// handle msg
echo $v . "
";
// when handled success
$redis->lrem('queue', $v, 1);
}
} else {
sleep(2);
}
}
}
Stream
在 Redis 5.0 以后,新增了 Stream 数据类型,解决了上述无解的问题。
Redis Stream 基本的命令我就不在这里介绍了,官方文档里面都有。
Stream 在创建队列的时候,可以指定队列长度。当队列长度超过上限,旧的消息会被删除。这样解决了消息堆积撑爆内存的问题,不过也带来了新的问题:当消费速度跟不上生产速度,未被消费的旧消息将会丢失。
# 10000 代表队列长度,* 代表 Redis 自动生成唯一 ID,msg 为 key,msg1 为value
127.0.0.1:6379> xadd queue maxlen 10000 * msg msg1
"1621579187964-0"
Stream 支持消费组,可以由多个消费者同时消费,并且保证了一条消息只能被同一消费组的一个消费者消费,避免了重复消费。
# 创建队列 queue 的消费组 group,0 等同于 0-0,代表从头开始拉取
127.0.0.1:6379> xgroup create queue group 0
OK
现在我们可以在消费组下面添加消费者。
# count 1 代表每次拉取一条消息,block 2000 代表没有消息时阻塞 2s,> 代表拉取最新的消息
127.0.0.1:6379> xreadgroup group group consumer1 count 1 block 2000 streams queue >
1) 1) "queue"
2) 1) 1) "1621579187964-0"
2) 1) "msg"
2) "msg1"
消费组里每消费一次,都会往 pending 里面写数据,记录了所有的消费信息,比如几个消费者,每个消费者消费了几条信息,哪个消费者消费了哪条消息。
127.0.0.1:6379> xpending queue group
1) (integer) 1
2) "1621579187964-0"
3) "1621579187964-0"
4) 1) 1) "consumer1"
2) "1"
Stream 提供了 ack 机制,每一次 ack,就会删掉 pending 里面的相关记录。
127.0.0.1:6379> xack queue group 1621579187964-0
(integer) 1
PHP 消费者代码示例:
public function actionConsumer1()
{
$redis = Redis::getInstance();
$queue = 'queue';
$group = 'group';
while (true) {
$res = $redis->xreadgroup($group, 'consumer1', [$queue => '>'], 1, 2000);
if ($res) {
// handle msg
var_dump($res);
// when handled success
$redis->xack($queue, $group, array_keys($res['queue']));
}
}
}
public function actionConsumer2()
{
$redis = Redis::getInstance();
$queue = 'queue';
$group = 'group';
while (true) {
$res = $redis->xreadgroup($group, 'consumer2', [$queue => '>'], 1, 2000);
if ($res) {
// handle msg
var_dump($res);
// when handled success
$redis->xack($queue, $group, array_keys($res['queue']));
}
}
}
运行结果:
127.0.0.1:6379> xadd queue maxlen 10000 * msg msg1
"1621580563151-0"
127.0.0.1:6379> xadd queue maxlen 10000 * msg msg2
"1621580564627-0"
127.0.0.1:6379> xadd queue maxlen 10000 * msg msg3
"1621580565981-0"
127.0.0.1:6379> xadd queue maxlen 10000 * msg msg4
"1621580567406-0"
127.0.0.1:6379> xadd queue maxlen 10000 * msg msg5
"1621580569186-0"
127.0.0.1:6379> xpending queue group
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)
# consumer1
$ php yii redis/consumer1
array(1) {
["queue"]=>
array(1) {
["1621580564627-0"]=>
array(1) {
["msg"]=>
string(4) "msg2"
}
}
}
array(1) {
["queue"]=>
array(1) {
["1621580565981-0"]=>
array(1) {
["msg"]=>
string(4) "msg3"
}
}
}
array(1) {
["queue"]=>
array(1) {
["1621580567406-0"]=>
array(1) {
["msg"]=>
string(4) "msg4"
}
}
}
# consumer2
$ php yii redis/consumer2
array(1) {
["queue"]=>
array(1) {
["1621580563151-0"]=>
array(1) {
["msg"]=>
string(4) "msg1"
}
}
}
array(1) {
["queue"]=>
array(1) {
["1621580569186-0"]=>
array(1) {
["msg"]=>
string(4) "msg5"
}
}
}
总结
把 Redis 当作消费队列的优点:
- 部署方便,学习成本低。
- 基于内存,速度快。
缺点:
- Redis 不能保证数据持久性,有可能会丢失数据。
- 面对消息积压,内存资源紧张。
所以,当你确保 Redis 不会宕机(日志刷盘或者主从同步的时候数据不丢失),内存够用的情况下,完全可以用来当作消息队列。亦或是业务对于数据丢失不敏感,也可以用。否则,最好还是用专业的消息队列中间件。