zoukankan      html  css  js  c++  java
  • Python之Rabbitmq的fanout模式

    欢迎关注【无量测试之道】公众号,回复【领取资源】,
    Python编程学习资源干货、
    Python+Appium框架APP的UI自动化、
    Python+Selenium框架Web的UI自动化、
    Python+Unittest框架API自动化、

    资源和代码 免费送啦~
    文章下方有公众号二维码,可直接微信扫一扫关注即可。

    1、什么是fanout模式?

    这种模式下,传递到 Exchange 的消息将会转发到所有与其绑定的 Queue 上。

    不需要指定 routing_key ,即使指定了也是无效。

    需要提前将 Exchange 和 Queue 绑定,一个 Exchange 可以绑定多个 Queue,一个Queue可以绑定多个Exchange。

    需要先启动订阅者,此模式下的队列是 Consumer 随机生成的,发布者仅仅发布消息到 Exchange ,由Exchange转发消息至Queue。

    2、代码逻辑

    producter_fanout.py文件内容如下:

    import json
    import pika
    import datetime
     
    #生成消息入口处
    def get_message():
        for i in range(10):
            message=json.dumps({'id': "10000%s" % i, "amount": 100 * i,"name":"tony","createtime":str(datetime.datetime.now())})
            producter_fanout(message)
     
     
    def producter_fanout(messages):
        # 获取与rabbitmq 服务的连接,虚拟队列需要指定参数 virtual_host,如果是默认的可以不填(默认为/),也可以自己创建一个
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=pika.PlainCredentials('guest', 'guest')))
        # 创建一个 AMQP 信道(Channel)
        channel = connection.channel()
        # 声明exchange名为tony_test的交换机,如不存在,则创建。type=fanout表示所有消息都可以送达到所有的queue中.durable = True 代表exchange持久化存储
        channel.exchange_declare(exchange='tony_test',exchange_type='fanout',durable=True)
        # 向exchange名为tony_test的交换机, routing_key 不需要配置,body是要处理的消息,delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化。
        print(messages)
        channel.basic_publish(exchange ='tony_test', routing_key='', body=messages,properties=pika.BasicProperties(delivery_mode=2))
        # 关闭与rabbitmq的连接
        connection.close()
    if __name__=="__main__":
        get_message()

    consumer_fanout.py文件内容如下:

    import pika
    import random
    #接收消息,并写入文件
    def write_file(message):
        with open("msg00.txt","a+") as f:
            print(message)
            f.write(message)
     
    def consumer_fanout():
        # 获取与rabbitmq 服务的连接
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=pika.PlainCredentials('guest', 'guest')))
        # 创建一个 AMQP 信道(Channel)
        channel = connection.channel()
        # 声明exchange名为tony_test的交换机,如不存在,则创建。type=fanout表示所有消息都可以送达到所有的queue中.durable = True 代表exchange持久化存储
        channel.exchange_declare(exchange='tony_test', exchange_type='fanout', durable=True)
        # 随机创建一个队列名称
        queuename="tester"+str(random.randrange(10,1000))
        result=channel.queue_declare(queue=queuename)
        # 将exchange 与queue 进行绑定
        channel.queue_bind(exchange='tony_test', queue=queuename)
        # 定义回调处理消息的函数
        def callback(ch, method, properties, body):
            ch.basic_ack(delivery_tag=method.delivery_tag)
            print(body.decode)
            write_file(body.decode())
        #告诉rabbitmq,用callback来接收并处理消息
        channel.basic_consume(result.method.queue,callback,False)
        # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
        channel.start_consuming()
     
    if __name__=="__main__":
        consumer_fanout()

    可以将consumer_fanout.py文件复制别名启动多个,然后再将producter_fanout.py里面的for 循环加大,可以看到Consumer在做消息的处理。

    3、Rabbitmq界面的Exchange & Queue的展示

     

    总结:fanout这种模式下,发送给Exchange的消息将会转发到所有与其绑定的Queue 上,确实是这样。有些Queue为0表示消息都被处理完成,有些没有处理是因为Queue在创建时成功但处理失败,程序重启后Queue就变了,所以消息会一直在那里存在,除非手动处理一下未处理消息的queue。

    备注:我的个人公众号已正式开通,致力于测试技术的分享,包含:大数据测试、功能测试,测试开发,API接口自动化、测试运维、UI自动化测试等,微信搜索公众号:“无量测试之道”,或扫描下方二维码:

     

     添加关注,一起共同成长吧。

  • 相关阅读:
    error: xslt-config not found. Please reinstall the libxslt >= 1.1.0 distribution
    configure: error: Cannot find OpenSSL's libraries
    PHP编译configure时常见错误
    解决PHP编译cURL的reinstall the libcurl问题
    Linux环境PHP7.0安装
    PHP 7的一些引人注目的新特性简单介绍
    (转).net webconfig使用IConfigurationSectionHandler自定section
    Asp.Net MVC 使用FileResult导出Excel数据文件
    ASP.NET MVC:通过 FileResult 向 浏览器 发送文件
    [老老实实学WCF] 第十篇 消息通信模式(下) 双工
  • 原文地址:https://www.cnblogs.com/Wu13241454771/p/13261482.html
Copyright © 2011-2022 走看看