zoukankan      html  css  js  c++  java
  • RocketMQ的简单使用

    RocketMQ的基础知识

    1-1 RocketMQ的特点

    RocketMQ特点:

    1、支持事务消息
    2、支持延迟消息
    3、天然支持集群、负载均衡
    4、支持指定次数和时间间隔的失败消息重发
    

    1-2 RocketMQ的组成

    RocketMQ的组成

    broker: 经纪人,代理商 ;
    
    1) Producer Cluster: 消息生产者群,负责发送消息,一般由业务系统负责产生消息(从NameServer获取broker信息)
    2) NameServer Cluster: 集群架构中的组织协调员,相当于注册中心,收集broker的工作情况,不负责消息的处理(从NameServer获取broker信息)                                                   
    3) Broker Cluster(消息服务器): RocketMQ的核心,负责消息的接受,存储,发送等。
    4) Consumer Cluster:  负责消费消息,一般是后台系统负责异步消费。
    

    RocketMQ的配置文件(runserver.sh)

    #===========================================================================================
    # JVM Configuration,开发环境可以将内存参数设置小一点
    堆参数:
      -Xmx  最大堆
      -Xms  最小堆
      -Xmn  新生代大小
    #================================默认堆的配置是4G=========================================================
    

    配置命令

    nohup sh mqnamesrv &                        # 启动nameserver
    nohup sh mqbroker -n localhost:9876 &       # 启动broker server并测试
    

    测试消息发送

    export NAMESRV_ADDR=127.0.0.1:9876
    bash tools.sh org.apache.rocketmq.example.quickstart.Producer
    

    1-3 RocketMQ消息的发送模式与消息的结构(重要)

    1-3-1 三种消息发送方式

    方式1:同步消息(sync message )

    producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送结果 
    

    方式2:异步消息(async message)

    producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer发送消
    息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。
    

    方式3:单向消息(oneway message)

    producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果
    

    1-3-2 消息结构

    消息结构
    基本属性 topic (一级分类) 消息体(4M) 消息 Flag (通常业务代码使用)
    扩展属性 tag (一般为空,用于消息过滤) keys: Message(运维检索) waitStoreMsgOK (发送是否等待消息存储)

    基本属性三个组成部分

    1)主题:消息的一级分类,具有相同topic的消息将发送至该topic下的消息队列中 
    2)消息体:即消息的内容 ,可以的字符串、对象等类型(可系列化)。消息的最大长度 是4M
    3) 消息flag:消息的一个标记,RocketMQ不处理,留给业务系统使用
    

    扩展属性的三个组成部分

    1)tag :相当于消息的二级分类,用于消费消息时进行过滤,可为空(区别于基本属性flag,扩展属性的tag可以用于过滤消息)
    2)keys: Message 索引键,在运维中可以根据这些 key 快速检索到消息,可为空 。
    3)waitStoreMsgOK :消息发送时是否等消息存储完成后再返回 。
    

    1-3-3 RocketMQ简单实例

    生产者代码

    生产者

    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ProducerSimple {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        /**
         * 消息发送的模式1: 同步消息
         * 应用场景:
         * 向topic队列发送同步消息
         * @param topic
         * @param msg
         */
        public void sendSyncMsg(String topic, String msg){
            rocketMQTemplate.syncSend(topic,msg,100000);
        }
    
        /**
         * 消息的发送模式2:异步消息
    
         */
        public void sendASyncMsg(String topic,String msg){
            /*异步消息需要设置回调对象,消息发送成功/失败后,会由另外一个线程调用对象中的方法*/
            rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
    
                @Override
                public void onException(Throwable throwable) {
                    throwable.printStackTrace();
                }
            },100000);
        }
    }
    
    

    调用生产者发送消息

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ProducerSimpleTest {
        @Autowired
        private ProducerSimple producerSimple;
        //测试发送同步消息
        @Test
        public void testSendSyncMsg(){
            this.producerSimple.sendSyncMsg("testTopic", "第3条同步消息");
            System.out.println("end...");
        }
        // 测试发送异步消息
        @Test
        public void testSendASyncMsg(){
            this.producerSimple.sendASyncMsg("testTopic","第一条异步消息");
            try{
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    生产者配置

    server:
      port: 8182 #服务端口
      servlet:
        context‐path: /rocketmq‐consumer
    
    spring:
      application:
        name: rocketmq‐consumer #指定服务名
    rocketmq:
      consumer:
        group: demo_consumer_group           # 必须配置才能注入RocketMQTemplate模板
      name-server: 49.52.10.41:9876
    
    消费者代码

    消费者

    package com.shanjupay.test.rocketmq.message;
    
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    
    
    /**
     * 监听消息队列需要指定:
     * 1)topic:监听的主题
     * 2)consumerGroup:消费组,相同消费组的消费者共同消费该主题的消息,它们组成一个集群(配置文件中设置),与之对应
     * 生成者需要配置producer group.
     */
    @Component
    @RocketMQMessageListener(topic = "testTopic",consumerGroup = "demo_consumer_group")
    public class ConsumerSimple implements RocketMQListener<String> {
        @Override
        public void onMessage(String s) {
            System.out.println(s);
        }
    }
    

    消费者配置

    server:
      port: 8181 #服务端口
      servlet:
        context‐path: /rocketmq‐producer
    
    spring:
      application:
        name: rocketmq‐producer
    rocketmq:
      producer:
        group: demo-producer-group
      name-server: 49.52.10.41:9876
    

    1-4 RocketMQ的消息传递流程与消费模式

    消息发送流程

    step1:从Nameserver获取路由信息,选择消息队列: Broker会将信息上报给Nameserver,因此NameServer中存有每个broker的topic以及队列,producer发送前根据topic从NameServer查询所有消息队列。如果该topic没有队列则会新建,通常一个topic会查询到多个队列,因此会按照一定的算法选择一个队列发送。

    根据topic查询的结果如下所示:
    [
        {"brokerName":"Broker‐1","queueId":0},
        {"brokerName":"Broker‐1","queueId":1},
        {"brokerName":"Broker‐2","queueId":0},
        {"brokerName":"Broker‐2","queueId":1}
    ]
    

    step2:检验并发送消息

    • 发送消息前进行校验,比如消息的内容长度不能为0、消息最大长度、消息必要的属性是否具备等

    • 若topic下还没有队列则自动创建,默认一个topic下自动创建4个写队列,4个读队列

    多个队列的动机:高可用(一个队列挂了,还有其他),高性能(并发度高)
    

    问题:为什么设置producer group?

    方便在事务消息中broker(代理)需要回查producer(回调),同一个生产组的producer组成一个集群,提高并发能力
    

    step3:consumer处于监听队列状态,消费消息

    辨析三个概念: topic, consumer group,consumer

    1)一个消费组可以包括多个消费者,一个消费组可以订阅多个主题。
    2)一个队列同时只允许一个消费者消费,一个消费者可以消费多个队列中的消息。
    
    问题:消息队列的消费模式(广播模式的推拉模式)?
    1)集群模式(点对点模式):一个消费组内的消费者组成一个集群,主题下的一条消息只能被一个消费者消费。
    2)广播模式(发布订阅模式):主题下的一条消息能被消费组下的所有消费者消费,消费者和broker之间通过推模式和拉模式接收消息
    

    广播模式下的消息消费方式?

    推模式:broker主动将消息推送给消费者

    拉模式:消费者从broker中查询消息

    1-5 延迟消息的应用与实现

    典型应用场景:订单的关闭

    • 延迟消息也叫做定时消息,比如在电商项目的交易系统中,当用户下单之后超过一段时间之后仍然没有支付,此时就需要将该订单关l闭。

    功能实现:可以在用户创建订单时就发送一条包含订单内容的延迟消息,该消息在一段时间之后投递给消息消费者,当消息消费者接收到该消息后,
    判断该订单的支付状态,如果处于未支付状态,则将该订单关闭,商品回库(删除订单)

    RocketMQ的延迟等级:RocketMQ的延迟消息实现非常简单,只需要发送消息前设置延迟的时间,延迟时间存在十八个等级
    (1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),调用setDelayTimeLevel()设置与时间相对应的延迟级别即可

    import com.shanjupay.test.rocketmq.model.OrderExt;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;         // spring的message对象
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    @Component
    public class ProducerSimple {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        /**
         * 消息发送的模式1: 同步消息
         * 应用场景:
         * 向topic队列发送同步消息
         * @param topic
         * @param msg
         */
        public void sendSyncMsg(String topic, String msg){
            rocketMQTemplate.syncSend(topic,msg,100000);
        }
    
        /**
         * 消息的发送模式2:异步消息
    
         */
        public void sendASyncMsg(String topic,String msg){
            /*异步消息需要设置回调对象,消息发送成功/失败后,会由另外一个线程调用对象中的方法*/
            rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
    
                @Override
                public void onException(Throwable throwable) {
                    throwable.printStackTrace();
                }
            },100000);
        }
    
    
        /**
         * 将对象转换为json字符串作为消息同步发送
         */
        public void sendMsgByJson(String topic, OrderExt orderExt){
            rocketMQTemplate.convertAndSend(topic,orderExt);
        }
    
        /**
         * 发送同步延迟消息(需要将对象转换为spring的message对象)
         * @param topic     broker中队列topic
         * @param orderExt  传递的消息对象内容
         */
        public void sendMsgByJsonDelay(String topic, OrderExt orderExt) {
            Message<OrderExt> message = MessageBuilder.withPayload(orderExt).build();      //发送同步消息,消息内容将orderExt转为json
            this.rocketMQTemplate.syncSend(topic,message,1000,3); //指定发送超时时间(毫秒)和延迟等级
            System.out.printf("send msg : %s",orderExt);
        }
        
    }
    
    延迟队列的实现流程
    1)如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列Id为延迟级别减1。
    2)消息进入SCHEDULE_TOPIC_XXXX的队列中。
    3)定时任务根据上次拉取的偏移量不断从队列中取出所有消息。
    4)根据消息的物理偏移量和大小再次获取消息。
    5)根据消息属性重新创建消息,清除延迟级别,恢复原主题和队列Id。
    6)重新发送消息到原主题的队列中,供消费者进行消费。
    

    基本思想:通过定时任务+队列来实现消息延时发送到broker

    1-6 消费重试

    消费重试定义:producer线程成功将消息发送到Broker,被consumer消费时,发生意外情况,没有被正常消费,此时需要进行消费重试

    何时需要消费重试?
    1)消息没有被消费者接收,比如消费者与broker存在网络异常。此种情况消息会一直被消费重试。
    2)消息接受成功,但执行时产生异常,无法向broker返回结果,这个时候也会消费重试(实际场景更为常见的问题)。
    
    broker是如何知道消息消费的成功与否的?
    broker会从消费者获取信息消费的结果,如果没有返回消费成功的状态,那么消费者就会进行重试。
    
    消费者抛出异常,该如何处理?
    当消息在消费时出现异常,此时消息被有限次的重试消费。
    默认策略:消息会按照延迟消息的延迟时间等级(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)从第3级开始重试,每试一次如果还不成功则延迟等级加1,因此最多重试16次,如果依旧无法消费成功,那么该消息会进入到死信队列中。
    
    实际开发中如何处理消费失败的情况?

    默认策略:进行有限次的消费重试,每次重试仍然消费失败的话,延迟下一次重试的时间。

    实际开发策略基本思想:实际生产中不会让消息重试这么多次,通常在重试一定的次数后将消息写入数据库,由另外单独的程序或人工去处

    这种处理失败的情况,通常属于线上的异常情况,当重试次数达到一定的阈值,则首先需要保存消息,便于定位问题,维护系统
    
    /*处理的逻辑如下:*/
    public class ConsumerSimple implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
            	 //取出当前重试次数
            	int reconsumeTimes = messageExt.getReconsumeTimes();
            	    //当大于一定的次数后将消息写入数据库,由单独的程序或人工去处理
                if(reconsumeTimes >=2){
                    //将消息写入数据库,之后正常返回
                    return;
                }
        		throw new RuntimeException(String.format("第%s次处理失败..",reconsumeTimes));
        }
    }
    

    参考资料

    RocketMQ的延迟消息
    消息队列的幂等性
    消息重复,消息丢失,消息积压的解决策略
    RocketMQ的基础课程

  • 相关阅读:
    沉痛悼念乔布斯 (Steven Paul Jobs)
    Linux下文件属性
    window phone开发之动画效果
    简单的UDP收发讯息
    Red Hat Enterprise Linux 5 (红帽子企业版5)下Samba, VSFTP配置与安装
    XNA那些事(六)--WINDOWS PHONE 游戏开发中的3D摄像机
    今天发布iPhone 4s的可能性大一点吧(结果:iPhone4s+CDMA/GSM)
    iPhone开发:如何在iPhone应用中使用自定义字体
    C语言数组与指针详解
    linux 常用命令
  • 原文地址:https://www.cnblogs.com/kfcuj/p/15100566.html
Copyright © 2011-2022 走看看