zoukankan      html  css  js  c++  java
  • redis教程(二)-----redis事务、记录日志到redis、分布式锁

    redis事务

    Redis 事务可以一次执行多个命令, 并且带有以下两个重要的保证:

    • 批量操作在发送 EXEC 命令前被放入队列缓存。
    • 收到 EXEC 命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行。
    • 在事务执行过程,其他客户端提交的命令请求不会插入到事务执行命令序列中。

    一个事务从开始到执行会经历以下三个阶段:

    • 开始事务。
    • 命令入队。
    • 执行事务。

    示例:

    //根据multi开启事务
    redis 127.0.0.1:6379> MULTI OK redis 127.0.0.1:6379> SET book-name "Mastering C++ in 21 days" QUEUED redis 127.0.0.1:6379> GET book-name QUEUED redis 127.0.0.1:6379> SADD tag "C++" "Programming" "Mastering Series" QUEUED redis 127.0.0.1:6379> SMEMBERS tag QUEUED
    //触发事务 redis
    127.0.0.1:6379> EXEC 1) OK 2) "Mastering C++ in 21 days" 3) (integer) 3 4) 1) "Mastering Series" 2) "C++" 3) "Programming"

    注:

    单个 Redis 命令的执行是原子性的,但 Redis 没有在事务上增加任何维持原子性的机制,所以 Redis 事务的执行并不是原子性的。
    事务可以理解为一个打包的批量执行脚本,但批量指令并非原子化的操作,中间某条指令的失败不会导致前面已做指令的回滚,也不会造成后续的指令不做。

    redis 127.0.0.1:7000> multi
    OK
    redis 127.0.0.1:7000> set a aaa
    QUEUED
    redis 127.0.0.1:7000> set b bbb
    QUEUED
    redis 127.0.0.1:7000> set c ccc
    QUEUED
    redis 127.0.0.1:7000> exec
    1) OK
    2) OK
    3) OK

    如果在 set b bbb 处失败,set a 已成功不会回滚,set c 还会继续执行。

    redis事务命令

    multi:标记一个事务块的开始。
    exec:执行所有事务块内的命令。
    discard:取消事务,放弃执行事务块内的所有命令。
    watch:监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。
    unwatch:取消 WATCH 命令对所有 key 的监视。

    将日志记录到redis

    1、需求

    • 获取日志的产生的线程名称,记录器名称,上下文产生时间,日志发生时间,自定义日志的信息
    • 将获取的信息以json的形式保存到redis中

    2、思路

    • 配置logback使用自定义Appender实现,来获取对应的日志信息
    • 配置一个单列的redis工具类(不影响其他业务),将获取的日志信息保存起来

    3、配置依赖

    <!-- 日志:slf4j是接口,log4j是具体实现 -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.12</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-core</artifactId>
      <version>1.1.1</version>
    </dependency>
    <!-- 实现slf4j接口并整合 -->
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.1.1</version>
    </dependency>
    <!--redis-->
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.7.3</version>
    </dependency>

    4、配置redis以及logback

    a、logback.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
    <!--自定义日志-->
        <appender name="MyAppender" class="log.RedisAppender" />
        <root level="info">
            <appender-ref ref="MyAppender" />
        </root>
    </configuration>

    b、redis.properties

    maxIdle=100
    maxWait=3000
    testOnBorrow=true
    host=127.0.0.1
    port=6379
    timeout=3000
    pass=你的密码

    5、MyAppender

    package log;
    
    import ch.qos.logback.classic.spi.LoggingEvent;
    import ch.qos.logback.core.AppenderBase;
    import dto.log.LogData;
    import redis.clients.jedis.Jedis;
    import tool.JsonBuilder;
    import tool.RedisBuilder;
    
    /**
     * 自定义日志处理
     */
    public class RedisAppender extends AppenderBase<LoggingEvent> {
    
        @Override
        protected void append(LoggingEvent loggingEvent) {
            //获取日志数据
            LogData logData=new LogData(loggingEvent);
            //设置日志保存数据库
            Jedis jedis=RedisBuilder.getSingleJedis(2);
            //设置日志的key
            String key="logData";
            //获取日志条数
            Integer index=RedisBuilder.getRedisIndexByName(key);
            //保存日志
            jedis.set(key+index, JsonBuilder.getString(logData));
            //关闭链接
            jedis.close();
    
        }
    }

    6、LogData

    package dto.log;
    
    import ch.qos.logback.classic.spi.LoggingEvent;
    import tool.DataFormatBuilder;
    
    /**
     * Created by yuyu on 2018/3/15.
     * 用于保存日志数据
     */
    public class LogData {
    
        private String message;//日志的信息
        private String loggerTime;//上下文产生时间
        private String loggerName;//记录器名称
        private String threadName;//线程名称
        private String happenStamp;//日志发生时间
    
        public LogData() {
        }
    
        public LogData(LoggingEvent loggingEvent) {
    
            this.message=loggingEvent.toString();
            this.loggerTime=DataFormatBuilder.getTimeStampFormat(null, loggingEvent.getContextBirthTime());
            this.loggerName=loggingEvent.getLoggerName();
            this.threadName=loggingEvent.getThreadName();
            this.happenStamp=DataFormatBuilder.getTimeStampFormat(null, loggingEvent.getTimeStamp());
        }
        //getter,setter略
    }

    7、RedisBuilder

    package tool;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    
    import java.util.Properties;
    
    /**
     * Created by yuyu on 2018/3/15.
     * redis相关的操作,获取一个单例的连接池
     */
    public class RedisBuilder {
    
        private static JedisPool jedisPool;
    
        private static RedisBuilder singleRedisBuilder=new RedisBuilder();
    
        //单利模式
        private RedisBuilder(){
            //获取配置信息
            Properties properties=PropertiesGetter.get("redis");
            //设置连接池参数
            JedisPoolConfig config = new JedisPoolConfig();
            config.setMaxIdle(Integer.parseInt(properties.getProperty("maxIdle")));//最大的jedis实例
            config.setMaxWaitMillis(Integer.parseInt(properties.getProperty("maxWait")));//最大等待时间(毫秒)
            config.setTestOnBorrow(Boolean.parseBoolean(properties.getProperty("testOnBorrow")));//借用时测试
            //redis链接
            jedisPool=new JedisPool(config, properties.getProperty("host")
                    , Integer.parseInt(properties.getProperty("port"))
                    , Integer.parseInt(properties.getProperty("timeout"))
                    ,  properties.getProperty("pass"));
        }
    
        /**
         * 从连接池中获取一个Jedis对象
         * @param db 数据库[0,15]
         * @return
         */
        public static Jedis getSingleJedis(Integer db) {
            if (db==null||(db<0&&db>15)){
                return null;
            }
            Jedis back=jedisPool.getResource();
            back.select(db);
            return back;
        }
    
        /**
         *传入名称获取保存在redis中的Index号码
         * @param name
         * @return
         */
        public static Integer getRedisIndexByName(String name){
            if (name==null){
                return null;
            }
            Jedis jedis=RedisBuilder.getSingleJedis(15);
            Integer index;
            String value=jedis.get(name);
            //获取保存的index数据,没有的时候取0
            if (null==value){
                index=0;
            }else{
                index =Integer.parseInt(value)+1;
            }
            //将index保存
            jedis.set(name,index.toString());
            jedis.close();
            return index;
        }
    }

    8、DateFormatBuilder

    package tool;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * Created by yuyu on 2018/3/15.
     * 用于日期处理相关函数
     */
    public class DataFormatBuilder {
    
        /**
         * 根据传进来的时间戳 获取对应的时间格式
         * @param format 时间格式
         * @param stamp 时间戳
         * @return
         */
        public static  String getTimeStampFormat(String format,Long stamp){
            if (stamp==null){
                return null;
            }
            if (format==null){
                format="yyyy-MM-dd HH:mm:ss/SSS";
            }
            SimpleDateFormat df = new SimpleDateFormat(format);//设置日期格式
            return df.format(stamp);//传进来的时间戳为获取当前系统时间
        }
    }

    9、PropertiesGetter

    package tool;
    
    import exception.DobeoneException;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    
    /**
     * Created by yuyu on 2018/3/12.
     * 用于参数配置文件中的数据获取
     */
    public class PropertiesGetter {
        /**
         * 获取配置文件的配置信息
         * @param name
         * @return
         */
        public synchronized static Properties get(String name){
            String file="/properties/"+name+".properties";
            Properties prop = new Properties();
            InputStream in = PropertiesGetter.class.getResourceAsStream(file);
            try {
                prop.load(in);
            } catch (IOException e) {
                throw new DobeoneException("获取配置文件异常!-file-"+file,e);
            }
            return prop;
        }
    }

    10、JsonBuilder 

    package tool;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    import exception.DobeoneException;
    
    /**
     * Created by yuyu on 2018/3/15.
     * json相关的函数
     */
    public class JsonBuilder {
        /**
         * 将一个实体类转换成json字符串
         * @param object
         * @return
         */
        public static String getString(Object object){
            //安全判断
            if (object==null){
                return null;
            }
            ObjectMapper mapper = new ObjectMapper();
            String back;
            try{
                back=mapper.writeValueAsString(object);
            }catch (Exception e){
                //抛出一个自定义异常
                throw new DobeoneException("json字符转换失败!-object-"+object,e);
            }
            return back;
        }
    }

    测试:

    package tool;
    
    import org.junit.Test;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * Created by yuyu on 2018/3/15.
     * 测试日志到redis
     */
    public class TestLogger {
    
        private Logger logger= LoggerFactory.getLogger(this.getClass());
    
        @Test
        public void testLogger(){
            logger.info("hahha");
        }
    }

    redis分布式锁

    一般生产环境都是服务器集群,那么如果希望某个操作只能由一台服务器去运行,那么如何实现呢?例如本人之前做过的一个需求,通过kettle将数据同步到我们的数据库中,这个同步是每天凌晨3点,每天一次,数据到达我们数据库之后我们要进行分发,并做后续的处理。因此这个分发就只能由一台服务器去操作,如果同时有多台服务器分发,就会出现任务重复的问题。

    1、分布式锁要求

    • 互斥性。在任意时刻,只有一个客户端能持有锁。
    • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
    • 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
    • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

    2、分布式锁

    a、添加依赖

    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.9.0</version>
    </dependency>

    b、代码实现

    public class RedisTool {
    
        private static final String LOCK_SUCCESS = "OK";
        private static final String SET_IF_NOT_EXIST = "NX";
        private static final String SET_WITH_EXPIRE_TIME = "PX";
    
        /**
         * 尝试获取分布式锁
         * @param jedis Redis客户端
         * @param lockKey 锁
         * @param requestId 请求标识
         * @param expireTime 超期时间
         * @return 是否获取成功
         */
        public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
            //第一个为key,我们使用key来当锁,因为key是唯一的;        
        
    //第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,
          通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
    //第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作; //第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。 //第五个为time,与第四个参数相呼应,代表key的过期时间。 //总结:利用redis单个操作的原子性,将加锁操作放在一条命令中 //1、String set(String key, String value) //2、String set(String key, String value, String nxxx) //3、String set(String key, String value, String nxxx, String expx, int time) //4、String set(String key, String value, String nxxx, String expx, long time) String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } }

    关于分布式锁的各种问题可以参考这篇文章:Redis分布式锁的正确实现方式(Java版)

  • 相关阅读:
    Unique Binary Search Trees 解答
    Unique Paths II 解答
    Unique Paths 解答
    Maximum Subarray 解答
    Climbing Stairs 解答
    House Robber II 解答
    House Robber 解答
    Valid Palindrome 解答
    Container With Most Water 解答
    Remove Duplicates from Sorted List II 解答
  • 原文地址:https://www.cnblogs.com/alimayun/p/10896906.html
Copyright © 2011-2022 走看看