zoukankan      html  css  js  c++  java
  • Spring boot 2.x 集成Rocketmq实现事物消息

    1.引入相关Maven依赖:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.xxx</groupId>
        <artifactId>rocket</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.5.RELEASE</version>
            <relativePath />
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-actuator-autoconfigure</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                  <version>2.0.1</version>
            </dependency>
    
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.14</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>
    View Code

    2.配置生产者:

      2.1 application.properties 配置如下:

    ####producer
    rocketmq.name-server=127.0.0.1:9876
    rocketmq.producer.group=  my-group
    rocketmq.producer.send-message-timeout= 300000
    rocketmq.producer.compress-message-body-threshold= 4096
    rocketmq.producer.max-message-size= 4194304
    rocketmq.producer.retry-times-when-send-async-failed= 0
    rocketmq.producer.retry-next-server= true
    rocketmq.producer.retry-times-when-send-failed= 2

      2.2 事务监听:

    package com.xxx.listener;
    
    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
    import org.springframework.messaging.Message;
    
    @Slf4j
    @RocketMQTransactionListener(txProducerGroup = "rocket")
    public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
            System.out.println("本地事务和消息发送:" + JSON.toJSONString(message));
    
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
            System.out.println("回查信息:" + JSON.toJSONString(message));
    
            return RocketMQLocalTransactionState.COMMIT;
        }
    }
    View Code

      2.3  发送事物消息:

    package com.xxx;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.producer.TransactionSendResult;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    
    @Slf4j
    @SpringBootApplication
    public class SpringBootRocketMqApplication {
        public static void main(String[] args) throws InterruptedException {
            ConfigurableApplicationContext context = SpringApplication.run(SpringBootRocketMqApplication.class, args);
    
            RocketMQTemplate template = context.getBean(RocketMQTemplate.class);
            while (true) {
                String msg = "demo msg test";
                log.info("开始发送消息:"+msg);
    
                Message message = MessageBuilder.withPayload(msg).build();
                TransactionSendResult result = template.sendMessageInTransaction("rocket", "ts", message, null);
                log.info("消息发送响应信息:"+result.toString());
    
                Thread.sleep(10);
            }
        }
    }
    View Code

    3. 配置消费者:

      3.1 application.properties 配置如下:

        rocketmq.name-server=127.0.0.1:9876

      3.2 消费者监听:

    package com.xxx.listener;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    
    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "ts", consumerGroup = "my-consumer-group")
    public class ConsumerLifecycleListener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
        @Override
        public void onMessage(String s) {
            // 实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
            log.info("实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用");
        }
    
        @Override
        public void prepareStart(DefaultMQPushConsumer consumer) {
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt messageExt : msgs) {
                        System.out.println("重试次数:" + messageExt.getReconsumeTimes());
    
                        System.out.println("接受到的消息:" + new String(messageExt.getBody()));
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
        }
    }
    View Code

    4. 延时消息
      RocketMQ 目前只支持固定精度的定时消息。

      延迟级别(18个等级)

      1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h  Message message = MessageBuilder.withPayload(msg).build();

      rocketMQTemplate.syncSend(topic, message,1000,2);//表示延时5秒

    5. 顺序消息
      asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback)

         通过指定hashkey实现顺序消费,同步的hashkey会按顺序消费

      

  • 相关阅读:
    Redis实战之Redis + Jedis[转]
    FastDFS、nginx配置手记
    服务器后端开发系列——《实战FastDFS分布式文件系统》[转]
    FastDFS分布文件系统[转]
    在XMPP的JAVA开源实现Openfire中,增加LBS 附近的人功能
    FASTDFS 5X安装
    助力互联网创业融资
    lucene索引并搜索mysql数据库[转]
    ZooKeeper监控
    光驱在资源管理器显示黄色感叹号的解决方法BIOS内有 系统下没有
  • 原文地址:https://www.cnblogs.com/yx88/p/11146484.html
Copyright © 2011-2022 走看看