zoukankan      html  css  js  c++  java
  • springboot2.X 集成redis+消息发布订阅

    需求场景:分布式项目中,每个子项目有各自的 user 数据库, 在综合管理系统中存放这所有用户信息, 为了保持综合管理系统用户的完整性,

        子系统添加用户后将用户信息以json格式保存至redis,然后发布到消息到消息通道,综合管理系统监控到子系统发布的消息前往redis 

        获取出用户信息保存到自己的数据库

    1)redis配置

     1 spring:
     2   redis:
     3     #数据库索引
     4     database: 5
     7     host: 127.0.0.1
     8     port: 6379
     9     password: 123456
    10     jedis:
    11       pool:
    12         #最大连接数
    13         max-active: 8
    14         #最大阻塞等待时间(负数表示没限制)
    15         #最大空闲
    16         max-idle: 8
    17         #最小空闲
    18         min-idle: 0

    2)集成redis , 初始化redis组件

      1 package com.bigcustomer.configs;
      2 
      3 
      4 import com.bigcustomer.utils.redisUtil.RedisService;
      5 import com.fasterxml.jackson.annotation.JsonAutoDetect;
      6 import com.fasterxml.jackson.annotation.PropertyAccessor;
      7 import com.fasterxml.jackson.databind.ObjectMapper;
      8 import org.slf4j.Logger;
      9 import org.slf4j.LoggerFactory;
     10 import org.springframework.beans.factory.annotation.Autowired;
     11 import org.springframework.cache.annotation.CachingConfigurerSupport;
     12 import org.springframework.cache.annotation.EnableCaching;
     13 import org.springframework.context.annotation.Bean;
     14 import org.springframework.context.annotation.Configuration;
     15 import org.springframework.data.redis.connection.RedisConnectionFactory;
     16 import org.springframework.data.redis.core.RedisTemplate;
     17 import org.springframework.data.redis.core.StringRedisTemplate;
     18 import org.springframework.data.redis.listener.PatternTopic;
     19 import org.springframework.data.redis.listener.RedisMessageListenerContainer;
     20 import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
     21 import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
     22 
     23 
     24 /**
     25  * @author :CX
     26  * @Date :Create in 2018/8/15 14:03
     27  * @Effect :
     28  */
     29 
     30 @Configuration
     31 @EnableCaching//开启注解
     32 public class RedisConfig extends CachingConfigurerSupport {
     33 
     34 
     35     private static Logger logger = LoggerFactory.getLogger(RedisConfig.class);
     36     // 自定义的配置类, 存放了通道地址
     37     @Autowired
     38     private BaseConfig baseConfig;
     39 
     40     /**
     41      *@参数
     42      *@返回值
     43      *@创建人  cx
     44      *@创建时间
     45      *@描述   //初始化监听器
     46      */
     47     @Bean
     48     RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
     49                                             MessageListenerAdapter listenerAdapter) {
     50 
     51         RedisMessageListenerContainer container = new RedisMessageListenerContainer();
     52         container.setConnectionFactory(connectionFactory);
     53         //配置监听通道
     54         container.addMessageListener(listenerAdapter, new PatternTopic(baseConfig.getRedisAisle()));// 通道的名字
     55         logger.info("初始化监听成功,监听通道:【"+baseConfig.getRedisAisle()+"】");
     56         return container;
     57     }
     58 
     59     /**
     60      *@参数
     61      *@返回值
     62      *@创建人  cx
     63      *@创建时间
     64      *@描述 利用反射来创建监听到消息之后的执行方法
     65      */
     66     @Bean
     67     MessageListenerAdapter listenerAdapter(RedisService receiver) {
     68         return new MessageListenerAdapter(receiver, "receiveMessage");
     69     }
     70 
     71 //    /**
     72 //     *@参数
     73 //     *@返回值
     74 //     *@创建人  cx
     75 //     *@创建时间
     76 //     *@描述 控制线程用的
     77 //     */
     78 //    @Bean
     79 //    Receiver receiver(CountDownLatch latch) {
     80 //        return new Receiver(latch);
     81 //    }
     82 //
     83 //    @Bean
     84 //    CountDownLatch latch() {
     85 //        return new CountDownLatch(1);
     86 //    }
     87 
     88     /**
     89      *@参数
     90      *@返回值
     91      *@创建人  cx
     92      *@创建时间
     93      *@描述   //使用默认的工厂初始化redis操作String模板
     94      */
     95     @Bean
     96     StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
     97         return new StringRedisTemplate(connectionFactory);
     98     }
     99     /**
    100      *@参数
    101      *@返回值
    102      *@创建人  cx
    103      *@创建时间
    104      *@描述   //使用默认的工厂初始化redis操作map模板
    105      */
    106     @Bean
    107     RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) {
    108 
    109         Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
    110         ObjectMapper om = new ObjectMapper();
    111         om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    112         om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    113         jackson2JsonRedisSerializer.setObjectMapper(om);
    114         RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
    115         template.setConnectionFactory(connectionFactory);
    116         template.setKeySerializer(jackson2JsonRedisSerializer);
    117         template.setValueSerializer(jackson2JsonRedisSerializer);
    118         template.setHashKeySerializer(jackson2JsonRedisSerializer);
    119         template.setHashValueSerializer(jackson2JsonRedisSerializer);
    120         template.afterPropertiesSet();
    121         return template;
    122 
    123     }
    124 }

    3 ) 操作string 和 map 的dao封装

      1 package com.bigcustomer.utils.redisUtil;
      2 
      3 import com.alibaba.fastjson.JSON;
      4 import com.bigcustomer.biguser.service.BigUserService;
      5 import org.slf4j.Logger;
      6 import org.slf4j.LoggerFactory;
      7 import org.springframework.data.redis.core.RedisTemplate;
      8 import org.springframework.data.redis.core.StringRedisTemplate;
      9 import org.springframework.stereotype.Component;
     10 
     11 import javax.annotation.Resource;
     12 import java.util.Map;
     13 
     14 /**
     15  * @author :CX
     16  * @Date :Create in 2018/8/15 14:19
     17  * @Effect : redisDAO封裝
     18  */
     19 @Component
     20 public class MyRedisDao {
     21 
     22     private static Logger logger = LoggerFactory.getLogger(BigUserService.class);
     23     @Resource
     24     private StringRedisTemplate template;
     25 
     26     @Resource
     27     private RedisTemplate redisTemplate;
     28 
     29     //大客户信息同步到redis时保存的map的key
     30     private final String BIG_USER_REDIS_KEY = "CM:CHANNELCUSTOMER";
     31 
     32     /**
     33      * @参数
     34      * @返回值
     35      * @创建人 cx
     36      * @创建时间
     37      * @描述 大客户添加成功后存到redis
     38      */
     39     public boolean setMap(Map<String , Object> map) {
     40 
     41         try {
     42             redisTemplate.opsForHash().putAll(BIG_USER_REDIS_KEY
     43                     , map);
     44             logger.info("同步大客户信息到redis 成功!userId【" + map.get("funiqueid")+ "】");
     45             return true;
     46         } catch (Exception e) {
     47             e.printStackTrace();
     48         }
     49         logger.info("同步大客户信息到redis 失败!userId【" + map.get("funiqueid")+ "】");
     50         return false;
     51     }
     52 
     53 
     54     public Object getMap(String key) {
     55 
     56         try {
     57             Object o = redisTemplate.opsForHash().get(BIG_USER_REDIS_KEY, key);
     58             if (null != o) {
     59                 return o;
     60             }
     61         } catch (Exception e) {
     62             e.printStackTrace();
     63         }
     64         logger.info("获取大客户信息到失败!");
     65         return null;
     66     }
     67 
     68 
     69     /**
     70      * @参数
     71      * @返回值 存在 = true , 不纯在false
     72      * @创建人 cx
     73      * @创建时间
     74      * @描述 判断是否存在 该key对应的值
     75      */
     76     public boolean isNull(String key) {
     77         return template.hasKey(key);
     78     }
     79 
     80     /**
     81      * @参数
     82      * @返回值
     83      * @创建人 cx
     84      * @创建时间
     85      * @描述 设置值 和 过期时间 单位秒
     86      */
     87     public boolean setValue(String key, Object val, long expire) {
     88         if (!this.isNull(key)) {
     89             //不存在
     90             String jsonString = JSON.toJSONString(val);
     91             template.opsForValue().set(key, jsonString, expire);
     92             logger.info("***************************成功在缓存中插入:" + key);
     93             return true;
     94         } else {
     95             logger.info("***************************【" + key + "】已经存在缓存");
     96             return false;
     97         }
     98     }
     99 
    100 
    101     /**
    102      * @参数
    103      * @返回值
    104      * @创建人 cx
    105      * @创建时间
    106      * @描述 删除
    107      */
    108     public boolean del(String key) {
    109         return template.delete(key);
    110     }
    111 
    112     /**
    113      * @参数
    114      * @返回值
    115      * @创建人 cx
    116      * @创建时间
    117      * @描述 插入直接覆盖
    118      */
    119     public boolean setValue(String key, Object val) {
    120         //不存在
    121         String jsonString = JSON.toJSONString(val);
    122         // 去掉多余的 “
    123         String replace = jsonString.replace(""", "");
    124         template.opsForValue().set(key, replace);
    125         logger.info("***************************成功在缓存中插入:" + key);
    126         return true;
    127     }
    128 
    129     /**
    130      * @参数
    131      * @返回值
    132      * @创建人 cx
    133      * @创建时间
    134      * @描述 获取对应key 的值
    135      */
    136     public String getValue(String key) {
    137         if (!this.isNull(key)) {
    138 
    139             //不存在
    140             logger.info("***************************【" + key + "】不存在缓存");
    141             return null;
    142         } else {
    143             return template.opsForValue().get(key);//根据key获取缓存中的val
    144         }
    145     }
    146 
    147 
    148 }

    4) 消息发布和监听的服务类

     1 package com.bigcustomer.utils.redisUtil;
     2 
     3 import com.bigcustomer.configs.BaseConfig;
     4 import huashitech.kissucomponent.service.BaseService;
     5 import org.springframework.beans.factory.annotation.Autowired;
     6 import org.springframework.data.redis.core.StringRedisTemplate;
     7 import org.springframework.stereotype.Service;
     8 
     9 /**
    10  * @author :CX
    11  * @Date :Create in 2018/8/23 10:22
    12  * @Effect :  redis 通道消息发送和监听接受
    13  */
    14 @Service
    15 public class RedisService extends BaseService {
    16 
    17     @Autowired
    18     private StringRedisTemplate template;
    19     @Autowired
    20     private BaseConfig baseConfig;
    21     @Autowired
    22     RedisService redisService;
    23 
    24     /**
    25      *@参数
    26      *@返回值
    27      *@创建人  cx
    28      *@创建时间
    29      *@描述       向默认通道发送消息
    30      */
    31     public void setMessage( Long funiqueid) {
    32 
    33         template.convertAndSend(baseConfig.getRedisAisle(),
    34                 baseConfig.getRedisMessageName() +funiqueid);
    35     }
    36 
    37 
    38     /**
    39      *@参数
    40      *@返回值
    41      *@创建人  cx
    42      *@创建时间
    43      *@描述       接受监听到的消息
    44      */
    45     public void receiveMessage(String message) {
    46         logger.info("接收redis通道消息:"+message);
    47     }
    48 }

    5) 使用

     1 dao.getTransactionManager().doTransaction((TransactionStatus s) -> {
     2                 //插入数据库
     3                 int insert = dao.insert(tbCmChannelcustomerModel);
     4                 // 加入缓存
     5                 HashMap<String, Object> map = new HashMap<>();
     6                 map.put(tbCmChannelcustomerModel.getFuniqueid().toString()
     7                         , JSON.toJSONString(tbCmChannelcustomerModel));
     8                 redisDao.setMap(map);
     9                 // 发布redis通知消息
    10                 redisService.setMessage(tbCmChannelcustomerModel.getFuniqueid());
    11             });
  • 相关阅读:
    Vue 组件化开发之插槽
    Vue 组件化开发
    Vue 双向绑定
    Vue 数组响应
    Vue 分支循环
    万字长文,详解推荐系统领域经典模型FM因子分解机
    操作失误不要慌,这个命令给你的Git一次反悔的机会
    codeforces 1425E,一万种情况的简单题
    计算机专业的学生要怎样做才能避免成为低级的码农?
    推荐系统,深度论文剖析GBDT+LR
  • 原文地址:https://www.cnblogs.com/cx987514451/p/9529611.html
Copyright © 2011-2022 走看看