zoukankan      html  css  js  c++  java
  • RabbitMQ的主题(Topic)模式(五)

    一、定义

    我们先来看一张图:

    这张图有几个不同的routingkey,每个routingkey都是由两段组成的;
    下面我们来编写代码来实现相关功能;

    二、生产者生产消息

    生产者发送四条消息,routingkey不相同,代码如下:

        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("msg send start!");
                TopicPublisher();
                Console.WriteLine("msg send end!");
            }
            static string EXCHANGE_NAME = "routing_exchange_topic";
            static void TopicPublisher()
            {
                var conn = RabbitMQHelper.GetConnection();
                var channel = conn.CreateModel();
                channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
                var routingKeyList = new List<string>() { "usa.news", "usa.weather", "europe.news", "europe.weather" };
                for (var i = 0; i < routingKeyList.Count; i++)
                {
                    var msg = $"hello topic {routingKeyList[i]}!";
    
                    channel.BasicPublish(
                        exchange: EXCHANGE_NAME,
                        routingKey: routingKeyList[i],
                        basicProperties: null,
                        body: Encoding.UTF8.GetBytes(msg));
                    Console.WriteLine($"消息发送:" + msg);
                }
                Console.WriteLine($"消息发送完成!");
                channel.Close();
                conn.Close();
            }
    
        }
    

    结果运行如图:

    三、消费者

    3.1 消费者1,绑定的routingkey为europe.#,代码如下:

            static string EXCHANGE_NAME = "routing_exchange_topic";
            static string QUEUE_NAME = "routing_queue_topic1";
            static void TopicConsumer1()
            {
                var conn = RabbitMQHelper.GetConnection();
                var channel = conn.CreateModel();
                channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
                channel.QueueDeclare(queue: QUEUE_NAME);
                // 绑定routingKey
                string routingKey = "europe.#";
                channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKey);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine($"Topic Consumer1 收到消息: {message},时间:{DateTime.Now}");
                };
                channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
                Console.ReadKey();
                channel.Close();
                conn.Close();
            }
        }
    

    3.2 消费者2,绑定的routingkey为#.weather,代码如下:

            static string EXCHANGE_NAME = "routing_exchange_topic";
            static string QUEUE_NAME = "routing_queue_topic2";
            static void TopicConsumer2()
            {
                var conn = RabbitMQHelper.GetConnection();
                var channel = conn.CreateModel();
                channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
                channel.QueueDeclare(queue: QUEUE_NAME);
                string routingKey = "#.weather";
                // 可以绑定多个routingKey         
                channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKey);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine($"Topic Consumer2 收到消息: {message},时间:{DateTime.Now}");
                };
                channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
                Console.ReadKey();
                channel.Close();
                conn.Close();
            }
    

    3.3 查看运行结果

    (1)队列绑定到交换机的结果如图:

    (2)消费者1的接收到的消息:

    (3)消费者2的接收到的消息:

    通过结果我们可以看到,消费者1和消费者2分别接收到了不同key的消息;

    四、小结

    1、topic模式,我们可以通过模糊匹配的,来接收自己想要的消息;使用Topic模式生产者在声明队列时需要制定消息到达队列方式为topic;
    2、Consumer消费者,匹配消息时:#匹配一个或者多个关键字,*匹配一个关键字
    3、参考文档:https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html

  • 相关阅读:
    满20年程序员生涯-与大家分享最近7年的快速成长经历(上海市青浦区快递行业战斗7年奋斗史)
    格局 逐阶而上
    基础才是重中之重~BouncyCastle实现的DES3加密~java通用
    jenkins~Publish Over SSH实现分布式部署
    maven~为MANIFEST.MF文件添加内容
    maven~多个plugin相同phase的执行顺序
    java~jar防止反编译
    个人博客的简单通告
    SQL Server中datetimeset转换datetime类型问题浅析
    MySQL如何计算统计redo log大小
  • 原文地址:https://www.cnblogs.com/zqllove/p/12858700.html
Copyright © 2011-2022 走看看