zoukankan      html  css  js  c++  java
  • curator监听zk临时节点实现信息中心服务的监控

    我们使用curator建立连接,curator有session维护,重试机制,对递归创建节点和删除节点有较好的支持:

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client =
                    CuratorFrameworkFactory.builder()
                            .connectString("10.10.210.123:2181")
                            .sessionTimeoutMs(50000000)
                            .connectionTimeoutMs(50000000)
                            .retryPolicy(retryPolicy)
                            .build();
    client.start();//start以后就可以使用Curator操作zk了,用完了以后不要忘记释放调用close方法。

    常规代码可以先判断是否存在后,创建消息通讯的根节点:

     Stat s = client.checkExists().forPath("/msg_node_list");
                if (s == null) {
                    String o = client.create().withMode(CreateMode.PERSISTENT).forPath("/msg_node_list");
                }

    针对我们的每一个netty服务,根据自己的服务id向msg_node_list注册临时节点:

    client.create().withMode(CreateMode.EPHEMERAL).forPath("/msg_node_list/b");

    而我们的监控中心,负责给客户端分发具体的netty链接服务器(固定10个节点进行hash分发,服务端向kafka中选择topic依然根据用户id与10按位&计算获得具体的partition,因为partiton和netty机器编号一致),则需要轮询监听每一个临时子节点的状态,并且通过watch监听异常下线节点:

    List<String> list = client.getChildren().forPath("/msg_node_list");
    //遍历list中的每一个临时子节点后调用如下代码,参数为list中取出的临时节点名称:
    client.getChildren().usingWatcher(w).forPath("/msg_node_list/b");
     Watcher w = new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                Event.EventType type = watchedEvent.getType();
                if (type.equals(Event.EventType.NodeDeleted)) {
                    System.out.println(watchedEvent.getPath());
                }
            }
        };

    对于节点监听处理,可以使用curator的高级策略:

       1) NodeCache: 对一个节点进行监听,监听事件包括指定的路径节点的增、删、改的操作。

       2) PathChildrenCache: 对指定的路径节点的一级子目录进行监听,不对该节点的操作进行监听,对其子目录的节点进行增、删、改的操作监听

       3) TreeCache:  可以将指定的路径节点作为根节点(祖先节点),对其所有的子节点操作进行监听,呈现树形目录的监听,可以设置监听深度,最大监听深度为2147483647(int类型的最大值)。

      PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/msg_server_list", false);//监听msg_server_list路径下的所有节点
                pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);//异步初始化
                pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
                    @Override
                    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                        PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
                        if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                            System.out.println(pathChildrenCacheEvent.getData().getPath());
                        }
                    }
                });

    =========================================================================================================

    如果客户端连接的netty服务异常下线,则在次访问监控服务器获取可用的netty地址。

    redis中维护着一个list列表,每个连接到netty服务器对应的客户端id,保存在一个redis的set中(也可以使用redis的bit按位置存储,然后用BITCOUNT统计总数

    监控服务器在给客户端分配netty服务器时,会从zk中拉取可用的netty服务列表,并根据redis中netty服务对应的BITCount取出最小值,返回给客户端,如果客户端发现分配的netty服务不可用,则带着该netty服务的id继续请求监控服务,监控服务会从zk中再次拉取netty服务列表,并将不可用的netty服务器id去除后,在根据服务器列表中对应的BITCount选出最小值并返回客户端。

    ps:进入正常流程后,消息通信通过redis的list实现,lpush插入待办信息,rpop取出信息。

    =================================================使用kafka重构========================================================

    每个netty服务器,监控一个kafka partition。然后每个客户端直接连接到netty以后,只要客户端不异常下线,它的channel就会被保存在一个Map中,key为客户端首次发送过来的id。

    当从kafka中取出数据时会带有id,根据id判断channel是否还存在于map中,如果客户端已经下线则会在对应是事件中将该channel从map删除,如果存在则将消息解析出来以后通过channel.writeAndFlush推送给客户端。

    每个netty服务对应一个partition。所有的消息放入一个topic中,当客户端被分配了具体的某个netty服务以后,服务器以同样的hash算法向具体partition中推入数据

    如果netty服务消费kafka的指定partition后,没有发现该channel连接上来则,将消息直接丢弃掉  “因为消息中心,只负责客户端在线时的消息推送,客户端下线后重新启动将会从服务中获取,此处与消息中心无关”*

    (1)服务端:只要kafka不宕机,服务端调用永远没有问题,消息发布端会将消息发送到所有的partition,保证客户端故障切换时能够消费到数据,监控中心会根据redis中保存的连接客户端的多少做出排序,找出可用的并持有最少客户端连接的netty服务分配给客户端。

    (2)客户端:如果客户端的网络异常情况下,不发送运维短信通知,如果客户端正常,还是连不上服务端的netty则发送短信通知运维,同时去监控中心重新获取其它可用的netty服务,只要服务端netty恢复正常运行则马上可以对客户端提供服务。

    关键代码如下:

    public Map<String, Channel> channelMap = new ConcurrentHashMap<>();
    
    @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            //如果该客户端刚刚连接上来,则带有连接标识。将其存入Map中,这里仍然只是示例,实际不应该写在这个方法中,而是写在channelActive方法中
            .......
            channelMap.put(msg, ctx.channel());
            .......
        } 
    
    
     //这里给出获取消息后的示例代码,用queue来模拟kafka,当然仅供参考。
     @Override
        public void run() {
            for (; ; ) {
                try {//模拟消息格式为,“用户id,实际消息内容”,再声明这里仅仅是演示,实际的消息格式需要严谨定义
                    String o = linkedBlockingQueue.take();
                    String msg[] = o.split("\,");
                    Channel channel = channelMap.get(msg[0]);
                    if (channel != null) {
                        channel.writeAndFlush(msg[1]);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
  • 相关阅读:
    ubuntu18+k8s单机版+kuboard+harbor安装笔记
    Apache commons StringSubstitutor 替换占位符
    Kafka消费与心跳机制
    本地机器如何访问服务器上的docker容器内的tensorboard?
    Pytorch cuDNN error: CUDNN_STATUS_NOT_SUPPORTED.解决办法
    Docker常用方法总结
    SpringBoot
    新版chrome中非https无法打开摄像头
    DDIA----笔记(不定时更新)
    Windows 无法验证此设备所需的驱动程序的数字签名。最近的硬件或软件更改安装的文件可能未正确签名或已损坏,或者可能是来自未知来源的恶意软件。 (代码 52)
  • 原文地址:https://www.cnblogs.com/zzq-include/p/12124898.html
Copyright © 2011-2022 走看看