zoukankan      html  css  js  c++  java
  • 并发与高并发(十九) 高并发の消息队列思路

    前言

    这一章节我们将讲解高并发解决方案中的队列。消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。

    主体概要

    • 高并发の消息队列基本介绍
    • 消息队列特性

    主体内容

    一、高并发の消息队列基本介绍

    1.例子

    在购物商城下单后,希望购买者能收到短信或者邮件通知。有一种做法时在下单逻辑执行后调用短信发送的API,如果此时服务器响应较慢、短信客户端出现问题等诸多原因购买者不能正常收到短信,那么此时是不断重试呢还是直接放弃发送呢?不管选择哪一种,在实现上都会变得复杂。

    消息队列是如何解决的呢?可以讲发送短信这个过程封装成一条消息,发送到消息队列,消息队列按照一定顺序依次处理队列中的消息,在某一个时刻就会处理刚才收到的发送短信的消息。消息队列会通知一个服务去发送这个短信,顺利的话这个消息刚被放进队列就会被处理,这种情况一次性就发送成功了。如果出现了什么问题,可以再次将该消息放进消息队列中等待处理。上面的例子中如果使用消息队列,其好处是将发送短信这个流程与其他功能解耦,发送短信时只需要保证将这条消息发送到消息队列就行了,然后就可以处理发送短信后的其他事情了;其次,系统设计变得简单,不用在下单的场景下过多的考虑发送短信的问题,而是交给了消息队列来处理这个事。而且可以保证消息一定会被发送出去,消息只要没有发送成功会不断被重新加入到消息队列。如果短信服务出现问题,那么等到服务恢复了,消息队列再发送出去即可,只是发送的不那么及时而已。

    最后一点,我们假设在发送短信完成之后还要发送邮件。有了消息队列,我们就不要做同步等待了,我们可以直接并行处理,直接下单的核心流程可以更快的结束这样就可以增加应用的异步处理能力,减少甚至不可能出现并发现象。回顾一下我们平时在网站上输入手机号发送验证码的时候,半天都收不到短信,算着短信接口时间已经超时了,其实这时后台极有可能通过消息队列的方式发送短信,而正好碰到短信发送出了点问题,或者服务器网络开了小差,也有可能某段时间内消息队列里消息太多了需要处理。

    2.好处

    1.成功完成了一个异步解耦的过程。短信发送时只要保证放到消息队列中就可以了,接着做后面的事情就行。一个事务只关心本质的流程,需要依赖其他事情但是不那么重要的时候,有通知即可,无需等待结果。每个成员不必受其他成员影响,可以更独立自主,只通过一个简单的容器来联系。

    对于我们的订单系统,订单最终支付成功之后可能需要给用户发送短信积分什么的,但其实这已经不是我们系统的核心流程了。如果外部系统速度偏慢(比如短信网关速度不好),那么主流程的时间会加长很多,用户肯定不希望点击支付过好几分钟才看到结果。那么我们只需要通知短信系统“我们支付成功了”,不一定非要等待它处理完成。

    3.应用场景

    可以使用消息队列的场景非常多

    主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。

    使用场景的话,举个例子:
    假设用户在你的软件中注册,服务端收到用户的注册请求后,它会做这些操作:

    • 校验用户名等信息,如果没问题会在数据库中添加一个用户记录
    • 如果是用邮箱注册会给你发送一封注册成功的邮件,手机注册则会发送一条短信
    • 分析用户的个人信息,以便将来向他推荐一些志同道合的人,或向那些人推荐他
    • 发送给用户一个包含操作指南的系统通知

    等等……

    但是对于用户来说,注册功能实际只需要第一步,只要服务端将他的账户信息存到数据库中他便可以登录上去做他想做的事情了。至于其他的事情,非要在这一次请求中全部完成么?值得用户浪费时间等你处理这些对他来说无关紧要的事情么?所以实际当第一步做完后,服务端就可以把其他的操作放入对应的消息队列中然后马上返回用户结果,由消息队列异步的进行这些操作。

    或者还有一种情况,同时有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能。

    二、消息队列特性

    1.四大特性

    • 业务无关:只做消息分发
    • FIFO(First In First Out):先投递先到达
    • 容灾:节点的动态增删和消息的持久化
    • 性能:吞吐量提升,系统内部通信效率提高

    2.为什么需要消息队列?

    • 【生产】和【消费】的速度或者稳定性不一致。

    3.消息队列的好处

    • 业务解耦:它是消息队列解决的最本质的问题,所谓解耦就是一个事物之关心核心的流程,而需要依赖其他系统,但不那么重要的事情,有通知即可,无需等待结果,换句话说就是基于消息的模型关心的是通知而不是处理。比如一个l旅游平台内部有一个产品中心,产品中心对接的是主站,移动后台,旅游供应链等多个数据源,下游对接的是推荐系统,API系统等展示系统,当上游的数据发生变更的时候,如果不使用消息队列,势必不停的需要调用接口来更新数据,这就特别依赖产品中心接口的稳定性和处理能力,但是其实作为旅游的产品中心,也许只有对于旅游自建的供应链产品中心更新成功,才是他们关心的事情,而对于团购等外部系统,产品中心更新的成功也好,失败也罢并不是他们的职责所在,他们只需要保证在信息变更的时候通知一下就可以了;而对于下游,可能有更新索引,更新缓存等一系列需求,对于产品中心而言,这些也不是职责所在。说白了,如果他们定时的拉取数据,也能保证数据的更新,只是实时性没有那么强,但是如果使用接口方式去更新数据,显然对于产品中心太过于重量级了,这时只需要发布一个产品ID变更的通知,由下游系统进行处理就更合理了。我们再举一个例子:对于订单系统,订单最后支付成功之后,我们可能要给用户发送一个短信通知,但其实这已经不是系统的核心流程了,如果外部系统速度偏慢,比如短信网关速度不好,那么主流程的时间就会加长很多,用户肯定不希望点击好几分钟之后才看到结果,那么我们只需要通知短信系统我们支付成功了,你去发个短信通知就好了,并不一定要等待它处理完成才结束。

    • 最终一致性:最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。当然有个时间限制,理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。
      业界有一些为“最终一致性”而生的消息队列,如Notify(阿里)、QMQ(去哪儿)等,其设计初衷,就是为了交易系统中的高可靠通知。
      以一个银行的转账过程来理解最终一致性,转账的需求很简单,如果A系统扣钱成功,则B系统加钱一定成功。反之则一起回滚,像什么都没发生一样。
      然而,这个过程中存在很多可能的意外:

      (1)A扣钱成功,调用B加钱接口失败。

      (2)A扣钱成功,调用B加钱接口虽然成功,但获取最终结果时网络异常引起超时。

      (3)A扣钱成功,B加钱失败,A想回滚扣的钱,但A机器down机。

      可见,想把这件看似简单的事真正做成,真的不那么容易。所有跨JVM的一致性问题,从技术的角度讲通用的解决方案是:

      (1)强一致性,分布式事务,但落地太难且成本太高,这里不再具体介绍,想要了解百度一下。

      (2)最终一致性,主要是用“记录”和“补偿”的方式。在做所有的不确定的事情之前,先把事情记录下来,然后去做不确定的事情,结果可能是:成功、失败或是不确定,“不确定”(例如超时等)可以等价为失败。成功就可以把记录的东西清理掉了,对于失败和不确定,可以依靠定时任务等方式把所有失败的事情重新搞一遍,直到成功为止。
      回到刚才的例子,系统在A扣钱成功的情况下,把要给B“通知”这件事记录在库里(为了保证最高的可靠性可以把通知B系统加钱和扣钱成功这两件事维护在一个本地事务里),通知成功则删除这条记录,通知失败或不确定则依靠定时任务补偿性地通知我们,直到我们把状态更新成正确的为止。需要注意的是,像Kafka等消息队列,它的设计在设计层面上具有丢失消息的可能,比如定时刷盘,会有丢失消息的可能,哪怕是只丢千分之一的消息,业务用其他手段也必须保证结果正确。

    • 广播:消息队列的基本功能之一是进行广播。如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。

    • 提速:假设我们还需要发送邮件,有了消息队列就不需要同步等待,我们可以直接并行处理,而下单核心任务可以更快完成。增强业务系统的异步处理能力。甚至几乎不可能出现并发现象。

    • 削峰和流控:对于不需要实时处理的请求来说,当并发量特别大的时候,可以先在消息队列中作缓存,然后陆续发送给对应的服务去处理。试想上下游对于事情的处理能力是不同的。比如,Web前端每秒承受上千万的请求,并不是什么神奇的事情,只需要加多一点机器,再搭建一些LVS负载均衡设备和Nginx等即可。但数据库的处理能力却十分有限,即使使用SSD加分库分表,单机的处理能力仍然在万级。由于成本的考虑,我们不能奢求数据库的机器数量追上前端。这种问题同样存在于系统和系统之间,如短信系统可能由于短板效应,速度卡在网关上(每秒几百次请求),跟前端的并发量不是一个数量级。但用户晚上个半分钟左右收到短信,一般是不会有太大问题的。如果没有消息队列,两个系统之间通过协商、滑动窗口等复杂的方案也不是说不能实现。但系统复杂性指数级增长,势必在上游或者下游做存储,并且要处理定时、拥塞等一系列问题。而且每当有处理能力有差距的时候,都需要单独开发一套逻辑来维护这套逻辑。所以,利用中间系统转储两个系统的通信内容,并在下游系统有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。总而言之,消息队列不是万能的。对于需要强事务保证而且延迟敏感的,RPC是优于消息队列的。对于一些无关痛痒,或者对于别人非常重要但是对于自己不是那么关心的事情,可以利用消息队列去做。支持最终一致性的消息队列,能够用来处理延迟不那么敏感的“分布式事务”场景,而且相对于笨重的分布式事务,可能是更优的处理方式。当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。如果下游有很多系统关心你的系统发出的通知的时候,果断地使用消息队列吧。

    4.消息队列举例

    (1)这里我们只针对Kafka,RabbitMQ举例

    • Kafka
    • RabbitMQ

    ...

    (2)Kafka是一个apache项目,是一个高性能,跨语言,分布式发布订阅消息队列系统。

    结构图

    特性

    • 快速持久化。以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
    • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。它是完全的分布式系统,它的Broker,Producer,Consumer(参考基本术语)都原生,自动支持分布式和自动实现负载均衡,它支持Hadoop数据并行加载。
    • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。
    • 同时支持离线数据处理和实时数据处理。
    • Scale out:支持在线水平扩展。

    基本术语

    • Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker。
    • Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
    • Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition。
    • Producer:负责发布消息到Kafka broker。
    • Consumer:消息消费者,向Kafka broker读取消息的客户端。
    • Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

    (3)接下来,我们看一下RabbitMQ。

    结构图

    RabbitMQ里的基本定义

    RabbitMQ Server:提供消息一条从Producer到Consumer的处理。
    Exchange:一边从发布者方接收消息,一边把消息推送到队列。
    producer只能将消息发送给exchange。而exchange负责将消息发送到queues。Procuder Publish的Message进入了exchange,exchange会根据routingKey处理接收到的消息,判断消息是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的主要的type有direct,topic,headers,fanout。具体针对不同的场景使用不同的type。
    queue也是通过这个routing keys来做的绑定。交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列。
    Queue:消息队列。接收来自exchange的消息,然后再由consumer取出。exchange和queue可以一对一,也可以一对多,它们的关系通过routingKey来绑定。
    Producer:Client A & B,生产者,消息的来源,消息必须发送给exchange。而不是直接给queue
    Consumer:Client 1,2,3消费者,直接从queue中获取消息进行消费,而不是从exchange中获取消息进行消费。

    (4)Kafka,rabbitmq使用springboot举例,为了简约内容,两者同时运行测试。

    windows下安装kafka请参考:https://www.jianshu.com/p/d64798e81f3b

    包架构

    KafkaReceiver.java

    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    /**
     * 接收端
     */
    @Component
    @Slf4j
    public class KafkaReceiver {
        @KafkaListener(topics={TopicConstants.TEST})
        public void receive(ConsumerRecord<?,?> record){
            log.info("record:{}",record);
        }
    }
    

    KafkaSender.java

    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    import com.practice.mq.Message;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    import javax.annotation.Resource;
    import java.util.Date;
    
    /**
     * 发送端
     */
    @Component
    @Slf4j
    public class KafkaSender {
        @Resource
        private KafkaTemplate<String,String> kafkaTemplate;
    
        private Gson gson = new GsonBuilder().create();
    
        public  void send(String msg) {
            Message message  = new Message();
            message.setId(System.currentTimeMillis());
            message.setMsg(msg);
            message.setSendTime(new Date());
            log.info("send Message:{}",message);
            kafkaTemplate.send(TopicConstants.TEST,gson.toJson(message));
        }
    }
    

    TopicConstants.java

    public interface TopicConstants {
        //定义一下我们需要使用Topic的字符串
        String TEST = "test";
        String MESSAGE = "message";
    }
    

    QueuesContants.java

    public interface QueuesConstants {
        String TEST="test";
        String MESSAGE="message";
    }
    

    RabbitMQClient.java

    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    import javax.annotation.Resource;
    
    @Component
    public class RabbitMQClient {
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        public void send(String message){
            //发送到指定队列
            rabbitTemplate.convertAndSend(QueuesConstants.TEST,message);
        }
    }
    

    RabbitMQServer.java

    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    
    @Configuration
    public class RabbitMQConfig {
        @Bean
        public Queue queue(){
            //定义好要发送的队列
            return new Queue(QueuesConstants.TEST);
        }
    }
    

    Message.java

    import lombok.Data;
    import java.util.Date;
    @Data
    public class Message {
        private Long id;
        private String msg;
        private Date sendTime;
    }
    

    MQController.java

    import com.practice.mq.kafka.KafkaSender;
    import com.practice.mq.rabbitmq.RabbitMQClient;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    import javax.annotation.Resource;
    
    @Controller
    @RequestMapping("/mq")
    public class MQController {
        @Resource
        private RabbitMQClient rabbitMQClient;
    
        @Resource
        private KafkaSender kafkaSender;
    
        @RequestMapping("/send")
        @ResponseBody
        public String send(){
            String message = "message";
            rabbitMQClient.send(message);
            kafkaSender.send(message);
            return "success";
        }
    }
    

    用到的Maven依赖(这里结合springboot)

    	   <!--kafka-->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <!--Gson-->
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.7</version>
            </dependency>
            <!-- rabbitmq依赖 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    

    application.properties配置

    #============== kafka ===================
    # 指定kafka server的地址,集群配多个,中间,逗号隔开
    spring.kafka.bootstrap-servers=127.0.0.1:9092
    spring.kafka.consumer.group-id=test
    #=============== provider  =======================
    # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
    # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
    spring.kafka.producer.retries=0
    # 每次批量发送消息的数量,produce积累到一定数据,一次发送
    spring.kafka.producer.batch-size=16384
    # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
    spring.kafka.producer.buffer-memory=33554432
    #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
    #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
    #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
    #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
    #可以设置的值为:all, -1, 0, 1
    spring.kafka.producer.acks=1
    # 指定消息key和消息体的编解码方式
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    

    浏览器运行“http://127.0.0.1:8090/mq/send”

    控制台打印

    rabbitmq:

    ...
    2020-04-19 02:09:12.040  INFO 31676 --- [nio-8090-exec-1] com.practice.mq.kafka.KafkaSender        : send Message:Message(id=1587233352040, msg=message, sendTime=Sun Apr 19 02:09:12 GMT+08:00 2020)
    2020-04-19 02:09:12.048  INFO 31676 --- [cTaskExecutor-1] com.practice.mq.rabbitmq.RabbitMQServer  : message:message
    2020-04-19 02:09:12.054  INFO 31676 --- [nio-8090-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    

    kafka:

    ...
    2020-04-19 02:09:12.070  INFO 31676 --- [nio-8090-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.0
    2020-04-19 02:09:12.070  INFO 31676 --- [nio-8090-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 3402a8361b734732
    2020-04-19 02:09:12.076  INFO 31676 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: i1-NXUmvQRyaT-E27LPozQ
    2020-04-19 02:09:12.106  INFO 31676 --- [ntainer#0-0-C-1] com.practice.mq.kafka.KafkaReceiver      : record:ConsumerRecord(topic = test, partition = 0, offset = 5, CreateTime = 1587233352082, serialized key size = -1, serialized value size = 73, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1587233352040,"msg":"message","sendTime":"Apr 19, 2020 2:09:12 AM"})
    
    

    OK,那么关于这两个队列的简单示例到此结束!

  • 相关阅读:
    python 中关于kafka的API
    python 中对json的操作
    python 错误--UnboundLocalError: local variable '**' referenced before assignment
    storm问题记录(1) python 不断向kafka中写消息,spout做为消费者从kafka中读消息并emit给bolt,但是部分消息没有得到bolt的处理
    nodejs+kafka+storm+hbase 开发
    python构造数据
    Head first java中提到的学习方法,很受用
    【机器学习 第2章 学习笔记】模型评估与选择
    路书
    二分搜索
  • 原文地址:https://www.cnblogs.com/xusp/p/12729624.html
Copyright © 2011-2022 走看看