zoukankan      html  css  js  c++  java
  • python3之rabbitMQ

    1、RabbitMQ介绍

      RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

    消息队列技术是分布式应用间交换信息的一种技术;消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走;通过消息队列,应用程序可独立地执行--它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。

    MQ主要作用是接受和转发消息。你可以想想在生活中的一种场景:当你把信件的投进邮筒,邮递员肯定最终会将信件送给收件人。我们可以把MQ比作 邮局和邮递员

    MQ和邮局的主要区别是,它不处理消息,但是,它会接受数据、存储消息数据、转发消息。

    2、安装RabbitMQ

    linux上安装:

    安装配置epel源

       $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 
    安装erlang
       $ yum -y install erlang
    安装RabbitMQ
       $ yum -y install rabbitmq-server

    windows上安装:

    (1)首先,您需要安装支持的 Windows 版Erlang运行Erlang Windows安装程序Erlang将出现在开始菜单中,设置erlang的环境变量(C:erl9.2in;),测试erlang是否安装正确:cmd-输入:erl,能看到eshell版本号,说明安装成功!

    (2)下载rabbitMQ:(安装下一步完成即可)

    https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.3/rabbitmq-server-windows-3.7.3.zip

    (3)安装RabbitMQ-Plugins,这个相当于是一个管理界面,方便我们在浏览器界面查看RabbitMQ各个消息队列以及交换的工作情况,安装方法是:打开命令行cd进入rabbitmq的sbin目录(我的目录是:C: software rabbitmq rabbitmq_server-3.6.5 sbin),输入:rabbitmq-plugins enable rabbitmq_management命令,稍等会会发现出现plugins安装成功的提示,默认是安装6个插件。

    插件安装完之后,在浏览器输入的http://本地主机:15672进行验证

    如果不能打开页面解决方法:首先在命令行输入:rabbitmq-service stop,接着输入rabbitmq-service remove,再接着输入rabbitmq-service install,接着输入rabbitmq-service start,最后重新输入rabbitmq-plugins enable rabbitmq_management试试,我是这样解决的。

    创建用户名,密码,绑定角色:

    RabbitMQ报错解决方法:

    C:RabbitMQ Server
    abbitmq_server-3.7.3sbin>rabbitmqctl.bat status
    Status of node rabbit@DESKTOP-6JT7D2H ...
    Error: unable to perform an operation on node 'rabbit@DESKTOP-6JT7D2H'. Please see diagnostics information and suggestions below.
    
    Most common reasons for this are:
    
     * Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues)
     * CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server)
     * Target node is not running
    
    In addition to the diagnostics info below:
    
     * See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more
     * Consult server logs on node rabbit@DESKTOP-6JT7D2H
    
    DIAGNOSTICS
    ===========
    
    attempted to contact: ['rabbit@DESKTOP-6JT7D2H']
    
    rabbit@DESKTOP-6JT7D2H:
      * connected to epmd (port 4369) on DESKTOP-6JT7D2H
      * epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traffic
      * TCP connection succeeded but Erlang distribution failed
    
      * Authentication failed (rejected by the remote node), please check the Erlang cookie
    
    
    Current node details:
     * node name: 'rabbitmqcli57@DESKTOP-6JT7D2H'
     * effective user's home directory: C:UsersAdministrator.DESKTOP-6JT7D2H
     * Erlang cookie hash: RmzKErjVZUcsMU8wSgBGbA==
    
    解决方法:
    将C:Users	racyclock.erlang.cookie 文件拷贝到C:WindowsSystem32configsystemprofile替换掉.erlang.cookie文件
    重启rabbitMQ服务:net stop RabbitMQ && net start RabbitMQ

    查看用户及用户角色:rabbitmqctl.bat list_users

    C:RabbitMQ Server
    abbitmq_server-3.7.3sbin>rabbitmqctl.bat list_users
    Listing users ...
    guest   [administrator]

    新增用户:rabbitmqctl.bat add_user username password

    C:RabbitMQ Server
    abbitmq_server-3.7.3sbin>rabbitmqctl.bat add_user admin 123456
    Adding user "admin" ...
    
    C:RabbitMQ Server
    abbitmq_server-3.7.3sbin>rabbitmqctl.bat list_users
    Listing users ...
    admin   []
    guest   [administrator]

    用户角色:

    rabbitmq用户角色可分为五类:超级管理员, 监控者, 策略制定者, 普通管理者以及其他。

    (1) 超级管理员(administrator)

    可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。

    (2) 监控者(monitoring)

    可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) 

    (3) 策略制定者(policymaker)

    可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。

    (4) 普通管理者(management)

    仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。

    (5) 其他的

    无法登陆管理控制台,通常就是普通的生产者和消费者。

    授权用户为超级管理员:rabbitmqctl.bat set_user_tags admin administrator

    C:RabbitMQ Server
    abbitmq_server-3.7.3sbin>rabbitmqctl.bat set_user_tags admin administrator
    Setting tags for user "admin" to [administrator] ...
    
    C:RabbitMQ Server
    abbitmq_server-3.7.3sbin>rabbitmqctl.bat list_users
    Listing users ...
    admin   [administrator]
    guest   [administrator]

    用户可以设置多个角色:rabbitmqctl.bat set_user_tags username tag1 tag2 ...

    修改用户密码:rabbitmqctl change_password userName newPassword

    C:RabbitMQ Server
    abbitmq_server-3.7.3sbin>rabbitmqctl.bat change_password admin 888888
    Changing password for user "admin" ...

    删掉用户:rabbitmqctl.bat delete_user username

    C:RabbitMQ Server
    abbitmq_server-3.7.3sbin>rabbitmqctl.bat delete_user guest
    Deleting user "guest" ...
    
    C:RabbitMQ Server
    abbitmq_server-3.7.3sbin>rabbitmqctl.bat list_users
    Listing users ...
    admin   [administrator]

    权限相关命令为:

    (1) 设置用户权限

    rabbitmqctl  set_permissions  -p  VHostPath  User  ConfP  WriteP  ReadP

    (2) 查看(指定hostpath)所有用户的权限信息

    rabbitmqctl  list_permissions  [-p  VHostPath]

    (3) 查看指定用户的权限信息

    rabbitmqctl  list_user_permissions  User

    (4)  清除用户的权限信息

    rabbitmqctl  clear_permissions  [-p VHostPath]  User

    3、python3使用pika python客户端

    发出消息(生产者):

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/2/22 15:51
    # @Author  : Py.qi
    # @File    : rabbitMQ_send_1.py
    # @Software: PyCharm
    
    import pika,sys
    print('send....start....')
    while True:
        inputso=input('soinsideto:')
        if inputso == 'quit':
            break
    #与RabbitMQ服务器建立链接
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    #建立隧道
        channel = connection.channel()
    #创建队列名称zhang
        channel.queue_declare(queue='zhang')
    #发送信息:exchange指定交换,routing_key指定队列名,body指定消息内容    
      channel.basic_publish(exchange='',routing_key='zhang',body=inputso)
    #关闭链接 connection.close()

    接收者(消费者):

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/2/22 15:57
    # @Author  : Py.qi
    # @File    : rabbitMQ_rescv_1.py
    # @Software: PyCharm
    
    import pika
    #创建链接
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    #建立隧道
    channel = connection.channel()
    #创建队列名,此处也可省略,在找不到队列时创建
    channel.queue_declare(queue='zhang')
    #使用回调函数callback来接收消息并打印消息
    def callback(ch,method,properties,body):
        print('recived:',body)
    #指定队列接收消息,callback接收消息,queue指定队列,no_ack不给发送者发送确认消息
    channel.basic_consume(callback,queue='zhang',no_ack=True)
    print('waiting for message,to exit press ctrl+c')
    #持续接收消息,阻塞
    channel.start_consuming()

     默认情况下,RabbitMQ会按顺序将每条消息发送到下一个使用者,每个消费者按顺序获得同样数量的消息,这种分配方式称为循环法。

    (1)消息持久化:

    当RabbitMQ退出或者崩溃时,它会忘记队列和消息,需要做两件事来确保消息不会丢失:我们需要将队列和消息标记为持久化。

    在创建队列时指定durable为True来标记队列持久化:

    channel.queue_declare(queue='hehe',durable=True)

    将消息标记为持久化,通过提供值为2的delivery_mode属性

    channel.basic_publish(exchange='',
                              routing_key='hehe',
                              body=inputso,
                              properties=pika.BasicProperties(
                                  delivery_mode=2,  # make message persistent
                              ))

    (2)消息公平分发

    如果RabbitMQ只管按顺序把消息发送到每个消费者上,不考虑消费者的负载,很可能出现一个机器配置不高的消费者那里堆积很多消息处理不完,同时配置高的机器却很轻松。为了解决这个问题,可以在每个消费者端,配置perfetch=1,意识就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再发新消息了。

    为了解决这个问题,我们可以使用basic.qos方法和 prefetch_count = 1设置。这告诉RabbitMQ一次不要向工作人员发送多于一条消息。或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。相反,它会将其分派给不是仍然忙碌的下一个工作人员。

    channel.basic_qos(prefetch_count = 1)

    完整代码:

    生产者:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/2/22 15:51
    # @Author  : Py.qi
    # @File    : rabbitMQ_send_1.py
    # @Software: PyCharm
    
    import pika
    print('send....start....')
    while True:
        inputso=input('soinsideto:')
        if inputso == 'quit':
            break
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
        channel = connection.channel()
        channel.queue_declare(queue='hehe',durable=True)
        channel.basic_publish(exchange='',
                              routing_key='hehe',
                              body=inputso,
                              properties=pika.BasicProperties(
                                  delivery_mode=2,  # make message persistent
                              ))
        connection.close()

    消费者:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/2/22 15:57
    # @Author  : Py.qi
    # @File    : rabbitMQ_rescv_1.py
    # @Software: PyCharm
    
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
    channel = connection.channel()
    channel.queue_declare(queue='hehe',durable=True)
    def callback(ch,method,properties,body):
        print('recived:',body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    channel.basic_qos(prefetch_count=1) #确认发送消息个数
    channel.basic_consume(callback,queue='hehe',no_ack=True)
    print('waiting for message,to exit press ctrl+c')
    channel.start_consuming()

    (3)消息发布与订阅

    发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

    发布消息类似于广播效果,需要用到exchange,在定义exchange时指定类型来决定哪些queue符合条件,可以接收消息:

    有几种可用的交换类型:direct, topic, headers 和fanout。我们将关注最后一个fanout。我们创建该类型的交换,并将其称为logs

    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')

    fanout:所有bind到此exchange的queue都可以接收到消息

    direct:通过routingKey和exchange决定的哪个唯一的queue可以接收消息

    topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。

      表达式符号:#代表一个或多个字符,*代表任何字符

    fanout交换,它只是将收到的所有消息广播到它所知道的所有队列中。

    要列出服务器上的交换,可以使用命令rabbitmqctl:

    sudo rabbitmqctl list_exchanges

    广播消息,fanout实例:

    生产者:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/2/24 17:05
    # @Author  : Py.qi
    # @File    : rabbit_send_fanout.py
    # @Software: PyCharm
    import pika
    import sys
    connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
    channel=connection.channel()
    #指定交换类型
    channel.exchange_declare(exchange='logs',exchange_type='fanout')
    #message = ' '.join(sys.argv[1:]) or 'info:hello world!'
    for i in range(10):
        channel.basic_publish(exchange='logs',routing_key='',body=str(i))
        print('[x]sent %r'%i)
    connection.close()

    消费者:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/2/24 17:12
    # @Author  : Py.qi
    # @File    : rabbit_recv_fanout.py
    # @Software: PyCharm
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='192.168.146.129'))
    channel = connection.channel()
    #指定交换类型
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    #让服务器随机生成队列名,一旦消费者链接关闭,队列将被删除
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    #将队列绑定到交换上
    channel.queue_bind(exchange='logs',
                       queue=queue_name)
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    channel.start_consuming()

    接收消息时指定队列, exchange type = direct

    RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据关键字判断应该将数据发送至指定队列。

    生产者:发布消息

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/2/26 9:25
    # @Author  : Py.qi
    # @File    : rabbit_send_direct.py
    # @Software: PyCharm
    
    import pika
    import sys
    
    conn = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
    channel = conn.channel()
    #绑定exchange类型为指定接收
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
    #判断参数个数
    severity=sys.argv[1] if len(sys.argv) > 1 else 'info'
    #发送消息
    message = ' '.join(sys.argv[2:]) or 'hello world'
    #消息发送到exchange交换队列
    channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
    print('[x]sent %r:%r'%(severity,message))
    conn.close()

    消费者:接收消息

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/2/26 9:54
    # @Author  : Py.qi
    # @File    : rabbit_recver_direct.py
    # @Software: PyCharm
    
    import pika,sys
    conn = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
    channel = conn.channel()
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
    #随机队列名,结束即删除
    result = channel.queue_declare(exclusive=True)
    queue_name=result.method.queue
    #接收队列消息关键参数
    severities=sys.argv[1:]
    if not severities:
        sys.stderr.write('Usage:%s[info][warning][error]
    '%sys.argv[0])
        sys.exit(1)
    #循环将消息发送到有关键字队列中
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)
    print('[*]waiting for logs,To exit press CTRL+C')
    def callback(ch,method,properites,body):
        print('[x]%r:%r'%(method.routing_key,body))
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    channel.start_consuming()

    模糊匹配:

    exchange type = topic

    在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

    #:表示可以匹配0个或多个单词

    *:表示只能匹配一个单词

    生产者:发送消息

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/2/26 10:42
    # @Author  : Py.qi
    # @File    : rabbit_send_topic.py
    # @Software: PyCharm
    
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='192.168.146.129'))
    channel = connection.channel()
    #指定exchange为模糊匹配topic
    channel.exchange_declare(exchange='topic_logs',
                             exchange_type='topic')
    
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()

    消费者:接收消息

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/2/26 10:43
    # @Author  : Py.qi
    # @File    : rabbit_recve_topic.py
    # @Software: PyCharm
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='192.168.146.129'))
    channel = connection.channel()
    #exchange模式topic
    channel.exchange_declare(exchange='topic_logs',
                             exchange_type='topic')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    binding_keys = sys.argv[1:]
    if not binding_keys:
        sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
        sys.exit(1)
    
    for binding_key in binding_keys:
        channel.queue_bind(exchange='topic_logs',
                           queue=queue_name,
                           routing_key=binding_key)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()

  • 相关阅读:
    Springboot构建问题集
    常用算法解析技巧总结
    Linux、Docker安装Nginx
    MySQL查询语句报错 sql_mode=only_full_group_by 问题
    MySQL按周统计 WEEK 实例
    IDEA注册码分享
    Mock测试接口
    Maven常用命令
    js中的for循环,循环次数会多出一次。当循环到最后一个的时候,循环还会继续,并且此时i就变成remove?
    vue .sync的理解
  • 原文地址:https://www.cnblogs.com/zhangxinqi/p/8460768.html
Copyright © 2011-2022 走看看