一、定义
我们先来看一张图:
这张图有几个不同的routingkey,每个routingkey都是由两段组成的;
下面我们来编写代码来实现相关功能;
二、生产者生产消息
生产者发送四条消息,routingkey不相同,代码如下:
class Program
{
static void Main(string[] args)
{
Console.WriteLine("msg send start!");
TopicPublisher();
Console.WriteLine("msg send end!");
}
static string EXCHANGE_NAME = "routing_exchange_topic";
static void TopicPublisher()
{
var conn = RabbitMQHelper.GetConnection();
var channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
var routingKeyList = new List<string>() { "usa.news", "usa.weather", "europe.news", "europe.weather" };
for (var i = 0; i < routingKeyList.Count; i++)
{
var msg = $"hello topic {routingKeyList[i]}!";
channel.BasicPublish(
exchange: EXCHANGE_NAME,
routingKey: routingKeyList[i],
basicProperties: null,
body: Encoding.UTF8.GetBytes(msg));
Console.WriteLine($"消息发送:" + msg);
}
Console.WriteLine($"消息发送完成!");
channel.Close();
conn.Close();
}
}
结果运行如图:
三、消费者
3.1 消费者1,绑定的routingkey为europe.#
,代码如下:
static string EXCHANGE_NAME = "routing_exchange_topic";
static string QUEUE_NAME = "routing_queue_topic1";
static void TopicConsumer1()
{
var conn = RabbitMQHelper.GetConnection();
var channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
channel.QueueDeclare(queue: QUEUE_NAME);
// 绑定routingKey
string routingKey = "europe.#";
channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKey);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"Topic Consumer1 收到消息: {message},时间:{DateTime.Now}");
};
channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
Console.ReadKey();
channel.Close();
conn.Close();
}
}
3.2 消费者2,绑定的routingkey为#.weather
,代码如下:
static string EXCHANGE_NAME = "routing_exchange_topic";
static string QUEUE_NAME = "routing_queue_topic2";
static void TopicConsumer2()
{
var conn = RabbitMQHelper.GetConnection();
var channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
channel.QueueDeclare(queue: QUEUE_NAME);
string routingKey = "#.weather";
// 可以绑定多个routingKey
channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKey);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"Topic Consumer2 收到消息: {message},时间:{DateTime.Now}");
};
channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
Console.ReadKey();
channel.Close();
conn.Close();
}
3.3 查看运行结果
(1)队列绑定到交换机的结果如图:
(2)消费者1的接收到的消息:
(3)消费者2的接收到的消息:
通过结果我们可以看到,消费者1和消费者2分别接收到了不同key的消息;
四、小结
1、topic模式,我们可以通过模糊匹配的,来接收自己想要的消息;使用Topic模式生产者在声明队列时需要制定消息到达队列方式为topic;
2、Consumer消费者,匹配消息时:#匹配一个或者多个关键字,*匹配一个关键字
3、参考文档:https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html