zoukankan      html  css  js  c++  java
  • rabbitmq (二) 持久化

    默认情况下rabbitmq 是根据消费者多少依次投递,投递后就删除消息.

    消息不会重复投递给不同的消费者.

    消费者如果遇到长时间的任务,会执行完一个消息之后再执行下一个消息,

    消费者持久化:

    如果一个消费者断网或者宕机.这个消息就会丢失.如果想在一个消费者宕机的情况下吧消息投递给另一个

    消费者需要使用:ack确认

    C#代码:

    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    
    channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

    使用这种方式.如果unack的消息太多,没有消费者处理,会吃掉很多内存,

    可以使用rabbitmqctl messages_unacknowledged  查看unack消息.

    服务端持久化:

    如果一个rabbitmq Server宕机,默认里面的消息全部丢失,如果想持久化使用代码:

    channel.QueueDeclare(queue: "task_queue",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;
    channel.BasicPublish(exchange: "",
                                             routingKey: "hello",
                                             basicProperties: properties,
                                             body: body);

    这里需要注意的是.需要新建一个队列来进行持久化,不能在以前已经建好的对象上修改属性.不然不会起作用

    公平分派:

    如果一个消费者负载很大,不能处理更多消息.另一个消费者负载小.可以使用代码,控制消费者可以接受消息数目

    channel.BasicQos(0, 1, false);

    上述代码告诉rabbitmq.不能再接受新的消息

    全部代码如下:

    生产者:

    public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "task_queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
    
                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes(message);
    
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;
    
                channel.BasicPublish(exchange: "",
                                     routingKey: "task_queue",
                                     basicProperties: properties,
                                     body: body);
                Console.WriteLine(" [x] Sent {0}", message);
            }
    
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    
        private static string GetMessage(string[] args)
        {
            return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
        }

    消费者:

    public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "task_queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
    
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
                Console.WriteLine(" [*] Waiting for messages.");
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);
    
                    int dots = message.Split('.').Length - 1;
                    Thread.Sleep(dots * 1000);
    
                    Console.WriteLine(" [x] Done");
    
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: "task_queue",
                                     autoAck: false,
                                     consumer: consumer);
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
  • 相关阅读:
    cocos2d-x 3.0 事件分发机制
    cocos2d-x Schedule详解
    OSG设置警告等级
    OSG四元数与欧拉角之间的转换
    编译OSG_FBX插件
    RakNet发送与接收数据
    RakNet基本教程
    IE不能上网,但是其他浏览器可以
    OSG计时器与时间戳
    添加OSG各种事件处理器
  • 原文地址:https://www.cnblogs.com/weichao975/p/8075086.html
Copyright © 2011-2022 走看看