正文
1、为什么要使用消息队列?
分析:一个用消息队列的人,不知道为啥用,这就有点尴尬。没有复习这点,很容易被问蒙,然后就开始胡扯了。
回答:这个问题,咱只答三个最主要的应用场景(不可否认还有其他的,但是只答三个主要的),即以下六个字:解耦、异步、削峰
(1)解耦
传统模式:
传统模式的缺点:
- 系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!
中间件模式:
中间件模式的的优点:
- 将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
(2)异步
传统模式:
传统模式的缺点:
- 一些非必要的业务逻辑以同步的方式运行,太耗费时间。
中间件模式:
中间件模式的的优点:
- 将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
(3)削峰
传统模式
传统模式的缺点:
- 并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常
中间件模式:
中间件模式的的优点:
- 系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
2、使用了消息队列会有什么缺点?
分析:一个使用了MQ的项目,如果连这个问题都没有考虑过,就把MQ引进去了,那就给自己的项目带来了风险。我们引入一个技术,要对这个技术的弊端有充分的认识,才能做好预防。要记住,不要给公司挖坑!
回答:回答也很容易,从以下两个个角度来答
- 系统可用性降低:你想啊,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低
- 系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。
但是,我们该用还是要用的。
一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式。利用redis这两种场景的消息队列都能够实现。 定义:
- 生产者消费者模式:生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有。
- 发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;即正常情况下每个消费者收到的消息应该都是一样的。
那么如此多的MQ产品,为什么要使用redis作消息队列呢?以下附上一份总结了别人的一些report或blog的表格,以及当初用来说服整个team的一句结论。
Redis is easy to use and configure since we have experience in Redis, and most importantly, its performance satisfies our requirement.
Then, how to use redis as a MQ?
首先,redis的队列实际在代码逻辑中不需要由我们自己实现,因此一个所谓的 RedisMQ 对象实际是一个 redis key以及对其操作的一些封装。
PubSub Mode:
redis 从 2.0.0 版本开始支持 pub/sub 指令。详情见 http://redis.io/topics/pubsub
实现思想很简单,Publisher调用redis的publish方法往特定的channel发送消息,Subscriber在初始化的时候要subscribe到该channel,一旦有消息就会立即接收。
比较简单的demo可参见:http://shift-alt-ctrl.iteye.com/blog/1867454 ,此链接博客中写得已较详细,本文便不再赘述
Producer/Consumer Mode:
该方法是借助redis的list结构实现的。
Producer调用redis的lpush往特定key里塞入消息,Consumer调用brpop去不断监听该key。
producer:
1 // producer code
2 String key = "demo:mq:test";
3 String msg = "hello world";
4 redisDao.lpush(key, msg);
consumer:
// consumer code String key = "demo:mq:test"; while (true) { // block invoke List<String> msgs = redisDao.brpop(BLOCK_TIMEOUT, listKey); if (msgs == null) continue; String jobMsg = msgs.get(1); processMsg(jobMsg); }
当有多个consumers的时候,它会按照brpop调用的顺序分派消息,并非随机。
BLOCK_TIMEOUT不建议设成infinity(有些redis驱动也直接不支持inifinity),我们目前设成30(单位是秒)情况良好。
在学习RPOPLPUSH命令的时候,官方文档中有提到安全队列和不安全的队列,一开始没有看懂,现在理解了做个笔记。
一般情况下,我们可以借助List来实现消息队列,比如一个客户端通过命令LPUSH(BLPUSH)把消息入队,另一个客户端通过命令RPOP(BRPOP)获取消息。这种方式实现的队列是不安全的。
为什么是不安全的呢?因为RPOP命令的特性:会移除list的队尾元素(消息),并将这个元素(消息)返回给客户端。这意味着该元素就只存在于客户端的上下文中,redis服务器中没有这个元素了,如果客户端在处理这个返回元素的过程崩溃了,那么这个元素就永远丢失了。这种情况导致:客户端虽然成功收到了消息,但是却没有处理它。
那怎么来实现一个安全的队列呢?可以使用redis的 RPOPLPUSH (或者其阻塞版本的 BRPOPLPUSH)命令。
redis的命令中,很多都提供了阻塞与非阻塞两个方式
例如 LPUSH 为非阻塞,BLPUSH 为阻塞方式
他们的区别是什么?
唯一的区别是当列表中没有元素时,BRPOP命令会一直阻塞住连接,直到有新元素加入,而RPOP会直接返回nil
实际应用的区别
需要从队列获取任务
如果用非阻塞的方式,代码会是这样
# 无限循环读取任务队列中的内容
loop
$task = RPOR queue
if $task
# 如果有任务则执行
execute($task)
else
# 如果没有就等待1秒
wait 1 second
当队列中没有任务时,每秒都会调用一次RPOP命令查看是否有新任务,可能会白白浪费很多系统资源,如果在有新任务加入队列时就通知消费者就好了,这个需求就可以使用阻塞式命令来实现
loop
# 如果队列中没有任务,BRPOP命令会一直阻塞
# 0 表示一直等待,永不过期
$task = BRPOP queue, 0
# 有返回值就继续执行
execute($task[1])
RPOPLPUSH命令格式:RPOPLPUSH source destination 。RPOPLPUSH命令原子性地返回并移除 source 列表的最后一个元素, 并把该元素放入 destination 列表的头部。使用这个命令就可以实现安全队列。
因为使用 RPOPLPUSH 获取消息时,RPOPLPUSH 会把消息返给客户端,同时把该消息放入一个备份消息列表,并且这个过程是原子的,可以保证消息的安全。当客户端成功的处理了消息后,就可以把此消息从备份列表中移除了。如果客户端因为崩溃的原因没有处理某个消息,那么就可以从备份列表destination中重新获取并处理这个消息。