rabbitmq-server服务端
安装erlang
yum -y install erlang
安装RabbitMQ
yum -y install rabbitmq-server
启动rabbitmq-server:
systemctl start rabbitmq-server
设置rabbitmq账号密码,以及角色权限设置
# 设置新用户dong 密码1234
sudo rabbitmqctl add_user dong 1234
设置用户为administrator角色
sudo rabbitmqctl set_user_tags dong administrator
# 设置权限,允许对所有的队列都有权限
对何种资源具有配置、写、读的权限通过正则表达式来匹配,具体命令如下:
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
sudo rabbitmqctl set_permissions -p "/" dong ".*" ".*" ".*"
#重启服务生效设置
systemctl restart rabbitmq-server
rabbitmq相关命令:
// 新建用户
rabbitmqctl add_user {用户名} {密码}
// 设置权限
rabbitmqctl set_user_tags {用户名} {权限}
// 查看用户列表
rabbitmqctl list_users
// 为用户授权
添加 Virtual Hosts :
rabbitmqctl add_vhost <vhost>
// 删除用户
rabbitmqctl delete_user Username
// 修改用户的密码
rabbitmqctl change_password Username Newpassword
// 删除 Virtual Hosts :
rabbitmqctl delete_vhost <vhost>
// 添加 Users :
rabbitmqctl add_user <username> <password>
rabbitmqctl set_user_tags <username> <tag> ...
rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read>
// 删除 Users :
delete_user <username>
// 使用户user1具有vhost1这个virtual host中所有资源的配置、写、读权限以便管理其中的资源
rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*'
// 查看权限
rabbitmqctl list_user_permissions user1
rabbitmqctl list_permissions -p vhost1
// 清除权限
rabbitmqctl clear_permissions [-p VHostPath] User
//清空队列步骤
rabbitmqctl reset
需要提前关闭应用rabbitmqctl stop_app ,
然后再清空队列,启动应用
rabbitmqctl start_app
此时查看队列rabbitmqctl list_queues
查看所有的exchange: rabbitmqctl list_exchanges
查看所有的queue: rabbitmqctl list_queues
查看所有的用户: rabbitmqctl list_users
查看所有的绑定(exchange和queue的绑定信息): rabbitmqctl list_bindings
查看消息确认信息:
rabbitmqctl list_queues name messages_ready messages_unacknowledged
查看RabbitMQ状态,包括版本号等信息:rabbitmqctl status
#开启web界面rabbitmq
rabbitmq-plugins enable rabbitmq_management
#访问web界面
http://ip(或者网址):15672/
现在就可以在web页面,查看管理服务端信息了!!
接下来为消息队列创建生产者消费者模型:
安装pika模块:
pip3 install pika
书写生产者python代码:
1 #!/usr/bin/env python 2 import pika 3 4 # 创建凭证,使用rabbitmq用户密码登录 5 credentials = pika.PlainCredentials("dong", "1234") 6 # 新建连接,这里localhost可以更换为服务器ip 7 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.168.129', credentials=credentials)) 8 # 创建一个链接 9 channel = connection.channel() 10 # 声明一个队列,用于接收消息。durable=True是声明一个持久化的队列不需要可以不指定 11 channel.queue_declare(queue='test', durable=True) 12 # 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),初学可以使用空字符串交换(exchange=''),它允许我们精确的指定发送给哪个队列(routing_key=''),参数body值发送的数据 13 14 while 1: 15 16 message = input('请输入消息:') 17 if message.upper() == 'Q': 18 break 19 channel.basic_publish(exchange='', 20 routing_key='test', 21 body=message, 22 properties=pika.BasicProperties( 23 delivery_mode=2, # 代表消息是持久的,非持久消息无需指定properties 24 )) 25 print("消息已发出") 26 # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接 27 connection.close()
书写消费者python代码:
1 import pika 2 3 # 建立与rabbitmq的连接 4 credentials = pika.PlainCredentials("dong", "1234") 5 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.168.129', credentials=credentials)) 6 channel = connection.channel() 7 channel.queue_declare(queue="test",durable=True) # 队列不是持久化的,不需要durable参数,默认Flase 8 9 10 def callbak(channel, method, properties, body): 11 print("接收到了消息:%r" % body.decode("utf8")) 12 13 14 # 有消息来临,立即执行callbak,没有消息则夯住,等待消息 15 channel.basic_consume(queue="test", on_message_callback=callbak, auto_ack=True,) 16 # auto_ack回复 17 # 开始消费,接收消息 18 channel.start_consuming()
这样,一个RabbitMQ的生产者消费者,消息队列模型就完成了。。如果需要队列和消息是持久化的。。只需要指定生产者和消费者的:
channel.queue_declare(queue="test",durable=True)中durable为True
以及,生产者的:
properties=pika.BasicProperties(delivery_mode=2,) #这个参数