zoukankan      html  css  js  c++  java
  • RabbitMQ 全套

    本博客代码运行环境

         ErLang:      ErLang_X64_22 version
         RabbitMQ:    RabbitMQ_Server_3.7.15 version
         python :     Python 3.7.1rc1   version
         pip :       pip 19.1.1  version
         pika :       pika 1.0.1 version
    

    什么是MQ

    消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
    其主要用途:不同进程Process/线程Thread之间通信。
    为什么会产生消息队列?有几个原因:
    
    不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
    
    不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;
    
    关于消息队列的详细介绍请参阅:
    《Java帝国之消息队列》
    《一个故事告诉你什么是消息队列》
    《到底什么时候该使用MQ》
    
    MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq。

    RabbitMQ

    RabbitMQ简介

        
        1、MQ 为 Message Queue, 消息队列是应用程序和应用程序之间的通信方法,
        2、RabbitMQ 是一个开源的, 在AMQP 基础上完整的,可复用的企业消息系统,消息中间件 , 消息队列
        3、支持主流的 OS, Linux, Windows, MacOX 等,
        4、多种开发语言支持, java、Python、Ruby、.NET、PHP、C/C++、node.js 等
        5、是专业级别的, 甩 python 的队列好几条街
        6、开发语言: Erlang----面向并发的编程语言----- 爱立信公司, 可以做到 热插拔, 局部加载, 不需要重启整个服务
    
     
       AMQP: 消息队列的一个协议。
    

    搭建RabbitMQ环境:windows下安装

    第一步 提供 Erlang 编程语言环境-----》安装 Erlang

    官网: http://www.erlang.org/download ,

    安装RabbitMQ

    官网: https://www.rabbitmq.com/install-windows.html

    安装完成: 打开 cmd 命令行工具, cd 到 RabbitMQ 的安装目录下的 sbin/ 子目录 中。 如图:

    1、启动管理工具插件: rabbitmq-plugins enable rabbitmq_management

    2、启动 RabbitMQ 服务:net start rabbitmq

    3、浏览器输入地址: http://127.0.0.1:15672/

        4、使用 默认账号管理: guest/ guest  , 能够登陆 ,说明安装成功

        4.1、 添加 admin 用户:

         4.2、用户角色: 

       1、超级管理员(administrator)
          可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
       2、监控者(monitoring)
          可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
       3、策略制定者(policymaker)
         可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
       4、普通管理者(management)
          仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
       5、其他
          无法登陆管理控制台,通常就是普通的生产者和消费者。
    

         4.3、创建Virtual Hosts

    virtual hosts

            选中Admin用户,设置权限:

    permissions

         看到权限已加

    look

    4.4、 管理界面中的功能: 

    vir

    shuo

       4.5、 管理工具中 查看队列消息:

        点击上面的队列名称,查询具体的队列中的信息:

    使用 rabbitMQ 命令 添加用户并设置权限的步骤:

       1. 创建用户:      rabbitmqctl add_user name password
       2. 设置用户角色:   rabbitmqctl set_user_tags name administrator
       3. 远程连接需设置用户权限, 代表允许从外面访问: 
          rabbitmqctl set_permissions -p / name ".*" ".*" ".*"
        解析:          set_permissions [-p vhost] {user} {conf} {write} {read}
    

       RabbitMQ 常用命令: 

     1 1、rabbitmq 管理器插件的启动和关闭:
     2     **启动监控管理器:rabbitmq-plugins enable rabbitmq_management
     3     关闭监控管理器:rabbitmq-plugins disable rabbitmq_management
     4 2、服务的启动与关闭:
     5      **启动rabbitmq:rabbitmq-service start
     6      关闭rabbitmq:rabbitmq-service stop
     7      **使用 windows 命令: net start rabbitmq
     8                                  net stop rabbitmq
     9 3、rabbitmq服务器的启动和关闭:
    10     前台启动: rabbitmq-server start
    11     后台启动: rabbitmq-server -detached
    12     前台停止:rabbitmqctl stop
    13     查看 RabbitMQ 的状态: rabbitmqctl status
    14 4、rabbitmq 应用管理: 
    15     关闭应用:rabbitmqctl stop_app
    16     启动应用:rabbitmqctl start_app
    17 5、用户管理: 
    18     **添加用户: rabbitmqctl add_user username password
    19     列出所有用户: rabbitmqctl list_users
    20     删除用户: rabbitmqctl delete_user username 
    21     **修改用户密码: rabbitmqctl change_password username 
    22  newpassword
    23 6、角色管理: 
    24     **分配角色:rabbitmqctl set_user_tags username administrator
    25 
    26      角色说明
    27              none 最小权限角色
    28              management 管理员角色
    29              policymaker 决策者
    30              monitoring 监控
    31              administrator 超级管理员
    32 7. 权限管理: 
    33    清除用户权限: rabbitmqctl clear_permissions -p vhostpath user
    34    **设置用户权限: rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
    35 8、虚拟主机管理:
    36     列出所有虚拟主机: rabbitmqctl list_vhosts
    37     创建虚拟主机:      rabbitmqctl list_vhost vhostpath
    38     删除虚拟主机:     rabbitmqctl delete_vhost vhostpath
    39     列出虚拟主机所有权限:rabbitmqctl list_permissions -p vhostpath
    40 9、队列管理:
    41    **查看所有的队列:rabbitmqctl list_queues
    42    清除所有的队列:rabbitmqctl reset
    43    查看所有绑定:    rabbitmqctl list_bindings
    44    查看所有通道:    rabbitmqctl list_channels
    45    查看所有连接:    rabbitmqctl list_connections
    46    列出所有消费者: rabbitmqctl list_consumers
    47    **列出所有交换机: rabbitmqctl list_exchanges

    Python 操作 RabbitMQ 之深入浅出

            此博客代码托管地址:  https://github.com/SuoSuo-Rocky/RabbitMQ-FullStack

    安装 rabbitMQ module

     
     pip install pika 
     or 
     easy_install pika
     or 
     源码 : https://pypi.python.org/pypi/pika 
    

    实现最简单的队列通信

    send端

     1 #  发送端, 消费者
     2 import pika
     3 
     4 credentials = pika.PlainCredentials('shiwei', 'shiwei666666')
     5 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
     6 
     7 # 在连接之上创建一个 rabbit 协议的通道
     8 channel = connection.channel()
     9 
    10 # 在通道中 声明 一个 queue
    11 channel.queue_declare(queue='hello')
    12 
    13 # 一个消息永远不能直接发送到队列,它总是需要经过一个交换
    14 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    15 channel.basic_publish(exchange='',  # 交换机
    16                       routing_key='hello',   # 路由键,写明将消息发往哪个队列,本例是将消息发往队列hello
    17                       body='Hello World!')   # 生产者要发送的消息 内容
    18 print(" [x] Sent 'Hello World!'")
    19 connection.close()  # 当生产者发送完消息后,可选择关闭连接

    receive端

    import pika
    import time
    credentials = pika.PlainCredentials('shiwei', 'shiwei666666')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
    channel = connection.channel()
    
    # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
    # We could avoid that if we were sure that the queue already exists. For example if send.py program
    # was run before. But we're not yet sure which program to run first. In such cases it's a good
    # practice to repeat declaring the queue in both programs.
    # 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
    channel.queue_declare(queue='hello')
    
    # 定义一个回调函数,用来接收生产者发送的消息
    def callback(ch, method, properties, body):
        print("received msg...start processing....",body)
        time.sleep(5)
        print(" [x] msg process done....",body)
    
    channel.basic_consume(on_message_callback=callback, # 定义一个回调函数,用来接收生产者发送的消息
                          auto_ack=True,
                          queue='hello',            # 指定取消息的队列名
                          )
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()      #开始循环取消息
    

    实现功能

       (1)rabbitmq循环调度公平分发,将消息循环发送给不同的消费者,如:消息1,3,5发送给消费者1;消息2,4,6发送给消费者2。
       (2)消息确认机制,为了确保一个消息不会丢失,RabbitMQ支持消息的确认 , 一个 ack(acknowlegement) 是从消费者端发送一个确认去告诉RabbitMQ 消息已经接收了、处理了,RabbitMQ可以释放并删除掉了。如果一个消费者死掉了(channel关闭、connection关闭、或者TCP连接断开了)而没有发送ack,RabbitMQ 就会认为这个消息没有被消费者处理,并会重新发送到生产者的队列里,如果同时有另外一个消费者在线,rabbitmq将会将消息很快转发到另外一个消费者中。 那样的话你就能确保虽然一个消费者死掉,但消息不会丢失。
         这个是没有超时的,当消费方(consumer)死掉后RabbitMQ会重新转发消息,即使处理这个消息需要很长很长时间也没有问题。消息的 acknowlegments 默认是打开的,在前面的例子中关闭了: auto_ack = True . 现在删除这个标识 然后 发送一个 acknowledgment。
    

    消息持久化

     消息持久化,将消息写入硬盘中。
           注意: (1)、RabbitMQ不允许你重新定义一个已经存在、但属性不同的queue, 否则报错
                (2)、标记消息为持久化并不能完全保证消息不会丢失,尽管已经告诉RabbitMQ将消息保存到磁盘,但 
           RabbitMQ接收到的消息在还没有保存的时候,仍然有一个短暂的时间窗口。RabbitMQ不会对每个消息都执行同步 - 
          -- 可能只是保存到缓存cache还没有写入到磁盘中。因此这个持久化保证并不是很强,但这比我们简单的任务queue要 
          好很多,如果想要很强的持久化保证,可以使用 publisher confirms。
         公平调度: 在一个消费者未处理完一个消息之前不要分发新的消息给它,而是将这个新消息分发给另一个不是很忙的消费 
                   者进行处理。为了解决这个问题我们可以在消费者代码中使用 channel.basic.qos (prefetch_count = 1 ),将消费者设置为公平调度
    

    生产者

      

    import pika
    import sys
    
    username = "shiwei"
    pwd = 'shiwei666666'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    # 创建连接
    conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
    chann = conn.channel()
    
    # 源码:
    """
    #     def queue_declare(self,            # channel.queueDeclare 用来创建队列,有5个参数:
    #                       queue,           # String queue, 队列名;
    #                       passive=False,   # 
    #                       durable=False,   # boolean durable, 该队列是否需要持久化
    #                       exclusive=False, # boolean exclusive,该队列是否为该通道独占的(其他通道是否可以消费该队列)
    #                       auto_delete=False, # boolean autoDelete,该队列不再使用的时候,是否让RabbitMQ服务器自动删除掉;
    #                       arguments=None)
    
    """
    chann.queue_declare(queue='test_tags', # 声明 队列, 不可与 已存在的 队列重名 , 否则 报错
                        durable=True,      # 设置队列 持久化 , 报 : ChannelClosedByBroker: 406 , 错误, passive:是屈服的意思,将passive设为True,问题解决。
                        # passive= True,
                        )
    message = "My name is shiwei"
    
    chann.basic_publish(exchange='',
                        routing_key='test_tags',                              # 表明 要将 消息 发送到哪个队列
                        body = message,
                        properties = pika.BasicProperties(delivery_mode = 2)  # 设置消息持久化, 将消息的属性设置为 2 ,表示消息持久化
                        )
    
    print('[Publisher] Send %s' % message)
    conn.close()
    

    消费者

    import pika
    import time
    
    username = "shiwei"
    pwd = 'shiwei666666'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    # 创建连接
    conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
    
    chann = conn.channel()
    
    chann.queue_declare(queue='test_tags',  # 声明 队列, 不可与 已存在的 队列重名 , 否则 报错
                        durable=True,       # 设置队列 持久化 ,
                        # passive=True,     # 是否检查当前队列 是否存在 , True 表示 当前声明队列 为 存在 的,
                        )
    # 定义 接受消息 的 回调函数
    def callback(ch,method, properties, body):
        print(" [消费者] Received %r" % body)
        time.sleep(3)
        print(" [消费者] Done")
        # 手动 确认  在接收到 消息后 给 rabbitmq 发送一个 确认 ACK, 返回 消息标识符
        ch.basic_ack(delivery_tag=method.delivery_tag)
    """
        def basic_consume(self,
                          queue,
                          on_message_callback,
                          auto_ack=False,
                          exclusive=False,
                          consumer_tag=None,
                          arguments=None):
    """
    chann.basic.qos (prefetch_count = 1 )
    # 注意 源码中的 位置参数的位置
    chann.basic_consume(queue='test_tags',
                        on_message_callback = callback,
                        # 是否 需要 自动 确认, 若为 False, 则需要在 消息回调函数中手动确认,
                        auto_ack = False,  # 默认是  False
                        )
    
    chann.start_consuming()  # 开始 循环 接受消息
    

    发布-订阅_广播:一对多-------交换机

     
    exchange:交换机。生产者不是将消息发送给队列,而是将消息发送给交换机,由交换机决定将消息发送给哪个队列。所以 
         exchange必须准确知道消息是要送到哪个队列,还是要被丢弃。因此要在exchange中给exchange定义规则,所有的规 
          则都是在exchange的类型中定义的。
    exchange有4个类型:
         fanout : 所有bind到此exchange的queue都可以接收消息
         direct :通过routingKey和exchange决定的那个唯一的queue可以接收消息
         topic  :所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
              表达式符号说明:#代表一个或多个字符,*代表任何字符
                  例:#.a会匹配a.a,aa.a,aaa.a等
                     *.a会匹配a.a,b.a,c.a等
                     注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 
        headers :通过headers 来决定把消息发给哪些queue
       之前,我们并没有讲过exchange,但是我们仍然可以将消息发送到队列中。这是因为我们用的是默认exchange.也就是 
            说之前写的:exchange='',空字符串表示默认的exchange。
    

    exchange_type=fanout

    广播类型,生产者将消息发送给所有消费者,如果某个消费者没有收到当前消息,就再也收不到了(消费者就像收音机)
               生产者:(可以用作日志收集系统)
               开启多个消费者后,会同时从生产者接收相同的消息

    fanout

    
    

    消息publisher

    import pika
    
    username = "shiwei"
    pwd = 'shiwei666666'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    # 创建连接
    conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
    
    #在连接上创建一个频道
    channel = conn.channel()
    
    #创建一个fanout(广播)类型的交换机exchange,名字为logs。
    channel.exchange_declare(exchange="logs", exchange_type="fanout")
    message =  "info: Hello World!"
    channel.basic_publish(exchange='logs',# 指定交换机exchange为logs,这里只需要指定将消息发给交换机logs就可以了,不需要指定队列,因为生产者消息是发送给交换机的。
                          routing_key='', # 在fanout类型中,绑定关键字routing_key必须忽略,写空即可
                          body=message)
    print(" [x] Sent %r" % message)
    conn.close()
    

    消息subscriber

    import pika
    import sys
    
    username = "shiwei"
    pwd = 'shiwei666666'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    # 创建连接
    conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
    
    #在连接上创建一个频道
    channel = conn.channel()
    
    channel.exchange_declare(exchange="logs", exchange_type="fanout")  # 参数名改变了,以前版本是 type
    
    result = channel.queue_declare(exclusive=True,  # 创建随机队列,exclusive=True(唯一性)当消费者与rabbitmq断开连接时,这个队列将自动删除。
                                   queue='',)
    
    queue_name = result.method.queue # 分配随机队列的名字。
    channel.queue_bind(exchange='logs',# 将交换机、队列绑定在一起,
                       queue=queue_name,)
    
    def callback(ch, method, properties, body):   #  定义回调函数,接收消息
        print(" [消费者] %r:%r" % (method.routing_key, body))
    
    channel.basic_consume(queue=queue_name,
                          on_message_callback = callback,
                          auto_ack=True) # 消费者接收消息后,不给rabbimq回执确认。
    
    channel.start_consuming() # 循环等待消息接收。
    

    exchange_type=direct

       RabbitMQ还支持根据关键字发送,无需声明队列,即:发布时给队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 
            关键字 判定应该将数据发送至指定队列。
            例如: 根据 日志级别,info, warning, error, success,本例即是。 
       注意: *****本例 需从命令行启动,给定参数-------》队列的绑定关键字
     

    direct

    消息publisher

    import pika
    import sys
    
    username = "shiwei"
    pwd = 'shiwei666666'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    # 创建连接
    conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
    
    #在连接上创建一个频道
    channel = conn.channel()
    
    #创建一个交换机并声明 direct 的类型为:关键字类型,表示该交换机会根据消息中不同的关键字将消息发送给不同的队列
    channel.exchange_declare(exchange="direct_logs", exchange_type="direct")
    
    severity = sys.argv[1] if len(sys.argv) > 1 else "info"
    
    message = ' '.join(sys.argv[2:]) or "Hello World!"
    
    channel.basic_publish(exchange='direct_logs', # 指明用于发布消息的交换机、关键字
                          routing_key=severity,   # 绑定关键字,即将message与关键字info绑定,明确将消息发送到哪个关键字的队列中。
                          body=message)
    print(" [x] Sent %r" % message)
    conn.close()

    消息subscriber

    import pika
    import sys
    
    username = "shiwei"
    pwd = 'shiwei666666'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    # 创建连接
    conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
    
    # 在连接上创建一个频道
    channel = conn.channel()
    
    channel.exchange_declare(exchange="direct_logs", exchange_type="direct")  # 参数 名改变了, 以前是 type
    
    result = channel.queue_declare(exclusive=True,  # 创建随机队列,当消费者与rabbitmq断开连接时,这个队列将自动删除。
                                   queue='',)
    queue_name = result.method.queue              # 分配随机队列的名字。
    
    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
    
    for severity in severities:                   # 循环 队列, 使其与交换机绑定在一起,
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity,)
    
    def callback(ch, method, properties, body):   # 定义回调函数,接收消息
        print(" [消费者] %r:%r" % (method.routing_key, body))
    
    channel.basic_consume(queue=queue_name,
                          on_message_callback = callback,
                          auto_ack=True,)         # 消费者接收消息后,不给rabbimq回执确认。
    
    channel.start_consuming()                     # 循环等待消息接收。
    

      

    exchange_type=topic---> 模糊匹配类型。比较常用

     
      发送到一个 topics交换机的消息,它的 routing_key不能是任意的 -- 它的routing_key必须是一个用小数点分割的 
          单词列表。 这个字符可以是任何单词,但是通常是一些指定意义的字符。比如: 
          “stock.usd.nyse","nyse.vmw","quick.orange.rabbit".  这里可以是你想要路由键的任意字符。最高限制 
           为255字节。
      生产者与消费者的routing_key必须在同一个表单中。 Topic交换的背后的逻辑类似直接交换(direct) -- 包含特定关 
          键字的消息将会分发到所有匹配的关键字队列中。然后有两个重要的特殊情况:
          绑定键值:
            > * (星)  可代替一个单词
            > # (井) 可代替0个或多个单词 
           注意: *****本例 需从命令行启动,给定参数-------》队列的绑定关键字
    

    topic

    消息publisher

    import pika
    import sys
    
    username = "shiwei"
    pwd = 'shiwei666666'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    # 创建连接
    conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
    
    channel = conn.channel()
    
    channel.exchange_declare(exchange='topic_logs',
                             exchange_type='topic')
    
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    conn.close()

    消息subscriber

    import pika
    import sys
    
    username = "shiwei"
    pwd = 'shiwei666666'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    # 创建连接
    conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
    
    channel = conn.channel()
    
    channel.exchange_declare(exchange='topic_logs',
                             exchange_type='topic')
    
    result = channel.queue_declare(exclusive=True,
                                   queue="",)
    queue_name = result.method.queue
    
    binding_keys = sys.argv[1:]
    if not binding_keys:
        sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
        sys.exit(1)
    
    for binding_key in binding_keys:
        channel.queue_bind(exchange='topic_logs',
                           queue=queue_name,
                           routing_key=binding_key)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    
    channel.basic_consume(on_message_callback = callback,
                          queue=queue_name,
                          auto_ack=True)
    
    channel.start_consuming()
    
    To receive all the logs run:
       python receive_logs_topic.py "#"
    To receive all logs from the facility "kern":
       python receive_logs_topic.py "kern.*"
    Or if you want to hear only about "critical" logs:
       python receive_logs_topic.py "*.critical"
    You can create multiple bindings:
       python receive_logs_topic.py "kern.*" "*.critical"
    And to emit a log with a routing key "kern.critical" type:
       python emit_log_topic.py "kern.critical" "A critical kernel error"
    

    exchange_type=topic----例 二 :

    生产者

     1 import pika
     2 import sys
     3 
     4 username = "shiwei"
     5 pwd = 'shiwei666666'
     6 user_pwd = pika.PlainCredentials(username, pwd)
     7 
     8 # 创建连接
     9 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
    10 
    11 channel = conn.channel()
    12 channel.exchange_declare(exchange='topic_logs',
    13                          exchange_type='topic')  # 创建模糊匹配类型的exchange。。
    14 
    15 routing_key = '[warn].kern' # 这里关键字必须为点号隔开的单词,以便于消费者进行匹配。引申:这里可以做一个判断,判断产生的日志是什么级别,然后产生对应的routing_key,使程序可以发送多种级别的日志
    16 message =  'Hello World!'
    17 channel.basic_publish(exchange='topic_logs',#将交换机、关键字、消息进行绑定
    18                       routing_key=routing_key,  # 绑定关键字,将队列变成[warn]日志的专属队列
    19                       body=message)
    20 print(" [x] Sent %r:%r" % (routing_key, message))
    21 conn.close()

    消费者

     1 import pika
     2 import sys
     3 
     4 username = "shiwei"
     5 pwd = 'shiwei666666'
     6 user_pwd = pika.PlainCredentials(username, pwd)
     7 
     8 # 创建连接
     9 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
    10 
    11 channel = conn.channel()
    12 
    13 channel.exchange_declare(exchange='topic_logs',
    14                          exchange_type='topic')  # 声明exchange的类型为模糊匹配。
    15 
    16 result = channel.queue_declare(exclusive=True,
    17                                queue="",)        # 创建随机一个队列当消费者退出的时候,该队列被删除。
    18 queue_name = result.method.queue                 # 创建一个随机队列名字。
    19 
    20 # 绑定键。‘#’匹配所有字符,‘*’匹配一个单词。这里列表中可以为一个或多个条件,能通过列表中字符匹配到的消息,消费者都可以取到
    21 binding_keys = ['[warn].*', 'info.*']
    22 if not binding_keys:
    23     sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
    24     sys.exit(1)
    25 
    26 # 通过循环绑定多个“交换机-队列-关键字”,只要消费者在rabbitmq中能匹配到与关键字相应的队列,就从那个队列里取消息
    27 for binding_key in binding_keys:
    28     channel.queue_bind(exchange='topic_logs',
    29                        queue=queue_name,
    30                        routing_key=binding_key)
    31 
    32 print(' [*] Waiting for logs. To exit press CTRL+C')
    33 
    34 
    35 def callback(ch, method, properties, body):
    36     print(" [x] %r:%r" % (method.routing_key, body))
    37 
    38 
    39 channel.basic_consume(on_message_callback=callback,
    40                       queue=queue_name,
    41                       auto_ack=True)       # 不给rabbitmq发送确认
    42 
    43 channel.start_consuming()                  # 循环接收消息

    远程过程调用(RPC)Remote procedure call

    消息属性
    AMQP协议在一个消息中预先定义了一个包含14个属性的集合。大部分属性很少用到,以下几种除外:
      > delivery_mode: 标记一个消息为持久的(值为2)或者 瞬时的(其它值), 你需要记住这个属性(在第二课时用到过)
      > content_type : 用来描述 MIME 类型的编码 ,比如我们经常使用的 JSON 编码,设置这个属性就非常好实现: application/json
      > reply_to:reply_to没有特别的意义,只是一个普通的变量名,只是它通常用来命名一个 callback 队列
      > correlation_id : 用来关联RPC的请求与应答。关联id的作用:当在一个队列中接收了一个返回,我们并不清楚这个结果时属于哪个请求的,这样当correlation_id属性使用后,我们为每个请求设置一个唯一值,这个值就是关联id。这样,请求会有一个关联id,该请求的返回结果也有一个相同的关联id。然后当我们从callback队列中接收到一个消息后,我们查看一下这个关联,基于这个我们就能将请求和返回进行匹配。如果我们看到一个未知的correlation_id值,我们可以直接丢弃这个消息 -- 它是不属于我们的请求。
    RPC执行过程:
        >  当客户端启动后,它创建一个匿名的唯一的回调队列
        > 对一个RPC请求, 客户端发送一个消息包含两个属性: reply_to (用来设置回调队列)和 correlation_id(用来为每个请求设置一个唯一标识)
        > 请求发送到 rpc_queue队列
        > RPC worker( 服务端) 在那个队列中等待请求,当一个请求出现后,服务端就执行一个job并将结果消息发送给客户端,使用reply_to字段中的队列
        > 客户端在callback 队列中等待数据, 当一个消息出现后,检查这个correlation_id属性,如果和请求中的值匹配将返回给应用

    rpc

    RPC Running Detail:
        >  当客户端启动后,它创建一个匿名的唯一的回调队列
        > 对一个RPC请求, 客户端发送一个消息包含两个属性: reply_to (用来设置回调队列)和 correlation_id(用来为每个请求设置一个唯一标识)
        > 请求发送到 rpc_queue队列
        > RPC worker( 服务端) 在那个队列中等待请求,当一个请求出现后,服务端就执行一个job并将结果消息发送给客户端,使用reply_to字段中的队列
        > 客户端在callback 队列中等待数据, 当一个消息出现后,检查这个correlation_id属性,如果和请求中的值匹配将返回给应用
    

    RPC Client

    import pika
    import uuid
    
    class FibonacciRpcClient(object):
        def __init__(self):
            username = "shiwei"
            pwd = 'shiwei666666'
            user_pwd = pika.PlainCredentials(username, pwd)
    
            # 创建连接
            self.conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
    
            self.channel = self.conn.channel()
    
            result = self.channel.queue_declare(exclusive=True, queue= '')  # 随机生成 一个 queue , 用与 Server 发送消息
            self.callback_queue = result.method.queue
    
            self.channel.basic_consume(on_message_callback = self.on_response, auto_ack = True,  # 准备 发送 消息
                                       queue=self.callback_queue)
    
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties(
                                                    reply_to=self.callback_queue,
                                                    correlation_id=self.corr_id,),
                                       body=str(n))
            while self.response is None:
                self.conn.process_data_events()
            return int(self.response)
    
    fibonacci_rpc = FibonacciRpcClient()
    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(7)
    print(" [.] Got %r" % response)
    Client Running Detail:
       > (11) 我们建立一个连接,通道并定义一个专门的’callback‘队列用来接收回复
       > (19) 我们订阅了“callback”队列,因此我们能够接收 RPC 的返回结果
       > (21) ’on_response'  在每个返回中执行的回调是一个简单的job, 对每个返回消息将检查correlation_id是否是我们需要查找的那个ID,如果是,将保存结果到 self.response 并终端consuming循环
       > (25) 下一步,我们定义我们的main方法 - 执行实际的RPC请求
       > (27) 在这方法中,首先我们生产一个唯一的 correlatin_id 号并保存 -- 'on_response"回调函数将用着号码来匹配发送和接收的消息值
       > (28) 下一步,发布请求信息,使用两个属性: reply_to 和 correlation_id
       > (34) 这一步我们可以坐等结果的返回
       > (36) 最后我们返回结果给用户
    

    RPC Server

    import pika
    
    username = "shiwei"
    pwd = 'shiwei666666'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    # 创建连接
    conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
    
    channel = conn.channel()
    
    channel.queue_declare(queue='rpc_queue')
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n - 1) + fib(n - 2)
    
    def on_request(ch, method, props, body):
        n = int(body)
        print(" [.] fib(%s)" % n)
        response = fib(n)
    
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id= props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue='rpc_queue', on_message_callback = on_request)
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
    
    Server Running Detail:
        >  当客户端启动后,它创建一个匿名的唯一的回调队列
        >  对一个RPC请求, 客户端发送一个消息包含两个属性: reply_to (用来设置回调队列)和 correlation_id(用来为每个请求设置一个唯一标识)
        >  请求发送到 rpc_queue队列
        >  RPC worker( 服务端) 在那个队列中等待请求,当一个请求出现后,服务端就执行一个job并将结果消息发送给客户端,使用reply_to字段中的队列
        >  客户端在callback 队列中等待数据, 当一个消息出现后,检查这个correlation_id属性,如果和请求中的值匹配将返回给应用

    RPC Demo02

    处理方法描述:发送端在发送信息前,产生一个接收消息的临时队列,该队列用来接收返回的结果。其实在这里接收端、发送端 
        的概念已经比较模糊了,因为发送端也同样要接收消息,接收端同样也要发送消息,所以这里笔者使用另外的示例来演示这一过程。
    示例内容:假设有一个控制中心和一个计算节点,控制中心会将一个自然数N发送给计算节点,计算节点将N值加1后,返回给控 
        制中心。这里用center.py模拟控制中心,compute.py模拟计算节点。
    

    Client

     1 import pika
     2 
     3 class Center(object):
     4     def __init__(self):
     5         username = "shiwei"
     6         pwd = 'shiwei666666'
     7         user_pwd = pika.PlainCredentials(username, pwd)
     8 
     9         # 创建连接
    10         self.conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
    11 
    12         self.channel = self.conn.channel()
    13         # 定义接收返回消息的队列
    14         result = self.channel.queue_declare(exclusive=True,queue="",)
    15         self.callback_queue = result.method.queue
    16 
    17         self.channel.basic_consume(on_message_callback=self.on_response,
    18                                    auto_ack=True,
    19                                    queue=self.callback_queue)
    20 
    21     # 定义接收到返回消息的处理方法
    22     def on_response(self, ch, method, props, body):
    23         self.response = body
    24 
    25     def request(self, n):
    26         self.response = None
    27         # 发送计算请求,并声明返回队列
    28         self.channel.basic_publish(exchange='',
    29                                    routing_key='compute_queue',
    30                                    properties=pika.BasicProperties(
    31                                        reply_to=self.callback_queue,
    32                                    ),
    33                                    body=str(n))
    34         # 接收返回的数据
    35         while self.response is None:
    36             self.conn.process_data_events()
    37         return int(self.response)
    38 
    39 center = Center()
    40 
    41 print(" [x] Requesting increase(30)")
    42 response = center.request(30)
    43 print(" [.] Got %r" % (response,))
    Client

    Server

     1 import pika
     2 
     3 username = "shiwei"
     4 pwd = 'shiwei666666'
     5 user_pwd = pika.PlainCredentials(username, pwd)
     6 # 创建连接
     7 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
     8 channel = conn.channel()
     9 
    10 print(' [*] Waiting for n')
    11 channel.queue_declare(queue='compute_queue')
    12 
    13 # 将n值加1
    14 def increase(n):
    15     return n + 1
    16 
    17 # 定义接收到消息的处理方法
    18 def request(ch, method, properties, body):
    19     print(" [.] increase(%s)" % (body,))
    20 
    21     response = increase(int(body))
    22 
    23     # 将计算结果发送回控制中心
    24     ch.basic_publish(exchange='',
    25                      routing_key=properties.reply_to,
    26                      body=str(response))
    27     ch.basic_ack(delivery_tag=method.delivery_tag)
    28 
    29 channel.basic_qos(prefetch_count=1)
    30 channel.basic_consume(on_message_callback=request,
    31                       queue='compute_queue')
    32 
    33 channel.start_consuming()
    Server

    参考文章:http://www.rabbitmq.com/tutorials/tutorial-six-python.html

     

  • 相关阅读:
    交互式监控工具glances
    性能测试工具Locust
    CentOS 7 安装 PostgreSQL 教程
    Vue表单
    Vue事件处理
    Vue列表渲染
    Vue条件渲染
    Vue中class与style绑定
    GIT命令操作
    Git简介
  • 原文地址:https://www.cnblogs.com/shiwei1930/p/11116250.html
Copyright © 2011-2022 走看看