一. Direct-Exchange模式
1. 含义
交换机类型设置为:ExchangeType.Direct
交换机和队列通过routingKey(路由key)进行绑定,发消息的时候每条消息也要指定routingKey(路由key),然后交换机根据该路由key进行匹配,该key绑定了几个Queue,那么该条消息就同时发送到几个队列中。
2. 使用场景
通过消息队列来写日志;
Info debug error warn :记录下来
error: 除了记录下来,还需要特殊处理,可能需要发送一个信息,发送一个邮件;
解决方案:通过路由key匹配不同的队列
队列1:专门用来记录日志
队列2:专门用来发邮件,发信息
3. 代码分享
生产者
/// <summary> /// DirectExchange路由 /// </summary> public class DirectExchange { public static void Show() { Console.ForegroundColor = ConsoleColor.Red; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服务在本地运行 factory.UserName = "guest";//用户名 factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //声明两个队列 channel.QueueDeclare(queue: "DirectExchangeLogAllQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "DirectExchangeErrorQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //1个路由 channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //4种路由key统一绑定DirectExchangeLogAllQueue队列, string[] logtypes = new string[] { "debug", "info", "warn", "error" }; foreach (string logtype in logtypes) { channel.QueueBind(queue: "DirectExchangeLogAllQueue", exchange: "DirectExChange", routingKey: logtype); } //路由key“error”再次绑定DirectExchangeErrorQueue队列 channel.QueueBind(queue: "DirectExchangeErrorQueue", exchange: "DirectExChange", routingKey: "error"); List<LogMsgModel> logList = new List<LogMsgModel>(); for (int i = 1; i <=20; i++) { if (i % 4 == 0) { logList.Add(new LogMsgModel() { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}条信息") }); } if (i % 4 == 1) { logList.Add(new LogMsgModel() { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}条信息") }); } if (i % 4 == 2) { logList.Add(new LogMsgModel() { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}条信息") }); } if (i % 4 == 3) { logList.Add(new LogMsgModel() { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}条信息") }); } } Console.WriteLine("生产者发送20条日志信息"); //发送日志信息 foreach (var log in logList) { channel.BasicPublish(exchange: "DirectExChange", routingKey: log.LogType, basicProperties: null, body: log.Msg); Console.WriteLine($"{Encoding.UTF8.GetString(log.Msg)} 已发送~~"); } } } } public class LogMsgModel { public string LogType { get; set; } public byte[] Msg { get; set; } } }
DirectExchange.Show();
消费者
1 public class DirectExchange 2 { 3 /// <summary> 4 /// 队列1--用于各种类型日志信息 5 /// </summary> 6 public static void Show1() 7 { 8 Console.ForegroundColor = ConsoleColor.Green; 9 10 var factory = new ConnectionFactory(); 11 factory.HostName = "localhost";//RabbitMQ服务在本地运行 12 factory.UserName = "guest";//用户名 13 factory.Password = "guest";//密码 14 using (var connection = factory.CreateConnection()) 15 { 16 using (IModel channel = connection.CreateModel()) 17 { 18 //channel.QueueDeclare(queue: "DirectExchangeLogAllQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); 19 //channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); 20 //string[] logtypes = new string[] { "debug", "info", "warn", "error" }; 21 //foreach (string logtype in logtypes) 22 //{ 23 // channel.QueueBind(queue: "DirectExchangeLogAllQueue", 24 // exchange: "DirectExChange", 25 // routingKey: logtype); 26 //} 27 //消费队列中的所有消息; 28 var consumer = new EventingBasicConsumer(channel); 29 consumer.Received += (model, ea) => 30 { 31 var body = ea.Body; 32 var message = Encoding.UTF8.GetString(body.ToArray()); 33 Console.WriteLine($"【{message}】,写入文本~~"); 34 }; 35 //处理消息 36 channel.BasicConsume(queue: "DirectExchangeLogAllQueue", autoAck: true, consumer: consumer); 37 Console.ReadLine(); 38 } 39 } 40 } 41 42 43 /// <summary> 44 /// 队列2--用于error类型日志进行单独处理 45 /// </summary> 46 public static void Show2() 47 { 48 Console.ForegroundColor = ConsoleColor.Green; 49 50 var factory = new ConnectionFactory(); 51 factory.HostName = "localhost";//RabbitMQ服务在本地运行 52 factory.UserName = "guest";//用户名 53 factory.Password = "guest";//密码 54 using (var connection = factory.CreateConnection()) 55 { 56 using (IModel channel = connection.CreateModel()) 57 { 58 //channel.QueueDeclare(queue: "DirectExchangeErrorQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); 59 //channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); 60 //路由key“error”再次绑定DirectExchangeErrorQueue队列 61 //channel.QueueBind(queue: "DirectExchangeErrorQueue", 62 // exchange: "DirectExChange", 63 // routingKey: "error"); 64 65 66 //消费队列中的所有消息; 67 var consumer = new EventingBasicConsumer(channel); 68 consumer.Received += (model, ea) => 69 { 70 var body = ea.Body; 71 var message = Encoding.UTF8.GetString(body.ToArray()); 72 Console.WriteLine($"【{message}】,发送邮件~~"); 73 }; 74 //处理消息 75 channel.BasicConsume(queue: "DirectExchangeErrorQueue", autoAck: true, consumer: consumer); 76 Console.ReadLine(); 77 } 78 } 79 } 80 81 }
{ Thread.Sleep(2000); Task.Run(() => { DirectExchange.Show1(); }); Task.Run(() => { DirectExchange.Show2(); }); }
运行结果
二. Fanout-Exchange模式
1.含义
交换机类型设置为:ExchangeType.Fanout
这种模式忽略routingKey,消息从客户端发出,只要queue与exchange有绑定,那么不管你的Routingkey是什么,都会将消息分发给所有与该exchang绑定的队列中。
2. 使用场景
典型的发布订阅模式,也可以叫做观察者模式. 比如博主有很多粉丝,博主每发一条消息,所有关注的粉丝都能收到推送(每个粉丝对应一个队列)
3. 代码分享
生产者
public class FanoutExchange { /// <summary> /// 博主发博客 /// </summary> public static void Show() { Console.ForegroundColor = ConsoleColor.Red; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服务在本地运行 factory.UserName = "guest";//用户名 factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(queue: "fansQueue1", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "fansQueue2", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); channel.QueueBind(queue: "fansQueue1", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null); channel.QueueBind(queue: "fansQueue2", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null); int i = 1; while (true) { var message = $"博客{i}"; var body = Encoding.UTF8.GetBytes(message); //基本发布 channel.BasicPublish(exchange: "FanoutExchange", routingKey: string.Empty, //这里忽略路由key设置什么,都会向所有的队列发送 basicProperties: null, body: body); Console.WriteLine($"{message}已发送到队列"); i++; Thread.Sleep(1000); } } } } }
FanoutExchange.Show();
消费者
public class FanoutExchange { /// <summary> /// 粉丝1的队列 /// </summary> public static void Show1() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服务在本地运行 factory.UserName = "guest";//用户名 factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { //channel.QueueDeclare(queue: "fansQueue1", durable: true, exclusive: false, autoDelete: false, arguments: null); //channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); //channel.QueueBind(queue: "fansQueue1", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null); //定义消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"粉丝1收到推送:{message}"); }; Console.WriteLine("粉丝1在浏览博客中.........."); //处理消息 channel.BasicConsume(queue: "fansQueue1", autoAck: true, consumer: consumer); Console.ReadLine(); } } } /// <summary> /// 粉丝2的队列 /// </summary> public static void Show2() { Console.ForegroundColor = ConsoleColor.Yellow; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服务在本地运行 factory.UserName = "guest";//用户名 factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { //channel.QueueDeclare(queue: "fansQueue2", durable: true, exclusive: false, autoDelete: false, arguments: null); //channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); //channel.QueueBind(queue: "fansQueue2", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null); //定义消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"粉丝2收到推送:{message}"); }; Console.WriteLine("粉丝2在浏览博客中.........."); //处理消息 channel.BasicConsume(queue: "fansQueue2", autoAck: true, consumer: consumer); Console.ReadLine(); } } } }
{ Thread.Sleep(2000); Task.Run(() => { FanoutExchange.Show1(); }); Task.Run(() => { FanoutExchange.Show2(); }); }
运行结果
三. Topic-Exchange模式
1. 含义
交换机类型设置为:ExchangeType.Topic
这种模式可以定制key,相当于在DirectExchange的基础上增加了对key的模糊搜索,规则如下,主要是两个关键符号
*,代表任意的一个词。例如topic.zlh.*,他能够匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....
#,代表任意多个词。例如topic.#,他能够匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....
2. 使用场景
分组
3. 代码分享
生产者
public class TopicExchange { public static void Show() { Console.ForegroundColor = ConsoleColor.Red; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服务在本地运行 factory.UserName = "guest";//用户名 factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "newsQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange", routingKey: "China.#", arguments: null); //多个词匹配 channel.QueueBind(queue: "newsQueue", exchange: "TopicExchange", routingKey: "*.news", arguments: null); //1个词匹配 { string message = "来自中国的新闻消息1"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.news", basicProperties: null, body: body); //同时匹配ChineQueue和newQueue Console.WriteLine($"消息【{message}】已发送到队列"); } { string message = "来自中国的天气消息1"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.weather.news", basicProperties: null, body: body); //仅匹配ChinaQueue Console.WriteLine($"消息【{message}】已发送到队列"); } { string message = "来自中国的新闻消息2"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "TopicExchange", routingKey: "msg.news", basicProperties: null, body: body); //仅匹配newsQueue Console.WriteLine($"消息【{message}】已发送到队列"); } { string message = "来自美国的天气消息2"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "TopicExchange", routingKey: "usa.weather", basicProperties: null, body: body); //谁都不匹配 Console.WriteLine($"消息【{message}】已发送到队列"); } } } } }
TopicExchange.Show();
消费者
public class TopicExchange { /// <summary> /// 读取ChinaQueue队列消息 /// </summary> public static void Show1() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服务在本地运行 factory.UserName = "guest";//用户名 factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); //channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange", routingKey: "China.#", arguments: null); //定义消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"ChinaQueue队列中消费成功:{message}"); }; //处理消息 channel.BasicConsume(queue: "ChinaQueue", autoAck: true, consumer: consumer); } } } /// <summary> /// 读取newsQueue队列消息 /// </summary> public static void Show2() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服务在本地运行 factory.UserName = "guest";//用户名 factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //定义消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"newsQueue队列中消费成功:{message}"); }; //处理消息 channel.BasicConsume(queue: "newsQueue", autoAck: true, consumer: consumer); } } } }
{ Thread.Sleep(2000); Task.Run(() => { TopicExchange.Show1(); }); Task.Run(() => { TopicExchange.Show2(); }); }
运行结果
四. Header-Exchange模式
1. 含义
交换机类型设置为:ExchangeType.Headers
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对以及x-match参数,x-match参数是字符串类型,可以设置为any或者all。
A. 如果设置为any,只要匹配到了headers表中的任何一对键值即可
B. 如果设置为all,则代表需要全部匹配
2.使用场景
如下案例,All队列中存储了1条消息test1,Any队列中存储了2条消息,test3和test4
3. 代码分享
生产者
public class HeaderExchange { public static void Show() { Console.ForegroundColor = ConsoleColor.Red; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服务在本地运行 factory.UserName = "guest";//用户名 factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "HeaderExchange", type: ExchangeType.Headers, durable: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "HeaderExchangeAllqueue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "HeaderExchangeAnyqueue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: "HeaderExchangeAllqueue", exchange: "HeaderExchange", routingKey: string.Empty, arguments: new Dictionary<string, object> { { "x-match","all"}, { "teacher","ypf"}, { "pass","123"}}); Console.WriteLine("生产者准备就绪...."); { string message = "x-match=all,teacher和pass都相同时发送的消息,test1"; var props = channel.CreateBasicProperties(); props.Headers = new Dictionary<string, object>() {{ "teacher","ypf"}, { "pass","123"}}; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "HeaderExchange", routingKey: string.Empty, basicProperties: props, body: body); Console.WriteLine($"消息【{message}】已发送"); //存入HeaderExchangeAllqueue队列成功 } { string message = "x-match=all,teacher和pass有一个不相同时发送的消息,test2"; var props = channel.CreateBasicProperties(); props.Headers = new Dictionary<string, object>() { { "teacher","ypf"}, { "pass","456"} }; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "HeaderExchange", routingKey: string.Empty, basicProperties: props, body: body); Console.WriteLine($"消息【{message}】已发送"); //存入HeaderExchangeAllqueue队列失败 } Console.WriteLine("*****************************888888888*********************************"); { channel.QueueBind(queue: "HeaderExchangeAnyqueue", exchange: "HeaderExchange", routingKey: string.Empty, arguments: new Dictionary<string, object> { { "x-match","any"}, { "teacher","lmr"}, { "pass","123456"},}); string msg = "x-match=any,teacher和pass完全相同时发送的消息,test3"; var props = channel.CreateBasicProperties(); props.Headers = new Dictionary<string, object>() { { "teacher","lmr"}, { "pass","123456"} }; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "HeaderExchange", routingKey: string.Empty, basicProperties: props, body: body); Console.WriteLine($"消息【{msg}】已发送"); //存入HeaderExchangeAnyqueue队列成功 } { string msg = "x-match=any,teacher和pass有一个不相同时发送的消息,test4"; var props = channel.CreateBasicProperties(); props.Headers = new Dictionary<string, object>() { { "teacher","lmr"}, { "pass","456"} }; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "HeaderExchange", routingKey: string.Empty, basicProperties: props, body: body); Console.WriteLine($"消息【{msg}】已发送"); //存入HeaderExchangeAnyqueue队列成功 } } } Console.ReadKey(); } }
HeaderExchange.Show();
消费者
public class HeaderExchange { /// <summary> /// 读取HeaderExchangeAllqueue队列消息 /// </summary> public static void Show1() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服务在本地运行 factory.UserName = "guest";//用户名 factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //定义消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"HeaderExchangeAllqueue队列中消费成功:{message}"); }; //处理消息 channel.BasicConsume(queue: "HeaderExchangeAllqueue", autoAck: true, consumer: consumer); } } } /// <summary> /// 读取HeaderExchangeAnyqueue队列消息 /// </summary> public static void Show2() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服务在本地运行 factory.UserName = "guest";//用户名 factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //定义消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"HeaderExchangeAnyqueue队列中消费成功:{message}"); }; //处理消息 channel.BasicConsume(queue: "HeaderExchangeAnyqueue", autoAck: true, consumer: consumer); } } } }
{ Thread.Sleep(2000); Task.Run(() => { HeaderExchange.Show1(); }); Task.Run(() => { HeaderExchange.Show2(); }); }
运行结果
五. 剖析持久化机制
1. 触发条件
需要设置交换机、队列、消息均为持久化。
2. 现象
(1). 下面路径存放的是virtual,有几个virtual,就有几个文件夹
C:UsersDELLAppDataRoamingRabbitMQdb abbit@DESKTOP-DR3FU9S-mnesiamsg_storesvhosts
(2). 下面的msg_store_persistent文件夹存放的是持久化数据,
关闭RabbitMQ服务,会有很多文件,打开后,又没了,说明启动服务,磁盘上的数据又被加载到了硬盘
!
- 作 者 : Yaopengfei(姚鹏飞)
- 博客地址 : http://www.cnblogs.com/yaopengfei/
- 声 明1 : 如有错误,欢迎讨论,请勿谩骂^_^。
- 声 明2 : 原创博客请在转载时保留原文链接或在文章开头加上本人博客地址,否则保留追究法律责任的权利。