一.简介
EasyNetQ是一个容易使用,针对RabbitMQ的.Net Api,它提供了一个尽可能简洁的和适用于RabbitMQ的.Net类库。在EasyNetQ中,消息必须使用.Net class来定义。想发送不同的消息类型需要用不同的class来表示,这个类必须是public的,且带一个默认构造函数和可读属性。
它通过消息的类型来路由。当你发布一个消息,EasyNetQ会检查消息类型,然后给它一个基于类型名称、命名空间和程序集的路由键。在消费者端,消费者去订阅这个类型。订阅之后,消费者就可以得到这个类型的消息。
默认情况下,EasyNetQ使用Newtonsoft.Json来序列化.Net 类为Json,这样的好处是消息对于人类的可读性好。因此可以通过类似RabbitMQ管理端应用来调试消息问题。
EasyNetQ是一个在RabbitMQ.Client类库上提供服务的组件集合。做了序列化、错误处理、线程管理、连接管理等功能。通过一个Mini-Ioc容器组织在一起,你可以很容易地用自己的方法去替换这些组件。下面的是官方给的结构图,能够很好地解释这个组件的结构。
二.EasyNetQ的使用
自行安装RabbitMQ环境。在.Net项目中安装EasyNetQ包。
定义一个消息类。
public class TextMessage { public string Text { get; set; } }
1.创建连接
使用EasyNetQ连接RabbitMQ,是在应用程序启动时创建一个IBus对象,并且在应用关闭的时候释放该对象。
RabbitMQ的连接是基于IBus接口,当IBus接口的方法被调用,连接才会开启。创建一个IBus对象方法如下:
var connStr = "host=127.0.0.1;virtualHost=EDCVHOST;username=admin;password=123456"; var bus = RabbitHutch.CreateBus(connStr)
与RabbitMQ服务器的延迟连接由IBus接口表示,创建连接的连接字符串格式为key=value的键值对组成,每一个用分号(;)分割。
host:例子host=localhost或者host=www.xx.com,如果用到集群配置,那么可以用逗号将服务地址分割开来,如host=a.com,b.com,c.com。
virtualHost:虚拟主机,默认为“/”。
username:用户登录名。
password:用户登录密码。
requestedHeartbeat:心跳设置,默认是10秒。
prefetchcount:默认是50秒。
publisherConfirms:默认是false。
persistentMessages:消息持久化。默认是true。
product:产品名。
platform:平台。
timeout:默认为10秒。
2.关闭连接
bus.Dispose();
要关闭连接,只需要简单地处理总线,这将关闭EasyNetQ使用的连接、渠道、消费者和其它所有资源。
3.发布消息和订阅消息
EasyNetQ支持最简单的消息模式是发布和订阅。发布消息后,任意消费者可以订阅该消息,也可以多个消费者订阅,不需要额外配置。创建一个IBus对象,然后创建一个可序列化的.Net class,调用publish方法即可。
bus.PubSub.Publish(new TextMessage { Text = "hello world"});
EasyNetQ提供了消息订阅,当调用Subscribe方法的时候,EasyNetQ会创建一个用于接受消息的队列,不过与消息发布不同的是,消息订阅增加了一个参数,subscribe_id。
bus.PubSub.Subscribe<TextMessage>("my_test_subscriptionid", HandleTextMessage); public static void HandleTextMessage(TextMessage textMessage) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("Got message: {0}", textMessage.Text); Console.ResetColor(); }
第一个参数是订阅id,另外一个是delegate参数,用于处理接收到的消息。这里要注意的是,subscribe_id参数很重要,假如开发者用同一个subscribe_id订阅了同一种消息类型两次或者多次,RabbitMQ会以轮询的方式给每个订阅的队列发送消息。接收到之后,其它队列就接收不到该消息。如果是用不同的subscribe_id订阅同一种消息类型,那么生成的每一个队列都会收到该消息。
需要注意的是,在处理收到的消息时,不要占用太多时间,否则会影响消息的处理效率,所以,遇到占用长时间的处理方法,最好异步处理。
4.Request和Response
发送请求,调用IBus中Rpc的Request方法。
using (var bus = RabbitHutch.CreateBus(connStr)) { var input = ""; Console.WriteLine("Please enter a message. 'Quit' to quit."); while ((input = Console.ReadLine()) != "Quit") { var myRequest = new MyRequest { Text = input }; var response = bus.Rpc.Request<MyRequest, MyResponse>(myRequest); Console.WriteLine(response.Text); } }
这里我们创建了一个MyRequest的类型的请求,调用了Request方法,用这个消息作为参数。当response得到响应,就将response的内容输出。
写一个响应请求的服务。
using (var bus = RabbitHutch.CreateBus(connStr)) { bus.Rpc.Respond<MyRequest, MyResponse>(request => new MyResponse { Text = "Responding to " + request.Text }); Console.WriteLine("Listening for messages. Hit <return> to quit."); Console.ReadLine(); }
Response带有参数,一个delegate参数,它能接收请求并返回响应。
5.Send和Recevie
Send/Receive模式是特别针对通过命名队列来设计的。它不用假定什么样的消息类型才能够发送到这个队列。这意味着可以发送不同类型的消息到相同的队列中。
使用IBus对象中SendReceive的Send方法,指定队列名和消息自身。
using (var bus = RabbitHutch.CreateBus(connStr)) { var input = ""; Console.WriteLine("Please enter a message. 'Quit' to quit."); while ((input = Console.ReadLine()) != "Quit") { //发送类型为TextMessage的消息 bus.SendReceive.Send("my.queue", new TextMessage { Text = input }); //发送类型为TextMessage2的消息 bus.SendReceive.Send("my.queue", new TextMessage2 { Text = input + "233" }); } }
消息接收,使用IBus对象中SendReceive的Receive方法。在同一个队列中,可以接收一种类型的消息,也可以接收多种类型的消息。
using (var bus = RabbitHutch.CreateBus(connStr)) { //单一接收类型为TextMessage的send bus.SendReceive.Receive<TextMessage>("my.queue", message => Console.WriteLine("Message:{0}", message.Text)); //同时接收类型为TextMessage、TextMessage2的send bus.SendReceive.Receive("my.queue", x => x .Add<TextMessage>(message => Console.WriteLine("Message:{0}", message.Text)) .Add<TextMessage2>(message => Console.WriteLine("Message2:{0}", message.Text)));
Console.WriteLine("Listening for messages. Hit <return> to quit.");
Console.ReadLine(); }
这个例子重复接收类型为TextMessage的消息,消费过了就没得再消费了。
三.总结
1.EasyNetQ的消息模式有PublisherSubscribe,Request/Response,Send/Receive。
2.PublisherSubscribe是发布和订阅,生产者发布了某类型消息,消费者订阅了就会实时监听这个类型的消息,然后获取。
3.Request/Response是请求和返回。生产者和消费者同时定义请求类型和返回类型。当生产者发送了一条请求类型的消息,消费者就会获取这条请求消息并发送一条返回消息,生产者就得到这条返回消息。感觉这就像是发布和订阅,只不过是类似相互发布和订阅。
4.Send/Receive是发送和接收。它与另外两种模式不同的是,它不是以消息类型为消息生产和消费的依据,而是以队列名为依据,这就可以让它在同一队列中发送和接收不同类型的消息。