zoukankan      html  css  js  c++  java
  • spring boot使用rocketmq

    依赖

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
    </dependency>
    

    配置

    rocketmq:
      name-server: 127.0.0.1:9876
      # 纯消费者不需要以下配置
      producer:
        group: test-group
    

    消息生产者

    • 获取客户端模板
    @Autowired
    private final RocketMQTemplate rocketMQTemplate;
    
    • 发送消息
    // 默认使用同步发送, 但拿不到回执, 源码见下文org.apache.rocketmq.spring.core.RocketMQTemplate.doSent
    rocketMQTemplate.convertAndSend("test-topic", entity);
    rocketMQTemplate.send("test-topic", MessageBuilder.withPayload(entity).build());
    // 带tag
    rocketMQTemplate.convertAndSend("test-topic:tag1", entity);
    rocketMQTemplate.send("test-topic:tag2", MessageBuilder.withPayload(entity).build());
    
    • 发送单向消息(不关心发送结果)
    rocketMQTemplate.sendOneWay("test-topic", MessageBuilder.withPayload("oneway message").build());
    
    • 同步发送
      可选参数:delayLevel - 延迟等级
      可用延迟等级可在broker服务器配置中指定 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    SendResult result = rocketMQTemplate.syncSend("test-topic", entity, timeout, delayLevel);
    
    • 发送顺序消息
      可选参数:hashkey - 用于选择消息队列,只有在相同队列的消息能保持顺序
    SendResult result = rocketMQTemplate.syncSendOrderly("test-topic", "order message", "hashkey", timeout);
    
    • 异步发送
    rocketMQTemplate.asyncSend("test-topic", MessageBuilder.withPayload(entity).build(), new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {}
        @Override
        public void onException(Throwable e) {}
    }, timeout);
    

    消息消费者

    @Component
    @RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-consumer", consumeMode = ConsumeMode.ORDERLY)
    public class DemoListener implements RocketMQListener<UserEntity> {
    	...
        @Override
        public void onMessage(MyEntity entity) {
            logger.info("---->收到了消息了!");
            logger.info("---->" + entity.toString());
        }
    }
    

    常用RocketMQMessageListener参数

    • consumeMode - 消费模式
      默认值:ConsumeMode.CONCURRENTLY并行接受
      ConsumeMode.ORDERLY每个队列使用一个线程按顺序接收
    • messageModel - 消息模式
      默认值:MessageModel.CLUSTERING集群模式
      MessageModel.BROADCASTING广播模式

    重试

    • 当onMessage方法抛出异常时会触发重试,默认为无限重试

    源码

    org.apache.rocketmq.spring.core.RocketMQTemplate.doSent

    @Override
    protected void doSend(String destination, Message<?> message) {
        SendResult sendResult = syncSend(destination, message);
        log.debug("send message to `{}` finished. result:{}", destination, sendResult);
    }
    
  • 相关阅读:
    solr schema.xml配置
    solrconfig.xml配置文件
    solr 使用edismax来控制评分
    solr 打分和排序机制(转载)
    VMware vCenter Converter Standalone使用,物理机转换虚拟机
    Windows server开机登录取消按ctrl+alt+delete组合键
    pycharm专业版2020.1.1版本最新亲测激活,其他版本也复用
    jmeter服务器监控和插件管理
    windows下安装navicat15破解,详解最全
    linux解压汉字文件是乱码
  • 原文地址:https://www.cnblogs.com/luguojun/p/14294718.html
Copyright © 2011-2022 走看看