zoukankan      html  css  js  c++  java
  • 最简单的SpringBoot集成RocketMQ(二)进阶

    1.准备

    本想改造第一篇,但是感觉改来改去,会越来越迷糊,索性就重新启个项目

    第一篇说的是消息队列传输Json字符串,一般在企业中夸平台时常用,现在在说如何直接把传过来的json字符串反序列化成实体类

    新建SpringBoot项目

    2.集成RocketMQ

    1)pom依赖引入

    <dependencies>
            <!--rocketmq-->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.3.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
        </dependencies>

    2)application.properties配置文件如下

    (说明一下,除去端口号是系统中有的,其他均是生成后我们自己在类中方便引用的)

    server.port=8088
    
    rocketmq.producer.groupName=ProducerGroup
    rocketmq.producer.namesrvAddr=127.0.0.1:9876
    rocketmq.producer.instanceName=ProducerGroup
    rocketmq.producer.topic=topic2020
    rocketmq.producer.tag=test2020
    rocketmq.producer.maxMessageSize=131072
    rocketmq.producer.sendMsgTimeout=10000
    
    rocketmq.consumer.namesrvAddr=127.0.0.1:9876
    rocketmq.consumer.groupName=ConsumerGroup
    rocketmq.consumer.topic=topic2020
    rocketmq.consumer.tag=test2020
    rocketmq.consumer.consumeThreadMin=20
    rocketmq.consumer.consumeThreadMax=64
    rocketmq.consumer.topic.topic=2020test
    rocketmq.consumer.tag.tag=animal

    3)创建实体类User、Animal

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Accessors(chain = true)
    public class User implements Serializable {
    
        private String name;
    
        private Integer age;
    
        private String sex;
    
        private String time;
    
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Accessors(chain = true)
    public class Animal implements Serializable {
    
        private String name;
    
        private Integer age;
    
        private String time;
    
    }

    4)引入序列胡及反序列化工具

    package com.niuben.springboot.utils;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    import java.io.IOException;
    
    /**
     * Description:jackson序列化工具
     */
    public class JsonUtils {
    
        public static final ObjectMapper MAPPER = new ObjectMapper();
    
        /**
         * 将对象转为json
         */
        public static String toJson(Object obj) {
            try {
                return MAPPER.writeValueAsString(obj);
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }
    
        /**
         * 反序列化
         */
        public static <T> T fromJson(String json, Class<T> presentClass) {
            try {
                return MAPPER.readValue(json, presentClass);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    
    }

    5)创建MessageProcessor消息处理接口

    package com.niuben.springboot.message;
    
    import com.niuben.springboot.utils.JsonUtils;
    
    public interface MessageProcessor<T> {
    
        boolean handle(T messageExt);
    
        Class<T> getPresentClass();
    
        // 将String类型的message反序列化成对应的对象
        default T transferMessage(String message) {
            return JsonUtils.fromJson(message, getPresentClass());
        }
    
    }

    6)实现MessageProcessorImpl消息处理类

    本篇中用两到个2个实体类(就用2个实体类演示一下),在实现消息处理接口时,就实现2个

    6.1)AnimalMessageProcessorImpl消息处理类

    package com.niuben.springboot.message.impl;
    
    import com.niuben.springboot.entity.Animal;
    import com.niuben.springboot.message.MessageProcessor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    
    /**
     * Description:
     */
    @Service
    @Slf4j
    public class AnimalMessageProcessorImpl implements MessageProcessor<Animal> {
    
        @Override
        public boolean handle(Animal animal) {
            log.info("接收Animal类消息:" + animal.toString());
            return true;
        }
    
        @Override
        public Class<Animal> getPresentClass() {
            return Animal.class;
        }
    }

    6.1)UserMessageProcessorImpl消息处理类

    package com.niuben.springboot.message.impl;
    
    import com.niuben.springboot.entity.User;
    import com.niuben.springboot.message.MessageProcessor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    
    /**
     * Description:
     */
    @Service
    @Slf4j
    public class UserMessageProcessorImpl implements MessageProcessor<User> {
    
        @Override
        public boolean handle(User user) {
            log.info("接收User类消息:" + user.toString());
            return true;
        }
    
        @Override
        public Class<User> getPresentClass() {
            return User.class;
        }
    }

    7)创建MessageListen消息监听类

    package com.niuben.springboot.message;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * Description:监听类
     */
    @Component
    @Slf4j
    public class MessageListen implements MessageListenerConcurrently {
    
        private Map<String, MessageProcessor> handleMap = new HashMap<>();
    
        // MessageProcessor消息处理接口的实现类放进Map集合
        // key:tag,value:MessageProcessor实体类
        public void registerHandler(String tags, MessageProcessor messageProcessor) {
            handleMap.put(tags, messageProcessor);
        }
    
        @SuppressWarnings("unchecked")
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            MessageExt ext = list.get(0);
            String message = new String(ext.getBody());
            // 获取到tag
            String tags = ext.getTags();
            // 根据tag获取到消息处理处理类
            MessageProcessor messageProcessor = handleMap.get(tags);
    
            Object obj = null;
            try {
                // 将String类型的message反序列化成对应的对象
                obj = messageProcessor.transferMessage(message);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("反序列化失败");
            }
            // 消息处理
            boolean result = messageProcessor.handle(obj);
            if (!result) {
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    
    }

    8)创建消息生产者RocketMQProducer

    package com.niuben.springboot.consumer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * Description:生产者配置
     */
    @Configuration
    @Slf4j
    public class RocketMQProducer {
    
        @Value("${rocketmq.producer.groupName}")
        private String groupName;
    
        @Value("${rocketmq.producer.namesrvAddr}")
        private String nameserAddr;
    
        @Value("${rocketmq.producer.instanceName}")
        private String instanceName;
    
        @Value("${rocketmq.producer.maxMessageSize}")
        private int maxMessageSize;
    
        @Value("${rocketmq.producer.sendMsgTimeout}")
        private int sendMsgTimeout;
    
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public DefaultMQProducer getRocketMQProducer() {
            DefaultMQProducer producer = new DefaultMQProducer(groupName);
            producer.setNamesrvAddr(nameserAddr);
            producer.setInstanceName(instanceName);
            producer.setMaxMessageSize(maxMessageSize);
            producer.setSendMsgTimeout(sendMsgTimeout);
            producer.setVipChannelEnabled(false);
            log.info("================>生产者创建完成,ProducerGroupName{}<================", groupName);
            return producer;
        }
    
    }

    (@Bean(initMethod = "start", destroyMethod = "shutdown"),在bean注解中开启/销毁生成者)

    9)创建消息消费者RocketMQConsumer

    package com.niuben.springboot.consumer;
    
    import com.niuben.springboot.message.MessageListen;
    import com.niuben.springboot.message.impl.AnimalMessageProcessorImpl;
    import com.niuben.springboot.message.impl.UserMessageProcessorImpl;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * Description:消费者配置
     */
    @Configuration
    @Slf4j
    public class RocketMQConsumer {
    
        @Autowired
        private MessageListen messageListen;
    
        @Autowired
        private UserMessageProcessorImpl userMessageProcessor;
    
        @Autowired
        private AnimalMessageProcessorImpl animalMessageProcessor;
    
        @Value("${rocketmq.consumer.namesrvAddr}")
        private String namesrvAddr;
    
        @Value("${rocketmq.consumer.groupName}")
        private String groupName;
    
        @Value("${rocketmq.consumer.topic}")
        private String topic;
    
        @Value("${rocketmq.consumer.tag}")
        private String tag;
    
        @Value("${rocketmq.consumer.consumeThreadMin}")
        private int consumeThreadMin;
    
        @Value("${rocketmq.consumer.consumeThreadMax}")
        private int consumeThreadMax;
    
        @Value("${rocketmq.consumer.topic.topic}")
        private String topic2;
    
        @Value("${rocketmq.consumer.tag.tag}")
        private String tag2;
    
        /**
         * 如果只想消费topic下的某几个tag,以用 “||”隔开。比如:consumer.subscribe("topic2020", "Tag1 || Tag2");
         * 如果想消费topic下所有的tag,用“*”。比如:consumer.subscribe("topic2020", "*");
         */
    
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public DefaultMQPushConsumer getRocketMQConsumer() {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
            consumer.setNamesrvAddr(namesrvAddr);
            consumer.setConsumeThreadMin(consumeThreadMin);
            consumer.setConsumeThreadMax(consumeThreadMax);
            consumer.setVipChannelEnabled(false);
    
            messageListen.registerHandler(tag, userMessageProcessor);
            messageListen.registerHandler(tag2, animalMessageProcessor);
            consumer.registerMessageListener(messageListen);
            try {
                consumer.subscribe(topic, tag);
                consumer.subscribe(topic2, "*");
                log.info("===============>消费者创建完成,ConsumerGroupName:{}<==============", groupName);
                log.info("=====>消费者监听开始,groupName:{},topic:{},namesrvAddr:{}<=======", groupName, topic, namesrvAddr);
            } catch (MQClientException e) {
                log.error("======>消费者监听失败,groupName:{},topic:{},namesrvAddr:{}<======", groupName, topic, namesrvAddr, e);
                e.printStackTrace();
            }
            return consumer;
        }
    
    }

    (@Bean(initMethod = "start", destroyMethod = "shutdown"),在bean注解中开启/销毁消费者)

    10)创建controller类,RocketMqController

    package com.niuben.springboot.controller;
    
    import com.alibaba.fastjson.JSON;
    import com.niuben.springboot.consumer.RocketMQProducer;
    import com.niuben.springboot.entity.Animal;
    import com.niuben.springboot.entity.User;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * Description:
     */
    @RestController
    @Slf4j
    public class RocketMqController {
    
        @Autowired
        @Qualifier("rocketMQProducer")
        RocketMQProducer rocketMQProducer;
    
        @GetMapping("/test")
        public void TestSend() {
            DefaultMQProducer producer = rocketMQProducer.getRocketMQProducer();
    
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            User user = new User("小明", 20, "", sdf.format(new Date()));
            Message messageToUser = new Message("topic2020", "test2020", JSON.toJSONString(user).getBytes());
            try {
                producer.send(messageToUser);
            } catch (Exception e) {
                log.error("消息发送异常");
                e.printStackTrace();
            }
    
            Animal animal = new Animal("金毛", 3, sdf.format(new Date()));
            Message messageToAnimal = new Message("2020test", "animal", JSON.toJSONString(animal).getBytes());
            try {
                producer.send(messageToAnimal);
            } catch (Exception e) {
                log.error("消息发送异常");
                e.printStackTrace();
            }
    
        }
    
    }

    3.测试

    1)首先启动本地RocketMQ(启动参考

     2)启动主启动类,在浏览器输入 localhost:8088/test,可以看到消费者已经创建并开始监听,并且已经消费消息

    源码:https://gitee.com/niugit_admin/springboot-rocketmq-advanced

  • 相关阅读:
    Jmeter跨线程组传值
    python基础之高级函数
    Python基础之函数的应用
    python基础之文件的输入输出
    python基础练习之猜数字
    折腾了两天的跨站脚本提交问题,与IIS7有关
    python简介和环境搭建
    python paramiko
    Linux rsync 企业级应用
    Linux find 命令详解
  • 原文地址:https://www.cnblogs.com/niudaben/p/12517147.html
Copyright © 2011-2022 走看看