1、RabbitMQ
RabbitMQ 是由 LShift 提供的一个Advanced Message Quenuing Protocol(AMQP)的开源实现,由以高性能,健壮性记忆可伸缩性出名的rlang 写成,因此也继承了这些优点
很成熟,久经考验,应用广泛
文档详细,客户端丰富,几乎常用语言都有RabbitMQ的开发库
2、安装
http://www.rabbitmq.com/install-rpm.html
选择RPM 包下载,选择对应平台,本次装到CentOS6 上
由于使用了erlang 语言开发,所以需要erlang包,该下载也提供了连接
安装成功,查看安装的文件:
配置:http://www.rabbitmq.com/configure.html#config-location
环境变量:
-
- 使用系统环境变量,如果没有使用rabbitmq-env.conf 中定义环境变量,否则 使用缺省值
- RABBITMQ_NODE_IP_ADDRESS the empty string, meaning that it should bind 同allnetwork interfaces
- RABBITMQ_NODE_PORT 5672
- RABBITMQ_NODE_PORT RABBITMQ_NODE_PORT + 20000 内部节点和客户端 工具通信用
- RABBITMQ_CONFIG_FILE 配置文件 路径默认为 /etc/rabbitmq/rabbitmq
15672:http端使用
环境变量文件,可以不配置
工作特性配置文件
rabbitmq.config 配置文件
3.7支持新旧两种格式配置文件格式
1、erlang配置 文件格式,为了兼容继续采用
2、sysctl格式,如果不需要兼容,RabbitMQ 鼓励使用
这个文件也可以不配置
插件管理
列出所有可用插件
# tabbitmq-plugins list
启动WEB 管理插件
# tabbitmq-plugins enable rabbitmq_management
启动服务:
# service rabbitmq-server start
启动中,可能出现下面的错误:
就是这个文件的权限问题,修改属主,属组即可
||
||
服务启动成功:
开始登陆 WEB 界面 ip:port
使用guest/guest 只能本地登录,远程登录会报错
rabbitmqctl
用户管理:
添加用户:
删除用户:
更改密码:
设置权限Tags,其实就是分配组:
设置rab 用户
tag 的意义如下:
administrator 可以管理用户,权限,虚拟主机
基本信息:
虚拟主机:
/ 为缺省虚拟机
缺省虚拟缀,默认只能是guest 用户在本机连接,上图新建的用户rab 默认无法访问任何虚拟主机
Python 库
Pika 是纯Python实现的额支持AMQP协议的库
$ pip install pika
RabbitMQ 工作原理及应用
https://www.rabbitmq.com/getstarted.html
上图,列出了RabbitMQ 的使用模式,学习上面的模式,对理解所有小写队列都很重要。
名词解释
名词 | 说明 |
Server | 服务器,接受客户端连接,实现消息队列即路由功能的继承(服务),也称为消息代理,注意,客户端包括生产者好消费者 |
Connection | 网络物理连接 |
Channel | 一个连接允许多个客户端连接 |
Exchange |
交换器,接受生产者发来的消息,决定如何路由 给 服务器中的队列,常用的类型:direct(point-to -point) topic(publish-subscribe) , fanout(multicast) {fanout:扇出,subcribe:订阅} |
Message | 消息 |
Message Queue | 消息队列,数据的存储载体 |
Bind | 绑定,建立消息队列和交换器之间的关系,也就是说就交换器拿到数据,把什么样的数据传送给哪个队列 |
Virtual Host | 虚拟主机,一批交换机,消息队列和相关对象的集合,为了多用户互不干扰,使用虚拟主机分组交换机,消息队列 |
Topic | 主题,话题 |
Broker | 可等价位Server |
1、队列:
这种模式就是简单的生产者消费者模式,消息队列就是一个FIFO 的队列
生产者 send.py 消费者 receive.py
官方例子:https://www.rabbitmq.com/tutorials/tutorial-one-python.html
参照官网例子,写一个程序:
1 # send.py 2 import pika 3 4 # 匹配连接参数 5 params = pika.ConnectionParameters('192.168.112.111') 6 # 建立连接 7 connection = pika.BlockingConnection(params) 8 9 10 with connection: 11 # 建立通道 12 channel = connection.channel() 13 # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped 14 channel.queue_declare(queue='hello') 15 16 channel.basic_publish( 17 exchange= "" ,# 使用缺省exchange 18 routing_key='hello', # routing_key 必须指定,这里要求和目标queue 一致 19 body= 'hello world' # 消息 20 ) 21 print('===== end ========')
结果:
访问被拒绝,还是权限问题,原因是guest 用户只能访问 localhost 上的 / 缺省虚拟主机
解决办法:
缺省虚拟主机,默认只能在本机访问,不要修改为远程访问, 是安全的考虑
因此,在Admin中Virtual hosts 中,新建一个虚拟主机test
注意:新建的test 虚拟主机 的User 是谁,本次是 rab 用户
在ConnectionParameters 中并没有用户名,密码填写的参数,它使用参数credentials 传入,这需要构建一个pika.credentials.Crendentuals对象
测试:(修改后)
1 # send.py 2 import pika 3 4 5 credential = pika.PlainCredentials('rab', '123456') 6 # 匹配连接参数 7 params = pika.ConnectionParameters( 8 '192.168.112.111', 5672, # 地址,端口 9 'test', # 虚拟主机 10 credential # 用户名,密码 11 ) 12 # 建立连接 13 connection = pika.BlockingConnection(params) 14 15 16 with connection: 17 # 建立通道 18 channel = connection.channel() 19 # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped 20 channel.queue_declare(queue='hello') 21 22 channel.basic_publish( 23 exchange= "" ,# 使用缺省exchange 24 routing_key='hello', # routing_key 必须指定,这里要求和目标queue 一致 25 body= 'hello world' # 消息 26 ) 27 print('===== end ========')
结果:
===== end ========
URLParameters, 也可以使用URL 创建参数 (直接替换就可以)
1 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
queue_declare 声明一个queue, 有必要的话,创建它
basc_publish exchange 为空就使用缺省exchange ,如果找不到指定的exchange 就抛异常
使用缺省exchange, 就必须指定routing_key 使用它找到queue
生产者代码做一些改动,使他能连接send message。
1 # send.py 2 import pika 3 4 5 credential = pika.PlainCredentials('rab', '123456') 6 # 匹配连接参数 7 # params = pika.ConnectionParameters( 8 # '192.168.112.111', 5672, # 地址,端口 9 # 'test', # 虚拟主机 10 # credential # 用户名,密码 11 # ) 12 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 13 # 建立连接 14 connection = pika.BlockingConnection(params2) 15 16 17 with connection: 18 # 建立通道 19 channel = connection.channel() 20 # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped 21 channel.queue_declare(queue='hello') 22 23 for _ in range(40): 24 channel.basic_publish( 25 exchange= "" ,# 使用缺省exchange 26 routing_key='hello', # routing_key 必须指定,这里要求和目标queue 一致 27 body= 'hello world' # 消息 28 ) 29 print('===== end ========')
结果:(加上之前的 一共42条了)
receive.py 消费者代码
单个消费者
1 # receive.py 2 import pika 3 4 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 5 6 connection = pika.BlockingConnection(params2) 7 channel = connection.channel() 8 9 with connection: 10 msg = channel.basic_get('hello',True) # 获取一个消息, True, 获取不到,不会阻塞 11 print(msg) 12 method, props, body = msg 13 print(method) 14 print(props) 15 if body: 16 print(body) 17 else: 18 print('empty')
结果:
1 (<Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=119', 'redelivered=False', 'routing_key=hello'])>, <BasicProperties>, b'hello world') 2 <Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=119', 'redelivered=False', 'routing_key=hello'])> 3 <BasicProperties> 4 b'hello world'
如上:每执行一次,消费一条
批量消费消息:
1 # receive.py 2 import pika 3 4 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 5 connection = pika.BlockingConnection(params2) 6 channel = connection.channel() 7 8 def callback(channel, method, properties, body): 9 print(body) 10 11 with connection: 12 channel.basic_consume( 13 callback, # 消费回调函数 14 queue='hello', # 队列名 15 no_ack=True # 不回应,不阻塞 16 ) 17 channel.start_consuming()
将hello 队列中的消息,都消费完,并一直处于消费状态
而这个Ack是TCP协议中的Ack此Ack的回复不关心消费者是否对接收到的数据进行了处理,当然也不关心处理数据所需要的耗时
2、工作队列:
继续使用队列模式的生产者消费者代码,启动2 个消费者
观察结果,可以看到,2个消费者是交替拿到不同的消息
这种工作 模式是一种竞争工作方式,对某一个消息来说,只能有一个消费者拿走它
从结果知道,使用的是轮询方式拿走数据的
注意:虽然上面的图中没有画出exchange, 用到缺省的exchange
测试:代码
1 # send.py 2 import pika 3 4 5 # credential = pika.PlainCredentials('rab', '123456') 6 # 匹配连接参数 7 # params = pika.ConnectionParameters( 8 # '192.168.112.111', 5672, # 地址,端口 9 # 'test', # 虚拟主机 10 # credential # 用户名,密码 11 # ) 12 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 13 # 建立连接 14 connection = pika.BlockingConnection(params2) 15 16 17 with connection: 18 # 建立通道 19 channel = connection.channel() 20 # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped 21 channel.queue_declare(queue='hello') 22 23 for i in range(40): 24 channel.basic_publish( 25 exchange= "" ,# 使用缺省exchange 26 routing_key='hello', # routing_key 必须指定,这里要求和目标queue 一致 27 body= '{} hello world'.format(i) # 消息 28 ) 29 print('===== end ========') 30 31 32 33 34 35 36 # receive.py 37 import pika 38 import time 39 40 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 41 connection = pika.BlockingConnection(params2) 42 channel = connection.channel() 43 44 def callback(channel, method, properties, body): 45 print(body) 46 47 time.sleep(1) 48 with connection: 49 channel.basic_consume( 50 callback, # 消费回调函数 51 queue='hello', # 队列名 52 no_ack=True # 不回应,不阻塞 53 ) 54 channel.start_consuming()
两个消费者结果:
三个消费者:
3、发布,订阅模式
Publish / Subscribe 发布和订阅
订阅者和消费者之间还是有一个 exchange
也就是,每个人 拿到的数据是一样的。
当前模式的exchange 的type是fanout, 就是一对多,即广播模式
注意:同一个queueu的消息只能被消费一次,所以这里使用了多个queue,相当于为了保证不同的消费者拿到同样的数据,每一个消费者都应该有自己的queueu
# 生成一个交换机 channel.exchange_declare( exchange='logs', # 新交换机 exchange_type='fanout' # 交换机的模式:广播 )
生产者使用广播模式,在test虚拟主机下构建了一个logs 交换机
至于queue, 可以由生产者创建,也可以由 消费者创建
本次采用使用消费者端创建爱你,生产者 把数据发往交换机logs, 采用了fanout ,然后将数据通过交换机发往已经绑定到此交换机的所有queue。
绑定 Bingding,建立exchange 和queue之间的联系
1 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 2 connection = pika.BlockingConnection(params2) 3 channel = connection.channel() 4 5 # 创建随机名称的queueu 6 # result = channel.queue_declare() # s生成一个随机名称的queue 7 # result = channel.queue_declare(exclusive=True) # 生成一个随机名称的queue,并在断开连接时删除queue 8 9 # 生成queue 10 q1 = channel.queue_declare(exclusive=True) 11 q2 = channel.queue_declare(exclusive=True) 12 q1name = q1.method.queue # 可以通过result.method.queue 查看随机名称 13 q2name = q2.method.queue # 可以通过result.method.queue 查看随机名称 14 15 print(q1name, q2name) 16 17 # 绑定 18 channel.queue_bind(exchange='blogs', queue=q1name) 19 channel.queue_bind(exchange='blogs', queue=q2name)
生产者代码:
注意观察交换机和队列
1 # send.py 2 import pika 3 import time 4 5 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 6 connection = pika.BlockingConnection(params2) 7 channel = connection.channel() 8 9 with connection: 10 # 建立通道 11 channel = connection.channel() 12 # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped 13 channel.exchange_declare( 14 exchange = 'logs', #新交换机 15 exchange_type = 'fanout' 16 ) 17 18 for i in range(100): 19 pub = channel.basic_publish( 20 exchange= "logs" ,# 指定exchange 21 routing_key='', # 广播模式不用指定 22 body= '{:02} hello world'.format(i) # 消息 23 ) 24 print(pub, '============') 25 26 print('==== end =====')
如果先开启生产者,没有消费者,直接dropped 掉,但是生成了exchange
消费者代码
构建queue 并绑定到test虚拟主机的logs交换机上
1 # receive.py 2 import pika 3 import time 4 5 import pika 6 import time 7 8 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 9 connection = pika.BlockingConnection(params2) 10 channel = connection.channel() 11 channel.exchange_declare('logs', 'fanout') 12 13 # 生成一个随机名的 queue 14 q1 = channel.queue_declare(exclusive=True) 15 q2 = channel.queue_declare(exclusive=True) 16 print(1, q1, type(q1)) 17 print(2, q1.method, type(q1.method)) 18 q1name = q1.method.queue # 可以通过result.method.queue 查看随机名称 19 q2name = q2.method.queue # 可以通过result.method.queue 查看随机名称 20 print(3, q1name) 21 22 # 绑定 23 channel.queue_bind(queue=q1name, exchange='logs') 24 channel.queue_bind(queue=q2name, exchange='logs') 25 26 def callback(channel, method, properties, body): 27 print("{} {}".format(channel, method)) 28 print(body) 29 print('=======================================') 30 31 32 with connection: 33 channel.basic_consume( 34 callback, # 消费回调函数 35 queue='q1name', # 队列名 36 no_ack=True # 不回应,不阻塞 37 ) 38 channel.basic_consume( 39 callback, # 消费回调函数 40 queue='q2name', # 队列名 41 no_ack=True # 不回应,不阻塞 42 ) 43 44 channel.start_consuming()
看到首先启动了 消费者,创建了exchange,并绑定了queue
打印:(部分打印)
1 1 <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=amq.gen-DfOvsPNZoEK6rWZaYbj8zw'])>"])> <class 'pika.frame.Method'> 2 2 <Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=amq.gen-DfOvsPNZoEK6rWZaYbj8zw'])> <class 'pika.spec.Queue.DeclareOk'> 3 3 amq.gen-DfOvsPNZoEK6rWZaYbj8zw 4 <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('192.168.112.1', 58780)->('192.168.112.111', 5672) params=<URLParameters host=192.168.112.111 port=5672 virtual_host=test ssl=False>>>> 5 <Basic.Deliver(['consumer_tag=ctag1.4cb1fdc9f859486cac3f1cfa01e476f8', 'delivery_tag=1', 'exchange=logs', 'redelivered=False', 'routing_key='])> 6 b'00 hello world'
q1 = channel.queue_declare(exclusive=True)
q2 = channel.queue_declare(exclusive=True)
如果先开启生产者,后开启消费者,部分数据会丢失
部分数据丢失,是因为,exchange 收到了数据,没有queue接受,所以,exchange 丢弃了这些数据。
4、路由Routing
路由其实就是生产者的数据经过exchange 的时候,通过匹配规则,决定数据的去向
消费者
生产者代码
交换机类型为 direct ,指定路由的key
1 # send.py 2 import pika 3 import time 4 import random 5 6 exchange = 'color' 7 colors = ('orange', 'black', 'green') 8 9 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 10 connection = pika.BlockingConnection(params2) 11 channel = connection.channel() 12 13 with connection: 14 # 建立通道 15 channel = connection.channel() 16 # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped 17 channel.exchange_declare( 18 exchange = exchange, #新交换机 19 exchange_type = 'direct' 20 ) 21 22 for i in range(40): 23 rk = colors[random.randint(0,2)] 24 msg = "routingKey {} --- data {:02}".format(rk, i) 25 pub = channel.basic_publish( 26 exchange= exchange ,# 指定exchange 27 routing_key=rk, # 广播模式不用指定 28 body= msg # 消息 29 ) 30 print(rk, msg, '============') 31 time.sleep(0.5) 32 33 print('==== end =====')
消费者代码:
1 # receive.py 2 import pika 3 import time 4 5 exchange = 'color' 6 colors = ('orange', 'black', 'green') 7 8 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 9 connection = pika.BlockingConnection(params2) 10 channel = connection.channel() 11 channel.exchange_declare(exchange, 'direct') 12 13 # 生成一个随机名的 queue 14 q1 = channel.queue_declare(exclusive=True) 15 q2 = channel.queue_declare(exclusive=True) 16 17 name1 = q1.method.queue # 可以通过result.method.queue 查看随机名称 18 name2 = q2.method.queue # 可以通过result.method.queue 查看随机名称 19 20 21 # 绑定(一定指定 routing_key) 22 channel.queue_bind(queue=name1, exchange=exchange, routing_key=colors[0]) 23 channel.queue_bind(name2, exchange,colors[1]) 24 channel.queue_bind(queue=name2, exchange=exchange,routing_key=colors[0]) 25 26 def callback(channel, method, properties, body): 27 print("{} {}".format(channel, method)) 28 print(body) 29 print('=======================================') 30 31 32 with connection: 33 channel.basic_consume( 34 callback, # 消费回调函数 35 queue=name2, # 队列名 36 no_ack=True # 不回应,不阻塞 37 ) 38 channel.basic_consume( 39 callback, # 消费回调函数 40 queue=name1, # 队列名 41 no_ack=True # 不回应,不阻塞 42 ) 43 channel.start_consuming()
如果 routing_key 设置都是一样的:
也就是,将 消费者的 routing_key 比如都设置为black, 则 生产者,只有生产 orange的,才会被消费
其次,最重要的是, 变成了广播, 类似fanout,都是1对多,但是不同
因为 fanout时,exchange 不做数据过滤, 1个消息,所有绑定的queue都能拿到一个副本
direct时,要按照 routing_key 分配数据,上图的black 有2 个 queue设置了,就会把1一个消息分发给2个queue,其他没有black 的,该怎么消费,就怎么消费
5、Topic 话题
Topic 就是更加高级的路由,支持模式匹配而已
Topic 的routing_key 必须使用点 号分割的单词组成,最多支持255个字节
支持使用通配符
-
- * 表示严格的一个单词
- #表示0个 或多个单词
如果queue绑定的routing_key 只是一个# ,这个queue 其实可以接受所有的消息
如果没有使用任何 通配符,效果类似direct
生产者代码:
1 # send.py 2 import pika 3 import time 4 import random 5 6 exchange = 'products' 7 colors = ('red', 'blue', 'green') 8 9 topics = ('phone.*', '*.red') # 2中话题 10 product_type = ('phone','pc','tv') # 3中产品 11 12 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 13 connection = pika.BlockingConnection(params2) 14 channel = connection.channel() 15 16 with connection: 17 # 建立通道 18 channel = connection.channel() 19 # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped 20 channel.exchange_declare( 21 exchange = exchange, #新交换机 22 exchange_type = 'topic' 23 ) 24 25 for i in range(40): 26 rk = '{}.{}'.format( 27 product_type[random.randint(0,2)], 28 colors[random.randint(0,2)] 29 ) 30 msg = "routingKey {} --- data {:02}".format(rk, i) 31 pub = channel.basic_publish( 32 exchange= exchange ,# 指定exchange 33 routing_key=rk, # 广播模式不用指定 34 body= msg # 消息 35 ) 36 print(rk, msg, '============') 37 time.sleep(0.5) 38 39 print('==== end =====')
消费者代码:
1 # receive.py 2 import pika 3 import time 4 5 exchange = 'products' 6 colors = ('red', 'blue', 'green') 7 8 topics = ('phone.*', '*.red') # 2中话题 9 product_type = ('phone','pc','tv') # 3中产品 10 11 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 12 connection = pika.BlockingConnection(params2) 13 channel = connection.channel() 14 channel.exchange_declare(exchange, 'topic') 15 16 # 生成一个随机名的 queue 17 q1 = channel.queue_declare(exclusive=True) 18 q2 = channel.queue_declare(exclusive=True) 19 20 name1 = q1.method.queue # 可以通过result.method.queue 查看随机名称 21 name2 = q2.method.queue # 可以通过result.method.queue 查看随机名称 22 23 24 # 绑定(一定指定 routing_key) 25 #q1 只收集phone开头的routing-key的消息 26 channel.queue_bind(queue=name1, exchange=exchange, routing_key=topics[0]) 27 # q2 只收集red结尾的 28 channel.queue_bind(name2, exchange,topics[1]) 29 30 def callback(channel, method, properties, body): 31 print("{} {}".format(channel, method)) 32 print(body) 33 print('=======================================') 34 35 36 with connection: 37 channel.basic_consume( 38 callback, # 消费回调函数 39 queue=name2, # 队列名 40 no_ack=True # 不回应,不阻塞 41 ) 42 channel.basic_consume( 43 callback, # 消费回调函数 44 queue=name1, # 队列名 45 no_ack=True # 不回应,不阻塞 46 ) 47 channel.start_consuming()
观察者消费者拿到的数据,注意观察phone.red 的数据出现的次数
由此,可以知道,交换机在路由消息的时候, 只要和queue的routing_key 匹配,就把消息发给该queue
RPC远程过程调用
很少使用,有更好的RPC 框架
消息队列的作用:
1、系统间解耦
2、解决生产者,消费者速度匹配
由于稍微上规模的项目都会分层,分模块开发,模块间或系统间,尽量不要直接耦合,需要开放公共接口提供给别的模块使用,而调用可能触发并发问题,为了缓冲和解耦,往往使用中间技术