zoukankan      html  css  js  c++  java
  • redis缓存和mysql数据库同步

    redis缓存和mysql数据库同步

    解决方案

    一、对强一致要求比较高的,应采用实时同步方案,即查询缓存查询不到再从DB查询,保存到缓存;更新缓存时,先更新数据库,再将缓存的设置过期(建议不要去更新缓存内容,直接设置缓存过期)。

    二、对于并发程度较高的,可采用异步队列的方式同步,可采用kafka等消息中间件处理消息生产和消费。

    三、使用阿里的同步工具canal,canal实现方式是模拟mysql slave和master的同步机制,监控DB bitlog的日志更新来触发缓存的更新,此种方法可以解放程序员双手,减少工作量,但在使用时有些局限性。

    四、采用UDF自定义函数的方式,面对mysql的API进行编程,利用触发器进行缓存同步,但UDF主要是c/c++语言实现,学习成本高。

    实时同步

    spring3+提供了注解的方式进行缓存编程

    @Cacheable(key = "caches[0].name + T(String).valueOf(#userId)",unless = "#result eq null")
    @CachePut(key = "caches[0].name + T(String).valueOf(#user.userId)")
    @CacheEvict(key = "caches[0].name + T(String).valueOf(#userId)" )
    @Caching(evict = {@CacheEvict(key = "caches[0].name + T(String).valueOf(#userId)" ),
    @CacheEvict(key = "caches[0].name + #result.name" )})
    @Cacheable:查询时使用,注意Long类型需转换为Sting类型,否则会抛异常
    @CachePut:更新时使用,使用此注解,一定会从DB上查询数据
    @CacheEvict:删除时使用;
    @Caching:组合用法      具体注解的使用可参考官网

    注意:注解方式虽然能使我们的代码简洁,但是注解方式有局限性:对key的获取,以及嵌套使用时注解无效,如下所示
    public class User {
        private Long userId;
        private String name;
        private Integer age;
        private String sex;
        private String addr;
      //get set ..... }

    service接口

    public interface UserService {
        User getUser(Long userId);
        User updateUser(User user);
        User getUserByName(String name);
        int insertUser(User user);
        User  delete (Long userId);
    }
    //实现类
    //假设有需求是由name查询user的,一般我们是先由name->id,再由id->user,这样会减少redis缓存的冗余信息
    @Service(value = "userSerivceImpl")
    @CacheConfig(cacheNames = "user")
    public class UserServiceImpl implements UserService {
    private static Logger log = LoggerFactory.getLogger(UserServiceImpl.class);
    @Autowired
    UserMapper userMapper;

    @Cacheable(key = "caches[0].name + T(String).valueOf(#userId)",unless = "#result eq null")
    public User getUser(Long userId) {
    User user = userMapper.selectByPrimaryKey(userId);
    return user;
    }
    @Cacheable(key = "caches[0].name + #name")
    public String getIdByName(String name){
    Long userId = userMapper.getIdByName(name);
    return String.valueOf(userId);
    }

    //使用getUserByName方式调用getIdByName 和getUser方法来实现查询,但是如果用此方式在controller中直接调用
    //getUserByName方法,缓存效果是不起作用的,必须是直接调用getIdByName和getUser方法才能起作用
        public User getUserByName(String name) {
    //通过name 查询到主键 再由主键查询实体
    return getUser(Long.valueOf(getIdByName(name)));
    }

    非注解方式实现

    1.先定义一个RedisCacheConfig类用于生成RedisTemplate和对CacheManager的管理

    @Configuration
    public class RedisCacheConfig  extends CachingConfigurerSupport {
    
        /*定义缓存数据 key 生成策略的bean
         *包名+类名+方法名+所有参数
        */
        @Bean
        public KeyGenerator keyGenerator() {
            return new KeyGenerator() {
                @Override
                public Object generate(Object target, Method method, Object... params) {
                    StringBuilder sb = new StringBuilder();
                    sb.append(target.getClass().getName());
                    sb.append(method.getName());
                    for (Object obj : params) {
                        sb.append(obj.toString());
                    }
                    return sb.toString();
                }
            };
        }
    
         //@Bean
         public CacheManager cacheManager(
                 @SuppressWarnings("rawtypes") RedisTemplate redisTemplate) {
             //RedisCacheManager cacheManager = new RedisCacheManager(redisTemplate);
               //cacheManager.setDefaultExpiration(60);//设置缓存保留时间(seconds)
             return cacheManager;
         }
    
        //1.项目启动时此方法先被注册成bean被spring管理
        @Bean
        public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
    
            StringRedisTemplate template = new StringRedisTemplate(factory);
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            template.setValueSerializer(jackson2JsonRedisSerializer);
            template.afterPropertiesSet();
            return template;
        }
    
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
            RedisTemplate<String, Object> template = new RedisTemplate<>();
            template.setConnectionFactory(connectionFactory);
    
            //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
            Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);
    
            System.out.println("==============obj:"+Object.class.getName());
            ObjectMapper mapper = new ObjectMapper();
            mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            serializer.setObjectMapper(mapper);
    
            template.setValueSerializer(serializer);
            //使用StringRedisSerializer来序列化和反序列化redis的key值
            template.setKeySerializer(new StringRedisSerializer());
            template.afterPropertiesSet();
            return template;
        }
    }

    2.定义一个redisUtil类用于存取缓存值

    @Component
    public class RedisCacheUtil {
    
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        /**
         * 存储字符串
         * @param key string类型的key
         * @param value String类型的value
         */
        public void set(String key, String value) {
            stringRedisTemplate.opsForValue().set(key, value);
        }
    
        /**
         * 存储对象
         * @param key String类型的key
         * @param value Object类型的value
         */
        public void set(String key, Object value) {
            redisTemplate.opsForValue().set(key, value);
        }
    
        /**
         * 存储对象
         * @param key String类型的key
         * @param value Object类型的value
         */
        public void set(String key, Object value,Long timeOut) {
            redisTemplate.opsForValue().set(key, value,timeOut, TimeUnit.SECONDS);
        }
    
        /**
         * 根据key获取字符串数据
         * @param key
         * @return
         */
        public String getValue(String key) {
            return stringRedisTemplate.opsForValue().get(key);
        }
    
    //    public Object getValue(String key) {
    //        return redisTemplate.opsForValue().get(key);
    //    }
        /**
         * 根据key获取对象
         * @param key
         * @return
         */
        public Object getValueOfObject(String key) {
            return redisTemplate.opsForValue().get(key);
        }
        /**
         * 根据key删除缓存信息
         * @param key
         */
        public void delete(String key) {
            redisTemplate.delete(key);
        }
        /**
         * 查询key是否存在
         * @param key
         * @return
         */
        @SuppressWarnings("unchecked")
        public boolean exists(String key) {
            return redisTemplate.hasKey(key);
        }
    }

    3.实现类

    /**
     * Created by yexin on 2017/9/8.
     *
     * 在Impl基础上+ 防止缓存雪崩和缓存穿透功能
     */
    @Service(value = "userServiceImpl4")
    public class UserServiceImpl4 implements UserService {
    
        @Autowired
        UserMapper userMapper;
    
        @Autowired
        RedisCacheUtil redisCacheUtil;
    
        @Value("${timeOut}")
        private long timeOut;
    
        @Override
        public User getUser(Long userId) {
    
            String key = "user" + userId;
            User user = (User) redisCacheUtil.getValueOfObject(key);
            String keySign = key + "_sign";
            String valueSign = redisCacheUtil.getValue(keySign);
            if(user == null){//防止第一次查询时返回时空结果
                //防止缓存穿透
                if(redisCacheUtil.exists(key)){
                    return  null;
                }
                user = userMapper.selectByPrimaryKey(userId);
    
                redisCacheUtil.set(key,user);
                redisCacheUtil.set(keySign,"1",timeOut *(new Random().nextInt(10) + 1));
    //            redisCacheUtil.set(keySign,"1",0L);  //过期时间不能设置为0,必须比0大的数
                return user;
            }
    
            if(valueSign != null){
                return user;
            }else {
                //设置标记的实效时间
                Long tt = timeOut * (new Random().nextInt(10) + 1);
                System.out.println("tt:"+tt);
                redisCacheUtil.set(keySign,"1",tt);
                //异步处理缓存更新  应对与高并发的情况,会产生脏读的情况
                ThreadPoolUtil.getExecutorService().execute(new Runnable(){
                    public void run() { //
                        System.out.println("-----执行异步操作-----");
                        User user1 = userMapper.selectByPrimaryKey(userId);
                        redisCacheUtil.set(key,user1);
                    }
                });
    
    //            new Thread(){
    //                public void run() { //应对与高并发的情况,会产生脏读的情况
    //                    System.out.println("-----执行异步操作-----");
    //                    User user1 = userMapper.selectByPrimaryKey(userId);
    //                    redisCacheUtil.set(key,user1);
    //                }
    //            }.start();
            }
            return user;
        }
    }

    异步实现

    异步实现通过kafka作为消息队列实现,异步只针对更新操作,查询无需异步,实现类如下

    1.pom文件需依赖

    <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

    2.生产着代码

    @EnableBinding(Source.class)
    public class SendService {
        @Autowired
        private Source source;
        public void sendMessage(String msg) {
            try{
                source.output().send(MessageBuilder.withPayload(msg).build());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    //接受的是一个实体类,具体配置在application.yml
        public void sendMessage(TransMsg msg) {
            try {
                //MessageBuilder.withPayload(msg).setHeader(KafkaHeaders.TOPIC,"111111").build();
                source.output().send(MessageBuilder.withPayload(msg).build());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    3.消费者代码

    @EnableBinding(Sink.class)
    public class MsgSink {
        @Resource(name = "userSerivceImpl3")
        UserService userService;
        @StreamListener(Sink.INPUT)
        public void process(TransMsg<?> msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException {
            System.out.println("sink......"+msg);
            System.out.println("opt db strat ----");
            userService.updateUser((User) msg.getParams());
            System.out.println("执行db结束------");
        }
    }

    4.application.yml配置

    spring:
      application:
        name: demo-provider
      redis:
        database: 0
        host: 192.168.252.128
        #host: localhost
        port: 6379
        password:
        pool:
          max-active: 50
          max-wait: -1
          max-idle: 50
        timeout: 0
    #kafka
      cloud:
          stream:
            kafka:
              binder:
                brokers: 192.168.252.128:9092
                zk-nodes: 192.168.252.128:2181
                minPartitionCount: 1
                autoCreateTopics: true
                autoAddPartitions: true
            bindings:
              input:
                destination: topic-02
    #            content-type: application/json
                content-type: application/x-java-object   #此种类型配置在消费端接受到的为一个实体类
                group: t1
                consumer:
                  concurrency: 1
                  partitioned: false
              output:
                destination: topic-02
                content-type: application/x-java-object             
                producer:
                  partitionCount: 1
            instance-count: 1
            instance-index: 0

    5.实现类

    @Service(value = "userServiceImpl2")
    public class UserServiceImpl2  implements UserService{
        @Autowired
        UserMapper userMapper;
        @Autowired
        RedisCacheUtil redisCacheUtil;
        private static Logger log = LoggerFactory.getLogger(UserServiceImpl.class);
        @Autowired
        SendService sendService;
        public User updateUser(User user) {
            System.out.println("   impl2   active   ");
            String key = "user"+ user.getUserId();
            System.out.println("key:"+key);
            //是否存在key
            if(!redisCacheUtil.exists(key)){
             return userMapper.updateByPrimaryKeySelective(user) == 1 ? user : null;
            }
            /*  更新key对应的value
                更新队列
             */
            User user1 = (User)redisCacheUtil.getValueOfObject(key);
            try {
                redisCacheUtil.set(key,user);
                TransMsg<User> msg = new TransMsg<User>(key,user,this.getClass().getName(),"updateUser",user);
                sendService.sendMessage(msg);
    
            }catch (Exception e){
                redisCacheUtil.set(key,user1);
            }
            return user;
        }
    }

    注意:kafka与zookeeper的配置在此不介绍

    canal实现方式

    先要安装canal,配置canal的example文件等,配置暂不介绍

    package org.example.canal;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.common.utils.AddressUtils;
    import com.alibaba.otter.canal.protocol.Message;
    import com.alibaba.otter.canal.protocol.CanalEntry.Column;
    import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
    import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
    import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
    import org.example.canal.util.RedisUtil;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    public class CanalClient {
    
        public static void main(String[] args) {
            // 创建链接
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                    11111), "example", "", "");
            int batchSize = 1000;
    
            try {
                connector.connect();
                connector.subscribe(".*\..*");
                connector.rollback();
                while (true) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    } else {
                        printEntry(message.getEntries());
                    }
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
            } finally {
                connector.disconnect();
            }
        }
    
        private static void printEntry( List<Entry> entrys) {
            for (Entry entry : entrys) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    continue;
                }
                RowChange rowChage = null;
                try {
                    System.out.println("tablename:"+entry.getHeaderOrBuilder().getTableName());
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
                EventType eventType = rowChage.getEventType();
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));
    
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        redisDelete(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        redisInsert(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------> before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------> after");
                        redisUpdate(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
        private static void printColumn( List<Column> columns) {
            for (Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    
        private static void redisInsert( List<Column> columns){
            JSONObject json=new JSONObject();
            for (Column column : columns) {
                json.put(column.getName(), column.getValue());
            }
            if(columns.size()>0){
                RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
            }
        }
    
        private static  void redisUpdate( List<Column> columns){
            JSONObject json=new JSONObject();
            for (Column column : columns) {
                json.put(column.getName(), column.getValue());
            }
            if(columns.size()>0){
                RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
            }
        }
    
        private static  void redisDelete( List<Column> columns){
            JSONObject json=new JSONObject();
            for (Column column : columns) {
                json.put(column.getName(), column.getValue());
            }
            if(columns.size()>0){
                RedisUtil.delKey("user:"+ columns.get(0).getValue());
            }
        }
    
    }
    package org.example.canal.util;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    public class RedisUtil {
    
        // Redis服务器IP
        private static String ADDR = "192.168.252.128";
        // Redis的端口号
        private static int PORT = 6379;
        // 访问密码
        //private static String AUTH = "admin";
        // 可用连接实例的最大数目,默认值为8;
        // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
        private static int MAX_ACTIVE = 1024;
        // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
        private static int MAX_IDLE = 200;
        // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
        private static int MAX_WAIT = 10000;
        // 过期时间
        protected static int  expireTime = 60 * 60 *24;
        // 连接池
        protected static JedisPool pool;
    
        static {
            JedisPoolConfig config = new JedisPoolConfig();
            //最大连接数
            config.setMaxTotal(MAX_ACTIVE);
            //最多空闲实例
            config.setMaxIdle(MAX_IDLE);
            //超时时间
            config.setMaxWaitMillis(MAX_WAIT);
            //
            config.setTestOnBorrow(false);
            pool = new JedisPool(config, ADDR, PORT, 1000);
        }
        /**
         * 获取jedis实例
         */
        protected static synchronized Jedis getJedis() {
            Jedis jedis = null;
            try {
                jedis = pool.getResource();
            } catch (Exception e) {
                e.printStackTrace();
                if (jedis != null) {
                    pool.returnBrokenResource(jedis);
                }
            }
            return jedis;
        }
    
        /**
         * 释放jedis资源
         * @param jedis
         * @param isBroken
         */
        protected static void closeResource(Jedis jedis, boolean isBroken) {
            try {
                if (isBroken) {
                    pool.returnBrokenResource(jedis);
                } else {
                    pool.returnResource(jedis);
                }
            } catch (Exception e) {
    
            }
        }
    
        /**
         * 是否存在key
         * @param key
         */
        public static boolean existKey(String key) {
            Jedis jedis = null;
            boolean isBroken = false;
            try {
                jedis = getJedis();
                jedis.select(0);
                return jedis.exists(key);
            } catch (Exception e) {
                isBroken = true;
            } finally {
                closeResource(jedis, isBroken);
            }
            return false;
        }
    
        /**
         * 删除key
         * @param key
         */
        public static void delKey(String key) {
            Jedis jedis = null;
            boolean isBroken = false;
            try {
                jedis = getJedis();
                jedis.select(0);
                jedis.del(key);
            } catch (Exception e) {
                isBroken = true;
            } finally {
                closeResource(jedis, isBroken);
            }
        }
    
        /**
         * 取得key的值
         * @param key
         */
        public static String stringGet(String key) {
            Jedis jedis = null;
            boolean isBroken = false;
            String lastVal = null;
            try {
                jedis = getJedis();
                jedis.select(0);
                lastVal = jedis.get(key);
                jedis.expire(key, expireTime);
            } catch (Exception e) {
                isBroken = true;
            } finally {
                closeResource(jedis, isBroken);
            }
            return lastVal;
        }
    
        /**
         * 添加string数据
         * @param key
         * @param value
         */
        public static String stringSet(String key, String value) {
            Jedis jedis = null;
            boolean isBroken = false;
            String lastVal = null;
            try {
                jedis = getJedis();
                jedis.select(0);
                lastVal = jedis.set(key, value);
                jedis.expire(key, expireTime);
            } catch (Exception e) {
                e.printStackTrace();
                isBroken = true;
            } finally {
                closeResource(jedis, isBroken);
            }
            return lastVal;
        }
    
        /**
         *  添加hash数据
         * @param key
         * @param field
         * @param value
         */
        public static void hashSet(String key, String field, String value) {
            boolean isBroken = false;
            Jedis jedis = null;
            try {
                jedis = getJedis();
                if (jedis != null) {
                    jedis.select(0);
                    jedis.hset(key, field, value);
                    jedis.expire(key, expireTime);
                }
            } catch (Exception e) {
                isBroken = true;
            } finally {
                closeResource(jedis, isBroken);
            }
        }
    
    }

    附redis关于缓存雪崩和缓存穿透,热点key

    穿透

    穿透:频繁查询一个不存在的数据,由于缓存不命中,每次都要查询持久层。从而失去缓存的意义。

    解决办法: 持久层查询不到就缓存空结果,查询时先判断缓存中是否exists(key) ,如果有直接返回空,没有则查询后返回,

                      注意insert时需清除查询的key,否则即便DB中有值也查询不到(当然也可以设置空缓存的过期时间)

    雪崩

    雪崩:缓存大量失效的时候,引发大量查询数据库。
    解决办法:①用锁/分布式锁或者队列串行访问

                      ②缓存失效时间均匀分布

    热点key

    热点key:某个key访问非常频繁,当key失效的时候有打量线程来构建缓存,导致负载增加,系统崩溃。

    解决办法:

    ①使用锁,单机用synchronized,lock等,分布式用分布式锁。

    ②缓存过期时间不设置,而是设置在key对应的value里。如果检测到存的时间超过过期时间则异步更新缓存。

    ③在value设置一个比过期时间t0小的过期时间值t1,当t1过期的时候,延长t1并做更新缓存操作。

    4设置标签缓存,标签缓存设置过期时间,标签缓存过期后,需异步地更新实际缓存  具体参照userServiceImpl4的处理方式

    总结

    一、查询redis缓存时,一般查询如果以非id方式查询,建议先由条件查询到id,再由id查询pojo

    二、异步kafka在消费端接受信息后,该怎么识别处理那张表,调用哪个方法,此问题暂时还没解决

    三、比较简单的redis缓存,推荐使用canal

    参考文档

    http://blog.csdn.net/fly_time2012/article/details/50751316

    http://blog.csdn.net/kkgbn/article/details/60576477

    http://www.cnblogs.com/fidelQuan/p/4543387.html

  • 相关阅读:
    连接数据库方法
    jdbc加载驱动方法
    MySQL数据库事务隔离级别
    事务和JDBC事务隔离级别
    IO
    java中从键盘输入的三种方法
    Java IO流学习总结八:Commons IO 2.5-IOUtils
    Java IO流学习总结七:Commons IO 2.5-FileUtils
    Java IO流学习总结五:转换流-InputStreamReader、OutputStreamWriter
    Java Code Examples
  • 原文地址:https://www.cnblogs.com/lanbo203/p/7494587.html
Copyright © 2011-2022 走看看