zoukankan      html  css  js  c++  java
  • 假如 Redis Cluster 模式用在 T-io 上

    前言

      前几天在学习Redis Cluster 模式的时候,突然想到如果把它的集群模式应用在T-io上也是挺有意思的一件事情。

    Redis 集群简介

       Redis Cluster 中有 N 台实例,每个实例负责部分 Slot,总共有 16384 个Slot,然后客户端连接的时候,需要根据操作的Key计算出所在的Slot和服务实例地址,然后直接执行或者返回MOVE命令等。实例之间的元数据更新使用Gossip协议。简单一张图了解一下:

    Tio集群

       我设计的很简单,就是一个多实例集群,没有主从关系,集群之间通过 伪Gossip(因为我也不知道咋实现,┭┮﹏┭┮)协议通讯。我的设计思路是这样的。

    从图中可以看出,每个实例会和集群中的其他的某几个实例相连,通过信息扩散的方式达到最终集群的完整性。比如A只知道B的存在,但是它最终会根据B知道C的存在。或者C 根据B知道A的存在。

    实现原理

    每个实例在启动之后会加载集群的其他某几个节点。然后上线之后通过 ClientChannelContext 与另外的集群节点进行通讯。模仿Redis设计了如下几个命令:

    MEET

    在一个实例上线之后,会发送 MEET 命令给其他已知实例。其他实例收到命令之后回复PONG命令。

    PING

    实例会定期在实例列表中选择若干实例发送PING命令,对方实例回复PONG命令,交换信息

    PONG

    在接收到 MEET,PING命令之后回复PONG命令

    FAIL

    在创建 ClientChannelContext对象之后,可以通过ClientAioListener获取连接回调,如果连接失败,那么实例将向其他在线实例发送FAIL命令,当本节点收到的FAILE命令数大于节点总数的一半时,认为该节点确实已经宕掉了,那么在执行UNAVAILABLE命令

    UNAVAILABLE

    如果说FAIL命令是病危通知书,那么 UNAVAILABLE 就是 确认死亡。

    代码设计实现

    TioClusterServer

    继承自 TioServer,新增start方法,增加其他集群业务,例如初始化集群上下文信息

    ClusterServerTioConfig

    继承自 ServerTioConfig,为了约束客户端传参类型。

    TioClusterNode

    集群节点,包含 IP,Port,创建时间,状态 等信息,同时负责创建自己的ClientChannelContext

    TioClusterContext

    集群上下文,负责集群节点的创建,集群数据更新工作,同时负责调度各个节点的事件处理。

    ClusterCommandExecutor

    集群命令发送执行器,负责将各个命令发送出去

    ClusterAioHandler,ClusterAioListener,ClusterServerAioHandler,ClusterServerAioListener

    这几个类大家就很熟悉了,就是用于编解码的处理类。

    集群启动流程

    使用Gossip协议有个缺点,集群上线之后,可能会有很大的延迟发现。实例越多,延迟越大。更何况我自己实现的伪 Gossip 协议,暂且不提。

    代码解析

    在集群上下文中,维护了一个节点的集合,节点的增删改查都是通过它来实现。

      /**
         * 当前集群的所有节点
         */
        private ConcurrentHashMap<String, TioClusterNode> clusterNodes = new ConcurrentHashMap<>(10);
    

    初始化集群

    单例,集群实例中只有一个大管家就是它,全局可用。

     /**
         * 初始化集群
         */
        public TioClusterContext(ClusterCommandExecutor clusterCommandExecutor) {
            if (tioClusterContext == null) {
                synchronized (this) {
                    if (tioClusterContext == null) {
                        this.startTime = SystemTimer.currentTimeMillis();
                        this.clusterCommandExecutor = clusterCommandExecutor;
                        this.clusterProperty = TioClusterProperty.getClusterProperty();
    
                        initCurrentNode();
                        initClusterNodes();
                        tioClusterContext = this;
                    }
                }
            }
        }
    

    通知节点选择

    根据配置文件选择要发送消息的N个节点,原则如下,距离上次更新时间超过配置文件中的超时时间的节点优先。然后轮询选择下一个节点,防止某个节点可能较长时间未被选中(也可以通过时上次更新时间排序解决),达到配置文件的节点数,停止。

     public List<TioClusterNode> selectNotifyNodes(boolean containsCurrent,boolean containsUnavailable) {
            List<TioClusterNode> nodes = new LinkedList<>();
            TioClusterNode[] nodesInArray = getClusterNodesInArray(containsCurrent);
            //根据时间间隔查找下一个节点
            for (TioClusterNode node : nodesInArray) {
                if ((SystemTimer.currentTimeMillis() - node.getUpdateTime()) >= clusterProperty.getNotifyTimeInterval()
                        && nodes.size() < clusterProperty.getNotifyNodesMaxCount()
                        && (containsCurrent || !isCurrentNode(node))
                        && !nodes.contains(node)
                        && (containsUnavailable || !node.isUnAvailable())) {
                    nodes.add(node);
                }
            }
    
            int maxCount = nodesInArray.length;
            int lastCount = (clusterProperty.getNotifyNodesMaxCount() > maxCount ? maxCount : clusterProperty.getNotifyNodesMaxCount()) - nodes.size();
    
            int tryMaxCount = lastCount;
            int tryCount = 0;
            //很容易出现死循环,做好边界
            while (lastCount > 0 && tryCount < tryMaxCount) {
                TioClusterNode nextNode = RoundRibbonNodeSelector.nextPingNode(nodesInArray);
                if (nextNode != null && !nodes.contains(nextNode)) {
                    if (containsUnavailable || !nextNode.isUnAvailable()) {
                        nodes.add(nextNode);
                    }
                    lastCount--;
                }
                tryCount++;
            }
            return nodes;
        }
    

    命令发送

    发送命令很简单,就是调用 Tio 的Send方法。

     private void send(TioClusterNode node, Consumer<ChannelContext> consumer){
            ClientChannelContext channelContext = node.getClientChannelContext();
            if (node.connectionActivated()) {
               consumer.accept(channelContext);
            }
        }
    	
    	 @Override
        public void meet(TioClusterNode other) {
            send(other, channelContext -> Tio.send(channelContext, ClusterPacketBuilder.buildMeetPacket()));
        }
    

    命令编码

    由于格式固定,所以命令的内容就不直接用 JSON 格式传输,根据命令的传输内容不同,分别组装不同的命令。

      public ByteBuffer build( final ClusterPacket clusterPacket) {
            this.clusterPacket = clusterPacket;
            byte[] body = body();
            ByteBuffer buffer = ByteBuffer.allocate(bufferLength());
            //command
            buffer.put(clusterPacket.command().getType());
            //fromServerLength
            buffer.putShort(fromServerLength());
            //bodyLength
            buffer.putInt(body.length);
            //fromServer
            buffer.put(fromServerBytes);
            buffer.put(body);
            return buffer;
        }
    	//此方法由具体的命令类实现,每个命令所传输的内容不同,格式也不同
    	protected abstract byte[] body();
    

    AioHandler 解耦

    由于我们在内部处理集群消息的时候需要用到AioHandler,所以这个是避免不了的,但是用户又有自己的消息要处理,所以,我又增加了一层,将ClusterServerAioHandler作为抽象类,增加若干抽象方法,然后具体的用户消息的处理放到新增的抽象方法里,其他类同理,不在阐述。

    public abstract class ClusterServerAioHandler implements ServerAioHandler {
    
        private static final Logger logger = LoggerFactory.getLogger(ClusterServerAioHandler.class);
    
        /**
         * 用户自定义解码
         */
        public abstract Packet clusterDecode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException;
    
        /**
         * 用户自定义编码
         */
        public abstract ByteBuffer clusterEncode(Packet packet, TioConfig tioConfig, ChannelContext channelContext);
    
        /**
         * 用户自定义消息处理
         */
        public abstract void clusterHandler(Packet packet, String packetJsonString, ChannelContext channelContext) throws Exception;
    
        /**
         * 处理集群消息解码
         * */
        @Override
        public final Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {
            Packet clusterPacket = ClusterPacketDecoder.decode(buffer, limit, position, readableLength, channelContext);
            if (clusterPacket != null) {
                return clusterPacket;
            }
            //reset buffer because cluster has already read a byte for  cluster command
            buffer.position(position);
            return clusterDecode(buffer, limit, position, readableLength, channelContext);
        }
    
        /**
         * 处理集群消息编码
         * */
        @Override
        public final ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
            ByteBuffer buffer = ClusterPacketEncoder.encode(packet, tioConfig, channelContext);
            if (buffer != null) {
                return buffer;
            }
    		//当集群解析的Buffer为NULL时候,说明是非集群内部消息,交给用户自定义处理
            return clusterEncode(packet, tioConfig, channelContext);
        }
    
        /**
         *
         * 处理集群消息*/
        @Override
        public final void handler(Packet packet, ChannelContext channelContext) throws Exception {
           //这里留空,后边讲解
        }
    }
    

    集群消息的转发

    暂时还没有将 slot 的概念派上用场,先用了普通的取模方式。此方法是根据业务主键获取所在节点的方法。比如 ID为1 的用户连接的服务节点可能为 127.0.0.1:7002,ID为2的用户连接的服务节点为127.0.0.1:7001.至于细节可以屏蔽,比如通过NGINX让用户无感知到底连接了哪台服务器。

     /**
         * 根据 hashCode 获取该值所绑定的节点,如果集群机器增加或者删除可能会导致节点错误,发消息失败(暂不考虑)
         * */
        public static TioClusterNode getClusterServerNode(Object object) {
            int hashCode = object.hashCode();
            //获取集群活跃节点
            List<TioClusterNode> nodes = TioClusterContext.currentContext().getActivateNodes();
            int length = nodes.size();
            //取模
            int index = hashCode % length;
            return nodes.get(index);
        }
    

    发送消息

     private static boolean sendToUserInternal(TioConfig tioConfig,String userId, Packet packet) {
            TioClusterNode node = getClusterServerNode(userId);
            //如果是本机,直接发送到本机即可
            if (TioClusterContext.currentContext().currentServer().equals(node) && tioConfig != null) {
                return Tio.sendToUser(tioConfig, userId, packet);
            } else {
                //这里需要包装成为ClusterPacket,因为集群内部的消息是通过ClusterPacket实现的
                ClusterPacket userPacket = ClusterPacketBuilder.buildUserPacket(packet);
                return Tio.send(node.getClientChannelContext(), userPacket);
            }
        }
    

    上文中的代码很简单,如果获取到的节点是本机节点,那么很幸运,直接执行发送即可。否则,需要将Packet包装到ClusterPacket中,因为集群内部的消息都是通过它来流通的。新增 USER 命令。它的包编码很简单,转JSON。

        @Override
        protected byte[] body() {
            Packet userPacket = ((ClusterUserPacket) clusterPacket).getPacketBody().getUserPacket();
            String json = Json.toJson(userPacket);
            return SafeStringEncoder.stringToBytes(json);
        }
    

    在上文中留空的地方代码如下:

     @Override
        public final void handler(Packet packet, ChannelContext channelContext) throws Exception {
            //第一种情况,内部消息转发
            if (packet instanceof ClusterPacket) {
                ClusterPacket clusterPacket = (ClusterPacket) packet;
                logger.info("[{}] RECEIVED [{}] FROM [{}] ", TioClusterContext.currentContext().currentServer().serverName(),clusterPacket.command().getCmd() ,clusterPacket.getPacketBody().getFromNode().serverName());
                //当命令为USER时,说明此消息是从其他节点过来的用户消息
                if (clusterPacket.command() == ClusterCommand.USER) {
                    ClusterUserPacket clusterUserPacket = (ClusterUserPacket) packet;
                    //由于框架并不知道用户使用了具体的Packet类型,所以,这里提供了 JSON  和 对象的两种传递方式,在这里  getuserPakcet 是为NULL的。
                    clusterHandler(clusterUserPacket.getPacketBody().getUserPacket(), clusterUserPacket.getPacketBody().getUserPacketString(), channelContext);
                } else {
                    ClusterObjectFactory.getPacketHandler(clusterPacket.command()).handle(clusterPacket, channelContext);
                }
            } else {
                //第二种情况,直接走用户消息处理
                clusterHandler(packet, null, channelContext);
            }
        }
    

    测试环节

    • 1 编写 ServerAioHandler,ServerAioListener,正如上文所讲,需要继承ClusterServerAioHandler
    public class MyClusterServerAioHandler extends ClusterServerAioHandler {}
    public class MyClusterServerAioListener extends ClusterServerAioListener{}
    
    • 2 编写starter
      public static void main(String[] args) throws IOException {
            start();
        }
    
        private static void start() throws IOException {
    	   //这里要实例化 ClusterSververTioConfig
            tioConfig = new ClusterServerTioConfig(new MyClusterServerAioHandler(), new MyClusterServerAioListener());
            TioClusterServer tioServer = new TioClusterServer(tioConfig);
            tioServer.start();
        }
    
    • 3 编写配置文件
    tio.server.ip=127.0.0.1
    tio.server.port=7001
    tio.cluster.server.nodes=127.0.0.1:7002
    tio.cluster.server.notify.max.count=4
    tio.cluster.server.notify.interval=20000
    tio.cluster.server.notify.retry.count=1
    tio.cluster.server.notify.timeout=20000
    
    tio.server.ip=127.0.0.1
    tio.server.port=7002
    tio.cluster.server.nodes=127.0.0.1:7001
    tio.cluster.server.notify.max.count=4
    tio.cluster.server.notify.interval=20000
    tio.cluster.server.notify.retry.count=1
    tio.cluster.server.notify.timeout=20000
    
    • 4 启动服务,查看日志

      日志中可以看出,刚开始一个节点是连不通的,但是该节点上线之后发送了MEET命令。

      现在两个节点都上线了,集群目前正常。

    • 5 启动两个客户端,一个用户1,一个用户2 ,用户1 连接 7002,用户2 连接 7001

      public static void main(String[] args) throws Exception {
    
            String name = System.getenv("TIO-CLUSTER-FILE-NAME");
            Prop prop = new Prop(name + ".properties");
    
            ClientTioConfig config = new ClientTioConfig(new MyClientAioHandler(), new MyClientAioListener(), null);
            config.setHeartbeatTimeout(0);
            config.setName("testClient");
            TioClient tioClient = new TioClient(config);
            ClientChannelContext channelContext = tioClient.connect(new Node(prop.get("tio.server.ip"), prop.getInt("tio.server.port")), 20000);
    
            String userId = prop.get("tio.userid");
            Tio.send(channelContext, MyPacket.handshakePacket(userId));
    
            while (true) {
                Thread.sleep(5000);
                Tio.send(channelContext, MyPacket.helloPacket(prop.get("tio.touserid"), "hello there ,I'm user " + userId + ",I come from " + prop.get("tio.server.port")));
            }
        }
    


    总结

    本文通过参考Redis集群模式实现了一个简单的集群功能。开发过程中遇到的问题:

    • 集群节点的发现功能,Gossip协议的实现,需要优化,优化节点算法
    • 细节的处理,写着写着可能发现隐藏着死循环!!!
    • 并发的处理,在这方面由于能力原因处理较少,应该还有优化空间
    • 消息类的解耦和封装,还有很多优化空间

    此集群实现可能出现的问题:

    • 集群节点过多的时候,一个实例可能维护多个 client ,再加上各种命令的发送,会比较浪费系统资源。可以通过合理的设置超时时间,通知节点的个数等来优化,我觉得也可以采用 RocketMQ的NameServer的方式去实现

    • 思路虽然不算错误,但是个人感觉,依赖第三方组件也是一种稳妥的做法,Redis发布订阅他不香吗?,Zookeeper管理集群节点不好吗?

    • 其实这都不是问题,最主要的是享受思考和开发的过程,通常会几个小时最后发现是自己犯了一个小错误导致的。不过还好,结局还是在接受范围内。

    最后本文也花费了我接近两个小时的时间,分享出我的想法和开发历程。希望能给各位带来收获,另外也希望各位大牛提出更好的意见和方案。谢谢大家,回家喽~~

  • 相关阅读:
    [C#]获取指定文件文件名、后缀、所在目录等
    Mysql 存储引擎中InnoDB与Myisam的主要区别
    MySQL的btree索引和hash索引的区别
    Mysql事务的隔离级别
    AE序列号
    mysql索引类型说明
    去除url中自带的jsessionid
    redirect传值非url(springmvc3)
    ueditor的使用
    mysql用户管理(开户、权限)
  • 原文地址:https://www.cnblogs.com/panzi/p/11504832.html
Copyright © 2011-2022 走看看