zoukankan      html  css  js  c++  java
  • Rabbitmq—基础

    1.分布式异步队列

    2.队列解读

    优点:

    以时间换空间:以更长的时间来处理堆集的业务逻辑;

    1. 异步处理;响应很快,增加服务器承载能力;
    2. 削峰,将流量高峰分解到不同的时间段来处理;
    3. 扩展性,UI和业务的解耦,就可以独立演化;
    4. 高可用,处理器发生故障以后,不会影响可用性;

    缺点:

    1. 即时性降低,降低了用户的体验---无法避免;业务上来屈服;
    2. 更复杂;
    3. 更加依附于队列了;

    3.路由方式

    Direct Exchange

    【直接交换(按RouteKey)】:
    直接按照绑定的RouteKey生产消费。

    img

    Fanout Exchange

    【分发】:
    所有绑定fanout exchange队列发送的消息,会被所有绑定fanout exchange队列的消费者消费掉。不按RouteKey处理。

    img

    Topic Exchange

    【模糊匹配】

    可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“XiaoChen.#”能够匹配到“XiaoChen.pets.cat”,但是“XiaoChen.” 只会匹配到“XiaoChen.money”。

    *: 一个单词

    #: 多个单词

    img

    4.优先级(priority)

    在RMQ中想要使用优先级特性需要的版本为3.5.0+。

    然后我们只需做两件事情:

    1. 将队列声明为优先级队列,即在创建队列的时候添加参数 x-max-priority 以指定最大的优先级,值为0-255(整数)。

    2. 为优先级消息添加优先级。

    生产者

            public static void Make(string queueName, string exchangeName, string routeKey)
            {
                //创建连接工厂
                var factory = new ConnectionFactory()
                {
                    UserName = "tangsansan",//用户名
                    Password = "123456",//密码
                    HostName = "localhost"//rabbitmq ip
                };        
    		//创建连接
            var connection = factory.CreateConnection();
            //创建通道
            var channel = connection.CreateModel();
            //定义一个队列
            //channel.QueueDeclare(queueName, false, false, false, null);
            channel.QueueDeclare(queueName, true, false, false, 
                new Dictionary<string, object>() {
                         {"x-max-priority",10 }  //指定队列要支持优先级设置
                       });
    
            //定义一个Direct类型交换机
            channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
    
            channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKey);
    
            Console.WriteLine("
    RabbitMQ连接成功,请输入消息,输入exit退出!");
    
            string[] questionList =
                {
                "error::This is an error message 1.",
                "info:This is an info message 1.",
                "warning:This is a warning message 1.",
                "error:This is an error message 2.",
                "info:This is an info message 2.",
                "error:This is an error message 3."
            };
    
            //设置消息优先级
            IBasicProperties props = channel.CreateBasicProperties();
            foreach (string questionMsg in questionList)
            {
                if (questionMsg.StartsWith("error"))
                {
                    props.Priority = 9;
                    channel.BasicPublish(exchange: exchangeName,
                                   routingKey: routeKey,
                                   basicProperties: props,
                                   body: Encoding.UTF8.GetBytes(questionMsg));
                }
                else
                {
                    props.Priority = 1;
                    channel.BasicPublish(exchange: exchangeName,
                                   routingKey: routeKey,
                                   basicProperties: props,
                                   body: Encoding.UTF8.GetBytes(questionMsg));
                }
    
                Console.WriteLine($"{questionMsg} 已发送..!");
            }
    
            channel.Close();
            connection.Close();
        }
    

    没使用消费者前,去看看 GetMessage...

    error全部排到前面来了...

    消费者

            public static void Consume(string queueName, string exchangeName, string routeKey)
            {
                //创建连接工厂
                var factory = new ConnectionFactory()
                {
                    UserName = "tangsansan",//用户名
                    Password = "123456",//密码
                    HostName = "localhost"//rabbitmq ip
                };        
            //创建连接
            var connection = factory.CreateConnection();
            //创建通道
            var channel = connection.CreateModel();
            //定义一个队列
            //channel.QueueDeclare(queueName, true, false, false, null);
    
            //定义一个Direct类型交换机
            //channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
    
            //channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKey);
    
            Console.WriteLine("
    RabbitMQ连接成功,请输入消息,输入exit退出!");
    
            var consumer = new EventingBasicConsumer(channel);
            string input;
            do
            {
                input = Console.ReadLine();
    
                consumer.Received += (model, ea) =>
                {
                    Console.WriteLine(Encoding.UTF8.GetString(ea.Body.ToArray()));
                };
    
                //消费消息
                channel.BasicConsume(queueName, true, consumer);
    
            } while (input.Trim().ToLower() != "exit");
    
            channel.Close();
            connection.Close();
        }
    

    5.Tx事务模式

    channel.TxSelect(); 开启一个事务
    channel.TxCommit(); 提交事务
    channel.TxRollback(); 事务回滚

    try
    {
        //开启事务机制
        channel.TxSelect();
        //发送消息
        //同时给多个队列发送消息;要么都成功;要么都失败;
        channel.BasicPublish(exchange: "TxQueueExchange", routingKey: "TxRouteKey01", 
                             basicProperties: null, body: body);
        channel.BasicPublish(exchange: "TxQueueExchange", routingKey: "TxRouteKey02", 
                             basicProperties: null, body: body);
        //事务提交
        channel.TxCommit();
        Console.WriteLine($"发送到Broke成功!");
    }
    catch (Exception ex)
    {
        Console.WriteLine($"发送到Broker失败!");
        channel.TxRollback(); //事务回滚
        throw;
    }
    

    6.确认消息

    6.1 生产者

    保证生产者发送到 Broker

    channel.ConfirmSelect() 开启确认模式

    (1)WaitForConfirms()

    消息发送以后,提供一个回执方法 WaitForConfirms() 返回一个bool 值;

    (2)WaitForConfirmsOrDie()

    全部执行完,如果失败,就报错。

    6.2 消费者

    保证消费者被正常消费

    6.2.1 自动确认

    autoAck: true 自动确认;

    在自动确认模式下,消息在发送后立即被认为是发送成功。 这种模式可以提高吞吐量(只要消费者能够跟上),不过会降低投递和消费者处理的安全性。 这种模式通常被称为“发后即忘”。 与手动确认模式不同,如果消费者的TCP连接或信道在成功投递之前关闭,该消息则会丢失。

    使用自动确认模式时需要考虑的另一件事是消费者过载。 手动确认模式通常与有限的信道预取一起使用,限制信道上未完成(“进行中”)传送的数量。 然而,对于自动确认,根据定义没有这样的限制。 因此,消费者可能会被交付速度所压倒,可能积压在内存中,堆积如山,或者被操作系统终止。 某些客户端库将应用TCP反压(直到未处理的交付积压下降超过一定的限制时才停止从套接字读取)。 因此,只建议当消费者可以有效且稳定地处理投递时才使用自动投递方式。

    //处理消息 
    //autoAck: true 自动确认
    //channel.BasicConsume(queue: "ConsumptionACKConfirmQueue", autoAck: true, consumer: consumer);
    

    消费者直接对这8条消息自动确认,不管是否处理,Broker 都会直接删除所有的这8条消息。

    6.2.2 手动确认

    //处理消息 
    //autoAck: false  手动确认
    channel.BasicConsume(queue: "ConsumptionACKConfirmQueue", autoAck: false, consumer: consumer);
    
    void BasicAck(ulong deliveryTag, bool multiple);
    
    void BasicReject(ulong deliveryTag, bool requeue);
    
    void BasicNack(ulong deliveryTag, bool multiple, bool requeue);
    
    • deliveryTag:可以看作消息的编号,它是一个64位的长整型值,最大值是9223372036854775807。
    • requeue:如果requeue 参数设置为true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果requeue 参数设置为false,则RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。
    • multiple:在BasicAck中,multiple 参数设置为true 则表示确认deliveryTag编号之前所有已被当前消费者确认的消息。在BasicNack中,multiple 参数设置为true 则表示拒绝deliveryTag 编号之前所有未被当前消费者确认的消息。
    • BasicReject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令。
    6.2.2.1 成功确认
     //手动确认  消息正常消费  告诉Broker:你可以把当前这条消息删除掉了
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    
    6.2.2.2 失败确认
    //否定:告诉Broker,这个消息我没有正常消费;  
    //requeue: true:重新写入到队列里去; false:你还是删除掉;
    channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
    

    参考代码

    using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        #region EventingBasicConsumer
                        //定义消费者                                      
                        var consumer = new EventingBasicConsumer(channel);
                        int i = 0;
                        consumer.Received += (model, ea) =>
                        { 
                            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                            //如果在这里处理消息的手,异常了呢? 
                            //Console.WriteLine($"接收到消息:{message}"); ;
    
                            if (i < 50)
                            {
                                //手动确认  消息正常消费  告诉Broker:你可以把当前这条消息删除掉了
                                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                                Console.WriteLine(message);
                            }
                            else
                            {
                                //否定:告诉Broker,这个消息我没有正常消费;  requeue: true:重新写入到队列里去; false:你还是删除掉;
                                channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
                            }
                            i++;
                        };
                        Console.WriteLine("消费者准备就绪...."); 
                        {
                            //处理消息 
                            //autoAck: true 自动确认; 
                            //channel.BasicConsume(queue: "ConsumptionACKConfirmQueue", autoAck: true, consumer: consumer);
                        } 
                        {
                            //处理消息 
                            //autoAck: false  手动确认 
                            channel.BasicConsume(queue: "ConsumptionACKConfirmQueue", autoAck: false, consumer: consumer);
                        }
    
    
                        Console.ReadKey();
                        #endregion
                    }
                }
    

    参考

    分布式系统消息中间件——RabbitMQ的使用思考篇

  • 相关阅读:
    Struts2的原理,配置和使用
    tomcat启动异常之----A child container failed during start
    Oracle两表关联更新其中一张表的数据
    jsp隐藏字符串中间部分信息,只显示前后字段
    浏览器会缓存js文件
    tomcat启动项目很快,且不报错,访问报404,项目实际上没起来,起的是空tomcat
    jd-eclipse插件的安装
    Junit4使用总结
    json中dump()与dumps()里的参数解释
    python时间函数和常用格式化
  • 原文地址:https://www.cnblogs.com/tangge/p/14146597.html
Copyright © 2011-2022 走看看