zoukankan      html  css  js  c++  java
  • RabbitMQ使用介绍4—Routing

    Routing

    此案例中,我们会尝试更多的特性,例如,仅仅将错误信息传递到Log文件中(节省空间),同时仍能将所有信息打印到控制台上

    Bindings有一个额外的参数routing_key,为了防止和basic_publish参数混淆,我们将它叫做binding_key.这是我们怎么去创建一个binding_key:

    channel.queue_bind(exchange=exchange_name,
                       queue=queue_name,
                       routing_key='black')
    

    这意味着binding key 取决于exchange类型,fanout exchanges 之前使用的exchanges忽略了它的价值

    Direct exchange

    之前教程的日志系统广播所有的信息给所有的消费者,我们想要扩展允许过滤信息根据严重性,例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重的错误,并且warning或者info日志消息不会浪费空间.

    我们使用的fanout exchange它并没有给我们带来太大的灵活性,它只能进行无意识的广播,

    我们将使用direct exchange代替,direct exchange路由是很简单的,消息进入绑定的binding和路由的key相匹配的队列里,为了说明这一点,请考虑以下设置:

    在此设置中,我们可以看到direct exchange X 和两个队列绑定,第一个队列绑定orange,第二个它是绑定了两个,一个是绑定了black key 另一个是绑定了green。

    在这种设置中,通过路由键橙色发布到交换机的消息将被路由到队列Q1。 路由键为黑色或绿色的消息将转到Q2。 所有其他消息将被丢弃。

    多重绑定

    用相同的binding key绑定多个队列是完全合法的,示例中我们可以在X和Q1之间增加black key 这个绑定是和fanout类似,广播到所有匹配的队列中,路由为black key 的消息将会发送给Q1,Q2

    Emitting logs

    我们将会使用这个模式的日志系统代替fanout,我们发送消息给cirect exchange,我们将会提供严重日志作为routing key,这样接收脚本将会选择想要接收的严重性日志,让我们关注emitting。

    首先我们需要创建一个exchange

    channel.exchange_declare(exchange='direct_logs',
                             exchange_type='direct')
    

    然后发送消息

    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,#不能删除
                          body=message,
                          properties=pika.BasicProperties(delivery_mode=2,#make message persistent
                                                          ))
    

    To simplify things we will assume that 'severity' can be one of 'info', 'warning', 'error'.

    订阅

    接收消息和之前是一样的,有一个例外,我们将创建一个新的绑定和每种严重性相关

    result = channel.queue_declare(queue='',exclusive=True)
    queue_name = result.method.queue
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
    

    Putting it all together

    direct_send.py

    # -*- coding: utf-8 -*-
    #send端
    import pika
    import sys
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))#默认端口5672,可不写
    
    #创建通道,声明一个管道,在管道里发送消息
    channel = connection.channel()
    #在管道里声明queue
    channel.exchange_declare(exchange='direct_logs',
                             exchange_type='direct')
    severity = sys.argv[1] if len(sys.argv)>1 else 'info'
    message = ''.join(sys.argv[2:]) or "Hello World"
    #一条消息永远不能直接发送到队列,它总需要经过一个交换exchange
    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,#不能删除
                          body=message,
                          properties=pika.BasicProperties(delivery_mode=2,#make message persistent
                                                          ))#设置routing_key(消息队列的名称)和body(发送的内容)
    print("[x] Sent %r" % message)
    connection.close()#关闭连接,队列关闭
    

    direct_receive.py

    # -*- coding: utf-8 -*-
    
    #receiving(消费者接收者)
    import pika
    import time
    import sys
    #创建一个连接
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost'))#默认端口5672,可不写
    #创建通道,声明一个管道,在管道里发送消息
    channel = connection.channel()
    
    #把消息队列的名字为hello,把消费者和queue绑定起来,生产者和queue的也是hello
    #为什么又声明了一个hello队列
    #如果确定已经声明了,可以不声明。但是你不知道那个机器先运行,所以要声明两次
    channel.exchange_declare(exchange='direct_logs',
                             exchange_type='direct')
    #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    result = channel.queue_declare(queue='',exclusive=True)
    queue_name = result.method.queue
    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
    #回调函数get消息体
    def callback(ch,method,properties,body):#四个参数为标准格式
        #管道内存对象,内容相关信息
        #print("打印看下是什么:",ch,method,properties) #打印看下是什么
        print(" [x] Received %r:%r" % (method.routing_key,body))
    
        ch.basic_ack(delivery_tag=method.delivery_tag)#消息确认
    
    #消费消息
    channel.basic_consume(
        queue=queue_name,#你要从那个队列里收消息
        on_message_callback=callback,#如果收到消息,就调用callback函数来处理消息
        auto_ack=True #写的话,如果接收消息,机器宕机消息就丢了
        #一般不写,宕机则生产者检测到发给其他消费者
    )
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming() #创建死循环,监听消息队列,可使用CTRL+C结束监听
    

    link

  • 相关阅读:
    log4net:ERROR XmlHierarchyConfigurator: Cannot find Property [File] to set object on [TF.Log.FileAppender]
    HTTP状态码总结
    基于.NET平台常用的框架整理
    WPF中查看PDF文件之MoonPdfLib类库
    查看操作系统报异常的地方
    VS 附加到进程 加载“附加进程”弹窗很慢
    C# for循环或者foreach往List中添加对象的时候前面的数据总被最后加入的覆盖
    方法的执行过程
    模拟IDE上的run过程
    Java动态加载
  • 原文地址:https://www.cnblogs.com/venvive/p/11768949.html
Copyright © 2011-2022 走看看