一、安装
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum install socat logrotate -y
修改repo文件
vi /etc/yum.repos.d/rabbitmq.repo
##
## Zero dependency Erlang
##
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/$basearch
repo_gpgcheck=1
gpgcheck=1
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
##
## RabbitMQ server
##
[rabbitmq_server]
name=rabbitmq_server
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/$basearch
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
[rabbitmq_server-source]
name=rabbitmq_server-source
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
安装:
yum install erlang rabbitmq-server
安装后启动:
rabbitmq-server
二、概念
2.1 生产者
生成消息的进程
2.2 消费者
消费消息的进程
2.3 exchange交换器
在rabbitmq中,加入了exchange概念,即交换器,生产者实际是把消息发给交换器,然后由交换器来决定怎么把消息发到队列,然后消费者从队列中获取消息。
交换器的类型有:
- fanout 扇区。会把消息发送给所有绑定了该交换器的队列
- direct 在fanout基础上支持全匹配过滤
- topic 在fanout基础上支持模糊匹配过滤
- headers
查看server注册的所有交换器:rabbitmqctl list_bindings
空字符串表示默认交换器
2.4 queue队列
一个队列里面的消息,只会发给一个消费者,如果需要多个消费者同时接收同一个消息,就需要为每个消费者定义一个队列,然后把消息,发送多份到不同的队列。
二、Python使用
安装库:
pip install pika
三、队列模式
- 多个消费者监听同一个队列
- 消息只发送一份到队列
- 一个消息只会被一个消费者消费
- 如果消费者拿了消息,没有消费成功,会给另一个消费者消费
生产者
# encoding=utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 定义rabbitmq服务的ip和端口
channel = connection.channel()
queue_name = 'hello' # 队列名
channel.queue_declare(queue=queue_name) # 创建队列,如果发送到一个不存在的队列,消息会被丢弃
channel.basic_publish(exchange='',
routing_key=queue_name,
body='Hello World!11')
print(" [x] Sent 'Hello World!'")
消费者
import pika
def callback(ch, method, properties, body):
print('callback', ch, method, properties, body)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
queue_name = 'hello'
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
四、发布订阅模式
-
一个消息会有属性:exchange和routing_key
-
消息会发布给所有订阅了该exchange和routing_key的消费者
-
这个模式需要用到fanout类型的交换器
-
生产者需要定义交换器名字(类似其他MQ里的topic)和类型,也就是
channel.exchange_declare(exchange='logs', exchange_type='fanout')
-
然后发布消息到该交换器
-
消费者需要指定自己的队列ID,可以自己定义,也可以让server随机分配一个
channel.queue_declare(queue='', exclusive=True)
queue传空就是随机分配。 -
然后把自己的队列绑定到交换器
queue_bind(exchange='logs',queue=result.method.queue)
-
然后消费者从该队列接收消息
-
生产者发送消息到交换器后,交换器会把这个消息发送到所有绑定了该交换器的队列。
生产者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 定义rabbitmq服务的ip和端口
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout') # 声明交换器,名字是logs 类型是fanout
channel.basic_publish(exchange='logs', routing_key='', body='body111')
print(" [x] Sent 'Hello World!'")
消费者
import pika
import time
def callback(ch, method, properties, body):
print('callback', ch, method, properties, body)
time.sleep(10)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
result = channel.queue_declare(queue='', exclusive=True) # exclusive=True表示当消费者关闭后,队列会相应被删除
print(result.method.queue) # 分配到的队列ID
channel.queue_bind(exchange='logs',queue=result.method.queue)
channel.basic_consume(queue=result.method.queue,
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
五、带过滤的发布订阅模式
在发布订阅模式中,可以加上过滤功能,就是某个消费者,只消费这个交换器里面的某一部分消息。过滤方法分为全匹配过滤和模糊匹配过滤
5.1全匹配过滤
这里需要使用到direct类型的交换器。全匹配是指消息的routing_key等于队列绑定的routing_key,才会发送消息到该队列
步骤:
- 生产者定义消息的routing_key值
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.basic_publish(exchange='direct_logs', routing_key='black', body='body111')
- 消费者绑定队列时指定routing_key值,一个消费者可以绑定多个routing_key
channel.queue_bind(exchange='direct_logs', queue=result.method.queue, routing_key='black')
channel.queue_bind(exchange='direct_logs', queue=result.method.queue, routing_key='black1')
5.2模糊匹配过滤
这里需要使用到topic类型的交换器。模糊配是指消息的routing_key可以模糊匹配上队列绑定的routing_key,才会发送消息到该队列。
消息的routing_key不支持通配符
队列的routing_key支持通配符:
*
表示任意一个字符#
表示0或多个字符
实验发现*和#基本是一样的,都是0-n个任意字符。不知道为什么。
- 生产者定义交换器和消息的routing_key
channel.exchange_declare(exchange='topic_logs', exchange_type='topic') # 声明交换器,名字是logs 类型是fanout
channel.basic_publish(exchange='topic_logs', routing_key='logs1.ab', body='body111')
- 消费者定义接收的routing_key
channel.queue_bind(exchange='topic_logs', queue=result.method.queue, routing_key='logs.*')
channel.queue_bind(exchange='topic_logs', queue=result.method.queue, routing_key='logs.#')
当消息的routing_key等于
- logs.error 可以接收
- logs.a 可以接收
- logs1.error 不可以接收
六、RPC模式
- 客户端通过队列,发送任务给服务端
- 服务端执行任务后,通过另一个队列,发送结果给客户端
客户端:
#!/usr/bin/env python
import pika
import uuid
class FibonacciRpcClient(object):
#!/usr/bin/env python
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
# 接收结果的队列
self.channel.basic_consume( queue=self.callback_queue,on_message_callback=self.on_response,auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='', routing_key='rpc_queue',
properties=pika.BasicProperties(reply_to=self.callback_queue,
correlation_id=self.corr_id, ),body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
服务端:
# encoding=utf8
import pika
import time
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='', routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
七、高可用和高并发
1. 普通集群模式
- 假如集群有3个节点,ABC
- ABC都会保存元数据,元数据包括哪个队列在哪个节点
- 队列的数据,也就是消息,只会保存在其中一个节点
- 所以
- 当生产者发送消息M,M只会被保存在其中一个节点,例如A
- 如果消费者连接到节点B,要消费M,发现B没有M,但是因为B有元数据,知道M在A节点,所以会引导消费者去连接A
- 特点:
- 能解决高并发问题,也就是可以横向拓展
- 不能解决高可用问题,因为其中一个节点挂了,数据也就丢失了
2. 镜像集群模式
- 假如集群有3个节点,ABC
- 每个节点都会保存元数据和队列的数据
- 所以:
- 当A挂了,生产者消费者可以连接到B,数据不会丢失
- 特点:
- 能解决高可用问题
- 不能解决高并发问题,因为当队列很多,并不能通过横向拓展来提升吞吐量。可以通过HA同步策略来解决
HA同步策略来解决
HA-mode | HA-params | 说明 |
---|---|---|
all | 无 | 队列会复制到所有的节点 |
exactly | count | 队列只会复制到count个节点。例如count=2,消息会保存在节点AB,C不会有。当A挂了,会把消息同步到C,使镜像数量依然等于2 |
nodes | node_name | 指定复制到哪个节点,例如node_name=AB,会把消息复制到AB两个节点。如果AB都不存在,保存在生产者当前连接的节点 |
当用上HA同步策略,可以实现:
- 高可用,队列会被复制到多个节点,保证其中一个节点挂了,队列依然正常服务
- 高并发,只要mode不是all,就可以实现横向拓展
八、实践
1. 异步服务
用于削峰,或者提升降低接口时延。例如发微博后,读优先的分发逻辑可以放到异步。
使用队列模式,因为一个消息只能被消费一次。消费失败需要重试。
2.系统解耦
例如用户注册后,告诉其他几个微服务,让它们处理新用户注册逻辑,例如金钱服务要发送新用户礼包。
使用发布订阅模式,因为有多个微服务,都需要消费该消息。
七、问题
常见问题:
- 高可用
- 可靠传输
- 生产者到MQ
- 通过confirm机制
- MQ自身
- 持久化
- 镜像复制
- MQ到消费者
- ack
- 生产者到MQ
- 顺序消费
- 消息堆积
- 积压几个消息,消费满
- 扩容,增加消费者
- MQ消息过期
- 设置消息不过期
- MQ满了
- 先把消息导出,保存到硬盘,用脚本定期消费,或者在低锋时间把消息重新发到MQ
- 积压几个消息,消费满
其他:
- 消费失败,重试问题
- 有ack机制,如果长时间没有ack,server会把任务发给另一个消费者,所以底层最好做好幂等
- 发送任务失败
- 只能重试发消息
- server宕机,启用从库
- 通过集群模式,数据有多份镜像
- 主从不同步问题
- 发送消息后,保证消费都保存到多个镜像,才返回成功
- 消息丢失
- server收到消息后,是否持久化?
- 有镜像
- 如果不持久化,server宕机怎么办?
- 通过镜像实现持久化
- server收到消息后,是否持久化?
- 怎么横行拓展,高并发?
- 集群模式