zoukankan      html  css  js  c++  java
  • Python Rabbit 广播模式

    Exchange
      在RabbitMQ下进行广播模式需要用到,exchange这个参数,它会把发送的消息推送到queues队列中,exchange必须要知道,它接下来收到的消息要分给谁,是要发给一个queue还是发给多个queue,还是要删除,这些动作都取决于exchange的传入参数。
      Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息。
      Exchange:在RabbitMQ中相当于中间件负责转发消息。
     
    注:如上图生产端到消费端,是通过exchange转发到队列内的,消费端在队列中取的数据,并不是直接在exchange到消费端。
     
     
    • fanout: 所有bind到此exchange的queue都可以接收消息
    •               订阅发布:fanout 广播消息只能发给以存活的消费端,实时发送,并不能存储数据。

     

    • direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
    •         direct广播 可指定级别接收端进行广播。

    • topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
    •           可同过发出去的多个消息设置 多个级别,消息结尾加入级别。

    表达式符号说明:#代表一个或多个字符,*代表任何字符
    #.a会匹配a.a,aa.a,aaa.a等
    *.a会匹配a.a,b.a,c.a等

    注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout

    注:headers: 通过headers 来决定把消息发给哪些queue
     
     

     
    RabbitMQ fanout广播模式(实例)
     
    send 生产端
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    # exchange=“自定义名字”
    # type = 'fanout' 定义exchange发送类型,广播类型
    # exchange_type type报错就使用这个
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    # 下面这条命令是 可通过命令行输入定义的消息 or 如果没输入就是后面这段话。
    #message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    
    # 发送的内容
    message = "info: Hello World!"
    
    # routing_key 传入queue 由于是广播,不填
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    
    # 注:由于是广播类型所以不需要写queue。
    recv 消费端
    #_*_coding:utf-8_*_
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    # exclusive=True 唯一的
    # 不指定queue名字,rabbit会随机分配一个名字,
    # exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    result = channel.queue_declare(exclusive=True)
    
    # 随机取queue名字。
    queue_name = result.method.queue
    print("random queuename",queue_name)
    
    # channel.queue_bind 绑定exchange转发器
    # exchange=logs 由于rabbitMQ下不知一个exchange需要绑定。
    # queue_name 对列名
    channel.queue_bind(exchange='logs',
                       queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()

    RabbitMQ direct广播模式(实例)

    send 生产端
    终端:python 脚本名 允许传入级别  发送消息
    例:python direct_p.py info Hello
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    # exchange=“自定义名字”
    # type = 'direct' 定义exchange发送类型,广播类型
    # exchange_type type报错就使用这个
    channel.exchange_declare(exchange='direct_logs',
                             exchange_type='direct')
    
    # 级别:默认取执行脚本传入参数,如果取不到执行info
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    
    # 下面这条命令是 可通过命令行输入定义的消息 or 如果没输入就是后面这段话。
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    
    # routing_key=severity 消息发送到指定级别
    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,
                          body=message)
    print(" [x] Sent %r:%r" % (severity, message))
    connection.close()
    recv 消费端
    终端:python 脚本文件 启动级别
    例:python direct_c.py info
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',
                             exchange_type='direct')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    # 获取执行脚本执行参数
    severities = sys.argv[1:]
    
    # 如果没有参数就会报错,提示加入参数,并退出程序
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
    
    print(severities)
    # 循环severities这个列表进行绑定 # routing_key=severity 接收端就是severity for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

     RabbitMQ topic广播模式

    send 生产端
    终端:python 执行脚本 消息.级别名称 消息.自定以级别
    例:python topic_p Hello.info Hi.mysql
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs',
                             exchange_type='topic')
    
    # 级别:默认取执行脚本传入参数,如果取不到执行info
    # 发送消息结尾需要加入级别。
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    
    # 下面这条命令是 可通过命令行输入定义的消息 or 如果没输入就是后面这段话。
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    
    # routing_key=severity 消息发送到指定级别
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()
    recv 消费端
    终端:python 执行脚本 *.级别 *.定义级别
    例:python topic_c *.info *.mysql 
    注:“#”代表可收所有。
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs',
                             exchange_type='topic')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    # 获取执行脚本执行参数
    binding_keys = sys.argv[1:]
    
    # 如果没有参数就会报错,提示加入参数,并退出程序
    if not binding_keys:
        sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
        sys.exit(1)
    
    # 循环severities这个列表进行绑定
    for binding_key in binding_keys:
        channel.queue_bind(exchange='topic_logs',
                           queue=queue_name,
                           routing_key=binding_key)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
  • 相关阅读:
    搞IT的不如去养鸡养猪了
    matlabemacs运行matlab程序出错.不能调用matlab命令行模式
    微软利用全球天文望远镜数据建立宇宙地图
    程序人生2005年(30)
    IT领袖峰会精彩观点:4G时代不存在信令问题
    初识MySql数据库
    Android屏幕尺寸适配注意事项
    Java多线程详解(一)
    ios键盘高度
    ASP.NET MVC——Action的执行
  • 原文地址:https://www.cnblogs.com/xiangsikai/p/8304100.html
Copyright © 2011-2022 走看看