1、RabbitMQ介绍
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
消息队列技术是分布式应用间交换信息的一种技术;消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走;通过消息队列,应用程序可独立地执行--它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。
MQ主要作用是接受和转发消息。你可以想想在生活中的一种场景:当你把信件的投进邮筒,邮递员肯定最终会将信件送给收件人。我们可以把MQ比作 邮局和邮递员。
MQ和邮局的主要区别是,它不处理消息,但是,它会接受数据、存储消息数据、转发消息。
2、安装RabbitMQ
linux上安装:
安装配置epel源
$ rpm
-
ivh http:
/
/
dl.fedoraproject.org
/
pub
/
epel
/
6
/
i386
/
epel
-
release
-
6
-
8.noarch
.rpm
安装erlang
$ yum
-
y install erlang
安装RabbitMQ
$ yum
-
y install rabbitmq
-
server
windows上安装:
(1)首先,您需要安装支持的 Windows 版Erlang。运行Erlang Windows安装程序。Erlang将出现在开始菜单中,设置erlang的环境变量(C:erl9.2in;),测试erlang是否安装正确:cmd-输入:erl,能看到eshell版本号,说明安装成功!
(2)下载rabbitMQ:(安装下一步完成即可)
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.3/rabbitmq-server-windows-3.7.3.zip
(3)安装RabbitMQ-Plugins,这个相当于是一个管理界面,方便我们在浏览器界面查看RabbitMQ各个消息队列以及交换的工作情况,安装方法是:打开命令行cd进入rabbitmq的sbin目录(我的目录是:C: software rabbitmq rabbitmq_server-3.6.5 sbin),输入:rabbitmq-plugins enable rabbitmq_management命令,稍等会会发现出现plugins安装成功的提示,默认是安装6个插件。
插件安装完之后,在浏览器输入的http://本地主机:15672进行验证
如果不能打开页面解决方法:首先在命令行输入:rabbitmq-service stop,接着输入rabbitmq-service remove,再接着输入rabbitmq-service install,接着输入rabbitmq-service start,最后重新输入rabbitmq-plugins enable rabbitmq_management试试,我是这样解决的。
创建用户名,密码,绑定角色:
RabbitMQ报错解决方法:
C:RabbitMQ Server abbitmq_server-3.7.3sbin>rabbitmqctl.bat status Status of node rabbit@DESKTOP-6JT7D2H ... Error: unable to perform an operation on node 'rabbit@DESKTOP-6JT7D2H'. Please see diagnostics information and suggestions below. Most common reasons for this are: * Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues) * CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server) * Target node is not running In addition to the diagnostics info below: * See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more * Consult server logs on node rabbit@DESKTOP-6JT7D2H DIAGNOSTICS =========== attempted to contact: ['rabbit@DESKTOP-6JT7D2H'] rabbit@DESKTOP-6JT7D2H: * connected to epmd (port 4369) on DESKTOP-6JT7D2H * epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traffic * TCP connection succeeded but Erlang distribution failed * Authentication failed (rejected by the remote node), please check the Erlang cookie Current node details: * node name: 'rabbitmqcli57@DESKTOP-6JT7D2H' * effective user's home directory: C:UsersAdministrator.DESKTOP-6JT7D2H * Erlang cookie hash: RmzKErjVZUcsMU8wSgBGbA== 解决方法: 将C:Users racyclock.erlang.cookie 文件拷贝到C:WindowsSystem32configsystemprofile替换掉.erlang.cookie文件 重启rabbitMQ服务:net stop RabbitMQ && net start RabbitMQ
查看用户及用户角色:rabbitmqctl.bat list_users
C:RabbitMQ Server abbitmq_server-3.7.3sbin>rabbitmqctl.bat list_users Listing users ... guest [administrator]
新增用户:rabbitmqctl.bat add_user username password
C:RabbitMQ Server abbitmq_server-3.7.3sbin>rabbitmqctl.bat add_user admin 123456 Adding user "admin" ... C:RabbitMQ Server abbitmq_server-3.7.3sbin>rabbitmqctl.bat list_users Listing users ... admin [] guest [administrator]
用户角色:
rabbitmq用户角色可分为五类:超级管理员, 监控者, 策略制定者, 普通管理者以及其他。
(1) 超级管理员(administrator)
可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。
(2) 监控者(monitoring)
可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
(3) 策略制定者(policymaker)
可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。
(4) 普通管理者(management)
仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。
(5) 其他的
无法登陆管理控制台,通常就是普通的生产者和消费者。
授权用户为超级管理员:rabbitmqctl.bat set_user_tags admin administrator
C:RabbitMQ Server abbitmq_server-3.7.3sbin>rabbitmqctl.bat set_user_tags admin administrator Setting tags for user "admin" to [administrator] ... C:RabbitMQ Server abbitmq_server-3.7.3sbin>rabbitmqctl.bat list_users Listing users ... admin [administrator] guest [administrator]
用户可以设置多个角色:rabbitmqctl.bat set_user_tags username tag1 tag2 ...
修改用户密码:rabbitmqctl change_password userName newPassword
C:RabbitMQ Server abbitmq_server-3.7.3sbin>rabbitmqctl.bat change_password admin 888888 Changing password for user "admin" ...
删掉用户:rabbitmqctl.bat delete_user username
C:RabbitMQ Server abbitmq_server-3.7.3sbin>rabbitmqctl.bat delete_user guest Deleting user "guest" ... C:RabbitMQ Server abbitmq_server-3.7.3sbin>rabbitmqctl.bat list_users Listing users ... admin [administrator]
权限相关命令为:
(1) 设置用户权限
rabbitmqctl set_permissions -p VHostPath User ConfP WriteP ReadP
(2) 查看(指定hostpath)所有用户的权限信息
rabbitmqctl list_permissions [-p VHostPath]
(3) 查看指定用户的权限信息
rabbitmqctl list_user_permissions User
(4) 清除用户的权限信息
rabbitmqctl clear_permissions [-p VHostPath] User
3、python3使用pika python客户端
发出消息(生产者):
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/22 15:51 # @Author : Py.qi # @File : rabbitMQ_send_1.py # @Software: PyCharm import pika,sys print('send....start....') while True: inputso=input('soinsideto:') if inputso == 'quit': break #与RabbitMQ服务器建立链接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) #建立隧道 channel = connection.channel() #创建队列名称zhang channel.queue_declare(queue='zhang') #发送信息:exchange指定交换,routing_key指定队列名,body指定消息内容
channel.basic_publish(exchange='',routing_key='zhang',body=inputso) #关闭链接 connection.close()
接收者(消费者):
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/22 15:57 # @Author : Py.qi # @File : rabbitMQ_rescv_1.py # @Software: PyCharm import pika #创建链接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) #建立隧道 channel = connection.channel() #创建队列名,此处也可省略,在找不到队列时创建 channel.queue_declare(queue='zhang') #使用回调函数callback来接收消息并打印消息 def callback(ch,method,properties,body): print('recived:',body) #指定队列接收消息,callback接收消息,queue指定队列,no_ack不给发送者发送确认消息 channel.basic_consume(callback,queue='zhang',no_ack=True) print('waiting for message,to exit press ctrl+c') #持续接收消息,阻塞 channel.start_consuming()
默认情况下,RabbitMQ会按顺序将每条消息发送到下一个使用者,每个消费者按顺序获得同样数量的消息,这种分配方式称为循环法。
(1)消息持久化:
当RabbitMQ退出或者崩溃时,它会忘记队列和消息,需要做两件事来确保消息不会丢失:我们需要将队列和消息标记为持久化。
在创建队列时指定durable为True来标记队列持久化:
channel.queue_declare(queue='hehe',durable=True)
将消息标记为持久化,通过提供值为2的delivery_mode属性
channel.basic_publish(exchange='', routing_key='hehe', body=inputso, properties=pika.BasicProperties( delivery_mode=2, # make message persistent ))
(2)消息公平分发
如果RabbitMQ只管按顺序把消息发送到每个消费者上,不考虑消费者的负载,很可能出现一个机器配置不高的消费者那里堆积很多消息处理不完,同时配置高的机器却很轻松。为了解决这个问题,可以在每个消费者端,配置perfetch=1,意识就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再发新消息了。
为了解决这个问题,我们可以使用basic.qos方法和 prefetch_count = 1设置。这告诉RabbitMQ一次不要向工作人员发送多于一条消息。或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。相反,它会将其分派给不是仍然忙碌的下一个工作人员。
channel.basic_qos(prefetch_count = 1)
完整代码:
生产者:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/22 15:51 # @Author : Py.qi # @File : rabbitMQ_send_1.py # @Software: PyCharm import pika print('send....start....') while True: inputso=input('soinsideto:') if inputso == 'quit': break connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129')) channel = connection.channel() channel.queue_declare(queue='hehe',durable=True) channel.basic_publish(exchange='', routing_key='hehe', body=inputso, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) connection.close()
消费者:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/22 15:57 # @Author : Py.qi # @File : rabbitMQ_rescv_1.py # @Software: PyCharm import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129')) channel = connection.channel() channel.queue_declare(queue='hehe',durable=True) def callback(ch,method,properties,body): print('recived:',body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) #确认发送消息个数 channel.basic_consume(callback,queue='hehe',no_ack=True) print('waiting for message,to exit press ctrl+c') channel.start_consuming()
(3)消息发布与订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
发布消息类似于广播效果,需要用到exchange,在定义exchange时指定类型来决定哪些queue符合条件,可以接收消息:
有几种可用的交换类型:direct, topic, headers 和fanout。我们将关注最后一个fanout。我们创建该类型的交换,并将其称为logs:
channel.exchange_declare(exchange='logs', exchange_type='fanout')
fanout:所有bind到此exchange的queue都可以接收到消息
direct:通过routingKey和exchange决定的哪个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。
表达式符号:#代表一个或多个字符,*代表任何字符
fanout交换,它只是将收到的所有消息广播到它所知道的所有队列中。
要列出服务器上的交换,可以使用命令rabbitmqctl:
sudo rabbitmqctl list_exchanges
广播消息,fanout实例:
生产者:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/24 17:05 # @Author : Py.qi # @File : rabbit_send_fanout.py # @Software: PyCharm import pika import sys connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129')) channel=connection.channel() #指定交换类型 channel.exchange_declare(exchange='logs',exchange_type='fanout') #message = ' '.join(sys.argv[1:]) or 'info:hello world!' for i in range(10): channel.basic_publish(exchange='logs',routing_key='',body=str(i)) print('[x]sent %r'%i) connection.close()
消费者:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/24 17:12 # @Author : Py.qi # @File : rabbit_recv_fanout.py # @Software: PyCharm import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.146.129')) channel = connection.channel() #指定交换类型 channel.exchange_declare(exchange='logs', exchange_type='fanout') #让服务器随机生成队列名,一旦消费者链接关闭,队列将被删除 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue #将队列绑定到交换上 channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
接收消息时指定队列, exchange type = direct
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据关键字判断应该将数据发送至指定队列。
生产者:发布消息
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/26 9:25 # @Author : Py.qi # @File : rabbit_send_direct.py # @Software: PyCharm import pika import sys conn = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129')) channel = conn.channel() #绑定exchange类型为指定接收 channel.exchange_declare(exchange='direct_logs',exchange_type='direct') #判断参数个数 severity=sys.argv[1] if len(sys.argv) > 1 else 'info' #发送消息 message = ' '.join(sys.argv[2:]) or 'hello world' #消息发送到exchange交换队列 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message) print('[x]sent %r:%r'%(severity,message)) conn.close()
消费者:接收消息
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/26 9:54 # @Author : Py.qi # @File : rabbit_recver_direct.py # @Software: PyCharm import pika,sys conn = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129')) channel = conn.channel() channel.exchange_declare(exchange='direct_logs',exchange_type='direct') #随机队列名,结束即删除 result = channel.queue_declare(exclusive=True) queue_name=result.method.queue #接收队列消息关键参数 severities=sys.argv[1:] if not severities: sys.stderr.write('Usage:%s[info][warning][error] '%sys.argv[0]) sys.exit(1) #循环将消息发送到有关键字队列中 for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print('[*]waiting for logs,To exit press CTRL+C') def callback(ch,method,properites,body): print('[x]%r:%r'%(method.routing_key,body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
模糊匹配:
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
#:表示可以匹配0个或多个单词
*:表示只能匹配一个单词
生产者:发送消息
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/26 10:42 # @Author : Py.qi # @File : rabbit_send_topic.py # @Software: PyCharm import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.146.129')) channel = connection.channel() #指定exchange为模糊匹配topic channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
消费者:接收消息
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/26 10:43 # @Author : Py.qi # @File : rabbit_recve_topic.py # @Software: PyCharm import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.146.129')) channel = connection.channel() #exchange模式topic channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]... " % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()