zoukankan      html  css  js  c++  java
  • RabbitMQ学习之Topics(5)

     上一节,我们讲了direct exchange,这节我们讲下topic exchange

    Topic exchange

    发送到topic exchange的messages不可以有一个随意的routing_key, 它必须是使用.分隔的一些词的集合。例如: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit" .

    这些binding key 必须有同样的样式(in the same form). topic exchange背后的逻辑与direct exchange相似:带有特定的routing keymessage会发送到所有有匹配的binding key 的所有queues

    然而,对于binding keys有两个重要的特例:

    • *star) 能替代一个词

    • #hash) 能替代0个或者多个词

    下面是一个简单的例子

    这个例子中,我们将会发送描述动物的message. 这些message将会带有一个routing key . 这个routing key 由三个词组成. 第一个词描述速度(speed),第二个词描述肤色(colour),第三个词描述物种。 <speed>.<colour>.<species>

    我们创建了3条bindings. Q1 绑定”*.orange.*”并且Q2绑定“*.*.rabbit”和“lazy.#”

    这些bindings总结下就是:

    • Q1对所有的orange的动物感兴趣
    • Q2想要了解所有的rabbit,和所有的lazy的动物
    Topic exchange

    Topic exchange是很强大的,并且可以表现的像其他exchange.

    当一个queue绑定带“#”的binding key时,它可以接受所有的messages而不用管routing key.就像fanout exchange.

    当特殊字符* 和#没有在bindings中使用时,topic exchange表现的像direct exchange. 

    代码

    我们将使用topic exchange.对于routing key ,我们将使用这两个词:<facility>.<severity>.

    EmitLogTopic.cs

    using System;using System.Linq;using RabbitMQ.Client;using System.Text;
    class EmitLogTopic
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "topic_logs",  //topic类型的exchange
                                        type: "topic");
    
                var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
                var message = (args.Length > 1)
                              ? string.Join(" ", args.Skip( 1 ).ToArray())
                              : "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "topic_logs",  //发送特定routing key的message
                                     routingKey: routingKey,
                                     basicProperties: null,
                                     body: body);
                Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
            }
        }
    }

    使用示例:

    cd EmitLogTopic
    dotnet run "kern.critical" "A critical kernel error"

    ReceiveLogsTopic.cs

    using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;
    class ReceiveLogsTopic
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); //声明topic exchange
                var queueName = channel.QueueDeclare().QueueName; //声明随机生成的queue name
    
                if(args.Length < 1)
                {
                    Console.Error.WriteLine("Usage: {0} [binding_key...]",
                                            Environment.GetCommandLineArgs()[0]);
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                    Environment.ExitCode = 1;
                    return;
                }
    
                foreach(var bindingKey in args)
                {
                    channel.QueueBind(queue: queueName,  //绑定bindingkey
                                      exchange: "topic_logs",
                                      routingKey: bindingKey);
                }
    
                Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    var routingKey = ea.RoutingKey; //message的routingkey
                    Console.WriteLine(" [x] Received '{0}':'{1}'",
                                      routingKey,
                                      message);
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }

    使用示例:

    接收所有的logs

    cd ReceiveLogsTopic
    dotnet run "#"

    接收所有facility:kern的logs

    cd ReceiveLogsTopic
    dotnet run "kern.*"

    仅仅critical的logs

    cd ReceiveLogsTopic
    dotnet run "*.critical"

    创建多个bindings

    cd ReceiveLogsTopic
    dotnet run "kern.*" "*.critical"

    参考网址:

    https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html

  • 相关阅读:
    Nginx目录结构与配置文件详解
    最新Sql语句来啦
    一个小工具帮你搞定实时监控Nginx服务器
    javascript学习笔记基于对象续
    Buffer解析
    使用用户自定义控件实现asp.net的的权限管理
    asp.net java 的十大关键技术
    程序员的基本素质
    无法编译的异常解决
    程序员的健康作息时间 该看看啊
  • 原文地址:https://www.cnblogs.com/Vincent-yuan/p/10941239.html
Copyright © 2011-2022 走看看