RabbitMQ交换机有四种类型:Direct,Fanout,Topic,Header
先简单介绍Direct交换机的代码实现
先创建连接
public class MQHelper { public IConnection GetConnection() { var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.UserName = "guest"; factory.Password = "guest"; return factory.CreateConnection(); } }
直流交换机代码:
public class DirectExchange { public void DirectPublish() { MQHelper mh = new MQHelper(); using (var conn = mh.GetConnection()) { using(IModel channel = conn.CreateModel()) { //声明队列 channel.QueueDeclare(queue: "DirectLogAll", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "DirectLogError", durable: true, exclusive: false, autoDelete: false, arguments: null); //声明交换机 channel.ExchangeDeclare(exchange: "DirectExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //日志类型 string[] logtypes = new string[] { "debug", "info", "warn", "error" }; foreach(string log in logtypes) { //绑定交换机和队列(所有日志类型队列) channel.QueueBind(queue: "DirectLogAll", exchange: "DirectExchange", routingKey: log); } //错误日志队列 channel.QueueBind(queue: "DirectLogError", exchange: "DirectExchange", routingKey: "error"); List<LogModel> list = new List<LogModel>(); //100条消息 for(int i = 1; i <= 100; i++) { if (i % 4 == 0) { list.Add(new LogModel() { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}条消息") }); }else if(i % 4 == 1) { list.Add(new LogModel() { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}条消息") }); } else if (i % 4 == 2) { list.Add(new LogModel() { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}条消息") }); } else { list.Add(new LogModel() { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}条消息") }); } } //发送 foreach(var log in list) { channel.BasicPublish(exchange: "DirectExchange", routingKey: log.LogType, basicProperties: null, body: log.Msg); //记录 Console.WriteLine($"{Encoding.UTF8.GetString(log.Msg)} 已发送"); } } } } }
这是模拟发送100条日志信息到RabbitMQ,四种类型各25条,根据路由键routingKey分别传到两条队列。其中日志消息
public class LogModel { public string LogType { get; set; } //日志类型 public byte[] Msg { get; set; } //消息正文 }
最终在RabbitMQ的本地localhost:15672上可以看到
这是发送消息到了RabbitMQ上待消费。
添加Direct消费模块
public class DirectExchangeConsumer { public void DirectConsume() { var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.UserName = "guest"; factory.Password = "guest"; using(var conn= factory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { //声明队列 channel.QueueDeclare(queue: "DirectLogAll", durable: true, exclusive: false, autoDelete: false, arguments: null); // channel.QueueDeclare(queue: "DirectLogError", durable: true, exclusive: false, autoDelete: false, arguments: null); //声明交换机 channel.ExchangeDeclare(exchange: "DirectExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //日志类型 string[] logtypes = new string[] { "debug", "info", "warn", "error" }; foreach (string log in logtypes) { //绑定交换机和队列(所有日志类型队列) channel.QueueBind(queue: "DirectLogAll", exchange: "DirectExchange", routingKey: log); } //错误日志队列 // channel.QueueBind(queue: "DirectLogError", exchange: "DirectExchange", routingKey: "error"); //消费消息 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"[{message}],写入文本"); }; //处理消息 channel.BasicConsume(queue: "DirectLogAll", autoAck: true, consumer: consumer); } } } }
虽然这里重复声明了交换机和队列,但由于连接一样,所以不会重复存在。
调用方法之后可以看到消息全部都被消费