zoukankan      html  css  js  c++  java
  • [rabbitmq] python版本(三) 发布/订阅

    之前提到:

    • 发布者producer:发布信息的应用程序
    • 队列queue:用于消息存储的缓冲
    • 消费者consumer:接收消息的应用程序

    rabbitmq消息模型核心理念:发布者不会直接发送任何消息给队列。事实上,发布者甚至不知道消息是否已经被投递到队列。

    发布者只需要把消息发送给一个交换机exchange。交换机一边从发布者方接受消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到制定的队列还是多个队列,或者是直接忽略消息。这些规则就是通过交换机类型exchange type来定义的

    几种可选的交换机类型:

    • 直连交换机direct
    • 主题交换机topic
    • 头交换机headers
    • 扇形交换机fanout:它把消息发送给它所知道的所有队列

    交换机列表
    rabbitmqctl能够列出服务器上所有的交换器:

    $ sudo rabbitmqctl list_exchanges
    Listing exchanges ...
    logs      fanout
    amq.direct      direct
    amq.topic       topic
    amq.fanout      fanout
    amq.headers     headers
    ...done.
    

    这个列表中有一些叫做amq.*的交换器。这些都是默认创建的,不过这时候你还不需要使用他们。

    匿名的交换器
    前面的教程中我们对交换机一无所知,但仍然能够发送消息到队列中。因为我们使用了命名为空字符串("")默认的交换机。

    回想我们之前是如何发布一则消息:

    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=message)
    

    exchange参数就是交换机的名称。空字符串代表默认或者匿名交换机:消息将会根据指定的routing_key分发到指定的队列。

    临时队列

    1.连接上rabbitmq时,需要一个全新的、空的队列。可以使用的方案:

    • 手动创建一个随机的队列
    • 服务器为我们选择一个随机的队列名(推荐),调用queue_declare方法的时候,不提供queue参数就可以了
    result = channel.queue_declare()
    

    可以通过该result.method.queue获得已经生成的随机队列名。它可能是这样子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。
    2.与消费者consumer断开连接的时候,这个队列应当被立即删除。exclusive标识符即可达到此目的

    result = channel.queue_declare(exclusive = True)
    

    绑定Bindings


    已经创建了一个扇型交换机(fanout)和一个队列。现在我们需要告诉交换机如何发送消息给我们的队列。交换器和队列之间的联系我们称之为绑定(binding)。

    channel.queue_bind(exchange='logs', queue=result.method.queue)
    

    现在logs交换机会把消息添加到我们得队列中

    板顶binding列表
    可以使用rabbitmqctl list_bindings列出现存的绑定

    代码整合

    发布日志消息的程序看起来和之前的没有太大区别。最重要的改变就是我们把消息发送给logs交换机而不是匿名交换机。在发送的时候我们需要提供routing_key参数,但是它的值会被扇型交换机(fanout exchange)忽略。
    emit_log.py

    #!/usr/bin/env python
    import pika
    import sys
    
    #连接
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    #声明交换机名称logs和类型fanout
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    
    #命令行参数作为输入或者默认作为信息
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    #routing_key分发到制定队列
    channel.basic_publish(exchange='logs', routing_key='', body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    

    连接成功后,声明了一个交换器,这点很重要,因为不允许发布消息到不存在的交换器
    如果没有绑定队列到交换器,消息将会丢失。但这个无所谓, 如果没有消费者监听,消息就会被忽略
    receive_logs.py

    #!/usr/bin/env python
    import pika
    
    #连接
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    #交换机为logs 交换机类型为fanout-->"群发" 发送给它知道的所有队列
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    #服务器为我们选择一个随机的队列名;exclusive设定为True表示当与consumer断开连接时,这个队列应当被立即删除
    result = channel.queue_declare(queue='', exclusive=True)
    #获取已经生成的随机队列名
    queue_name = result.method.queue
    
    #交换机与队列之间绑定
    channel.queue_bind(exchange='logs', queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    channel.start_consuming()
    

    如果保存日志到文件,控制台重定向输出到一个log文件就好

    python receive_logs.py > logs_from_rabbit.log
    

    如果是在cmd中查看,在另外一个terminal中运行

    python receive_logs.py
    

    发送日志

    python emit_log.py
    

    使用rabbitmqctl list_bindings可确认已经创建的队列绑定。可以看到运行中的两个receive_logs.py程序

    $ sudo rabbitmqctl list_bindings -->windows去掉sudo
    Listing bindings ...
     ...
    logs    amq.gen-TJWkez28YpImbWdRKMa8sg==                []
    logs    amq.gen-x0kymA4yPzAT6BoC/YP+zw==                []
    ...done.
    

    显示结果很直观:logs交换器把数据发送给两个系统命名的队列。这就是我们所期望的。

    日积月累,水滴石穿
  • 相关阅读:
    appium---纯web app测试
    appium---元素定位工具
    appium---[ADB] Killing adb server on port 5037报错
    pytest---自定义用例识别规则
    pytest---用例执行顺序
    解决Could not find function xmlCheckVersion in library libxml2问题
    pytest---测试框架初探
    layoutSubviews何时被调用
    'addTimeInterval:' is deprecated: first deprecated in iOS 4.0
    iOS7 表格separatorInset的处理
  • 原文地址:https://www.cnblogs.com/lonelyisland/p/12752289.html
Copyright © 2011-2022 走看看