zoukankan      html  css  js  c++  java
  • springboot+redis实现消息队列

    文章参考

    SpringBoot(9) 基于Redis消息队列实现异步操作

    https://blog.csdn.net/wilsonsong1024/article/details/80573611

    所做的改进

    • 博客中实用的是jedis操作,在springboot的年代,我们不需要去写redis的操作工具类了。
    • 直接上redisTemplate的使用。
    • handler的处理需要根据业务需求改造。
    • 增加了测试部分

    觉得后期的改进

    1. 消费redis的时候,看看有没有阻塞的策略(我的代码中是一直查询,感觉不太好)
    2. 消费线程,直接使用的是new thread。这个不太好管理(后期用线程池优化)

     涉及spring和springboot使用的部分

    • RedisTemplate序列化的配置,以及api相关的应用
    • fastjson的JSONObject等相关使用
    • InitializingBean,ApplicationContextAware是使用,以及其实现接口的作用
    • 线程池相关的问题学习
    • 模板模式的抽象能力


    代码

    pom.xml文件

        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.6.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.pig</groupId>
        <artifactId>about-redis</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>about-redis</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.62</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    AboutRedisApplication(这种redis序列化,可以在redis客户端中看到字符串)
    package com.pig.aboutredis;
    
    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    @SpringBootApplication
    public class AboutRedisApplication {
    
        public static void main(String[] args) {
            try{
    
                SpringApplication.run(AboutRedisApplication.class, args);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    
    
    
        @Bean
        public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
            RedisTemplate<String,Object> template=new RedisTemplate<>();
            template.setConnectionFactory(factory);
    
            Jackson2JsonRedisSerializer serializer=new Jackson2JsonRedisSerializer(Object.class);
    
            ObjectMapper om=new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            serializer.setObjectMapper(om);
    
            template.setValueSerializer(serializer);
            template.setKeySerializer(new StringRedisSerializer());
            template.setHashKeySerializer(new StringRedisSerializer());
            // 如果这里不设置的话,hash的类型如果是自定义类型,就会报错,序列化问题
            template.setHashValueSerializer(serializer);
            template.afterPropertiesSet();
            return template;
        }
    
    }
    EventModel (return this 值得思考学习)
    package com.pig.aboutredis.messagequeue.common;
    
    
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.RequiredArgsConstructor;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class EventModel {
    
    
        private EventType type;
        private int actorId;
        private int entityType;
        private int entityId;
        private int entityOwerId;
        public Map<String,String> exts=new HashMap<>();
    
        public EventModel() {
        }
    
        public EventModel(EventType type) {
            this.type = type;
        }
    
        public EventType getType() {
            return type;
        }
    
        public EventModel setType(EventType type) {
            this.type = type;
            return this;
        }
    
        public int getActorId() {
            return actorId;
        }
    
        public EventModel setActorId(int actorId) {
            this.actorId = actorId;
            return this;
        }
    
        public int getEntityType() {
            return entityType;
        }
    
        public EventModel setEntityType(int entityType) {
            this.entityType = entityType;
            return this;
        }
    
        public int getEntityId() {
            return entityId;
        }
    
        public EventModel setEntityId(int entityId) {
            this.entityId = entityId;
            return this;
        }
    
        public int getEntityOwerId() {
            return entityOwerId;
        }
    
        public EventModel setEntityOwerId(int entityOwerId) {
            this.entityOwerId = entityOwerId;
            return this;
        }
    
        public String getExts(String key) {
            return exts.get(key);
        }
    
        public EventModel setExts(String key,String value) {
            exts.put(key,value);
            return this;
        }
    
        @Override
        public String toString() {
            return "EventModel{" +
                    "type=" + type +
                    ", actorId=" + actorId +
                    ", entityType=" + entityType +
                    ", entityId=" + entityId +
                    ", entityOwerId=" + entityOwerId +
                    '}';
        }
    }
    EventType
    package com.pig.aboutredis.messagequeue.common;
    
    //事件的各种类型
    public enum EventType {
        LIKE(0),COMMENT(1),LOGIN(2),MAIL(3);
    
        private int value;
        EventType(int value){
            this.value=value;
        }
    
        public int getValue(){
            return value;
        }
    
    }
    EventProducer
    package com.pig.aboutredis.messagequeue.producer;
    
    import com.alibaba.fastjson.JSONObject;
    import com.pig.aboutredis.messagequeue.common.EventModel;
    import com.pig.aboutredis.messagequeue.utils.RedisKeyUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    
    import java.security.Key;
    
    @Component
    @Slf4j
    public class EventProducer {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
    
        // 把事件分发出去
        // 就是存储到redis中的list数据类型中
        public boolean fireEvent(EventModel model){
            try{
                String jsonString = JSONObject.toJSONString(model);
                String queueKey = RedisKeyUtil.getEventQueueKey();
                redisTemplate.opsForList().leftPush(queueKey,jsonString);
                return true;
            }catch (Exception e){
                e.printStackTrace();
                return false;
            }
        }
    
    }
    RedisKeyUtil 这个在我的代码没有任何意义,只需要一个共同的key(消费者和生产者)
    package com.pig.aboutredis.messagequeue.utils;
    
    public class RedisKeyUtil {
    
    
        private static final String SPLIT=":";
        private static final String BIZ_LIKE="LIKE";
        private static final String BIZ_DISLIKE="DISLIKE";
        private static final String BIZ_EVENTQUEUE="EVENTQUEUE";
    
    
        public static String getLikeKey(int entityType,int entityId){
            return BIZ_LIKE+SPLIT+String.valueOf(entityType)+SPLIT+String.valueOf(entityId);
        }
    
        public static String getDisLikeKey(int entityType,int entityId){
            return BIZ_DISLIKE+SPLIT+String.valueOf(entityType)+SPLIT+String.valueOf(entityId);
        }
        public static String getEventQueueKey(){
            return BIZ_EVENTQUEUE;
        }
    
    }
    EventHandler 这里用到了模板设计模式,很多抽象接口,使用到了模板模式
    package com.pig.aboutredis.messagequeue.consumer;
    
    import com.pig.aboutredis.messagequeue.common.EventModel;
    import com.pig.aboutredis.messagequeue.common.EventType;
    
    import java.util.List;
    
    // 模板设计模式
    public interface EventHandler {
    
        void doHandler(EventModel model);
    
        List<EventType> getSupportEventTypes();
    }
    EventConsumer 这种方法也可以自动注入所有EventHandler类型的bean
    @Autowired
    private Map<String,EventHandler> beans;
    package com.pig.aboutredis.messagequeue.consumer;
    
    import com.alibaba.fastjson.JSON;
    import com.pig.aboutredis.messagequeue.common.EventModel;
    import com.pig.aboutredis.messagequeue.common.EventType;
    import com.pig.aboutredis.messagequeue.utils.RedisKeyUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    import org.springframework.util.CollectionUtils;
    import org.springframework.util.StringUtils;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.logging.Handler;
    
    @Component
    @Slf4j
    public class EventConsumer implements InitializingBean,ApplicationContextAware {
    
        private Map<EventType,List<EventHandler>> config=new HashMap<>();
    
        // 获取spring的context
        private ApplicationContext context;
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.context=applicationContext;
        }
    
    
        public void afterPropertiesSet() throws Exception {
            Map<String,EventHandler> beans=context.getBeansOfType(EventHandler.class);
    
            if(!CollectionUtils.isEmpty(beans)){
                for (Map.Entry<String,EventHandler> entry:beans.entrySet()){
                    // 当前handler能够处理的类型
                    // 例如:LikeHandler 能够处理 EventType.LIKE
                    List<EventType> eventTypes = entry.getValue().getSupportEventTypes();
                    // 初始化 config
                    // key: EventType.LIKE类型 value: LikeHandler能够处理type的handler
                    for (EventType type : eventTypes) {
                        // 如果不包含,就创建一个可以
                        // 如果包含,就add进入list
                        if(!config.containsKey(type)){
                            config.put(type,new ArrayList<EventHandler>());
                        }
                        config.get(type).add(entry.getValue());
                    }
                }
            }
    
            // 消费策略
            // 有机会,优化为线程池类型。
            // 真正的线上环境,是不可能这样创建线程的,这样不好控制,没有高可用,不方便管理
            Thread thread=new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true){
                        String key = RedisKeyUtil.getEventQueueKey();
                        // producer是从left进入,consumer从right消费
                        String rightPop = (String) redisTemplate.opsForList().rightPop(key);
                        // 需要优化,redis api是否可以阻塞,如果有阻塞的可能,那么这里就不用判断了
                        if(StringUtils.isEmpty(rightPop)){
                            continue;
                        }
                        EventModel eventModel = JSON.parseObject(rightPop, EventModel.class);
                        if(!config.containsKey(eventModel.getType())){
                            log.error("不能识别的事件");
                            continue;
                        }
                        // 一个类型,多个handler可以消费
                        // 这里的消费策略,需要根据业务情况设置
                        for (EventHandler handler : config.get(eventModel.getType())) {
                            handler.doHandler(eventModel);
                        }
    
                    }
                }
            });
            thread.start();
    
        }
    }
    LikeHandler
    package com.pig.aboutredis.messagequeue.consumer.handler;
    
    import com.pig.aboutredis.messagequeue.common.EventModel;
    import com.pig.aboutredis.messagequeue.common.EventType;
    import com.pig.aboutredis.messagequeue.consumer.EventHandler;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    import java.util.Arrays;
    import java.util.List;
    
    
    @Component
    @Slf4j
    public class LikeHandler implements EventHandler {
    
    
        @Override
        public void doHandler(EventModel model) {
            log.info("LikeHandler 消费了你的数据,开始消费");
            log.info("LikeHandler 消费了你的数据,消费{}",model);
            log.info("LikeHandler 消费了你的数据,结束消费");
    
        }
    
        @Override
        public List<EventType> getSupportEventTypes() {
            return Arrays.asList(EventType.LIKE);
        }
    }

    测试

    ProducerController
    package com.pig.aboutredis.messagequeue.controller;
    
    
    import com.pig.aboutredis.messagequeue.common.EventModel;
    import com.pig.aboutredis.messagequeue.common.EventType;
    import com.pig.aboutredis.messagequeue.producer.EventProducer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class ProducerController {
    
    
        @Autowired
        private EventProducer producer;
    
    
        @GetMapping("/queue")
        public String redisQueue(){
            EventModel model=new EventModel();
            model.setType(EventType.LIKE);
            model.setActorId(11);
            producer.fireEvent(model);
            model.setActorId(22);
            producer.fireEvent(model);
            model.setActorId(33);
            producer.fireEvent(model);
            model.setActorId(44);
            producer.fireEvent(model);
            model.setActorId(55);
            producer.fireEvent(model);
            return "i am your father";
        }
    }

    http://localhost:8080/aboutredis/queue

    日志结果

    2020-04-02 22:28:53.254 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
    2020-04-02 22:28:53.254 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=11, entityType=0, entityId=0, entityOwerId=0}
    2020-04-02 22:28:53.254 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费
    2020-04-02 22:28:53.351 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
    2020-04-02 22:28:53.351 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=22, entityType=0, entityId=0, entityOwerId=0}
    2020-04-02 22:28:53.351 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费
    2020-04-02 22:28:53.454 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
    2020-04-02 22:28:53.454 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=33, entityType=0, entityId=0, entityOwerId=0}
    2020-04-02 22:28:53.454 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费
    2020-04-02 22:28:53.538 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
    2020-04-02 22:28:53.539 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=44, entityType=0, entityId=0, entityOwerId=0}
    2020-04-02 22:28:53.539 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费
    2020-04-02 22:28:53.623 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
    2020-04-02 22:28:53.626 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=55, entityType=0, entityId=0, entityOwerId=0}
    2020-04-02 22:28:53.626 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费



    目的

    写这个博客的目的是:熟悉redis和springboot
    熟悉redis和springboot不能只是停留在api的使用阶段。
    而是要使用起来得心应手,就是想用redis和springboot能干的事情,都能够很轻松的写出来。
    一句话:可以用它来展现自己所想。

    后期有机会研究一下下面专题:(百度“redis实现”出现的关键词)

    1. redis实现session共享
    2. redis实现分布式锁原理
    3. redis实现分布式事务
    4. redis实现布隆过滤器
  • 相关阅读:
    《Geometric Deep Learning综述介绍》
    《和想象不太一样的'Graph Neural Network + Zero Shot'》
    《几何深度学习前沿》
    《【Paper Live】滴滴出行-探索资源约束的Contextual Bandits问题 & KDD Cup滴滴出行比赛解读>
    《PDP: 解决约束满足问题的神经网络架构 | 刘晶 | 集智俱乐部图网络论文读书会20190819》
    《NeuroSAT: Learning a SAT Solver from Single-Bit Supervision》
    《OR Talk NO.5 | Facebook 田渊栋:用深度(强化)学习为组合优化寻找更好的启发式搜索策略》
    《OR Talk NO.15 | 在业界实验室做AI设计师是一种什么体验?》
    超级跳马 —— 矩阵快速幂优化DP
    图SLAM:Noob的同时本地化和映射指南
  • 原文地址:https://www.cnblogs.com/windy13/p/12623609.html
Copyright © 2011-2022 走看看