zoukankan      html  css  js  c++  java
  • RabbitMQ(五) -- topics

    RabbitMQ(五) -- topics

    `rabbitmq`中的`topic exchange`将路由键和某模式进行匹配,从而类似于正则匹配的方式去接收喜欢的信息。

    topic exchange

    如果想使用`topic`模式,那么可以随意设置`routing_key`。相反,需要按照一定的要求设定该值。
    `routing_key`在topic模式中应该选择一组拥有特定属性的单词作为该值。

    • * (star) can substitute for exactly one word.
    • # (hash) can substitute for zero or more words.

    例如,如果生产者的`routing_key`设置为`test1.test2.test3`,那么消费着中绑定消息队列的`routing_key`必须可以匹配生产者的`routing_key`。

    #生产者
    routing_key = 'test1.test2.test3'
    channel.basic_publish(exchange='topic_test', routing_key=routing_key, body=message)
    
    #消费者
    routing_key = 'test1.*' #可以
    routing_key = '*.test2.*' #可以
    routing_key = 'test3' #不可以
    channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)

    例子

    生产者如下,会依次设置`routing_key`为A和B,那么需要设置两个消费者的`routing_key`来分别读取消息。

    #!/usr/bin/env python
    # coding=utf-8
    import pika
    import sys
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_test',
    type='topic')
    
    message = "test "
    for i in range(20):
        for item in ['A', 'B']:
            routing_key = item
            channel.basic_publish(exchange='topic_test',routing_key=routing_key, body=message+item)
            print " [x] Sent %r:%r" % (routing_key, message)
            time.sleep(2)
    connection.close()    

    消费者如下,启动命令分别为:

    python receive.py A
    python receive.py B

    消费者如下:

    #!/usr/bin/env python
    # coding=utf-8
    
    import pika
    import sys
    
    def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_test',
    type='topic')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    binding_key = sys.argv[1]
    print "Usage: %s [binding_key]..." % (sys.argv[1])
    
    channel.queue_bind(exchange='topic_test', queue=queue_name, routing_key=binding_key)
    print ' [*] Waiting for logs. To exit press CTRL+C'
    channel.basic_consume(callback, queue=queue_name, no_ack=True)
    
    channel.start_consuming()
  • 相关阅读:
    三分法
    string常用函数的整理
    一句话 讲解 kmp的 next 数组 看不懂的 直接来掐死我吧
    http://www.codeforces.com/contest/703/problem/D D. Mishka and Interesting sum (莫队的TLE)
    Codeforces Round #365 (Div. 2) C
    数论
    默慈金数
    转载:HTTP 请求头中的 X-Forwarded-For
    Glusterfs volume 的三种挂载方式
    GlusterFS 配置及使用
  • 原文地址:https://www.cnblogs.com/coder2012/p/4342963.html
Copyright © 2011-2022 走看看