文章参考
SpringBoot(9) 基于Redis消息队列实现异步操作
https://blog.csdn.net/wilsonsong1024/article/details/80573611
所做的改进
- 博客中实用的是jedis操作,在springboot的年代,我们不需要去写redis的操作工具类了。
- 直接上redisTemplate的使用。
- handler的处理需要根据业务需求改造。
- 增加了测试部分
觉得后期的改进
- 消费redis的时候,看看有没有阻塞的策略(我的代码中是一直查询,感觉不太好)
- 消费线程,直接使用的是new thread。这个不太好管理(后期用线程池优化)
涉及spring和springboot使用的部分
- RedisTemplate序列化的配置,以及api相关的应用
- fastjson的JSONObject等相关使用
- InitializingBean,ApplicationContextAware是使用,以及其实现接口的作用
- 线程池相关的问题学习
- 模板模式的抽象能力
代码
pom.xml文件
<modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.pig</groupId> <artifactId>about-redis</artifactId> <version>0.0.1-SNAPSHOT</version> <name>about-redis</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
AboutRedisApplication(这种redis序列化,可以在redis客户端中看到字符串)
package com.pig.aboutredis; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @SpringBootApplication public class AboutRedisApplication { public static void main(String[] args) { try{ SpringApplication.run(AboutRedisApplication.class, args); }catch (Exception e){ e.printStackTrace(); } } @Bean public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){ RedisTemplate<String,Object> template=new RedisTemplate<>(); template.setConnectionFactory(factory); Jackson2JsonRedisSerializer serializer=new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om=new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(om); template.setValueSerializer(serializer); template.setKeySerializer(new StringRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); // 如果这里不设置的话,hash的类型如果是自定义类型,就会报错,序列化问题 template.setHashValueSerializer(serializer); template.afterPropertiesSet(); return template; } }
EventModel (return this 值得思考学习)
package com.pig.aboutredis.messagequeue.common; import lombok.Data; import lombok.NoArgsConstructor; import lombok.RequiredArgsConstructor; import java.util.HashMap; import java.util.Map; public class EventModel { private EventType type; private int actorId; private int entityType; private int entityId; private int entityOwerId; public Map<String,String> exts=new HashMap<>(); public EventModel() { } public EventModel(EventType type) { this.type = type; } public EventType getType() { return type; } public EventModel setType(EventType type) { this.type = type; return this; } public int getActorId() { return actorId; } public EventModel setActorId(int actorId) { this.actorId = actorId; return this; } public int getEntityType() { return entityType; } public EventModel setEntityType(int entityType) { this.entityType = entityType; return this; } public int getEntityId() { return entityId; } public EventModel setEntityId(int entityId) { this.entityId = entityId; return this; } public int getEntityOwerId() { return entityOwerId; } public EventModel setEntityOwerId(int entityOwerId) { this.entityOwerId = entityOwerId; return this; } public String getExts(String key) { return exts.get(key); } public EventModel setExts(String key,String value) { exts.put(key,value); return this; } @Override public String toString() { return "EventModel{" + "type=" + type + ", actorId=" + actorId + ", entityType=" + entityType + ", entityId=" + entityId + ", entityOwerId=" + entityOwerId + '}'; } }
EventType
package com.pig.aboutredis.messagequeue.common; //事件的各种类型 public enum EventType { LIKE(0),COMMENT(1),LOGIN(2),MAIL(3); private int value; EventType(int value){ this.value=value; } public int getValue(){ return value; } }
EventProducer
package com.pig.aboutredis.messagequeue.producer; import com.alibaba.fastjson.JSONObject; import com.pig.aboutredis.messagequeue.common.EventModel; import com.pig.aboutredis.messagequeue.utils.RedisKeyUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.security.Key; @Component @Slf4j public class EventProducer { @Autowired private RedisTemplate redisTemplate; // 把事件分发出去 // 就是存储到redis中的list数据类型中 public boolean fireEvent(EventModel model){ try{ String jsonString = JSONObject.toJSONString(model); String queueKey = RedisKeyUtil.getEventQueueKey(); redisTemplate.opsForList().leftPush(queueKey,jsonString); return true; }catch (Exception e){ e.printStackTrace(); return false; } } }
RedisKeyUtil 这个在我的代码没有任何意义,只需要一个共同的key(消费者和生产者)
package com.pig.aboutredis.messagequeue.utils; public class RedisKeyUtil { private static final String SPLIT=":"; private static final String BIZ_LIKE="LIKE"; private static final String BIZ_DISLIKE="DISLIKE"; private static final String BIZ_EVENTQUEUE="EVENTQUEUE"; public static String getLikeKey(int entityType,int entityId){ return BIZ_LIKE+SPLIT+String.valueOf(entityType)+SPLIT+String.valueOf(entityId); } public static String getDisLikeKey(int entityType,int entityId){ return BIZ_DISLIKE+SPLIT+String.valueOf(entityType)+SPLIT+String.valueOf(entityId); } public static String getEventQueueKey(){ return BIZ_EVENTQUEUE; } }
EventHandler 这里用到了模板设计模式,很多抽象接口,使用到了模板模式
package com.pig.aboutredis.messagequeue.consumer; import com.pig.aboutredis.messagequeue.common.EventModel; import com.pig.aboutredis.messagequeue.common.EventType; import java.util.List; // 模板设计模式 public interface EventHandler { void doHandler(EventModel model); List<EventType> getSupportEventTypes(); }
EventConsumer 这种方法也可以自动注入所有EventHandler类型的bean
@Autowired
private Map<String,EventHandler> beans;
package com.pig.aboutredis.messagequeue.consumer; import com.alibaba.fastjson.JSON; import com.pig.aboutredis.messagequeue.common.EventModel; import com.pig.aboutredis.messagequeue.common.EventType; import com.pig.aboutredis.messagequeue.utils.RedisKeyUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.logging.Handler; @Component @Slf4j public class EventConsumer implements InitializingBean,ApplicationContextAware { private Map<EventType,List<EventHandler>> config=new HashMap<>(); // 获取spring的context private ApplicationContext context; @Autowired private RedisTemplate redisTemplate; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.context=applicationContext; } public void afterPropertiesSet() throws Exception { Map<String,EventHandler> beans=context.getBeansOfType(EventHandler.class); if(!CollectionUtils.isEmpty(beans)){ for (Map.Entry<String,EventHandler> entry:beans.entrySet()){ // 当前handler能够处理的类型 // 例如:LikeHandler 能够处理 EventType.LIKE List<EventType> eventTypes = entry.getValue().getSupportEventTypes(); // 初始化 config // key: EventType.LIKE类型 value: LikeHandler能够处理type的handler for (EventType type : eventTypes) { // 如果不包含,就创建一个可以 // 如果包含,就add进入list if(!config.containsKey(type)){ config.put(type,new ArrayList<EventHandler>()); } config.get(type).add(entry.getValue()); } } } // 消费策略 // 有机会,优化为线程池类型。 // 真正的线上环境,是不可能这样创建线程的,这样不好控制,没有高可用,不方便管理 Thread thread=new Thread(new Runnable() { @Override public void run() { while (true){ String key = RedisKeyUtil.getEventQueueKey(); // producer是从left进入,consumer从right消费 String rightPop = (String) redisTemplate.opsForList().rightPop(key); // 需要优化,redis api是否可以阻塞,如果有阻塞的可能,那么这里就不用判断了 if(StringUtils.isEmpty(rightPop)){ continue; } EventModel eventModel = JSON.parseObject(rightPop, EventModel.class); if(!config.containsKey(eventModel.getType())){ log.error("不能识别的事件"); continue; } // 一个类型,多个handler可以消费 // 这里的消费策略,需要根据业务情况设置 for (EventHandler handler : config.get(eventModel.getType())) { handler.doHandler(eventModel); } } } }); thread.start(); } }
LikeHandler
package com.pig.aboutredis.messagequeue.consumer.handler; import com.pig.aboutredis.messagequeue.common.EventModel; import com.pig.aboutredis.messagequeue.common.EventType; import com.pig.aboutredis.messagequeue.consumer.EventHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.List; @Component @Slf4j public class LikeHandler implements EventHandler { @Override public void doHandler(EventModel model) { log.info("LikeHandler 消费了你的数据,开始消费"); log.info("LikeHandler 消费了你的数据,消费{}",model); log.info("LikeHandler 消费了你的数据,结束消费"); } @Override public List<EventType> getSupportEventTypes() { return Arrays.asList(EventType.LIKE); } }
测试
ProducerController
package com.pig.aboutredis.messagequeue.controller; import com.pig.aboutredis.messagequeue.common.EventModel; import com.pig.aboutredis.messagequeue.common.EventType; import com.pig.aboutredis.messagequeue.producer.EventProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class ProducerController { @Autowired private EventProducer producer; @GetMapping("/queue") public String redisQueue(){ EventModel model=new EventModel(); model.setType(EventType.LIKE); model.setActorId(11); producer.fireEvent(model); model.setActorId(22); producer.fireEvent(model); model.setActorId(33); producer.fireEvent(model); model.setActorId(44); producer.fireEvent(model); model.setActorId(55); producer.fireEvent(model); return "i am your father"; } }
http://localhost:8080/aboutredis/queue
日志结果
2020-04-02 22:28:53.254 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
2020-04-02 22:28:53.254 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=11, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.254 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费
2020-04-02 22:28:53.351 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
2020-04-02 22:28:53.351 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=22, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.351 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费
2020-04-02 22:28:53.454 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
2020-04-02 22:28:53.454 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=33, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.454 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费
2020-04-02 22:28:53.538 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
2020-04-02 22:28:53.539 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=44, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.539 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费
2020-04-02 22:28:53.623 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
2020-04-02 22:28:53.626 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=55, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.626 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费
目的
写这个博客的目的是:熟悉redis和springboot
熟悉redis和springboot不能只是停留在api的使用阶段。
而是要使用起来得心应手,就是想用redis和springboot能干的事情,都能够很轻松的写出来。
一句话:可以用它来展现自己所想。
后期有机会研究一下下面专题:(百度“redis实现”出现的关键词)
- redis实现session共享
- redis实现分布式锁原理
- redis实现分布式事务
- redis实现布隆过滤器