zoukankan      html  css  js  c++  java
  • zookeeper curator处理会话过期session expired

    本文介绍在使用curator框架的时候如何handle session expire。

    1、什么是zookeeper的会话过期?

     一般来说,我们使用zookeeper是集群形式,如下图,client和zookeeper集群(3个实例)建立一个会话session。

     在这个会话session当中,client其实是随机与其中一个zk provider建立的链接,并且互发心跳heartbeat。zk集群负责管理这个session,并且在所有的provider上维护这个session的信息,包括这个session中定义的临时数据和监视点watcher。

    如果再网络不佳或者zk集群中某一台provider挂掉的情况下,有可能出现connection loss的情况,例如client和zk provider1连接断开,这时候client不需要任何的操作(zookeeper api已经给我们做好了),只需要等待client与其他provider重新连接即可。这个过程可能导致两个结果:

    1)在session timeout之内连接成功

    这个时候client成功切换到连接另一个provider例如是provider2,由于zk在所有的provider上同步了session相关的数据,此时可以认为无缝迁移了。

    2)在session timeout之内没有重新连接

    这就是session expire的情况,这时候zookeeper集群会任务会话已经结束,并清除和这个session有关的所有数据,包括临时节点和注册的监视点Watcher。

    在session超时之后,如果client重新连接上了zookeeper集群,很不幸,zookeeper会发出session expired异常,且不会重建session,也就是不会重建临时数据和watcher。

    2、如何使用curator实现session expired异常的捕获和处理?

    1)首先我们先创建一个链接

    这里设置了重试策略retryPolicy和会话超时时间sessionTimeoutMs,并打开链接。

    public void init() {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
            client = CuratorFrameworkFactory.builder().connectString(zookeeperServer).retryPolicy(retryPolicy)
                    .sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).build();
            client.start();
        }

    2)客户端注册

    public void register() {
            try {
                String rootPath = "/services";
                String hostAddress = InetAddress.getLocalHost().getHostAddress();
                String serviceInstance = "/prometheus" + "-" +  hostAddress + "-";
                String path = rootPath + serviceInstance;
                SessionConnectionListener sessionConnectionListener = new SessionConnectionListener(path, "");
                client.getConnectionStateListenable().addListener(sessionConnectionListener);
                client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
            } catch (Exception e) {
                logger.error("注册出错", e);
            }
        }

     这里我们创建了一个临时有序节点node,这个节点将会在session expired触发的时候被自动删除。当session又重新恢复的时候,client只会收到session expired异常和不会自动将临时节点添加到zookeeper中。

    为了解决这个问题,我们增加了一个监听器,

    client.getConnectionStateListenable().addListener(sessionConnectionListener)

    这个监听器监听session expired事件,并且在事件发生的时候进行处理,监听器处理的流程如下。

    注意:这个监听器注册是可以复用的,即如果多次session expired,不用重复注册监听器。

    3、监听器sessionConnectionListener

    package com.xiaoju.dqa.prometheus.client.zookeeper;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.state.ConnectionState;
    import org.apache.curator.framework.state.ConnectionStateListener;
    import org.apache.zookeeper.CreateMode;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class SessionConnectionListener implements ConnectionStateListener {
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        private String path;
        private String data;
    
        public SessionConnectionListener(String path, String data) {
            this.path = path;
            this.data = data;
        }
    
        @Override
        public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState){
            if(connectionState == ConnectionState.LOST){
                logger.error("[负载均衡失败]zk session超时");
                while(true){
                    try {
                        if(curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut()){
                            curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, data.getBytes("UTF-8"));
                            logger.info("[负载均衡修复]重连zk成功");
                            break;
                        }
                    } catch (InterruptedException e) {
                        break;
                    } catch (Exception e){
    
                    }
                }
            }
        }
    }

     这里的ConnectionState.LOST等同于session expired事件,对这个事件的处理是,在一个死循环中重试链接zk,知道链接成功才退出循环。

    需要注意的是:一旦重新创建了会话,那么之前会话的所有观察点都会失效,需要重新初始化观察点。

  • 相关阅读:
    HDFS datanode源码分析
    hive udaf开发入门和运行过程详解
    hive原生和复合类型的数据加载和使用
    tomcat部署web应用(转)
    HDFS namenode源码分析
    HDFS dfsclient写文件过程 源码分析
    hive中UDTF编写和使用(转)
    HDFS dfsclient读文件过程 源码分析
    MapReduce源码分析总结(转)
    DataRabbit 轻量的数据访问框架(09) -- IDataSchemaAccesser
  • 原文地址:https://www.cnblogs.com/kangoroo/p/7538314.html
Copyright © 2011-2022 走看看