zoukankan      html  css  js  c++  java
  • Lettuce在Spring boot中的使用方式

    Lettuce是一个可伸缩线程安全的Redis客户端。多个线程可以共享同一个RedisConnection.本文是基于Lettuce5,主要介绍的知识点如下:

    1. Lettuce在Spring Boot中的配置
    2. Lettuce的同步,异步,响应式使用方式
    3. 事件的订阅
    4. 发布自定义事件
    5. 读写分离
    6. 读写分离策略实现源码
    7. 客户端分片实现
    @Configuration
    public class LettuceConfig {
    
        /**
         * 配置客户端资源
         * @return
         */
        @Bean(destroyMethod = "shutdown")
        ClientResources clientResources() {
            return DefaultClientResources.builder().ioThreadPoolSize(8).computationThreadPoolSize(10).build();
        }
    
    
        /**
         * 配置Socket选项
         * keepAlive=true
         * tcpNoDelay=true
         * connectionTimeout=5秒
         * @return
         */
        @Bean
        SocketOptions socketOptions(){
           return SocketOptions.builder().keepAlive(true).tcpNoDelay(true).connectTimeout(Duration.ofSeconds(5)).build(); 
        }
        /**
         * 配置客户端选项
         * @return
         */
        @Bean
        ClientOptions clientOptions(SocketOptions socketOptions) {
            return ClientOptions.builder().socketOptions(socketOptions).build();
        }
    
        /**
         * 创建RedisClient
         * @param clientResources 客户端资源
         * @param clientOptions 客户端选项
         * @return 
         */
        @Bean(destroyMethod = "shutdown")
        RedisClient redisClient(ClientResources clientResources, ClientOptions clientOptions) {
            RedisURI uri = RedisURI.builder().withSentinel("xx.xx.xx.xx", 26009).withPassword("abcd1234").withSentinelMasterId("xxx").build();
            RedisClient client = RedisClient.create(clientResources, uri);
            client.setOptions(clientOptions);
            return client;
        }
    
        /**
         * 创建连接
         * @param redisClient
         * @return
         */
        @Bean(destroyMethod = "close")
        StatefulRedisConnection<String, String> connection(RedisClient redisClient) {
            return redisClient.connect();
        }
    }
    

      

     基本使用

    public Mono<ServerResponse> hello(ServerRequest request) throws  Exception {
        //响应式使用
        Mono<String> resp = redisConnection.reactive().get("gxt_new");
        //同步使用
        redisConnection.sync().get("test");
        redisConnection.async().get("test").get(5, TimeUnit.SECONDS);
        return ServerResponse.ok().body(resp, String.class);
    }
    

     客户端订阅事件

         客户端使用事件总线传输运行期间产生的事件;EventBus可以从客户端资源进行配置和获取,并用于客户端和自定义事件。  

    如下事件可以被客户端发送:

    • 连接事件
    • 测量事件
    • 集群拓扑事件

      

    client.getResources().eventBus().get().subscribe(e -> {
                System.out.println("client 订阅事件: " + e);
            });
    

      

    client 订阅事件: ConnectionActivatedEvent [/xx:49910 -> /xx:6008]
    client 订阅事件: ConnectionActivatedEvent [/xx:49911 -> /xx:6018]
    client 订阅事件: ConnectedEvent [/xx:49912 -> /xx:6018]

     发布事件

        发布使用也是通过使用eventBus进行发布事件,Event接口只是一个标签接口

     eventBus.publish(new Event() {
                @Override
                public String toString() {
                    return "自定义事件";
                }
            });
    

       订阅者就可以订阅到这个自定义事件了  

    client 订阅事件: 自定义事件
    

     读写分离 

    @Bean(destroyMethod = "close")
        StatefulRedisMasterSlaveConnection<String, String> statefulRedisMasterSlaveConnection(RedisClient redisClient, RedisURI redisURI) {
            StatefulRedisMasterSlaveConnection connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisURI);
            connection.setReadFrom(ReadFrom.NEAREST);
            return connection;
        }
    }
    

      StatefulRedisMasterSlaveConnection 支持读写分离,通过设置ReadFrom控制读是从哪个节点读取.

    参数 含义
    MASTER 从master节点读取
    SLAVE 从slave节点读取
    MASTER_PREFERRED
    从master节点读取,如果master节点不可以则从slave节点读取
    SLAVE_PREFERRED
    从slave节点读取,如果slave节点不可用则倒退到master节点读取
    NEAREST
    从最近到节点读取

    具体是如何实现到呢? 下面看一下MasterSlaveConnectionProvider相关源码

     //根据意图获取连接
        public StatefulRedisConnection<K, V> getConnection(Intent intent) {
    
            if (debugEnabled) {
                logger.debug("getConnection(" + intent + ")");
            }
            //如果readFrom不为null且是READ
            if (readFrom != null && intent == Intent.READ) {
                //根据readFrom配置从已知节点中选择可用节点描述
                List<RedisNodeDescription> selection = readFrom.select(new ReadFrom.Nodes() {
                    @Override
                    public List<RedisNodeDescription> getNodes() {
                        return knownNodes;
                    }
    
                    @Override
                    public Iterator<RedisNodeDescription> iterator() {
                        return knownNodes.iterator();
                    }
                });
                //如果可选择节点集合为空则抛出异常
                if (selection.isEmpty()) {
                    throw new RedisException(String.format("Cannot determine a node to read (Known nodes: %s) with setting %s",
                            knownNodes, readFrom));
                }
                try {
                    //遍历所有可用节点
                    for (RedisNodeDescription redisNodeDescription : selection) {
                        //获取节点连接
                        StatefulRedisConnection<K, V> readerCandidate = getConnection(redisNodeDescription);
                        //如果节点连接不是打开到连接则继续查找下一个连接
                        if (!readerCandidate.isOpen()) {
                            continue;
                        }
                        //返回可用连接
                        return readerCandidate;
                    }
                    //如果没有找到可用连接,默认返回第一个
                    return getConnection(selection.get(0));
                } catch (RuntimeException e) {
                    throw new RedisException(e);
                }
            }
            //如果没有配置readFrom或者不是READ 则返回master连接
            return getConnection(getMaster());
        }
    

     我们可以看到选择连接到逻辑是通用的,不同的处理就是在selection的处理上,下面看一下不同readFrom策略对于selection的处理

    ReadFromSlavePerferred和ReadFromMasterPerferred都是有优先级到概念,看看相关逻辑的处理

    static final class ReadFromSlavePreferred extends ReadFrom {
    
            @Override
            public List<RedisNodeDescription> select(Nodes nodes) {
    
                List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());
                //优先添加slave节点
                for (RedisNodeDescription node : nodes) {
                    if (node.getRole() == RedisInstance.Role.SLAVE) {
                        result.add(node);
                    }
                }
                //最后添加master节点
                for (RedisNodeDescription node : nodes) {
                    if (node.getRole() == RedisInstance.Role.MASTER) {
                        result.add(node);
                    }
                }
    
                return result;
            }
    

      

     static final class ReadFromMasterPreferred extends ReadFrom {
    
            @Override
            public List<RedisNodeDescription> select(Nodes nodes) {
    
                List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());
                //优先添加master节点
                for (RedisNodeDescription node : nodes) {
                    if (node.getRole() == RedisInstance.Role.MASTER) {
                        result.add(node);
                    }
                }
                //其次在添加slave节点
                for (RedisNodeDescription node : nodes) {
                    if (node.getRole() == RedisInstance.Role.SLAVE) {
                        result.add(node);
                    }
                }
    
                return result;
            }
        }
    

    对于ReadFromMaster和ReadFromSlave都是获取指定角色的节点  

     static final class ReadFromSlave extends ReadFrom {
    
            @Override
            public List<RedisNodeDescription> select(Nodes nodes) {
    
                List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());
                //只获取slave节点
                for (RedisNodeDescription node : nodes) {
                    if (node.getRole() == RedisInstance.Role.SLAVE) {
                        result.add(node);
                    }
                }
    
                return result;
            }
        }
    

      

    static final class ReadFromMaster extends ReadFrom {
    
            @Override
            public List<RedisNodeDescription> select(Nodes nodes) {
    
                for (RedisNodeDescription node : nodes) {
                    if (node.getRole() == RedisInstance.Role.MASTER) {
                        return LettuceLists.newList(node);
                    }
                }
    
                return Collections.emptyList();
            }
        }
    

      获取最近的节点这个就有点特殊了,它对已知对节点没有做处理,直接返回了它们的节点描述,也就是谁在前面就优先使用谁

    static final class ReadFromNearest extends ReadFrom {
    
            @Override
            public List<RedisNodeDescription> select(Nodes nodes) {
                return nodes.getNodes();
            }
        }
    

      在SentinelTopologyProvider中可以发现,获取nodes节点总是优先获取Master节点,其次是slave节点,这样Nearest效果就等效与MasterPreferred

    public List<RedisNodeDescription> getNodes() {
    
            logger.debug("lookup topology for masterId {}", masterId);
    
            try (StatefulRedisSentinelConnection<String, String> connection = redisClient.connectSentinel(CODEC, sentinelUri)) {
    
                RedisFuture<Map<String, String>> masterFuture = connection.async().master(masterId);
                RedisFuture<List<Map<String, String>>> slavesFuture = connection.async().slaves(masterId);
    
                List<RedisNodeDescription> result = new ArrayList<>();
                try {
                    Map<String, String> master = masterFuture.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
                    List<Map<String, String>> slaves = slavesFuture.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
                    //添加master节点    
                    result.add(toNode(master, RedisInstance.Role.MASTER));
                    //添加所有slave节点
                    result.addAll(slaves.stream().filter(SentinelTopologyProvider::isAvailable)
                            .map(map -> toNode(map, RedisInstance.Role.SLAVE)).collect(Collectors.toList()));
    
                } catch (ExecutionException | InterruptedException | TimeoutException e) {
                    throw new RedisException(e);
                }
    
                return result;
            }
        }
    

       自定义负载均衡  

           通过上文可以发现只需要实现 ReadFrom接口,就可以通过该接口实现Master,Slave负载均衡;下面的示例是通过将nodes节点进行打乱,进而实现

     @Bean(destroyMethod = "close")
        StatefulRedisMasterSlaveConnection<String, String> statefulRedisMasterSlaveConnection(RedisClient redisClient, RedisURI redisURI) {
            StatefulRedisMasterSlaveConnection connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisURI);
            connection.setReadFrom(new ReadFrom() {
                @Override
                public List<RedisNodeDescription> select(Nodes nodes) {
                    List<RedisNodeDescription> list = nodes.getNodes();
                    Collections.shuffle(list);
                    return list;
                }
            });
            return connection;
        }
    

      

       在大规模使用的时候会使用多组主备服务,可以通过客户端分片的方式将部分请求路由到指定的服务器上,但是Lettuce没有提供这样的支持,下面是自定义的实现:

    public class Sharded< C extends StatefulRedisConnection,V> {
    
        private TreeMap<Long, String> nodes;
        private final Hashing algo = Hashing.MURMUR_HASH;
        private final Map<String, StatefulRedisConnection> resources = new LinkedHashMap<>();
        private RedisClient redisClient;
        private String password;
        private Set<HostAndPort> sentinels;
        private RedisCodec<String, V> codec;
    
        public Sharded(List<String> masters, RedisClient redisClient, String password, Set<HostAndPort> sentinels, RedisCodec<String, V> codec) {
            this.redisClient = redisClient;
            this.password = password;
            this.sentinels = sentinels;
            this.codec = codec;
            initialize(masters);
        }
    
        private void initialize(List<String> masters) {
            nodes = new TreeMap<>();
    
            for (int i = 0; i != masters.size(); ++i) {
                final String master = masters.get(i);
                for (int n = 0; n < 160; n++) {
                    nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), master);
                }
                RedisURI.Builder builder = RedisURI.builder();
                for (HostAndPort hostAndPort : sentinels) {
                    builder.withSentinel(hostAndPort.getHostText(), hostAndPort.getPort());
                }
    
                RedisURI redisURI = builder.withPassword(password).withSentinelMasterId(master).build();
                resources.put(master, MasterSlave.connect(redisClient, codec, redisURI));
            }
    
        }
    
        public StatefulRedisConnection getConnectionBy(String key) {
            return resources.get(getShardInfo(SafeEncoder.encode(key)));
        }
    
        public Collection<StatefulRedisConnection> getAllConnection(){
            return Collections.unmodifiableCollection(resources.values());
        }
    
        public String getShardInfo(byte[] key) {
            SortedMap<Long, String> tail = nodes.tailMap(algo.hash(key));
            if (tail.isEmpty()) {
                return nodes.get(nodes.firstKey());
            }
            return tail.get(tail.firstKey());
        }
    
    
        public void close(){
           for(StatefulRedisConnection connection:  getAllConnection()){
                connection.close();
            }
        }
    
        private static  class SafeEncoder {
    
             static byte[] encode(final String str) {
                try {
                    if (str == null) {
                        throw new IllegalArgumentException("value sent to redis cannot be null");
                    }
                    return str.getBytes("UTF-8");
                } catch (UnsupportedEncodingException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        private interface Hashing {
            Hashing MURMUR_HASH = new MurmurHash();
    
            long hash(String key);
    
            long hash(byte[] key);
        }
    
    
        private static  class MurmurHash implements Hashing {
    
             static long hash64A(byte[] data, int seed) {
                return hash64A(ByteBuffer.wrap(data), seed);
            }
    
    
             static long hash64A(ByteBuffer buf, int seed) {
                ByteOrder byteOrder = buf.order();
                buf.order(ByteOrder.LITTLE_ENDIAN);
    
                long m = 0xc6a4a7935bd1e995L;
                int r = 47;
    
                long h = seed ^ (buf.remaining() * m);
    
                long k;
                while (buf.remaining() >= 8) {
                    k = buf.getLong();
    
                    k *= m;
                    k ^= k >>> r;
                    k *= m;
    
                    h ^= k;
                    h *= m;
                }
    
                if (buf.remaining() > 0) {
                    ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
                    // for big-endian version, do this first:
                    // finish.position(8-buf.remaining());
                    finish.put(buf).rewind();
                    h ^= finish.getLong();
                    h *= m;
                }
    
                h ^= h >>> r;
                h *= m;
                h ^= h >>> r;
    
                buf.order(byteOrder);
                return h;
            }
    
            public long hash(byte[] key) {
                return hash64A(key, 0x1234ABCD);
            }
    
            public long hash(String key) {
                return hash(SafeEncoder.encode(key));
            }
        }
    
    
    
    
    }
    

      

     @Bean(destroyMethod = "close")
        Sharded<StatefulRedisMasterSlaveConnection,String> sharded(RedisClient redisClient) {
    
            Set<HostAndPort> hostAndPorts=new HashSet<>();
            hostAndPorts.add(HostAndPort.parse("1xx:26009"));
            hostAndPorts.add(HostAndPort.parse("1xx:26009"));
    
    
            return new Sharded<>(Arrays.asList("te009","test68","test67"),redisClient,"password",hostAndPorts, new Utf8StringCodec());
        }
    

      使用方式

      //只从slave节点中读取
                StatefulRedisMasterSlaveConnection redisConnection = (StatefulRedisMasterSlaveConnection) sharded.getConnectionBy("key");
                //使用异步模式获取缓存值
                System.out.println(redisConnection.sync().get("key"));
    

      

  • 相关阅读:
    WPF线程中获取控件的值和给控件赋值
    sublime text3 安装以及主要插件安装
    云服务IaaS,PaaS,SaaS
    What is JSON
    Core Java
    英语单词及语义
    设置PyCharm创建文件时自动添加头文件
    【练习】字典的循环遍历:实现多层级节点存取
    字符串格式化
    常用数据类型的方法--str、int、list、dict
  • 原文地址:https://www.cnblogs.com/wei-zw/p/9147938.html
Copyright © 2011-2022 走看看