zoukankan      html  css  js  c++  java
  • RabbitMQ的主题(Topic)模式(五)

    一、定义

    我们先来看一张图:

    这张图有几个不同的routingkey,每个routingkey都是由两段组成的;
    下面我们来编写代码来实现相关功能;

    二、生产者生产消息

    生产者发送四条消息,routingkey不相同,代码如下:

        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("msg send start!");
                TopicPublisher();
                Console.WriteLine("msg send end!");
            }
            static string EXCHANGE_NAME = "routing_exchange_topic";
            static void TopicPublisher()
            {
                var conn = RabbitMQHelper.GetConnection();
                var channel = conn.CreateModel();
                channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
                var routingKeyList = new List<string>() { "usa.news", "usa.weather", "europe.news", "europe.weather" };
                for (var i = 0; i < routingKeyList.Count; i++)
                {
                    var msg = $"hello topic {routingKeyList[i]}!";
    
                    channel.BasicPublish(
                        exchange: EXCHANGE_NAME,
                        routingKey: routingKeyList[i],
                        basicProperties: null,
                        body: Encoding.UTF8.GetBytes(msg));
                    Console.WriteLine($"消息发送:" + msg);
                }
                Console.WriteLine($"消息发送完成!");
                channel.Close();
                conn.Close();
            }
    
        }
    

    结果运行如图:

    三、消费者

    3.1 消费者1,绑定的routingkey为europe.#,代码如下:

            static string EXCHANGE_NAME = "routing_exchange_topic";
            static string QUEUE_NAME = "routing_queue_topic1";
            static void TopicConsumer1()
            {
                var conn = RabbitMQHelper.GetConnection();
                var channel = conn.CreateModel();
                channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
                channel.QueueDeclare(queue: QUEUE_NAME);
                // 绑定routingKey
                string routingKey = "europe.#";
                channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKey);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine($"Topic Consumer1 收到消息: {message},时间:{DateTime.Now}");
                };
                channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
                Console.ReadKey();
                channel.Close();
                conn.Close();
            }
        }
    

    3.2 消费者2,绑定的routingkey为#.weather,代码如下:

            static string EXCHANGE_NAME = "routing_exchange_topic";
            static string QUEUE_NAME = "routing_queue_topic2";
            static void TopicConsumer2()
            {
                var conn = RabbitMQHelper.GetConnection();
                var channel = conn.CreateModel();
                channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
                channel.QueueDeclare(queue: QUEUE_NAME);
                string routingKey = "#.weather";
                // 可以绑定多个routingKey         
                channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKey);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine($"Topic Consumer2 收到消息: {message},时间:{DateTime.Now}");
                };
                channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
                Console.ReadKey();
                channel.Close();
                conn.Close();
            }
    

    3.3 查看运行结果

    (1)队列绑定到交换机的结果如图:

    (2)消费者1的接收到的消息:

    (3)消费者2的接收到的消息:

    通过结果我们可以看到,消费者1和消费者2分别接收到了不同key的消息;

    四、小结

    1、topic模式,我们可以通过模糊匹配的,来接收自己想要的消息;使用Topic模式生产者在声明队列时需要制定消息到达队列方式为topic;
    2、Consumer消费者,匹配消息时:#匹配一个或者多个关键字,*匹配一个关键字
    3、参考文档:https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html

  • 相关阅读:
    eclipse下c/cpp " undefined reference to " or "launch failed binary not found"问题
    blockdev 设置文件预读大小
    宝宝语录
    CentOS修改主机名(hostname)
    subprocess报No such file or directory
    用ldap方式访问AD域的的错误解释
    英特尔的VTd技术是什么?
    This virtual machine requires the VMware keyboard support driver which is not installed
    Linux内核的文件预读详细详解
    UNP总结 Chapter 26~29 线程、IP选项、原始套接字、数据链路访问
  • 原文地址:https://www.cnblogs.com/zqllove/p/12858700.html
Copyright © 2011-2022 走看看