zoukankan      html  css  js  c++  java
  • 消息队列

    在我们团队内部,随着消息应用中心(任务中心)的广泛应用,有时候我们感觉不到消息队列的存在,但这不影响消息队列在高可用、分布式、高并发架构下的核心地位。

    消息队列都应用到了哪些实际的应用场景中?

    一、再谈消息队列的应用场景

    1. 异步处理:例如短信通知、终端状态推送、App推送、用户注册等
    2. 数据同步:业务数据推送同步
    3. 重试补偿:记账失败重试
    4. 系统解耦:通讯上下行、终端异常监控、分布式事件中心
    5. 流量消峰:秒杀场景下的下单处理
    6. 发布订阅:HSF的服务状态变化通知、分布式事件中心
    7. 高并发缓冲:日志服务、监控上报

    但是,我们对消息队列的底层技术和原理还是不了解,那么我们马上开始吧…

    二、消息队列的一些基本概念和简单原理

    1. Broker

    Broker的概念来自与Apache ActiveMQ,通俗的讲就是MQ的服务器。

    2. 消息的生产者、消费者

    消息生产者Producer:发送消息到消息队列。

    消息消费者Consumer:从消息队列接收消息。

    3. 点对点消息队列模型

    消息生产者向一个特定的队列发送消息,消息消费者从该队列中接收消息;

    消息的生产者和消费者可以不同时处于运行状态。

    每一个成功处理的消息都由消息消费者签收确认(Acknowledge)。如图:

    4. 发布订阅消息模型-Topic

    发布订阅消息模型中,支持向一个特定的主题Topic发布消息,0个或多个订阅者接收来自这个消息主题的消息。在这种模型下,发布者和订阅者彼此不知道对方。实际操作过程中,

    发布订阅消息模型中,支持向一个特定的主题Topic发布消息,0个或多个订阅者接收来自这个消息主题的消息。在这种模型下,发布者和订阅者彼此不知道对方。实际操作过程中,

    必须先订阅,再发送消息,而后接收订阅的消息,这个顺序必须保证。

    5. 消息的顺序性保证

    基于Queue消息模型,利用FIFO先进先出的特性,可以保证消息的顺序性。

    6. 消息的ACK确认机制

    即消息的Ackownledge确认机制,

    为了保证消息不丢失,消息队列提供了消息Acknowledge机制,即ACK机制,当Consumer确认消息已经被消费处理,发送一个ACK给消息队列,此时消息队列便可以删除这个消息了。如果Consumer宕机/关闭,没有发送ACK,消息队列将认为这个消息没有被处理,会将这个消息重新发送给其他的Consumer重新消费处理。

    7. 消息的持久化

    消息的持久化,对于一些关键的核心业务来说是非常重要的,启用消息持久化后,消息队列宕机重启后,消息可以从持久化存储恢复,消息不丢失,可以继续消费处理。

    8. 消息的同步和异步收发

    同步:消息的收发支持同步收发的方式。

    同时还有另一种同步方式:同步收发场景下,消息生产者和消费者双向应答模式,例如:张三写封信送到邮局中转站,然后李四从中转站获得信,然后在写一份回执信,放到中转站,然后张三去取,当然张三写信的时候就得写明回信地址

    消息的接收如果以同步的方式(Pull)进行接收,如果队列中为空,此时接收将处于同步阻塞状态,会一直等待,直到消息的到达。

    异步:消息的收发同样支持异步方式:异步发送消息,不需要等待消息队列的接收确认;异步接收消息,以Push的方式触发消息消费者接收消息。

    9. 消息的事务支持

    消息的收发处理支持事务,例如:在任务中心场景中,一次处理可能涉及多个消息的接收、处理,这处于同一个事务范围内,如果一个消息处理失败,事务回滚,消息重新回到队列中。

    三、我们对消息队列的实际使用

    我们使用了两种消息队列组件:

    RabbitMQ:高可用、高可靠消息应用场景,例如记账失败重试、通知服务,消息不允许丢

    Kafka:高性能消息应用场景,例如日志、监控,消息允许丢失。

    在此之上,我们封装了消息应用中心、日志服务等核心组件和服务。那么,消息应用中心和日志都用到了消息队列什么技术? 干货来了…

    1.     消息应用中心

    消息应用中心(任务中心)使用了消息队列的异步处理、数据同步、重试补偿、系统解耦、流量消峰等特性。其中:

    消息应用中心(任务中心),支持RabbitMQ和Kafka两种消息通道,支持在任务元数据层面设置

    任务:就是一个包含了任务执行上下文的消息,同时代表了异步处理

    任务发送者(ITaskSender)发送任务:消息的生产者将任务消息发送的消息队列

    任务类型:消息队列名称,例如:HaKeepAcco***Queue,充电补偿记账队列

    消息队列:任务的临时存储

    任务中心:任务集中处理,消息消费者

    任务处理完成:消息Ack确认

    任务的多级重试:多个重试消息队列,HaSysTaskStore2Queue

    2.     日志组件

    日志组件,使用了消息队列的高并发缓冲和发布订阅特性。其中:

    日志组件使用Kafka作为消息通道,因为Kafka的性能好,吞吐量大, 可以容忍偶尔的消息数据丢失

    日志组件使用发布订阅的消息模型

    日志组件包含日志服务SDK和日志HSF服务,二者都是消息的生产者Producer

    日志类型:消息的Topic主题

    日志处理器:消息的消费者、Topic的订阅、日志数据处理(HbaseES其他)

    3.     RPC服务状态变化通知

    RPC服务状态变化通知,使用了消息队列的发布订阅特性。其中:

    RPC服务状态变化通知,使用了RabbitMQ消息队列技术

    使用发布订阅的消息模型

    Topic:RPCServiceState

    RPCService.Proxy:RPC服务状态变化消息的订阅者

    RPC服务注册、发布:消息的生产者,发送RPC服务状态变化消息。

    四、消息队列使用的最佳实践

    1.  RabbitMQ的连接,底层都是Socket连接,长连接 or 短连接?

    RabbitMQ每个在创建每个连接的同时,会自动创建一个监视线程来定时(默认60s)侦测连接的状态,如果连接断开,触发ConnectionShutdown事件。

        用长连接,还是用短连接??

        发送端:建议使用短连接,用完即释放,避免长连接带来的端口占用,因为发送端无处不在,发送操作短而急促。

    接收端:建议使用长连接,时刻接收处理消息,因为消息的接收消费比较集中,接收操作久而弥坚。

    2. 网络是有抖动的,连接的断开是正常的,如何应对?

        发送端:发送失败重试

    接收端:注册ConnectionShutdown事件同时捕获消息接收异常,重新建立连接,接收消费消息

    3. RabbitMQ Exchange(Topic)模式下带来的消息队列数量激增

        只是创建了一个Exchange(Topic),为什么会增加这么多Queue。

       因为,每个Topic的订阅都是绑定一个Queue用作消息的消费。

    4. 需求的演变,消息结构的变更,如何平滑过度?

        消息是byte[]数组,我们将复杂对象消息二进制序列化。

        接收到消息后,我们将二进制数组反序列化为实体类。

        当我们的实体类消息体的结构发生变化后,因为受二进制序列化处理的

    影响,导致无法反序列化。

        解决方案:

        消息体预留一些string类型的扩展字段

       消息队列版本化,支持多个版本的消息体。

    5. Kafka Consumer Group

       同一Topic的一条消息只能被同一个Group内的一个Consumer消费

       多个Consumer Group可同时消费同一条消息

       

    6. 消息的积压

    消息的积压产生的原因:消息接收消费的速率低,发送的速度>接收的速度。

    消息积压后的影响:

    消息大量积压后,当新的消费者连接上MQ并开始接收消息时,发送速率会大幅降低。

    消息队列集群的压力增加,大量的消息要持久化存储和同步。

    如何减少消息积压:快速消费消息,同时保持消息体的不要过大

  • 相关阅读:
    【leetcode】1630. Arithmetic Subarrays
    【leetcode】1629. Slowest Key
    【leetcode】1624. Largest Substring Between Two Equal Characters
    【leetcode】1620. Coordinate With Maximum Network Quality
    【leetcode】1619. Mean of Array After Removing Some Elements
    【leetcode】1609. Even Odd Tree
    【leetcode】1608. Special Array With X Elements Greater Than or Equal X
    【leetcode】1603. Design Parking System
    【leetcode】1598. Crawler Log Folder
    Java基础加强总结(三)——代理(Proxy)Java实现Ip代理池
  • 原文地址:https://www.cnblogs.com/studynode/p/10364504.html
Copyright © 2011-2022 走看看