zoukankan      html  css  js  c++  java
  • rabbitmq的简单介绍二

    上一篇博客我们没有介绍完rabbitmq,今天我们接着上一篇的博客继续介绍rabbitmq

    这边的博客的内容如下

    1、组播,对指定的队列设置关键词,通过关键词来控制消息的分发

    2、更加细致的组播

    先来介绍组播

    其实组播和广播只有3个区别

    a、组播的exchange的类型是“direct”

    b、组播的生产端不需要绑定queue,只需要申明exchange,然后在发布消息的时指定route_key、exchange、message即可

    c、消费者者需要绑定队列到exchange上,且也需要指定route_key,只有绑定到和生产者端相同的exchange的队列【队列名称可以不一样】,且生产端的route_key一致就可以收到消息

    先看生产端的核心代码

    import pika
    
    test_connectio = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
    
    test_channel = test_connectio.channel()
    
    
    test_channel.exchange_declare(exchange="logs",
                                  exchange_type="direct")
    
    #在队列中什么一个exchange,类型为“direct”
    
    
    test_channel.queue_declare(queue="zhang1",durable=True)
    test_channel.queue_declare(queue="zhang2",durable=True)
    test_channel.queue_declare(queue="zhang3",durable=True)
    
    test_channel.basic_publish(exchange="logs",
                               routing_key="error",
                               body="this is 组播")
    
    
    #发布消息,将消息发往exchange名称为“logs”中,且指定的routing_key为error的队列中
    
    test_channel.close()
    

     

    在看消费者的核心代码

    import pika
    
    test_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
    
    test_channel = test_connection.channel()
    
    
    test_channel.exchange_declare(exchange="logs",
                                  exchange_type="direct")
    
    #在管道中申明一个exhcange
    
    
    test_channel.queue_declare(queue="zhang2",durable=True)
    #在管道中申明一个队列
    
    
    
    test_channel.queue_bind(exchange="logs",
                            queue="zhang2",
                            routing_key="info")
    #将队列绑定到exchange上,并指明我这个消费者的routing_key,只有和生产端相同
    #routing_key的消费者才可以收到生产者的消息,这里可以绑定多个routing_key
    
    
    def callback(ch,method,properties,body):
        print("server:",body)
    
    
    
    test_channel.basic_consume(callback,
                          queue="zhang2",
                          no_ack=True)
    
    test_channel.start_consuming()
    

     

    二、这里我们看下更细致的消息控制

    更细致的消息控制是什么意思呢?比如我们实现这样一个场景;消费者可以接受*.error的日志,还可以接受apache.info的日志

    更细致的消息控制和组播有2个不同

    1、更细致的消息控制用到的exchange的类型为topic

    2、routing_key的用法用法不一样

    下面我们看下生产者的核心代码

     import pika
    import sys
    
    test_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
    
    test_channel = test_connection.channel()
    
    test_channel.exchange_declare(exchange="topic_logs",
                                  exchange_type="topic")
    
    route_key = sys.argv[1] if len(sys.argv) > 1 else "anonymous.info"
    
    message = "".join(sys.argv[2:]) or "hello world"
    
    test_channel.basic_publish(exchange="topic_logs",
                               routing_key=route_key,
                               body=message)
    
    print("[x] send %r %r" %(route_key,message))
    
    test_connection.close()
    

      

    在看消费者的核心代码

    import pika
    import sys
    test_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
    
    test_channel = test_connection.channel()
    
    test_channel.exchange_declare(exchange="topic_logs",
                                  exchange_type="topic")
    
    result = test_channel.queue_declare(exclusive=True)
    
    queue_name = result.method.queue
    binding_keys = sys.argv[1:]
    
    if not binding_keys:
        sys.stderr.write("Usage: %s [binding_key]....
    " %sys.argv[0])
        sys.exit(0)
    
    for binding_key in binding_keys:
        test_channel.queue_bind(exchange="topic_logs",
                                queue=queue_name,
                                routing_key=binding_key)
    
    print("waiting for message")
    
    def callback(ch,method,properties,body):
        print("[x] %s" %(body))
    
    test_channel.basic_consume(callback,
                               queue=queue_name,
                               no_ack=True)
    
    test_channel.start_consuming()
    

    我们可以做如下测试

    在生产者端执行下面的命令,将消息发往mysql.error,消息的内容是xxxxxxxx

    python rabbitmq_send.py mysql.error xxxxxxxxx
    

    在消费者端执行下面的命令,该消费者接受所有的*.info的消息,和mysql.info的消息

    python rabbitmq_receive.py *.info mysql.error
    

      

  • 相关阅读:
    vue检查用户名是否重复
    后端注册接口完善
    django添加检查用户名和手机号数量接口
    Vue联调,图片及短信验证码
    swift webView 提出这样的要求你能忍吗?
    iOS 如何给Xcode7项目添加“.pch”文件
    swift 定制自己的Button样式
    Swift 为你的webView定制标题
    swift 如何获取webView的内容高度
    如何在MAC上使用SVN,简单几行命令搞定
  • 原文地址:https://www.cnblogs.com/bainianminguo/p/7530607.html
Copyright © 2011-2022 走看看