官方的使用教程(测试运行)
1、"Hello World!" -- 发送接收
We're about to tell the server to deliver us the messages from the queue. Since it will push us messages asynchronously, we provide a callback. That is what EventingBasicConsumer.Received event handler does.
我们将告诉服务器从队列中发给我们消息。服务器异步地推送给我们消息,我们提供一个回调。这就是 EventingBasicConsumer.Received 事件处理程序做的事。
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Send_Simple { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); string message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Send {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Receive_Simple { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
官方测试用例:
Receive_Simple.exe
Send_Simple.exe
结果:Receive 可以接受到消息
2、Work Queues -- 单发多接,只有一个能接受到消息
One of the advantages of using a Task Queue is the ability to easily parallelise work.
简单地并行工作能力是使用 Task Queue 的优势之一。
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Send_Work_queues { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { // Message durability(消息持久机制),关键参数 durable channel.QueueDeclare(queue: "task_queue", durable: true /* 持久化机制 */, exclusive: false, autoDelete: false, arguments: null); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); // 如果 channel.QueueDeclare 中参数 durable 设置为 true,必须加上持久化语句 var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); Console.WriteLine(" [x] Send {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } } }
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Receive_Work_queues { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { // Message durability(消息持久机制),关键参数 durable channel.QueueDeclare(queue: "task_queue", durable: true /* 如果 生产者 设置为 true,相应的 消费者 也需要设置为 true 才会生效 */, exclusive: false, autoDelete: false, arguments: null); // Fair dispatch(公平触发机制) channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); // 如果 channel.BasicConsume 中参数 noAck 设置为 false,必须加上消息确认语句 // Message acknowledgment(消息确认机制作用) // consumer dies(its channel is closed, connection is closed, or TCP connection is lost) channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", noAck: false /* Message acknowledgment(消息确认机制) */, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
官方测试用例:
Receive_Work_queues.exe
Receive_Work_queues.exe
Send_Work_queues.exe
结果:有且只有一个 Receive 可以接受到消息
3、Publish/Subscribe -- 单发多接,每一个都可以接收到消息
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Send_Publish_Subscribe { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello World!"); } } }
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Receive_Publish_Subscribe { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); // Temporary queues var queueName = channel.QueueDeclare().QueueName; // Bindings channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
官方测试用例:
Receive_Publish_Subscribe.exe
Receive_Publish_Subscribe.exe
Send_Publish_Subscribe.exe
结果:两个 Receive 都可以接受到信息
4、Routing -- 单发多接,满足路由规则的可以接收到消息
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Send_Routing { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var severity = (args.Length > 0) ? args[0] : "info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body); Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Receive_Routing { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var queueName = channel.QueueDeclare().QueueName; if (args.Length < 1) { Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; } foreach (var severity in args) { channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); } Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
官方测试用例:
Receive_Routing.exe info
Receive_Routing.exe warning
Receive_Routing.exe error
Receive_Routing.exe info warning error
Send_Routing.exe error "Run. Run. Or it will explode."
结果:只有 Receive_Routing.exe error 和 Receive_Routing.exe info warning error 可以接受到消息
5、Topics -- 单发多接,匹配规则的可以接收到消息
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Send_Topics { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); var routingKey = (args.Length > 0) ? args[0] : "anonymous.info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "topic_logs", routingKey: routingKey, basicProperties: null, body: body); Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message); } } } } }
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Receive_Topics { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); var queueName = channel.QueueDeclare().QueueName; if (args.Length < 1) { Console.Error.WriteLine("Usage: {0} [binding_key...]", Environment.GetCommandLineArgs()[0]); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; } foreach (var bindingKey in args) { channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: bindingKey); } Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
官方测试用例:
Receive_Topics.exe "#"
Receive_Topics.exe "kern.*"
Receive_Topics.exe "*.critical"
Receive_Topics.exe "kern.*" "*.critical"
Send_Topics.exe "kern.critical" "A critical kernel error"
结果:每一个 Receive 都可以接收到一次消息
6、RPC
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Send_RPC { class RPCClient { private IConnection connection; private IModel channel; private string replyQueueName; private QueueingBasicConsumer consumer; public RPCClient() { var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue: replyQueueName, noAck: true, consumer: consumer); } public string Call(string message) { var corrId = Guid.NewGuid().ToString(); var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName; props.CorrelationId = corrId; var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); if (ea.BasicProperties.CorrelationId == corrId) { return Encoding.UTF8.GetString(ea.Body); } } } public void Close() { connection.Close(); } } class Program { static void Main(string[] args) { var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close(); } } }
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Receive_RPC { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); // Fair dispatch(公平触发机制) channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue: "rpc_queue", noAck: false, consumer: consumer); Console.WriteLine(" [x] Awaiting RPC requests"); while (true) { string response = null; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine(" [.] fib({0})", message); response = fib(n).ToString(); } catch (Exception e) { Console.WriteLine(" [.] " + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } } } } /// <summary> /// Assumes only valid positive integer input. /// Don't expect this one to work for big numbers, /// and it's probably the slowest recursive implementation possible. /// </summary> private static int fib(int n) { if (n == 0 || n == 1) { return n; } return fib(n - 1) + fib(n - 2); } } }
官方测试用例:
Receive_RPC.exe
Send_RPC.exe
Send_RPC.exe
结果:两个 Send(correlationId) 都发送请求,每个 Send 只会打印对应 correlationId 请求的响应