zoukankan      html  css  js  c++  java
  • RocketMQ(6)---发送普通消息(三种方式)

    发送普通消息(三种方式)

    RocketMQ 发送普通消息有三种实现方式:可靠同步发送可靠异步发送单向(Oneway)发送

    注意顺序消息只支持可靠同步发送

    GitHub地址: https://github.com/yudiandemingzi/SpringBootBlog

    一、概念

    1、可靠同步发送

    原理:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。

    应用场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

    2、可靠异步发送

    原理:异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 消息队列 RocketMQ 的异步发送,需要用户实现异步发送回调接口(SendCallback)。

    应用场景:异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如批量发货等操作。

    3、单向(Oneway)发送

    原理:单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别。

    应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

    4、三种对比

    下表概括了三者的特点和主要区别。

    发送方式 发送 TPS 发送结果反馈 可靠性
    同步发送 不丢失
    异步发送 不丢失
    单向发送 最快 可能丢失

    二、代码示例

    1、三种方式代码示例

    @Slf4j
    @RestController
    public class Controller {
        /**
         * 生产者组
         */
        private static String PRODUCE_RGROUP = "test_producer";
        /**
         * 创建生产者对象
         */
        private static DefaultMQProducer producer = null;
    
        static {
            producer = new DefaultMQProducer(PRODUCE_RGROUP);
            //不开启vip通道 开通口端口会减2
            producer.setVipChannelEnabled(false);
            //绑定name server
            producer.setNamesrvAddr("47.99.03.25:9876");
            try {
                producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
    
        }
    
        @GetMapping("/message")
        public  void  message() throws Exception {
            //1、同步
            sync();
            //2、异步
            async();
            //3、单项发送
            oneWay();
        }
        /**
         * 1、同步发送消息
         */
        private  void sync() throws Exception {
            //创建消息
            Message message = new Message("topic_family", ("  同步发送  ").getBytes());
            //同步发送消息
            SendResult sendResult = producer.send(message);
            log.info("Product-同步发送-Product信息={}", sendResult);
        }
        /**
         * 2、异步发送消息
         */
        private  void async() throws Exception {
            //创建消息
            Message message = new Message("topic_family", ("  异步发送  ").getBytes());
            //异步发送消息
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("Product-异步发送-输出信息={}", sendResult);
                }
                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                    //补偿机制,根据业务情况进行使用,看是否进行重试
                }
            });
        }
        /**
         * 3、单项发送消息
         */
        private  void oneWay() throws Exception {
            //创建消息
            Message message = new Message("topic_family", (" 单项发送 ").getBytes());
            //同步发送消息
            producer.sendOneway(message);
        }
    }
    

    2、测试结果

    这里消费者代码就不贴出来了。

    通过这个很明显可以看出三种方式都被 Consumer 消费了。只不过对于 Product 同步和异步发送是有返回信息的,单项发送是没有返回信息的。


    三、SendStatus状态

    当Product发送消息的时候,会返回SendResult对象,该对象又包含了一个SendStatus对象。

    package org.apache.rocketmq.client.producer;
    public enum SendStatus {
        SEND_OK,
        FLUSH_DISK_TIMEOUT,
        FLUSH_SLAVE_TIMEOUT,
        SLAVE_NOT_AVAILABLE,
    }
    

    下面对这几种状态进行说明

    SEND_OK

    代表发送成功!但并不保证它是可靠的。要确保不会丢失任何消息,还应启用SYNC_MASTER或SYNC_FLUSH。

    SLAVE_NOT_AVAILABLE

    如果Broker的角色是SYNC_MASTER(同步复制)(默认为异步),但没有配置Slave Broker,将获得此状态。

    FLUSH_DISK_TIMEOUT

    如果Broker设置为 SYNC_FLUSH(同步刷盘)(默认为ASYNC_FLUSH),并且Broker的syncFlushTimeout(默认为5秒)内完成刷新磁盘,将获得此状态。

    FLUSH_SLAVE_TIMEOUT

    如果Broker的角色是SYNC_MASTER(同步复制)(默认为ASYNC_MASTER),并且从属Broker的syncFlushTimeout(默认为5秒)内完成与主服务器的同步,将获得此状态。


    参考

    1、RocketMQ 阿里云官网文档



    只要自己变优秀了,其他的事情才会跟着好起来(上将3)
    
  • 相关阅读:
    事务
    MySQL删除表的方式
    建立索引的原则
    对表设置引擎
    运算符
    数据库锁简介
    为什么对表设置主键
    php苹果原生apns推送接口
    华为推送
    php操作redis
  • 原文地址:https://www.cnblogs.com/qdhxhz/p/11121981.html
Copyright © 2011-2022 走看看