zoukankan      html  css  js  c++  java
  • rabbitmq 之 exchanges

    exchanges(交换机)  (重点)

     上图较之前增加了 X(exchange 交换机)

    之前helloworld中我们的exchange的参数设置是空 也就是默认。 

    实际本质上,producer并不是直接把消息发送给队列的,他并不知道最终会将消息放到哪个队列中去。

    而是通过 中间的交换机,producer将消息发送给交换机,交换机是真正将这些消息分配给相应的队列中去的。

    不BB,直接上代码

    emit_log.py

    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    #声明交换机 channel.exchange_declare(exchange='logs', #定义交换机的名字 logs exchange_type='fanout') #设定交换机的种类 fanout message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', #指定交换机logs routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()

    receive_logs.py

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    #临时队列 result = channel.queue_declare(exclusive=True) #rabbitmq 自动生成队列 exclusive=True参数是当consumer断开连接后,自动销毁此队列 queue_name = result.method.queue #取自动生成队列的名字 channel.queue_bind(exchange='logs', #将交换机 和 队列做绑定 这样producer只要向该exchange上发送消息,exchange会自动分配给对应绑定的queue queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

    匿名交换机

    之前 我们对交换机一无所知,但仍然能够发送消息到队列中。因为我么使用了命名为空字符串("")默认交换机

    消息会根据指定的routing_key 分发到指定的队列。

    交换机类型

    下面列出几个交换机类型:

    1. 直连交换机(direct)
    2. 主题交换机(topic)
    3. 头交换机(headers)
    4. 扇形交换机(fanout)

    1.扇形交换机(fanout)

    扇型交换机(fanout)很简单,你可能从名字上就能猜测出来,它把消息发送给它所知道的所有队列。

    上面的代码就是关于扇形交换机实现的例子。

    绑定(Bindings)

    前面的例子中,我们已经创建过绑定(bindings)

    channel.queue_bind(exchange=exchange_name,
                       queue=queue_name)

    bindings 是指 exchange 和 queue 的关系。

    这里 我们说一个额外的参数  routing_key。 为了避免与basic_publish的参数混淆,我们把他叫做绑定键(binding key)。

    下面是创建一个带绑定键的 bindings

    channel.queue_bind(exchange=exchange_name,
                       queue=queue_name,
                       routing_key='black')

    绑定键存在的意义取决于 exchanges 的类型。 我们之前使用的 fanout exchanges 会忽略这个值。

    2.直连交换机 (Direct exchange)

    之前我们使用的扇形交换机(fanout exchange) 没有足够的灵活性 ---- 他能做的仅仅是广播。

    而直连交换机(direct exchange) 会将  绑定键(binding key) 和 路由键(routing key) 进行精确匹配,从而确定消息该分发到哪个队列。

    下图能很好描述该场景:

     多个绑定

    多个队列使用相同的绑定键是合法的。  这样就可以做出一个与扇形交换机一样的广播行为。 非常的灵活。

     示例代码

     emit_log_direct.py 

    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',      #声明exchange  名字是:direct_logs  类型是:直连交换机(direct)
                             type='direct')
    
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='direct_logs',            #向direct_logs exchange上发送消息
                          routing_key=severity,              #是绑定键(这里不再是以前理解的队列了)
                          body=message)  
    print " [x] Sent %r:%r" % (severity, message)
    connection.close()

    receive_logs_direct.py

    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',     #声明exchange
                             type='direct')
    
    result = channel.queue_declare(exclusive=True)      #创建临时队列,consumer断开,自动销毁
    queue_name = result.method.queue                    #获取临时队列的名字
    
    severities = sys.argv[1:]
    if not severities:
        print >> sys.stderr, "Usage: %s [info] [warning] [error]" % 
                             (sys.argv[0],)
        sys.exit(1)
    
    for severity in severities:                      #将exchange  queue 和 binding_key 做绑定  
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)
    
    print ' [*] Waiting for logs. To exit press CTRL+C'
    
    def callback(ch, method, properties, body):
        print " [x] %r:%r" % (method.routing_key, body,)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()

    3.主题交换机(topic exchange)

    发送到topic exchange 的消息不可以携带随意什么样子的路由键(routing_key),它的路由键必须是一个由 . 分隔开的词语列表。

    绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似。但是它的绑定键和路由键有两个特殊应用方式:

    • * (星号) 用来表示一个单词
    • # (井号) 用来表示任意数量(零个或多个) 单词

    下边用图说明:

  • 相关阅读:
    VScode 修改中文字体
    missing KW_END at ')' near '<EOF>'
    SQL inner join, join, left join, right join, full outer join
    SQL字符替换函数translater, replace
    SQL COOKBOOK SQL经典实例代码 笔记第一章代码
    sqlcook sql经典实例 emp dept 创建语句
    dateutil 2.5.0 is the minimum required version python
    安装postgresql后找不到服务 postgresql service
    Postgres psql: 致命错误: 角色 "postgres" 不存在
    【西北师大-2108Java】第十六次作业成绩汇总
  • 原文地址:https://www.cnblogs.com/s686zhou/p/12896179.html
Copyright © 2011-2022 走看看