using RabbitMQ.Client; using System; using System.Text; using System.Threading; namespace SampleStack.RabbitMQ.Producer { class Program { static void Main(string[] args) { for (var i = 0; i < 1000; i++) { using (var connection = new ConnectionFactory() { HostName = "localhost" }.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "work_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); // durable: true -> 队列持久性 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // Persistent = true -> 消息持久性 var message = i.ToString(); Console.WriteLine(message); channel.BasicPublish(exchange: "", routingKey: "work_queue", basicProperties: properties, body: Encoding.UTF8.GetBytes(message)); } Thread.Sleep(1000); } } } } // Install-Package RabbitMQ.Client
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using System.Threading; namespace SampleStack.RabbitMQ.Consumer { class Program { static void Main(string[] args) { using (var connection = new ConnectionFactory() { HostName = "localhost" }.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "work_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); // durable: true -> 队列持久性 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); // 告知RabbitMQ,在未收到当前Worker消息确认信号前,不再分发给消息,确保公平调度 var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var message = Encoding.UTF8.GetString(e.Body); Console.WriteLine(message); Thread.Sleep(1000); channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); // 手动发送消息确认信号 }; channel.BasicConsume(queue: "work_queue", autoAck: false, consumer: consumer); // autoAck: false Console.ReadKey(); } } } } // Install-Package RabbitMQ.Client