zoukankan      html  css  js  c++  java
  • Redis:解决分布式高并发修改同一个Key的问题

    本篇文章是通过watch(监控)+mutil(事务)实现应用于在分布式高并发处理等相关场景。下边先通过redis-cli.exe来测试多个线程修改时,遇到问题及解决问题。

    高并发下修改同一个key遇到的问题:

    1)定义一个hash类型的key,key为:lock_test,元素locker的值初始化为0。

    2)实现高并发下对locker元素的值递增:定义64个多线程,并发的对lock_test元素locker的值进行修改。

    package com.dx.es;
    
    import java.util.concurrent.CountDownLatch;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    
    public class Test_UnLock {
        public static void main(String[] args) {
            final JedisPool pool = RedisUtil.getPool();
    
            // 获得jedis对象
            Jedis jedis = pool.getResource();
            jedis.hset("lock_test", "locker", "0");
            String val = jedis.hget("lock_test", "locker");
            System.out.println("lock_test.locker的初始值為:" + val);
            jedis.close();
    
            int threahSize = 64;
            final CountDownLatch threadsCountDownLatch = new CountDownLatch(threahSize);
    
            Runnable handler = new Runnable() {
                public void run() {
                    Jedis jedis = pool.getResource();
    
                    Integer integer = Integer.valueOf(jedis.hget("lock_test", "locker"));
                    jedis.hset("lock_test", "locker", String.valueOf(integer + 1));
    
                    jedis.close();
                    threadsCountDownLatch.countDown();
                }
            };
    
            for (int i = 0; i < threahSize; i++) {
                new Thread(handler).start();
            }
    
            // 等待所有并行子线程任务完成。
            try {
                threadsCountDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println("complete");
    
            val = jedis.hget("lock_test", "locker");
            System.out.println(val);
        }
    }

    RedisUtil.java

    package com.dx.es;
    
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    
    public class RedisUtil {
        public static JedisPool getPool() {
            // 简单创建 Jedis的方法:
            // final Jedis jedis = new Jedis("127.0.0.1",6379);
    
            // 下边使用线程池的方案:jedis对象是线程不安全的,因此在并发情况下要使用JedisPool,默认情况下jedisPool只支持8个连接,因此在声明JedisPool时要先修改JedisPool的最大连接数
            JedisPoolConfig config = new JedisPoolConfig();
            // 修改最大连接数
            config.setMaxTotal(32);
            
            // 声明一个线程池
            JedisPool pool = new JedisPool(config, "127.0.0.1", 6379);
    
            return pool;
        }
    }

    此时,会出现以下问题:

    1. A线程获取key的值为0,而B线程也获取jkey的值0,则A把key值递增为1,B线程也实现把key值递增为1。两个线程都执行了key值修改:0到1。
    2. 在1)中最终key修改为了1,但是c线程获取key的值为0(因为c线程读取key值时,a、b线程还未触发修改,因此c线程读取到的值为0),此时d线程读取到的值为1(因为d线程读取key值时,a、b线程已触发修改,一次d线程取到的值为1)。
    3. 此时假设d线程优先触发递增,则在c线程未触发提交之前d线程已经把值修改了2,但是c此时并不知道在它获取到值到修改之前这段时间发生了什么,直接把值修改1。

    此时执行打印结果为:

    lock_test.locker的初始值為:0
    complete
    24 #备注:也可能是其他值,可能是正确值64的可能性比较小。

    通过watch+mutil解决并发修改的问题:

    需要掌握Redis 事务命令:

    下表列出了 redis 事务的相关命令:

    序号 命令 描述 可用版本
     1  DISCARD

    Redis Discard 命令用于取消事务,放弃执行事务块内的所有命令。

    语法 redis Discard 命令基本语法如下:

    redis 127.0.0.1:6379> DISCARD
     >= 2.0.0
     2  EXEC

    Redis Exec 命令用于执行所有事务块内的命令。

    语法 redis Exec 命令基本语法如下:

    redis 127.0.0.1:6379>Exec
     >= 1.2.0
     3  MULTI

    Redis Multi 命令用于标记一个事务块的开始。

    事务块内的多条命令会按照先后顺序被放进一个队列当中,最后由 EXEC 命令原子性(atomic)地执行。

    语法 redis Multi 命令基本语法如下:

    redis 127.0.0.1:6379>Multi
     >= 1.2.0
     4  UNWATCH

    Redis Unwatch 命令用于取消 WATCH 命令对所有 key 的监视。

    语法 redis Unwatch 命令基本语法如下:

    redis 127.0.0.1:6379> UNWATCH 
     >= 2.2.0
     5  WATCH

    Redis Watch 命令用于监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断

    语法 redis Watch 命令基本语法如下:

    WATCH key [key ...]
     >= 2.2.0

     Redis 事务可以一次执行多个命令, 并且带有以下两个重要的保证:

    • 批量操作在发送 EXEC 命令前被放入队列缓存。
    • 收到 EXEC 命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行。
    • 在事务执行过程,其他客户端提交的命令请求不会插入到事务执行命令序列中。

    一个事务从开始到执行会经历以下三个阶段:

    • 开始事务。
    • 命令入队。
    • 执行事务。

    备注:概念性摘自《http://www.runoob.com/redis/redis-transactions.html》

    redis-cli.exe下的事务操作:

    # 事务被成功执行
    redis 127.0.0.1:6379> MULTI
    OK
    
    redis 127.0.0.1:6379> INCR user_id
    QUEUED
    
    redis 127.0.0.1:6379> INCR user_id
    QUEUED
    
    redis 127.0.0.1:6379> INCR user_id
    QUEUED
    
    redis 127.0.0.1:6379> PING
    QUEUED
    
    redis 127.0.0.1:6379> EXEC
    1) (integer) 1
    2) (integer) 2
    3) (integer) 3
    4) PONG

    并发情况下使用watch+mutil操作:

    事务块内所有命令的返回值,按命令执行的先后顺序排列。 当操作被打断时,返回空值 nil 。

    A线程:

    # 监视 key ,且事务成功执行
    redis 127.0.0.1:6379> WATCH lock lock_times
    OK
    
    redis 127.0.0.1:6379> MULTI
    OK
    
    redis 127.0.0.1:6379> SET lock "huangz"
    QUEUED
    
    redis 127.0.0.1:6379> INCR lock_times
    QUEUED
    
    redis 127.0.0.1:6379> EXEC
    1) OK
    2) (integer) 1

    B线程:

    # 监视 key ,且事务被打断
    redis 127.0.0.1:6379> WATCH lock lock_times
    OK
    
    redis 127.0.0.1:6379> MULTI
    OK
    
    redis 127.0.0.1:6379> SET lock "joe"        # 就在这时,另一个客户端修改了 lock_times 的值
    QUEUED
    
    redis 127.0.0.1:6379> INCR lock_times
    QUEUED
    
    redis 127.0.0.1:6379> EXEC                  # 因为 lock_times 被修改, joe 的事务执行失败
    (nil)

    上边演示了A、B线程并发下的watch+mutil操作情况。

    解决高并发下修改同一个key遇到的问题:

    package com.dx.es;
    
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.Transaction;
    
    public class Test_Lock3 {
        public static void main(String[] args) {
            final JedisPool pool = RedisUtil.getPool();
    
            // 对测试key赋初始值
            Jedis jedis = pool.getResource();
            jedis.hset("lock_test", "locker", "0");
            String val = jedis.hget("lock_test", "locker");
            System.out.println("lock_test.locker的初始值為:" + val);
            jedis.close();
    
            int threahSize = 64;
            final CountDownLatch threadsCountDownLatch = new CountDownLatch(threahSize);
    
            Runnable handler = new Runnable() {
                public void run() {
                    Jedis jedis = pool.getResource();
    
                    while (true) {
                        jedis.watch("lock_test");
                        String val = jedis.hget("lock_test", "locker");
                        Integer integer = Integer.valueOf(val);
                        Transaction tx = jedis.multi();
    
                        tx.hset("lock_test", "locker", String.valueOf(integer + 1));
    
                        List<Object> exec = tx.exec();
    
                        if (exec == null || exec.isEmpty()) {
                            System.out.println(Thread.currentThread().getName() + ":" + "Error:(" + val + "=>" + (integer + 1) + ")");
                        } else {
                            String values = "";
                            for (int i = 0; i < exec.size(); i++) {
                                values += exec.get(i).toString();
                            }
                            System.out.println(Thread.currentThread().getName() + ":" + values + ":(" + val + "=>" + (integer + 1) + ")");
                            break;
                        }
    
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    jedis.close();
                    threadsCountDownLatch.countDown();
                }
            };
    
            for (int i = 0; i < threahSize; i++) {
                new Thread(handler).start();
            }
    
            // 等待所有并行子线程任务完成。
            try {
                threadsCountDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println("complete");
    
            val = jedis.hget("lock_test", "locker");
            System.out.println(val);
        }
    }

    打印结果:

    lock_test.locker的初始值為:0
    Thread-8:0:(0=>1)
    Thread-56:Error:(1=>2)
    Thread-53:0:(1=>2)
    Thread-25:Error:(0=>1)
    Thread-18:Error:(0=>1)
    Thread-3:Error:(0=>1)
    Thread-30:Error:(2=>3)
    Thread-11:Error:(2=>3)
    Thread-9:Error:(0=>1)
    Thread-63:0:(2=>3)
    Thread-7:Error:(0=>1)
    Thread-10:0:(3=>4)
    Thread-34:0:(4=>5)
    Thread-65:Error:(4=>5)
    Thread-24:Error:(0=>1)
    Thread-17:Error:(0=>1)
    Thread-62:0:(5=>6)
    Thread-29:Error:(6=>7)
    Thread-61:0:(6=>7)
    Thread-64:0:(7=>8)
    Thread-16:Error:(0=>1)
    Thread-19:Error:(8=>9)
    Thread-6:Error:(0=>1)
    Thread-28:Error:(0=>1)
    Thread-21:Error:(0=>1)
    Thread-14:Error:(0=>1)
    Thread-20:Error:(0=>1)
    Thread-5:Error:(0=>1)
    Thread-13:Error:(0=>1)
    Thread-15:Error:(0=>1)
    Thread-22:Error:(0=>1)
    Thread-4:Error:(0=>1)
    Thread-12:Error:(0=>1)
    Thread-23:Error:(0=>1)
    Thread-54:Error:(0=>1)
    Thread-57:Error:(0=>1)
    Thread-26:0:(8=>9)
    Thread-27:Error:(8=>9)
    Thread-32:Error:(9=>10)
    Thread-35:Error:(9=>10)
    Thread-56:0:(9=>10)
    Thread-33:Error:(10=>11)
    Thread-50:0:(10=>11)
    Thread-31:0:(11=>12)
    Thread-38:0:(12=>13)
    Thread-25:Error:(13=>14)
    Thread-36:0:(13=>14)
    Thread-39:Error:(14=>15)
    Thread-14:0:(14=>15)
    Thread-19:Error:(14=>15)
    Thread-17:Error:(14=>15)
    Thread-6:Error:(14=>15)
    Thread-9:Error:(14=>15)
    Thread-33:Error:(14=>15)
    Thread-35:Error:(14=>15)
    Thread-23:Error:(14=>15)
    Thread-18:Error:(14=>15)
    Thread-15:Error:(14=>15)
    Thread-11:Error:(14=>15)
    Thread-7:Error:(14=>15)
    Thread-57:Error:(14=>15)
    Thread-27:Error:(14=>15)
    Thread-16:Error:(14=>15)
    Thread-65:Error:(14=>15)
    Thread-24:Error:(14=>15)
    Thread-13:Error:(14=>15)
    Thread-32:Error:(15=>16)
    Thread-28:Error:(14=>15)
    Thread-21:Error:(14=>15)
    Thread-30:Error:(14=>15)
    Thread-54:Error:(14=>15)
    Thread-22:Error:(14=>15)
    Thread-25:Error:(14=>15)
    Thread-3:Error:(14=>15)
    Thread-29:Error:(14=>15)
    Thread-5:Error:(14=>15)
    Thread-12:Error:(14=>15)
    Thread-20:Error:(14=>15)
    Thread-40:0:(15=>16)
    Thread-4:Error:(14=>15)
    Thread-41:0:(16=>17)
    Thread-44:0:(17=>18)
    Thread-45:0:(18=>19)
    Thread-47:0:(19=>20)
    Thread-43:0:(20=>21)
    Thread-48:0:(21=>22)
    Thread-37:0:(22=>23)
    Thread-49:0:(23=>24)
    Thread-55:0:(24=>25)
    Thread-60:0:(25=>26)
    Thread-42:0:(26=>27)
    Thread-52:0:(27=>28)
    Thread-46:0:(28=>29)
    Thread-58:0:(29=>30)
    Thread-51:0:(30=>31)
    Thread-66:0:(31=>32)
    Thread-59:0:(32=>33)
    Thread-17:0:(33=>34)
    Thread-19:Error:(33=>34)
    Thread-39:Error:(33=>34)
    Thread-28:0:(34=>35)
    Thread-54:Error:(34=>35)
    Thread-65:Error:(34=>35)
    Thread-25:Error:(34=>35)
    Thread-30:Error:(34=>35)
    Thread-5:Error:(35=>36)
    Thread-13:Error:(35=>36)
    Thread-16:Error:(34=>35)
    Thread-6:Error:(34=>35)
    Thread-9:Error:(34=>35)
    Thread-21:Error:(35=>36)
    Thread-29:Error:(35=>36)
    Thread-33:Error:(34=>35)
    Thread-57:Error:(35=>36)
    Thread-24:Error:(34=>35)
    Thread-22:Error:(34=>35)
    Thread-32:Error:(35=>36)
    Thread-23:Error:(34=>35)
    Thread-7:Error:(34=>35)
    Thread-15:Error:(34=>35)
    Thread-4:0:(35=>36)
    Thread-20:Error:(35=>36)
    Thread-12:Error:(35=>36)
    Thread-35:0:(36=>37)
    Thread-18:Error:(36=>37)
    Thread-11:Error:(36=>37)
    Thread-3:Error:(36=>37)
    Thread-27:Error:(37=>38)
    Thread-39:Error:(37=>38)
    Thread-19:0:(37=>38)
    Thread-7:Error:(38=>39)
    Thread-33:0:(38=>39)
    Thread-29:Error:(38=>39)
    Thread-16:Error:(38=>39)
    Thread-22:Error:(38=>39)
    Thread-65:Error:(38=>39)
    Thread-54:Error:(38=>39)
    Thread-57:Error:(38=>39)
    Thread-30:Error:(38=>39)
    Thread-21:Error:(38=>39)
    Thread-24:Error:(38=>39)
    Thread-32:Error:(39=>40)
    Thread-5:Error:(39=>40)
    Thread-13:Error:(39=>40)
    Thread-6:Error:(38=>39)
    Thread-25:Error:(38=>39)
    Thread-9:Error:(38=>39)
    Thread-20:0:(39=>40)
    Thread-12:Error:(39=>40)
    Thread-15:Error:(38=>39)
    Thread-23:Error:(38=>39)
    Thread-18:Error:(40=>41)
    Thread-3:Error:(40=>41)
    Thread-27:0:(40=>41)
    Thread-11:Error:(40=>41)
    Thread-39:0:(41=>42)
    Thread-7:Error:(42=>43)
    Thread-54:0:(42=>43)
    Thread-22:Error:(42=>43)
    Thread-30:Error:(42=>43)
    Thread-57:Error:(42=>43)
    Thread-65:Error:(42=>43)
    Thread-32:Error:(43=>44)
    Thread-24:Error:(43=>44)
    Thread-5:Error:(42=>43)
    Thread-21:Error:(42=>43)
    Thread-16:Error:(43=>44)
    Thread-29:Error:(43=>44)
    Thread-6:0:(43=>44)
    Thread-9:Error:(43=>44)
    Thread-23:Error:(43=>44)
    Thread-25:Error:(43=>44)
    Thread-15:Error:(43=>44)
    Thread-12:Error:(43=>44)
    Thread-13:Error:(44=>45)
    Thread-11:0:(44=>45)
    Thread-18:Error:(44=>45)
    Thread-3:Error:(44=>45)
    Thread-57:0:(45=>46)
    Thread-7:Error:(45=>46)
    Thread-22:Error:(45=>46)
    Thread-30:Error:(45=>46)
    Thread-9:0:(46=>47)
    Thread-65:Error:(46=>47)
    Thread-25:Error:(46=>47)
    Thread-24:Error:(46=>47)
    Thread-21:Error:(47=>48)
    Thread-32:Error:(47=>48)
    Thread-5:Error:(47=>48)
    Thread-15:Error:(47=>48)
    Thread-16:0:(47=>48)
    Thread-29:Error:(47=>48)
    Thread-12:Error:(48=>49)
    Thread-23:Error:(47=>48)
    Thread-13:0:(48=>49)
    Thread-18:Error:(49=>50)
    Thread-3:Error:(49=>50)
    Thread-7:0:(49=>50)
    Thread-30:Error:(49=>50)
    Thread-22:Error:(49=>50)
    Thread-25:0:(50=>51)
    Thread-65:Error:(50=>51)
    Thread-12:0:(51=>52)
    Thread-21:Error:(51=>52)
    Thread-32:Error:(52=>53)
    Thread-24:Error:(52=>53)
    Thread-29:Error:(52=>53)
    Thread-5:Error:(52=>53)
    Thread-23:Error:(52=>53)
    Thread-15:Error:(52=>53)
    Thread-18:0:(52=>53)
    Thread-3:Error:(52=>53)
    Thread-30:0:(53=>54)
    Thread-22:Error:(53=>54)
    Thread-65:0:(54=>55)
    Thread-24:0:(55=>56)
    Thread-21:Error:(55=>56)
    Thread-32:Error:(55=>56)
    Thread-5:0:(56=>57)
    Thread-29:Error:(57=>58)
    Thread-15:0:(57=>58)
    Thread-23:Error:(57=>58)
    Thread-3:0:(58=>59)
    Thread-22:0:(59=>60)
    Thread-32:0:(60=>61)
    Thread-21:Error:(60=>61)
    Thread-29:0:(61=>62)
    Thread-23:Error:(61=>62)
    Thread-21:0:(62=>63)
    Thread-23:0:(63=>64)
    complete
    64

    备注:实际应用场景中我这里不是实现自增,上边代码自增目的:测试是否能够在高并发下修改同一个key时实现锁的功能。

    实际应用:高并发使用redis同一个key存储统计TOPN结果

    需求:

    1)拥有多个记录,每条记录数据格式:日期,统计指标1,统计指标2,统计指标3,统计指标4

    2)要求分别统计出4个指标的最大值,并记录各自指标最大值时对应的记录。

    3)记录中各个字段的模拟公式:

    // 数据格式:
    // 第一列:日期
    // 第二列:f(x) = x*0.7+1(x自然数)      最大值 45.1
    // 第三列:f(x)= -x*x+20x+1(自然数)    a=-1,b=20,c=1 , 当x=-b/2a时 y 最大值:-10*10+20*10+1=101
    // 第四列:f(x)= -x*x+30x+1(自然数)    a=-1,b=30,c=1 , 当x=-b/2a时 y 最大值:-15*15+30*15+1=445-225+1=221
    // 第五列:f(x)= 640/(x+1)(x自然数)    最大值 640    

    使用mutil+watch实现:

    首先,把最终统计结果存储到redis的key(lock_test)中,lock_test是一个hash类型,其中存储四个键值对,每个键值对分别存储各自的指标最大值时对应的记录。

    指标配置项存储到FeildConfig.java

        static class FieldConfig {
            private String key;
            private Integer index;
    
            public String getKey() {
                return key;
            }
    
            public void setKey(String key) {
                this.key = key;
            }
    
            public Integer getIndex() {
                return index;
            }
    
            public void setIndex(Integer index) {
                this.index = index;
            }
    
            public FieldConfig(String key, Integer index) {
                super();
                this.key = key;
                this.index = index;
            }
        }

    在测试类的static构造函数中初始化各自指标在一条记录的中索引位置,及一共包含的指标类型:

    public class Test_Lock3 {
        。。。
        final static List<FieldConfig> keys = new ArrayList<FieldConfig>();
    
        static {
            keys.add(new FieldConfig("101", 1));
            keys.add(new FieldConfig("102", 2));
            keys.add(new FieldConfig("103", 3));
            keys.add(new FieldConfig("104", 4));
        }
        。。。
    }

    备注:101代表指标1,对应在被统计记录中的第1索引位置(备注:这里索引是对记录按照“,”分割后的数组索引位置)。

    制作测试数据:

            List<String> dataItems=new ArrayList<String>();
    
            // 数据格式:
            // 第一列:日期
            // 第二列:f(x) = x*0.7+1(x自然数)      最大值 45.1
            // 第三列:f(x)= -x*x+20x+1(自然数)    a=-1,b=20,c=1 , 当x=-b/2a时 y 最大值:-10*10+20*10+1=101
            // 第四列:f(x)= -x*x+30x+1(自然数)    a=-1,b=30,c=1 , 当x=-b/2a时 y 最大值:-15*15+30*15+1=445-225+1=221
            // 第五列:f(x)= 640/(x+1)(x自然数)    最大值 640        
            for(int x=0;x<threahSize;x++){
                String dataItem = "2018-07-29 00:00:00," + (x*0.7d+1) + "," + (-1d*x*x+20*x+1) + "," + (-1d*x*x+30*x+1) + "," + (640d/(x+1));
                dataItems.add(dataItem);
            }

    以下是整体测试代码:

    package com.dx.es;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.CountDownLatch;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.Transaction;
    
    public class Test_Lock3 {
        final static JedisPool pool = RedisUtil.getPool();
        final static List<FieldConfig> keys = new ArrayList<FieldConfig>();
    
        static {
            keys.add(new FieldConfig("101", 1));
            keys.add(new FieldConfig("102", 2));
            keys.add(new FieldConfig("103", 3));
            keys.add(new FieldConfig("104", 4));
        }
    
        public static void main(String[] args) {
            int threahSize = 64;
            final CountDownLatch threadsCountDownLatch = new CountDownLatch(threahSize);
            Jedis jedis = pool.getResource();
            jedis.del("lock_test");
            
            List<String> dataItems=new ArrayList<String>();
    
            // 数据格式:
            // 第一列:日期
            // 第二列:f(x) = x*0.7+1(x自然数)      最大值 45.1
            // 第三列:f(x)= -x*x+20x+1(自然数)    a=-1,b=20,c=1 , 当x=-b/2a时 y 最大值:-10*10+20*10+1=101
            // 第四列:f(x)= -x*x+30x+1(自然数)    a=-1,b=30,c=1 , 当x=-b/2a时 y 最大值:-15*15+30*15+1=445-225+1=221
            // 第五列:f(x)= 640/(x+1)(x自然数)    最大值 640        
            for(int x=0;x<threahSize;x++){
                String dataItem = "2018-07-29 00:00:00," + (x*0.7d+1) + "," + (-1d*x*x+20*x+1) + "," + (-1d*x*x+30*x+1) + "," + (640d/(x+1));
                dataItems.add(dataItem);
            }
            
            
            for (int i = 0; i < threahSize; i++) {
                Jedis jedisT = pool.getResource();
                
                Runnable handler = new MyRunable(jedisT, threadsCountDownLatch, dataItems.get(i));
    
                new Thread(handler).start();
            }
    
            // 等待所有并行子线程任务完成。
            try {
                threadsCountDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println("complete");
    
            Map<String, String> values = jedis.hgetAll("lock_test");
            jedis.close();
    
            System.out.println(values);
        }
    
        static class MyRunable implements Runnable {
            private Jedis jedis = null;
            private CountDownLatch threadsCountDownLatch = null;
            private String newLine = null;
    
            public MyRunable(Jedis jedis, CountDownLatch threadsCountDownLatch, String newLine) {
                this.jedis = jedis;
                this.threadsCountDownLatch = threadsCountDownLatch;
                this.newLine = newLine;
            }
    
            public void run() {
                while (true) {
                    this.jedis.watch("lock_test");
    
                    Map<String, String> newValues = new HashMap<String, String>();
                    Map<String, String> oldValues = this.jedis.hgetAll("lock_test");
                    
                    String newValueStr="";
    
                    for (FieldConfig key : keys) {
                        if (oldValues.containsKey(key.getKey())) {
                            Double oldValue = Double.valueOf(oldValues.get(key.getKey()).split(",")[key.getIndex()]);
                            Double newValue = Double.valueOf(newLine.split(",")[key.getIndex()]);
                            if (newValue > oldValue) {
                                newValues.put(key.getKey(), newLine);
                                newValueStr=newLine;
                            } else {
                                newValues.put(key.getKey(), oldValues.get(key.getKey()));
                                newValueStr=oldValues.get(key.getKey());
                            }
                        } else {
                            newValues.put(key.getKey(), newLine);
                            newValueStr=newLine;
                        }
                    }
    
                    Transaction tx = this.jedis.multi();
                    tx.hmset("lock_test", newValues);
    
                    List<Object> exec = tx.exec();
    
                    if (exec == null || exec.isEmpty()) {
                        //System.out.println(Thread.currentThread().getName() + ":" + "Error:(" + oldValueStr + "->"+newValueStr+")");
                    } else {
                        String values = "";
                        for (int i = 0; i < exec.size(); i++) {
                            values += exec.get(i).toString();
                        }
                        System.out.println(Thread.currentThread().getName() + ":" + values + ":(" + newLine +"->"+newValueStr+")");
                        break;
                    }
    
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
                this.jedis.close();
                this.threadsCountDownLatch.countDown();
            }
        }
    
        static class FieldConfig {
            private String key;
            private Integer index;
    
            public String getKey() {
                return key;
            }
    
            public void setKey(String key) {
                this.key = key;
            }
    
            public Integer getIndex() {
                return index;
            }
    
            public void setIndex(Integer index) {
                this.index = index;
            }
    
            public FieldConfig(String key, Integer index) {
                super();
                this.key = key;
                this.index = index;
            }
        }
    }

    测试:

    Thread-23:OK:(2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
    Thread-29:OK:(2018-07-29 00:00:00,19.2,-155.0,105.0,23.703703703703702->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
    Thread-33:OK:(2018-07-29 00:00:00,22.0,-299.0,1.0,20.64516129032258->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
    Thread-24:OK:(2018-07-29 00:00:00,15.7,-20.0,190.0,29.09090909090909->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
    Thread-36:OK:(2018-07-29 00:00:00,24.099999999999998,-428.0,-98.0,18.823529411764707->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
    Thread-32:OK:(2018-07-29 00:00:00,21.299999999999997,-260.0,30.0,21.333333333333332->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
    Thread-39:OK:(2018-07-29 00:00:00,26.2,-575.0,-215.0,17.2972972972973->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
    Thread-40:OK:(2018-07-29 00:00:00,26.9,-628.0,-258.0,16.842105263157894->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
    Thread-25:OK:(2018-07-29 00:00:00,16.4,-43.0,177.0,27.82608695652174->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
    Thread-3:OK:(2018-07-29 00:00:00,1.0,1.0,1.0,640.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-16:OK:(2018-07-29 00:00:00,10.1,92.0,222.0,45.714285714285715->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-21:OK:(2018-07-29 00:00:00,13.6,37.0,217.0,33.68421052631579->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-45:OK:(2018-07-29 00:00:00,30.4,-923.0,-503.0,14.883720930232558->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-28:OK:(2018-07-29 00:00:00,18.5,-124.0,126.0,24.615384615384617->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-46:OK:(2018-07-29 00:00:00,31.099999999999998,-988.0,-558.0,14.545454545454545->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-12:OK:(2018-07-29 00:00:00,7.3,100.0,190.0,64.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-34:OK:(2018-07-29 00:00:00,22.7,-340.0,-30.0,20.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-49:OK:(2018-07-29 00:00:00,33.199999999999996,-1195.0,-735.0,13.617021276595745->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-50:OK:(2018-07-29 00:00:00,33.9,-1268.0,-798.0,13.333333333333334->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-52:OK:(2018-07-29 00:00:00,35.3,-1420.0,-930.0,12.8->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-53:OK:(2018-07-29 00:00:00,36.0,-1499.0,-999.0,12.549019607843137->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-54:OK:(2018-07-29 00:00:00,36.699999999999996,-1580.0,-1070.0,12.307692307692308->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-55:OK:(2018-07-29 00:00:00,37.4,-1663.0,-1143.0,12.075471698113208->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-56:OK:(2018-07-29 00:00:00,38.099999999999994,-1748.0,-1218.0,11.851851851851851->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-57:OK:(2018-07-29 00:00:00,38.8,-1835.0,-1295.0,11.636363636363637->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-58:OK:(2018-07-29 00:00:00,39.5,-1924.0,-1374.0,11.428571428571429->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-59:OK:(2018-07-29 00:00:00,40.199999999999996,-2015.0,-1455.0,11.228070175438596->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-60:OK:(2018-07-29 00:00:00,40.9,-2108.0,-1538.0,11.03448275862069->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-61:OK:(2018-07-29 00:00:00,41.599999999999994,-2203.0,-1623.0,10.847457627118644->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-62:OK:(2018-07-29 00:00:00,42.3,-2300.0,-1710.0,10.666666666666666->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-63:OK:(2018-07-29 00:00:00,43.0,-2399.0,-1799.0,10.491803278688524->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-64:OK:(2018-07-29 00:00:00,43.699999999999996,-2500.0,-1890.0,10.32258064516129->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-65:OK:(2018-07-29 00:00:00,44.4,-2603.0,-1983.0,10.158730158730158->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-11:OK:(2018-07-29 00:00:00,6.6,97.0,177.0,71.11111111111111->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-41:OK:(2018-07-29 00:00:00,27.599999999999998,-683.0,-303.0,16.41025641025641->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-47:OK:(2018-07-29 00:00:00,31.799999999999997,-1055.0,-615.0,14.222222222222221->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-37:OK:(2018-07-29 00:00:00,24.799999999999997,-475.0,-135.0,18.285714285714285->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-48:OK:(2018-07-29 00:00:00,32.5,-1124.0,-674.0,13.91304347826087->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-5:OK:(2018-07-29 00:00:00,2.4,37.0,57.0,213.33333333333334->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-42:OK:(2018-07-29 00:00:00,28.299999999999997,-740.0,-350.0,16.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-8:OK:(2018-07-29 00:00:00,4.5,76.0,126.0,106.66666666666667->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-9:OK:(2018-07-29 00:00:00,5.199999999999999,85.0,145.0,91.42857142857143->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-43:OK:(2018-07-29 00:00:00,29.0,-799.0,-399.0,15.609756097560975->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-7:OK:(2018-07-29 00:00:00,3.8,65.0,105.0,128.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-44:OK:(2018-07-29 00:00:00,29.7,-860.0,-450.0,15.238095238095237->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-18:OK:(2018-07-29 00:00:00,11.5,76.0,226.0,40.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-20:OK:(2018-07-29 00:00:00,12.899999999999999,52.0,222.0,35.55555555555556->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-15:OK:(2018-07-29 00:00:00,9.399999999999999,97.0,217.0,49.23076923076923->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-27:OK:(2018-07-29 00:00:00,17.799999999999997,-95.0,145.0,25.6->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-17:OK:(2018-07-29 00:00:00,10.799999999999999,85.0,225.0,42.666666666666664->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-10:OK:(2018-07-29 00:00:00,5.8999999999999995,92.0,162.0,80.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-66:OK:(2018-07-29 00:00:00,45.099999999999994,-2708.0,-2078.0,10.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-19:OK:(2018-07-29 00:00:00,12.2,65.0,225.0,37.64705882352941->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-26:OK:(2018-07-29 00:00:00,17.099999999999998,-68.0,162.0,26.666666666666668->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-4:OK:(2018-07-29 00:00:00,1.7,20.0,30.0,320.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-35:OK:(2018-07-29 00:00:00,23.4,-383.0,-63.0,19.393939393939394->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-22:OK:(2018-07-29 00:00:00,14.299999999999999,20.0,210.0,32.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-31:OK:(2018-07-29 00:00:00,20.599999999999998,-223.0,57.0,22.06896551724138->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-51:OK:(2018-07-29 00:00:00,34.599999999999994,-1343.0,-863.0,13.061224489795919->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-30:OK:(2018-07-29 00:00:00,19.9,-188.0,82.0,22.857142857142858->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-6:OK:(2018-07-29 00:00:00,3.0999999999999996,52.0,82.0,160.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-14:OK:(2018-07-29 00:00:00,8.7,100.0,210.0,53.333333333333336->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-13:OK:(2018-07-29 00:00:00,8.0,101.0,201.0,58.18181818181818->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    Thread-38:OK:(2018-07-29 00:00:00,25.5,-524.0,-174.0,17.77777777777778->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
    complete
    {
    101=2018-07-29 00:00:00,45.099999999999994,-2708.0,-2078.0,10.0,
    102
    =2018-07-29 00:00:00,8.0,101.0,201.0,58.18181818181818,
    103=2018-07-29 00:00:00,11.5,76.0,226.0,40.0,
    104=2018-07-29 00:00:00,1.0,1.0,1.0,640.0

    }

    参考:《通过Jedis的setnx、multi事务及watch实现三种分布式跨JVM锁的方法代码示例》注意:本章文章中介绍了三种锁的方案,但是这三种方案并不能解决topn的问题,仅供参考。

  • 相关阅读:
    log4j1修改DailyRollingFileAppender支持日志最大数量
    log4j1 修改FileAppender解决当天的文件没有日期后缀
    log4j生成有日期的日志文件名
    Java删除List和Set集合中元素
    Java并发编程:并发容器之ConcurrentHashMap
    Java并发编程:并发容器之CopyOnWriteArrayList
    java.util.ConcurrentModificationException解决详解
    Handshake failed due to invalid Upgrade header: null 解决方案
    web项目Log4j日志输出路径配置问题
    log4j.properties 的使用详解
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9383713.html
Copyright © 2011-2022 走看看