AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,所以安装RabbitMq服务器端前需要先安装Erlang。以下记录在windows 7环境下的学习笔记。
一、RabbitMQ下载安装
Erlang下载地址:https://www.erlang.org/downloads
RabbitMQ服务端下载地址:https://www.rabbitmq.com/install-windows.html
安装好Erlang和RabbitMQ之后都需要新建系统变量,计算机-->属性-->高级系统设置-->环境变量-->系统变量新建
1、新建Erlang和RabbitMQ系统变量
建系统变量时值为安装目录,我的安装目录:
Erlang:C:Program Fileserl10.4
RabbitMQ:C:Program FilesRabbitMQ Server abbitmq_server-3.7.15
新建系统变量后,还需要在已有系统变量PATH中增加Erlang和RabbitMQ的配置(注:是追加,PATH原有的值不要删除),以分号分隔。
Erlang:%ERLANG_HOME%in
RabbitMQ:%RABBITMQ_SERVER%sbin
2、启动RabbitMQ的管理界面
在cmd中运行如下命令:
"C:Program FilesRabbitMQ Server abbitmq_server-3.7.15sbin abbitmq-plugins.bat" enable rabbitmq_management
默认会生成一个账号guest,密码也是guest。
查看已有用户及用户的角色命令:
rabbitmqctl.bat list_users
RabbitMQ安装后是默认启动服务的,若要手动启动、停止服务,执行如下命令:
启动服务:
net start rabbitmq
停止服务:
net stop rabbitmq
3、登录RabbitMQ的管理界面
默认地址是本地端口15672,地址:http://127.0.0.1:15672,用guest账号登录。
二、Rabbit介绍
消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
交换机和交换机类型
Default exchange(默认交换机):一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
Direct exchange(直连交换机):根据消息携带的路由键(routing key)将消息投递给对应队列的。
Fanout exchange(扇型交换机):将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
Topic exchange(主题交换机):通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
routing_key格式是以点号“."分割的字符表。对于routing_key,有两个特殊字符(在正则表达式里叫元字符):
*代表任意一个单词
#代表0个或者多个单词
由于有"*"和"#",Topic exchange 非常强大并且可以转化为其他的exchange:
如果binding_key是"#",它会接收所有的Message,不管routing_key是什么,就像是Fanout exchange。
如果"*"和"#"都没有被使用,那么topic exchange就变成了Direct exchange。
Headers exchange(头交换机):有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
三、Hello Word
创建2个项目,1个发送端Send,1个接收端Receive,通过Nuget安装RabbitMQ.Client。
发送端代码:
var message ="Hello Word "; var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); for (int i = 0; i < 10; i++) { var body = Encoding.UTF8.GetBytes(message + i); channel.BasicPublish(exchange: "",//使用默认交换机 routingKey: "hello", basicProperties: null, //new BasicProperties() { DeliveryMode = 2 }, body: body); } }
接收端代码:
var factory = new ConnectionFactory() { HostName = "localhost" }; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//限制流量,向消费者一条条发送消息 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var directory = AppContext.BaseDirectory + "/Log"; if (!Directory.Exists(directory)) { Directory.CreateDirectory(directory); } using (FileStream fs = System.IO.File.Open(directory + $"/{DateTime.Now:yyyyMMdd}{ea.ConsumerTag}.txt", FileMode.Append, FileAccess.Write, FileShare.ReadWrite)) { var sr = new StreamWriter(fs); sr.WriteLine($"[{DateTime.Now:yyyyMMddHHmmssfff}]"); sr.WriteLine(ea.ConsumerTag + ":" + message); sr.Flush(); sr.Close(); Thread.Sleep(100);//可复制一份代码,将等待的时间设置成200,测试多消费者 } channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手动消息确认 }; channel.BasicConsume(queue: "hello", autoAck: false,//消息确认设置成手动 consumer: consumer);
更多的内容可以看以下文章:
https://blog.csdn.net/anzhsoft2008/column/info/rabbitmq
http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html