zoukankan      html  css  js  c++  java
  • Rocketmq<二>springboot集成rocktmq

     前提:对于sringboot来说 集成任何框架,无非就是三个步骤:1、添加pom依赖 , 2、修改配置文件 , 3、启动类添加注解和配置 。

    一、pom依赖、配置文件

    pom依赖:

     <!-- SpringBoot集成RocketMQ  https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>2.0.3</version>
            </dependency>

    配置文件:

    server:
      port: 1004
    spring:
      application:
        name: lcn-user
    
    eureka: client: service
    -url: defaultZone: http://localhost:7900/eureka/
    rocketmq: name
    -server: 192.168.10.17:9876 producer: group: ${spring.application.name} send-message-timeout: 3000 retry-times-when-send-failed: 3 mq: user: topic: tpk02 group: name: lcn-pay tag: tag02 key: key02

    二、消息发送、消费消息

    1、发送同步消息:

    /**
     * @author D-L
     * @version 1.0.0
     * @ClassName UserService.java
     * @Description 消息发送
     * @createTime 2021-06-22 13:48:00
     */
    @Service
    @Slf4j
    public class UserService {
    
        @Value("${mq.user.topic}")
        private String topic;
        @Value("${mq.user.tag}")
        private String tag;
        @Value("${mq.user.key}")
        private String key;
        @Value("${mq.user.group.name}")
        private String group;
    
    
        @Autowired
        private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        /**
         * 发送同步消息
         * @return
         */
        public String sendMsg01(){
            threadPoolTaskExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    Message message = MessageBuilder.withPayload("发送同步消息").build();
                    String destination = String.format("%s:%s",topic ,tag);
                    SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
                    if(sendResult.getSendStatus().equals(SendStatus.SEND_OK)){
                        log.info("消息发送成功【SendStatus】 :{}" ,sendResult);
                    }else {
                        log.info("消息发送失败【SendStatus】 :{}" ,sendResult);
                    }
                }
            });
            return "ok";
        }
    
        /**
         * 发送异步消息
         * @return
         */
        public String sendMsg02(){
            threadPoolTaskExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    Message message = MessageBuilder.withPayload("发送异步消息").build();
                    String destination = String.format("%s:%s",topic ,tag);
                    rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
                        /**
                         * 发送成功回调函数
                         * @param sendResult
                         */
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            log.info("消息发送成功   【发送结果】 :{}" ,sendResult);
                        }
                        /**
                         * 发送失败回调函数
                         * @param e
                         */
                        @Override
                        public void onException(Throwable e) {
                            log.info("异常:{} ,消息:{}" ,e ,message);
                            //todo 可以对异常消息进行其他操作,重新发送或者存入db
                        }
                    });
                }
            });
            return "ok";
        }
    
        /**
         * 发送单项消息
         * @return
         */
        public String sendOneWayMsg(){
            log.info("发送单项消息------------------------------------");
            Message<String> message = MessageBuilder.withPayload("发送单项消息").build();
            String destination = String.format("%s:%s", topic, tag);
            rocketMQTemplate.sendOneWay(destination , message);
            return "ok";
        }
    
    
        /**
         * 发送顺序消息
         * @return
         */
        public String sendOrderlyMsg(){
            log.info("发送顺序消息------------------------------------");
            for (int i = 0; i < 100; i++) {
                int defaultTopicQueueNums = rocketMQTemplate.getProducer().getDefaultTopicQueueNums();
                Message<String> message = MessageBuilder.withPayload("发送顺序消息" + i + "   【队列数取模】:" + String.valueOf(i%defaultTopicQueueNums)).build();
                String destination = String.format("%s:%s", topic, tag);
                //根据i对topic中queue的数量取模后的值   放入对应的队列中  只能保证对应
                SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, message, String.valueOf(i%defaultTopicQueueNums));
                if(sendResult.getSendStatus().equals(SendStatus.SEND_OK)){
                    log.info("发送顺序消息成功 【消息内容】 :{}" ,sendResult);
                }else {
                    log.info("发送顺序消息失败--------------------");
                }
            }
            return "ok";
        }
    
    
        /**
         * 发送事务消息
         * @param msgStr
         * @return
         */
        public String sendTransactionMessage(String msgStr){
            log.info("【发送消息】-------------");
            Future<TransactionSendResult> submit = threadPoolTaskExecutor.submit(new Callable<TransactionSendResult>() {
                @Override
                public TransactionSendResult call() {
                    Message message = MessageBuilder.withPayload(msgStr).build();
                    TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(group, topic, message, tag);
                    log.info("【发送状态】:{}", result.getLocalTransactionState());
                    return result;
                }
            });
            LocalTransactionState localTransactionState = null;
            try {
                localTransactionState = submit.get().getLocalTransactionState();
            } catch (Exception e) {
                log.error("获取消息发送状态失败------------------");
            }
            return localTransactionState.toString();
        }
    
        /**
         *
         * @return
         */
        public String updateUserInfo(){
            log.info("操作用户信息------------------------------------");
            try {
    //            int count = 1/0;
            }catch (Exception e) {
                log.info("操作用户信息失败---------------");
                return "fail";
            }
            return "ok";
        }
    }

    2、事务消息(本地事务执行、消息回查):

    /**
     * @author D-L
     * @version 1.0.0
     * @ClassName SyncProducerListener.java
     * @Description 事务消息
     * @createTime 2021-06-22 14:51:00
     */
    @Slf4j
    @RocketMQTransactionListener(txProducerGroup = "lcn-user01")
    public class SyncProducerListener implements RocketMQLocalTransactionListener {
        private ConcurrentHashMap<Integer, RocketMQLocalTransactionState> map=new ConcurrentHashMap<>();
    
    
        @Autowired
        private UserService userService;
    
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
            map.put(message.hashCode(),RocketMQLocalTransactionState.UNKNOWN);
    
            String result = userService.updateUserInfo();
    
            if(!result.equalsIgnoreCase("OK")){
                System.out.println("本地事务出错,回滚事务消息--------");
                map.put(message.hashCode(),RocketMQLocalTransactionState.ROLLBACK);
            }else {
                map.put(message.hashCode(),RocketMQLocalTransactionState.COMMIT);
            }
            log.info(map.get(message.hashCode()).toString());
            return map.get(message.hashCode());
        }
    
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
            System.out.println("没有获得消息ack  -----  进行消息回查   消息的Tag:" + message);
            return map.get(message.hashCode());
        }
    }

    3、消息消费:

    /**
     * @author D-L
     * @version 1.0.0
     * @ClassName SyncProducerListener.java
     * @Description 消息接收者
     * @createTime 2021-06-22 22:51:00
     */
    
    @Component
    @RocketMQMessageListener(topic = "${mq.user.topic}",consumerGroup = "${mq.user.group.name}", selectorExpression = "*" ,
            //消费模式:广播
            messageModel = MessageModel.BROADCASTING,
            //消费模式:集群消费模式(默认情况)
    //        messageModel = MessageModel.CLUSTERING,
    
    
            //同时接收异步传递的消息
            consumeMode = ConsumeMode.CONCURRENTLY
            //有序接收异步传递的消息
    //         consumeMode = ConsumeMode.ORDERLY
    )
    public class PaymentListener implements RocketMQListener<MessageExt> {
        private static final Logger log = LoggerFactory.getLogger(PaymentListener.class);
    
        @Override
        public void onMessage(MessageExt messageExt) {
    
            log.info("开始接收到消息---------------------------------------");
            //1.解析消息内容
            try {
                String body = new String(messageExt.getBody(),"UTF-8");
    //            User user = JSON.parseObject(body, User.class);
    //            log.info("消息内容-------------:" + user);
                log.info("消息内容-------------:" + body);
            } catch (UnsupportedEncodingException e) {
                log.error("接收到消息失败 ,{}" ,e);
            }
            log.info("消息消费完成-----------------------------------------");
        }
    }
  • 相关阅读:
    xScrapBook
    使用STL仿函数和判断式来降低复杂性并改善可读[转]
    C++ 开源程序库[转]
    资源泄漏的悲剧
    Excel导入的HDR=YES; IMEX=1详解
    largeint.lib
    共享刚写的简单DirectUI库 只实现了思想
    document.body.scrollTop的值总为零的解决办法
    CDCHandle谨慎使用
    C++中std::tr1::function和bind 组件的使用
  • 原文地址:https://www.cnblogs.com/dongl961230/p/14923364.html
Copyright © 2011-2022 走看看