zoukankan      html  css  js  c++  java
  • Redis系列(三)--消息队列、排行榜、慢查询、pipeline等实现

    Redis命令执行生命周期:

      发送命令--->排队(单线程)--->执行命令--->返回结果

    慢查询:

      只是针对命令执行阶段

      慢查询日志通过一个固定长度的FIFO queue,这个queue保存在内存中,通过设置命令执行时间慢查询范围,超过这个范围进入慢查询范围,就

    会保存到queue中

    慢查询有两个相关参数:

      slowlog-log-slower-than 1000

      slowlog-max-len 1000

    可以通过修改redis.conf或者命令config set slowlog-log-slower-than 1000设置,通过config get获取参数

    慢查询命令:

    slowlog get:

    127.0.0.1:6379> slowlog get
    1) 1) (integer) 2
       2) (integer) 1558081229
       3) (integer) 293
       4) 1) "COMMAND"
       5) "127.0.0.1:58194"
       6) ""
    2) 1) (integer) 1
       2) (integer) 1552552609
       3) (integer) 15589
       4) 1) "save"
       5) "127.0.0.1:54516"
       6) ""

    参数说明:

    1、慢查询记录id

    2、发起命令的时间戳

    3、 命令耗时,单位为微秒

    4、 该条记录的命令及参数

    5、客户端网络套接字(ip: port)

    slowlog len:慢查询队列长度

    127.0.0.1:6379> slowlog len
    (integer) 2

    slowlog reset:清空慢查询队列

    127.0.0.1:6379> slowlog reset
    OK

    慢查询日志优化:

      1、slowlog-log-slower-than默认10000微秒,就是10ms,通常设置1ms

      2、slowlog-max-lan默认128,通常设置1000,当超过最大queue长度,最先进入的记录被剔除,最新的一条记录加入slow log

      3、参数可以动态设置,前面说了

      4、可以定期将慢查询日志进行持久化,因为它保存在内存中

    pipeline:

      1次网络+n次命令时间

      pipeline也就是流水线,将多个命令进行打包,在Redis server端计算出来,然后依次将结果返回

    简单应用:

    @Test
    public void countTimes() {
        SimpleDateFormat format = new SimpleDateFormat("hh:mm:ss");
        String startTime = format.format(new Date());
        log.info("开始时间:{}", startTime);
        String realKey = "test";
        Pipeline pipeline = redisService.pipeline();
        for (int i=0; i<10000; i++) {
            pipeline.set(realKey + i, "a" );
        }
        for (int i=0; i<10000; i++) {
            pipeline.del(realKey +i);
        }
        pipeline.sync();
        log.info("结束时间:{}", format.format(new Date()));
    }
    2019-05-17 16:57:14.357  INFO 11132 --- [           main] com.it.RedisServiceTest                  : 开始时间:04:57:14
    2019-05-17 16:57:16.184  INFO 11132 --- [           main] com.it.RedisServiceTest                  : 结束时间:04:57:16

      如果使用set和del各自操作10000次,由于本人在上海,买的阿里云归属地是北京,加上配置太渣,网络问题等,1分钟过后才插入5000条。是

    在等不下去了可以看出pipeline的速度是有多快

    注意点:

      m相关命令是原子操作,而pipeline不是,会拆分为很多子命令

    计数器:

      通过incr、incrby实现

    应用场景:

      用户登录次数记录

      社交点赞等

    消息队列:

    一般可以用来单对单消息队列,这不是Redis本身的功能,而是通过list实现,不保证可靠性投递。如果真的需要消息队列,还是通过MQ实现

    实现:

    127.0.0.1:6379> lpush list 1
    (integer) 1
    127.0.0.1:6379> blpop list 10                    //blpop,从左边弹出一个元素,在timeout时间内如果没有元素就阻塞
    1) "list"
    2) "1"
    (4.98s)
    127.0.0.1:6379> brpop list 10                    //brpop,从右边谈,和blpop相同
    1) "list"
    2) "1"
    (4.98s)

    java代码实现:

    Redis基本方法:lpush、rpush、lpop、rpop、brpop、blpop

    public void lpush(byte[] key, byte[] value) {
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.lpush(key, value);
            } finally {
                returnToPool(jedis);
            }
        }
    
        public void rpush(byte[] key, byte[] value) {
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.rpush(key, value);
            } finally {
                returnToPool(jedis);
            }
        }
    
        public Object lpop(String key) {
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                String message = jedis.lpop(key);
                return message;
            } finally {
                returnToPool(jedis);
            }
        }
    
        public byte[] rpop(byte[] key) {
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                return jedis.rpop(key);
            } finally {
                returnToPool(jedis);
            }
        }
    
        public List<byte[]> brpop(int timeout, String key) {
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                return jedis.brpop(timeout, key.getBytes());
            } finally {
                returnToPool(jedis);
            }
        }
    @Data
    @AllArgsConstructor
    public class RedisMessage implements Serializable {
    
        private int id;
        private String message;
    
    }
    @Slf4j
    @Service
    public class RedisQueue {
        @Autowired
        private RedisService redisService;
    
        public void sendRedisMessage(int id, String message) {
            RedisMessage redisMessage = new RedisMessage(id, message);
            String key = RedisConstant.LIST_KEY + id;
            try {
                redisService.lpush(key.getBytes(), ObjectUtils.object2Bytes(redisMessage));
            } catch (IOException e) {
                log.error("Redis消息发送失败:{}",e);
            }
        }
    
        public RedisMessage receiveMessage(int id){
            String key = RedisConstant.LIST_KEY + id;
            List<byte[]> list = redisService.brpop(0, key);
            RedisMessage message = null;
            try {
                message = (RedisMessage)ObjectUtils.bytes2Object(list.get(1));
            } catch (IOException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            return message;
    
        }
    }
    @Slf4j
    public class RedisServiceTest extends ApplicationTests{
    
        @Autowired
        private RedisQueue redisQueue;
    
        @Test
        public void sendMessage() {
            for (int i=0; i<5; i++)
            redisQueue.sendRedisMessage(1, "send redis message!!!");
        }
    
        @Test
        public void receiveMessage() {
            RedisMessage message = redisQueue.receiveMessage(1);
            log.info("接收redis message:{}",message.getMessage());
        }
    }

    发送消息:

    127.0.0.1:6379> lrange redisQueue1 0 -1
    1) "xacxedx00x05srx00x19com.it.redis.RedisMessage$xd4D'>[gxf8x02x00x02Ix00x02idLx00amessagetx00x12Ljava/lang/String;xpx00x00x00x01tx00x15send redis message!!!"
    2) "xacxedx00x05srx00x19com.it.redis.RedisMessage$xd4D'>[gxf8x02x00x02Ix00x02idLx00amessagetx00x12Ljava/lang/String;xpx00x00x00x01tx00x15send redis message!!!"
    3) "xacxedx00x05srx00x19com.it.redis.RedisMessage$xd4D'>[gxf8x02x00x02Ix00x02idLx00amessagetx00x12Ljava/lang/String;xpx00x00x00x01tx00x15send redis message!!!"
    4) "xacxedx00x05srx00x19com.it.redis.RedisMessage$xd4D'>[gxf8x02x00x02Ix00x02idLx00amessagetx00x12Ljava/lang/String;xpx00x00x00x01tx00x15send redis message!!!"
    5) "xacxedx00x05srx00x19com.it.redis.RedisMessage$xd4D'>[gxf8x02x00x02Ix00x02idLx00amessagetx00x12Ljava/lang/String;xpx00x00x00x01tx00x15send redis message!!!"

    接收消息:

    2019-05-20 10:35:04.576  INFO 3780 --- [           main] com.it.RedisServiceTest                  : 接收redis message:send redis message!!!

    解释:

    RedisMessage:实体,实现Serializable,作为收发消息载体

    RedisQueue:消息队列,包含收发消息方法

    RedisServiceTest:测试类

    brpop(timeout, key),timeout取0,表示如果无法取到消息,就会一直阻塞

    发布订阅:

      一个新的订阅者订阅一个频道是无法收到以前的消息的,没有消息堆积的能力

    角色:

      发布者publisher、订阅者subscriber、频道channel

    命令:

    127.0.0.1:6379> subscribe myChannel 
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "myChannel"
    3) (integer) 1
    1) "message"
    2) "myChannel"
    3) "aaa"
    127.0.0.1:6379> psubscribe channel*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "channel*"
    3) (integer) 1
    1) "pmessage"
    2) "channel*"
    3) "channel2"
    4) "bbb"
    1) "pmessage"
    2) "channel*"
    3) "channel1"
    4) "aaa"
    127.0.0.1:6379> unsubscribe myChannel
    1) "unsubscribe"
    2) "myChannel"
    3) (integer) 0
    
    127.0.0.1:6379> publish myChannel aaa
    (integer) 1

    psubscribe pattern:安装某种方式进行订阅,可以使用通配符

    抽奖功能:set实现

    127.0.0.1:6379> sadd choujiang zhangsan lisi wanger                //添加抽奖名单到set
    (integer) 3
    127.0.0.1:6379> smembers choujiang                                //获取抽奖名单
    1) "lisi"
    2) "zhangsan"
    3) "wanger"
    127.0.0.1:6379> srandmember choujiang 2                            //从名单中随机抽取2名,并且不删除已中奖名单
    1) "zhangsan"
    2) "wanger"
    127.0.0.1:6379> spop choujiang 2                                //从名单中随机抽取2名,并且删除已中奖名单
    1) "wanger"
    2) "lisi"
    127.0.0.1:6379> smembers choujiang
    1) "zhangsan"

    实现点赞、签到具体用户列表:set实现

    127.0.0.1:6379> sadd article:1001 zhangsan
    (integer) 1
    127.0.0.1:6379> sadd article:1001 lisi                        //lisi给1001文章点赞
    (integer) 1
    127.0.0.1:6379> srem article:1001 lisi                        //lisi给1001文章取消点赞
    (integer) 1
    127.0.0.1:6379> sismember article:1001 zhangsan                //检查lisi是否给1001文章点过赞,个人觉得sadd也是一样的,如果返回0,证明set已经包含了
    (integer) 1
    127.0.0.1:6379> sadd article:1001 lisi1
    (integer) 1
    127.0.0.1:6379> sadd article:1001 lisi2
    (integer) 1
    127.0.0.1:6379> smembers article:1001                        //获取点赞列表
    1) "zhangsan"
    2) "lisi2"
    3) "lisi1"
    127.0.0.1:6379> scard article:1001                            //点赞人数
    (integer) 3

    共同关注列表:set实现

    通过sinter实现

    127.0.0.1:6379> sadd zhangsanlist jesen kobe
    (integer) 2
    127.0.0.1:6379> sadd lisilist jesen gakki hebe
    (integer) 3
    127.0.0.1:6379> sinter zhangsanlist lisilist
    1) "jesen"

    排行榜:zset实现

    127.0.0.1:6379> zadd NouthAmercianMovieRanking 5702 Speed_preparation 2841 The_Avengers 2482 Big_detective_Pikachu
    (integer) 3
    127.0.0.1:6379> zrevrange NouthAmercianMovieRanking 0 2 withscores
    1) "Speed_preparation"
    2) "5702"
    3) "The_Avengers"
    4) "2841"
    5) "Big_detective_Pikachu"
    6) "2482"

    部分内容参考:https://mp.weixin.qq.com/s/FyYhLS3X7LDe0PLxooz_cQ

  • 相关阅读:
    Oracle笔记(十五) 数据库备份
    Oracle笔记(十四) 用户管理
    Oracle笔记(十三) 视图、同义词、索引
    Oracle笔记(十二) 集合、序列
    Oracle笔记(十一) 建表、更新、查询综合练习
    Oracle笔记(十) 约束
    Oracle笔记(八) 复杂查询及总结
    Oracle笔记(九) 表的创建及管理
    06-流程控制
    05-数据类型转换
  • 原文地址:https://www.cnblogs.com/huigelaile/p/10881537.html
Copyright © 2011-2022 走看看