zoukankan      html  css  js  c++  java
  • rabbitmq 之 exchanges

    exchanges(交换机)  (重点)

     上图较之前增加了 X(exchange 交换机)

    之前helloworld中我们的exchange的参数设置是空 也就是默认。 

    实际本质上,producer并不是直接把消息发送给队列的,他并不知道最终会将消息放到哪个队列中去。

    而是通过 中间的交换机,producer将消息发送给交换机,交换机是真正将这些消息分配给相应的队列中去的。

    不BB,直接上代码

    emit_log.py

    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    #声明交换机 channel.exchange_declare(exchange='logs', #定义交换机的名字 logs exchange_type='fanout') #设定交换机的种类 fanout message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', #指定交换机logs routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()

    receive_logs.py

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    #临时队列 result = channel.queue_declare(exclusive=True) #rabbitmq 自动生成队列 exclusive=True参数是当consumer断开连接后,自动销毁此队列 queue_name = result.method.queue #取自动生成队列的名字 channel.queue_bind(exchange='logs', #将交换机 和 队列做绑定 这样producer只要向该exchange上发送消息,exchange会自动分配给对应绑定的queue queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

    匿名交换机

    之前 我们对交换机一无所知,但仍然能够发送消息到队列中。因为我么使用了命名为空字符串("")默认交换机

    消息会根据指定的routing_key 分发到指定的队列。

    交换机类型

    下面列出几个交换机类型:

    1. 直连交换机(direct)
    2. 主题交换机(topic)
    3. 头交换机(headers)
    4. 扇形交换机(fanout)

    1.扇形交换机(fanout)

    扇型交换机(fanout)很简单,你可能从名字上就能猜测出来,它把消息发送给它所知道的所有队列。

    上面的代码就是关于扇形交换机实现的例子。

    绑定(Bindings)

    前面的例子中,我们已经创建过绑定(bindings)

    channel.queue_bind(exchange=exchange_name,
                       queue=queue_name)

    bindings 是指 exchange 和 queue 的关系。

    这里 我们说一个额外的参数  routing_key。 为了避免与basic_publish的参数混淆,我们把他叫做绑定键(binding key)。

    下面是创建一个带绑定键的 bindings

    channel.queue_bind(exchange=exchange_name,
                       queue=queue_name,
                       routing_key='black')

    绑定键存在的意义取决于 exchanges 的类型。 我们之前使用的 fanout exchanges 会忽略这个值。

    2.直连交换机 (Direct exchange)

    之前我们使用的扇形交换机(fanout exchange) 没有足够的灵活性 ---- 他能做的仅仅是广播。

    而直连交换机(direct exchange) 会将  绑定键(binding key) 和 路由键(routing key) 进行精确匹配,从而确定消息该分发到哪个队列。

    下图能很好描述该场景:

     多个绑定

    多个队列使用相同的绑定键是合法的。  这样就可以做出一个与扇形交换机一样的广播行为。 非常的灵活。

     示例代码

     emit_log_direct.py 

    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',      #声明exchange  名字是:direct_logs  类型是:直连交换机(direct)
                             type='direct')
    
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='direct_logs',            #向direct_logs exchange上发送消息
                          routing_key=severity,              #是绑定键(这里不再是以前理解的队列了)
                          body=message)  
    print " [x] Sent %r:%r" % (severity, message)
    connection.close()

    receive_logs_direct.py

    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',     #声明exchange
                             type='direct')
    
    result = channel.queue_declare(exclusive=True)      #创建临时队列,consumer断开,自动销毁
    queue_name = result.method.queue                    #获取临时队列的名字
    
    severities = sys.argv[1:]
    if not severities:
        print >> sys.stderr, "Usage: %s [info] [warning] [error]" % 
                             (sys.argv[0],)
        sys.exit(1)
    
    for severity in severities:                      #将exchange  queue 和 binding_key 做绑定  
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)
    
    print ' [*] Waiting for logs. To exit press CTRL+C'
    
    def callback(ch, method, properties, body):
        print " [x] %r:%r" % (method.routing_key, body,)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()

    3.主题交换机(topic exchange)

    发送到topic exchange 的消息不可以携带随意什么样子的路由键(routing_key),它的路由键必须是一个由 . 分隔开的词语列表。

    绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似。但是它的绑定键和路由键有两个特殊应用方式:

    • * (星号) 用来表示一个单词
    • # (井号) 用来表示任意数量(零个或多个) 单词

    下边用图说明:

  • 相关阅读:
    C语言中typedef可以出现在struct定义之前
    C语言编程规划——怎样用C语言设计一个程序
    如何去掉页眉页脚上的横线 如何去掉页眉上的横线 如何去掉页脚上的横线
    在Visual Studio 2005中使用WinPcap编写程序的配置方法
    使用WinPcap编程(1)——获取网络设备信息
    如何思考多维数组
    Visual Studio 2005编写C程序出错:error C2143,)前缺少*
    在Linux下运行peersim1.0.5
    Login failed for user 'IIS APPPOOL\DefaultAppPool'
    win2008 r2 安装sqlserver2005 sp3报错
  • 原文地址:https://www.cnblogs.com/s686zhou/p/12896179.html
Copyright © 2011-2022 走看看