zoukankan      html  css  js  c++  java
  • 高性能/并发的保证-Netty在Redisson的应用

    背景图

    前言

    ​ Redisson Github: https://github.com/redisson/redisson

    ​ Redisson 官网:https://redisson.pro/

    Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

    以下是Redisson的结构:

    Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。

    客户端初始化

    createBootstrap

    org.redisson.client.RedisClient#createBootstrap

    private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
            Bootstrap bootstrap = new Bootstrap()
                            .resolver(config.getResolverGroup())
              							//1.指定配置中的IO类型
                            .channel(config.getSocketChannelClass())
              							//2.指定配置中的线程模型
                            .group(config.getGroup());
      			//3.IO处理逻辑
            bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
      			//4. 指定bootstrap配置选项
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
            bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
            bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
            config.getNettyHook().afterBoostrapInitialization(bootstrap);
            return bootstrap;
        }
    

    从上面的代码可以看到,客户端启动的引导类是 Bootstrap,负责启动客户端以及连接服务端,引导类创建完成之后,下面我们描述一下客户端启动的流程。

    一. 首先,我们需要给它指定线程模型,驱动着连接的数据读写。然后,redisson默认指定 IO 模型为 NioSocketChannel

    二. 接着,给引导类指定一系列处理链路,这里主要就是定义连接的业务处理逻辑,不理解没关系,在后面我们会详细分析

    RedisChannelInitializer

    org.redisson.client.handler.RedisChannelInitializer

    RedisChannelInitializer

     @Override
        protected void initChannel(Channel ch) throws Exception {
          	// 开启SSL终端识别能力
            initSsl(config, ch);
            
            if (type == Type.PLAIN) {
              	//Redis正常连接处理类
                ch.pipeline().addLast(new RedisConnectionHandler(redisClient));
            } else {
              	//Redis订阅发布处理类
                ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));
            }
            
            ch.pipeline().addLast(
              	//链路检测狗
                connectionWatchdog,
              	//Redis协议命令编码器
                CommandEncoder.INSTANCE,
              	//Redis协议命令批量编码器
                CommandBatchEncoder.INSTANCE,
              	//Redis命令队列
                new CommandsQueue());
            
            if (pingConnectionHandler != null) {
               //心跳包连接处理类
                ch.pipeline().addLast(pingConnectionHandler);
            }
            
            if (type == Type.PLAIN) {
              	//Redis协议命令解码器
                ch.pipeline().addLast(new CommandDecoder(config.getExecutor(), config.isDecodeInExecutor()));
            } else {
              	//Redis订阅发布解码器
                ch.pipeline().addLast(new CommandPubSubDecoder(config.getExecutor(), config.isKeepPubSubOrder(), config.isDecodeInExecutor()));
            }
    
            config.getNettyHook().afterChannelInitialization(ch);
        }
    

    图1 Redisson 链路处理图

    Redisson处理链路

    Redisson的处理链

    Redisson的Pipeline里面的ChannelHandler比较多,我挑选其中CommandEncoderCommandDecoder进行源码剖析。

    CommandDecoder_Encoder

    失败重连

    org.redisson.client.handler.ConnectionWatchdog#reconnect 重连机制

    private void reconnect(final RedisConnection connection, final int attempts){
    		//重试时间越来越久
        int timeout = 2 << attempts;
        if (bootstrap.config().group().isShuttingDown()) {
            return;
        }
        
        try {
            timer.newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    tryReconnect(connection, Math.min(BACKOFF_CAP, attempts + 1));
                }
            }, timeout, TimeUnit.MILLISECONDS);
        } catch (IllegalStateException e) {
            // skip
        }
    }
    

    netty中的Timer管理,使用了的Hashed time Wheel的模式,Time Wheel翻译为时间轮,是用于实现定时器timer的经典算法。

    这个方法的声明是这样的:

     /**
         * Schedules the specified {@link TimerTask} for one-time execution after
         * the specified delay.
         *
         * @return a handle which is associated with the specified task
         *
         * @throws IllegalStateException       if this timer has been {@linkplain #stop() stopped} already
         * @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout
         *                                    can cause instability in the system.
         */
        Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
    

    这个方法需要一个TimerTask对象以知道当时间到时要执行什么逻辑,然后需要delay时间数值和TimeUnit时间的单位。

    Redis协议命令编码器

    ​ Redis 的作者认为数据库系统的瓶颈一般不在于网络流量,而是数据库自身内部逻辑处理上。所以即使 Redis 使用了浪费流量的文本协议,依然可以取得极高的访问性能。Redis 将所有数据都放在内存,用一个单线程对外提供服务,单个节点在跑满一个 CPU 核心的情况下可以达到了 10w/s 的超高 QPS。

    RESP 是 Redis 序列化协议的简写。它是一种直观的文本协议,优势在于实现异常简单,解析性能极好。

    Redis 协议将传输的结构数据分为 5 种最小单元类型,单元结束时统一加上回车换行符号

    1. 单行字符串 以 + 符号开头。
    2. 多行字符串 以 $ 符号开头,后跟字符串长度。
    3. 整数值 以 : 符号开头,后跟整数的字符串形式。
    4. 错误消息 以 - 符号开头。
    5. 数组 以 * 号开头,后跟数组的长度。

    单行字符串 hello world

    +hello world
    
    

    多行字符串 hello world

    $11
    hello world
    
    

    多行字符串当然也可以表示单行字符串。

    整数 1024

    :1024
    
    

    错误 参数类型错误

    -WRONGTYPE Operation against a key holding the wrong kind of value
    
    

    数组 [1,2,3]

    *3
    :1
    :2
    :3
    
    
    

    NULL 用多行字符串表示,不过长度要写成-1。

    $-1
    
    
    

    空串 用多行字符串表示,长度填 0。

    $0
    
    
    
    

    注意这里有两个 。为什么是两个?因为两个 之间,隔的是空串。

    org.redisson.client.handler.CommandEncoder#encode()

    private static final char ARGS_PREFIX = '*';
    private static final char BYTES_PREFIX = '$';
    private static final byte[] CRLF = "
    ".getBytes();
    
    
    @Override
        protected void encode(ChannelHandlerContext ctx, CommandData<?, ?> msg, ByteBuf out) throws Exception {
            try {
              	//redis命令前缀
                out.writeByte(ARGS_PREFIX);
                int len = 1 + msg.getParams().length;
                if (msg.getCommand().getSubName() != null) {
                    len++;
                }
                out.writeCharSequence(Long.toString(len), CharsetUtil.US_ASCII);
                out.writeBytes(CRLF);
                
                writeArgument(out, msg.getCommand().getName().getBytes(CharsetUtil.UTF_8));
                if (msg.getCommand().getSubName() != null) {
                    writeArgument(out, msg.getCommand().getSubName().getBytes(CharsetUtil.UTF_8));
                }
              	......
            } catch (Exception e) {
                msg.tryFailure(e);
                throw e;
            }
        }
    
    private void writeArgument(ByteBuf out, ByteBuf arg) {
        out.writeByte(BYTES_PREFIX);
        out.writeCharSequence(Long.toString(arg.readableBytes()), CharsetUtil.US_ASCII);
        out.writeBytes(CRLF);
        out.writeBytes(arg, arg.readerIndex(), arg.readableBytes());
        out.writeBytes(CRLF);
    }
    
    

    Redis协议命令解码器

    org.redisson.client.handler.CommandDecoder#readBytes

     private static final char CR = '
    ';
     private static final char LF = '
    ';
     private static final char ZERO = '0';
    
    private ByteBuf readBytes(ByteBuf is) throws IOException {
        long l = readLong(is);
        if (l > Integer.MAX_VALUE) {
            throw new IllegalArgumentException(
                    "Java only supports arrays up to " + Integer.MAX_VALUE + " in size");
        }
        int size = (int) l;
        if (size == -1) {
            return null;
        }
        ByteBuf buffer = is.readSlice(size);
        int cr = is.readByte();
        int lf = is.readByte();
      	//判断是否以
    开头
        if (cr != CR || lf != LF) {
            throw new IOException("Improper line ending: " + cr + ", " + lf);
        }
        return buffer;
    }
    
    

    数据序列化

    Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。Redisson提供了以下几种的对象编码应用,以供大家选择:

    编码类名称 说明
    org.redisson.codec.JsonJacksonCodec Jackson JSON 编码 默认编码
    org.redisson.codec.AvroJacksonCodec Avro 一个二进制的JSON编码
    org.redisson.codec.SmileJacksonCodec Smile 另一个二进制的JSON编码
    org.redisson.codec.CborJacksonCodec CBOR 又一个二进制的JSON编码
    org.redisson.codec.MsgPackJacksonCodec MsgPack 再来一个二进制的JSON编码
    org.redisson.codec.IonJacksonCodec Amazon Ion 亚马逊的Ion编码,格式与JSON类似
    org.redisson.codec.KryoCodec Kryo 二进制对象序列化编码
    org.redisson.codec.SerializationCodec JDK序列化编码
    org.redisson.codec.FstCodec FST 10倍于JDK序列化性能而且100%兼容的编码
    org.redisson.codec.LZ4Codec LZ4 压缩型序列化对象编码
    org.redisson.codec.SnappyCodec Snappy 另一个压缩型序列化对象编码
    org.redisson.client.codec.JsonJacksonMapCodec 基于Jackson的映射类使用的编码。可用于避免序列化类的信息,以及用于解决使用byte[]遇到的问题。
    org.redisson.client.codec.StringCodec 纯字符串编码(无转换)
    org.redisson.client.codec.LongCodec 纯整长型数字编码(无转换)
    org.redisson.client.codec.ByteArrayCodec 字节数组编码
    org.redisson.codec.CompositeCodec 用来组合多种不同编码在一起

    codec

    Codec

    public interface Codec {
    
      	//返回用于HMAP Redis结构中哈希映射值的对象解码器
        Decoder<Object> getMapValueDecoder();
    
      	//返回用于HMAP Redis结构中哈希映射值的对象编码器
        Encoder getMapValueEncoder();
    
      	//返回用于HMAP Redis结构中哈希映射键的对象解码器
        Decoder<Object> getMapKeyDecoder();
    
      	//返回用于HMAP Redis结构中哈希映射键的对象编码器
        Encoder getMapKeyEncoder();
    
        //返回用于除HMAP之外的任何存储Redis结构的对象解码器
        Decoder<Object> getValueDecoder();
    
        //返回用于除HMAP之外的任何存储Redis结构的对象编码器
        Encoder getValueEncoder();
    
        //返回用于加载解码过程中使用的类的类加载器对象
        ClassLoader getClassLoader();
    
    }
    
    

    BaseCodec

    org.redisson.client.codec.BaseCodec

    BaseCodec

    1. HashMap的键值对的编解码的处理类使用普通的对象编解码处理类进行分解。

      //返回用于除HMAP之外的任何存储Redis结构的对象解码器
          Decoder<Object> getValueDecoder();
      
      //返回用于除HMAP之外的任何存储Redis结构的对象编码器
          Encoder getValueEncoder();
      

    SerializationCodec

    org.redisson.codec.SerializationCodec

    Decoder

    SerializationCodec-decoder

    Encoder

    SerializationCodec-encoder

  • 相关阅读:
    NOIP2015 斗地主
    BZOJ 2120: 数颜色
    BZOJ 1014: [JSOI2008]火星人prefix
    BZOJ 4665: 小w的喜糖
    BZOJ 3665: maths
    BZOJ 3270: 博物馆
    BZOJ 1419: Red is good
    【转】二分图的最大匹配
    POJ 3026 Borg Maze(Prim+BFS建邻接矩阵)
    POJ 2485 Highway(Prim+邻接矩阵)
  • 原文地址:https://www.cnblogs.com/sanshengshui/p/12669011.html
Copyright © 2011-2022 走看看