默认情况下rabbitmq 是根据消费者多少依次投递,投递后就删除消息.
消息不会重复投递给不同的消费者.
消费者如果遇到长时间的任务,会执行完一个消息之后再执行下一个消息,
消费者持久化:
如果一个消费者断网或者宕机.这个消息就会丢失.如果想在一个消费者宕机的情况下吧消息投递给另一个
消费者需要使用:ack确认
C#代码:
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
使用这种方式.如果unack的消息太多,没有消费者处理,会吃掉很多内存,
可以使用rabbitmqctl messages_unacknowledged 查看unack消息.
服务端持久化:
如果一个rabbitmq Server宕机,默认里面的消息全部丢失,如果想持久化使用代码:
channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);
这里需要注意的是.需要新建一个队列来进行持久化,不能在以前已经建好的对象上修改属性.不然不会起作用
公平分派:
如果一个消费者负载很大,不能处理更多消息.另一个消费者负载小.可以使用代码,控制消费者可以接受消息数目
channel.BasicQos(0, 1, false);
上述代码告诉rabbitmq.不能再接受新的消息
全部代码如下:
生产者:
public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }
消费者:
public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }