介绍
RabbitMQ是一个消息实体服务(broker):它接收及转发消息。你可以把它想象成一个邮局:当你把你想要寄送的邮件放进邮箱里时,你能够确信邮局的派送员最终会把你的这封邮局送到这信的收件者手中。以这个类比来说,RabbitMQ就是邮箱,邮局和邮局的派送员。
RabbitMQ和这个邮局最大的区别,是RabbitMQ不是与纸张打交道,而是接受、储存和转发二进制数据块——消息。
RabbitMQ 和一般通信中使用的一些术语:
生产就只是意味着发送。一个发送消息的程序就是生产者:
队列是存在于RabbitMQ里的一个邮箱的名字。尽管消息流过RabbitMQ和你的应用程序,但他们只被存储于队列里。队列只受限于主机的内存和磁盘。它本质上是一个很大的消息缓冲区。一些生成者能够发送消息到一个队列里,同时一些消费者可以尝试从那个队列里接受数据。下图就是我们描述的一个队列:
消费有着类似于接收的意思。一个消费者就是一个主要等待接收消息的程序:
注意:生产者、消费者,以及消息实体服务不一定是寄居于同一台主机;事实上在大部分应用中,它们不是在同一台主机里。
“Hello World”
注意:这个教程中的例子,假设你已经安装了RabbitMQ,而且运行在本地的标准端口(5672)。万一你使用了不同的主机,端口或证书,那你需要调整连接设置。
(使用 .NET/C# Client)
在这部分教程中,我们将用C#来写两个程序:一个用来发送一条消息的生成者,和一个接受消息并把消息打印出来的消费者。我们将忽略.net client API 中的一些细节,作为一个开始我们只专注于这些很简单的东西。这是一个“Hello World”消息。
在下面的示意图中,“P”就是我们的生成者,"C"就是我们的消费者。中间的盒子就是一个队列,队列就是RabbitMQ 为消费者保留的一个消息缓冲区。
.Net 客户端类库
RabbitMQ 可以使用多种协议。这个教程中使用AMQP 0-9-1,这是一个消息通信中开放而多用途的协议。对于RabbitMQ,可以使用多种不同的语言写成的客户端。我们这里将使用RabbitMQ提供的.net 客户端。
这个客户端支持.Net Core,跟支持.net framework4.5.1以上一样。在这个教程中将使用RabbitMQ .net client 5.0 和.net core,所以你必须确保你已经安装了,而且在你的PATH里。
你同样可以使用.net framework来完成这个教程中的示例,但是建立项目的步骤将会不一样。
RabbitMQ .NET client 5.0 及最新版通过nuget发布。
这个教程假设你的windows是使用了powershell。在MacOS和Linux几乎任何shell都可以。
建立项目
首先我们先来确认.net core toolchain 在PATH:
dotnet --help
输入这条命令应该会产生一条帮助信息。
现在让我们生成两个项目。一个为发布者,一个是消费者:
dotnet new console --name Send
mv Send/Program.cs Send/Send.cs
dotnet new console --name Receive
mv Receive/Program.cs Receive/Receive.cs
这些命令将生成两个名为Send和Receive的目录。
然后我们添加客户端依赖项目。
cd Send
dotnet add package RabbitMQ.Client
dotnet restore
cd ../Receive
dotnet add package RabbitMQ.Client
dotnet restore
现在我们已经建立了我们需要的.net项目,我们就可以在这两个项目中写一些代码了。
发送
我们将把我们的消息发布者(发送者)取名为send.cs,消费者(接收者)取名为Receive.cs。这个发布者将连接到RabbitMQ,然后发送一条消息就退出。
在Send.cs中,我们需要引用一些命名空间:
1
2
3
|
using System; using RabbitMQ.Client; using System.Text; |
建立这个类:
1
2
3
4
5
6
7
|
class Send { public static void Main() { ... } } |
然后我们创建一个到服务器的连接:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
class Send { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using ( var connection = factory.CreateConnection()) { using ( var channel = connection.CreateModel()) { ... } } } } |
这个连接抽象了套接字连接,和负责协议版本协商及认证等等。这里我们连接到本地的消息服务实体——这里就是localhost。如果我们想连接到在不同机器上的消息服务实体的话,我们只要在这里指定那机器的名称或者IP。
然后是创建一个通道,大部分API处理任务都是在这里完成的。
对于发送,我们必须声明一个队列用来发送消息;然后我们发布一条消息到这个队列中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
using System; using RabbitMQ.Client; using System.Text; class Send { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using ( var connection = factory.CreateConnection()) using ( var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello" , durable: false , exclusive: false , autoDelete: false , arguments: null ); string message = "Hello World!" ; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "" , routingKey: "hello" , basicProperties: null , body: body); Console.WriteLine( " [x] Sent {0}" , message); } Console.WriteLine( " Press [enter] to exit." ); Console.ReadLine(); } } |
声明一个队列是幂等的——它只有不存在的情况下才会创建。消息内容是一个二进制数组,所以你可以发送任何你想发送的消息。
当你运行完上面编写好的代码后,这个通道和连接就将会被释放掉。
发送不起作用?
如果这是你第一次使用RabbitMQ,而且你没有看到你发送的消息,你可能会抓耳挠腮地在想,到底是哪里出错了。这可能是消息服务实体启动时没有足够的可用磁盘空间(默认是至少需要50M的可用空间),而导致它拒绝接受消息。如果需要的话,检查消息服务实体日志来确认和减少这种限制。这个 configuration file documentation(配置文档)将告诉你如何设置disk_free_limit
接受
上面是我们的发布者。我们的消费者是从RabbitMQ中拉取消息,接受者不同于发布者只发布一条消息,我们将保持接收者一直监听消息及将其打印出来。
这个代码(Receive.cs)几乎使用了跟Send一样的命名空间:
1
2
3
4
|
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; |
创建一个接收方和创建发布者是一样的;我们先打开连接和一个通道,然后声明一个我们制定要去哪里拉取消息的队列。注意这个要匹配Send中发布消息的队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
class Receive { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using ( var connection = factory.CreateConnection()) { using ( var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello" , durable: false , exclusive: false , autoDelete: false , arguments: null ); ... } } } } |
注意,我们这儿一样声明了一个队列。因为我们可能在发布消息之前就启动了消费者。我们要确认我们在尝试从队列中拉取消息前,这个队列应该是已经存在了。
我们将告诉服务器把这个队列中的消息转发给我们。它将异步推送给我们消息,我们提供一个回调方法。这个就是EventingBasicConsumer.Received事件来处理的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; class Receive { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using ( var connection = factory.CreateConnection()) using ( var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello" , durable: false , exclusive: false , autoDelete: false , arguments: null ); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine( " [x] Received {0}" , message); }; channel.BasicConsume(queue: "hello" , autoAck: true , consumer: consumer); Console.WriteLine( " Press [enter] to exit." ); Console.ReadLine(); } } } |
把生产者和消费者组合在一起
打开这两个终端。
运行消费者:
cd Receive dotnet run
然后运行生成者:
cd Send dotnet run
消费者将取得发布者发布到RabbitMQ的消息,并将它打印出来。消费者将继续保持运行,以等待新的消息(使用Ctrl+C来停止),所以可以尝试在另一个终端运行发布者。
PS:这是第一次翻译,翻译得不好,原因之一是自己对于RabbitMQ不熟,由于目前工作中右接触到RabbitMQ,所以自己就正在补这方面的知识,自己也顺便尝试一下翻译,自己感觉翻译真的是个有点揪心的事,很多英文句子看起来很简单,但是翻译出来的中文,却总觉得怪怪的。
如果你是使用.net framwork,一样是通过nuget获取RabbitMQ.Client的引用,其它的一样,这个在博客园有很多的例子,可以自己去找一下。