zoukankan      html  css  js  c++  java
  • SpringKafka——生产者消息发送

    生产者消息发送方式

    发送既忘方式(异步发送)

    不考虑回调,不考虑消息重复,消息漏发,速度最快。常用作收集操作日志、用户浏览痕迹等不重要的信息。

        @Autowired
        @Qualifier("producertemplate")
        private KafkaTemplate<String, String> producertemplate;
    
    
    public String testKafka(){
     		transactionalTemplate.send("topicname","sendMessage")
            return null;
        }
    

    异步发送 带回调

    有回调函数,可以对失败结果或者成功结果进行二次处理。可靠性比发送既忘的方式高,效率比发送既忘方式低(但还是很高,测试过20W条数据眨眼的时间)

        @Autowired
        @Qualifier("producertemplate")
        private KafkaTemplate<String, String> producertemplate;
        public String testKafka(){
      
            producertemplate.send("orderInfo",sendMessage).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    log.info("msg send fail! exception : "+ throwable.getMessage());  //失败回调
                }
                @Override
                public void onSuccess(SendResult<String, String> stringStringSendResult) {
                    log.info("msg send successful! topic : "+stringStringSendResult.getRecordMetadata().topic()+"msg : " +stringStringSendResult.getProducerRecord().value() ); //成功回调
                }
            });
    
            return null;
        }
    

    同步发送

    所谓的同步发送就是阻塞,当发送一条消息后,阻塞该线程,等broker的回调结果,知道broker回调成功后才会继续进行,否则阻塞当前线程。个人觉得没必要使用,异步发送的回调可以解决大部分问题,线程阻塞的方式实在是不怎么友好、

        @Autowired
        @Qualifier("producertemplate")
        private KafkaTemplate<String, String> producertemplate;
        private KafkaTemplate<String, String> transactionalTemplate;
        public String testKafka(){
            try {
                SendResult<String, String> sendResult = producertemplate.send("hello", "message").get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
    
            return null;
        }
    

    事务发送

    为什么要使用事务发送呢?

    事务发送可以有效的保证消息的不重复,再加上有效的回调手段可以很好的保证消息不丢失也不重复,比如订单类型的信息、支付类型的信息。而且在添加trantional注解的方法上,这个方法如果发生错误,该消息也不会发送,事务除了保证消息的不重复,同样的也复合数据库事务的大部分特性。

    所以本次的测试使用事务+异步发送待回调的方式进行测试。

    大家可能已经注意 我的注入有两种

        @Autowired
        @Qualifier("producertemplate")
        private KafkaTemplate<String, String> producertemplate;
        @Autowired
        @Qualifier("transactionalTemplate")
        private KafkaTemplate<String, String> transactionalTemplate;
    

    这两种的区别就是一个带事务一个不带事务

    使用事务发送把注入对象修改一下 换成id名为transactionalTemplate的实例进行注入即可。

        @Autowired
        @Qualifier("transactionalTemplate")
        private KafkaTemplate<String, String> transactionalTemplate;
        public String testKafka(){
      
            transactionalTemplate.send("orderInfo",sendMessage).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    log.info("msg send fail! exception : "+ throwable.getMessage());  //失败回调
                }
                @Override
                public void onSuccess(SendResult<String, String> stringStringSendResult) {
                    log.info("msg send successful! topic : "+stringStringSendResult.getRecordMetadata().topic()+"msg : " +stringStringSendResult.getProducerRecord().value() ); //成功回调
                }
            });
    
            return null;
        }
    

    综上所述具体使用哪一种发送方式根据业务场景进行使用,目前我们使用的是事务+异步带回调的方式进行发送,因为业务明细未定以及保证灵活性我们就不编写工具类了

  • 相关阅读:
    SpringBoot项目启动遇到的问题记录
    关于点击按钮提交前进行数据校验
    idea插件的位置
    sqlite
    xamarin.forms 使用依赖注入
    vs2019查找替换,使用正则表达式功能
    EFCore 事务提交
    windows磁盘【文件和文件夹遇到-打不开or不能删除or损坏】的解决方式
    xamarin.forms 中使用Forms9Patch插件来显示图片遇到的问题
    net5 依赖注入的时候,遇到的问题:Cannot consume scoped service from singleton IHostedService
  • 原文地址:https://www.cnblogs.com/luckyhui28/p/12582054.html
Copyright © 2011-2022 走看看