zoukankan      html  css  js  c++  java
  • (九)分布式服务----Zookeeper注册中心

     ==>>点击查看本系列文章目录

    首先看一下几种注册中心:

    最老的就是Zookeeper了, 比较新的有Eureka,Consul 都可以做注册中心。可以自行搜索对比三者的优缺点。

    Zookeeper 最开始就是hadoop大家族中的一员,用于做协调的框架,后来已经是apache的子项目了。

    几年前大数据很火的时候,只要学hadoop必学zookeeper,当然还有其他成员。

    大数据简单说就是分布式,比如分布式文件存储hdfs,分布式数据库hbase,分布式协调zookeeper,还有kafka,Flume等等都是hadoop大家族。

    zookeeper,现在更多被用来做注册中心,比如阿里的开源SOA框架dubbo就经常搭配zookeeper做注册中心。

    Eureka:java的微服务框架Spring Cloud中内部已经集成了Eureka注册中心。

    我选择zookeeper,不是因为他比另外两个强,而是因为我几年前就已经学习过一些zookeeper的原理,上手更容易。网络上学习书籍、资料、视频教程也特别多,学习资料完善。

    注册中心的基本功能:

    1. 注册服务,有点类似DNS,所有的服务注册到注册中心,包含服务的地址等信息。

    2. 服务订阅,客户端请求服务,注册中心就要把那些能用的服务器地址告诉客户端,服务端有变动时,注册中心也能及时通知到客户端。

    3. 性能好且高可用,注册中心自身也是一个集群,如果只有一个注册中心机器的话那岂不是把注册中心累死啊,而且他一旦坏了以后,那客户端都找不到服务器了。所有注册中心就有很多台,其中只有一个老大(leader),老大用来写,小弟用来读。就是说老大来决定一台服务器能不能注册进来,小弟负责帮助客户端查找服务器。因为注册服务的次数是很少的,通常有新服务器加入才需要注册,但是客户端订阅那就很多了,所以注册中心只有一个leader。leader万一坏掉的话,会从小弟中选举出一个来当老大接替工作。

    上面提到说zookeeper集群,就是说有很多台机器做zookeeper机器,但是这些机器里存储的东西基本上都是一样的,就是说客户端不管连到哪个zookeeper 都是一样的,能做服务订阅。

    每一个zookeeper 中都有很多节点(Znode)。

    接下来说的zookeeper节点和集群完全不是一回事。 有些人喜欢吧集群中的每一台zookeeper机器称为一个节点,但是这个节点(zookeeper机器)和我说的节点(Znode)完全不是一回事。

    如下图:

     本例的图中可以看到,一共有5台机器,每台机器都有5个znode,Znode下面的子节点就更多了。

    先看5台机器:

    一台leader,老大,上文已经介绍,服务都从这些注册写入。

    两台follower,小弟,平时用于服务订阅,老大挂掉以后,follower内部就会自行选出老大。

    两台observer,观察者,就是属于无业游民,只能看,没有选老大的资格,不能参与竞选也不能投票,唯一的功能就是服务订阅。

      observer模式需要手动开启,为什么会出现observer呢,是因为机器太多的话,每个机器都有选举权的话特别影响性能。全中国14亿人口,每个人都参与国家竞选的话,效率极低。所以呢,选举的工作就交给follower完成就行了,只需要确保一直都有leader接班人就好。

    再看看zookeeper有什么基本功能:

    基本功能很简单,组合以后却可以完成各种复杂工作。

    1. 可以创建:临时节点(断开连接时便删除节点) 和 持久化节点(必须手动删除节点)。

    2. 可以创建:无序节点 和 有序节点。

    3. 节点上可以添加watcher监听功能,监听该节点的增删改,然后触发自定义的事件。

    看看这些功能怎么用:

    1. 节点: 每次注册一个服务就创建一个节点,节点的名称(Key)就是服务的名称,服务的详细信息存储在节点value中,客户端通过key找到对应的节点,再找打节点中的value。

    2. 临时节点:服务端注册一个服务时创建一个临时节点,服务断开时,临时节点自动销毁,自动完成服务注销。

    3. watcher监听: 客户端在注册中心订阅了一个服务的时候,同时在这个服务所在的节点上加一个监听事件,每当服务节点信息有变化的时候,注册中心会自动回调通知客户端。

    4. 有序临时节点:分布式锁或者分布式队列(这里与服务注册无关),客户端1想要操作一条数据的时候,在A节点下创建一个有序临时节点,自动分配编号001;客户端1也要操作该数据的时候,在A节点下也创建一个有序临时节点,自动分配编号002。只有编号最小的子节点才会被执行,因此001节点会被执行,客户端1执行完毕后,自动删除001节点,此时002编号为最小子节点。即锁的概念,不能同时操作同一数据;也可以做队列,按照先后顺序依次执行。

    5. 有序临时节点+watcher监听: 上面第4条中说到每次执行编号最小的节点,因此需要有一个程序,每次都需要遍历全部节点,然后找出最小的节点,假如是002节点,这时客户端2开始执行。但是添加监听机制以后就不一样了,002监听001,003监听比他小一号的002,这样001销毁的同时通知002开始执行,002销毁的时候通知003开始执行,不需要遍历最小节点,也能有序依次执行。

    6. 临时节点+watcher监听: 集群master选举以及高可用。比如hadoop集群,也有一个resourcemanager资源管理器,负责调度其它节点机器,相当于hadoop集群的leader节点。这个leader就可以交由zookeeper管理,所有的hadoop机器同时在zookeeper中创建一个同名的临时节点,由于是同名互斥的节点,因此只有一个节点能被创建,成功创建这个节点的hadoop机器就是leader。同时添加Watcher监听,这个leader只要断开连接,临时节点自动销毁,触发监听,其它hadoop开始新一轮的master选举。这也是zookeeper最初在hadoop家族中的重要使命。

    7....... 还要很多地方都能用zookeeper,简直无所不能,而且自身也是高可用,高性能,牛x

    zookeeper本身的操作还是很简单的,无非就是节点的增删改查,可以选择要创建节点的类型,还有就是在节点上添加watcher监听器。就这些。

    文件结构:

    上代码:

    zookeeper客户端管理类:

    public class ZookeeperClientProvider
        {
            private ConfigInfo _config;
            private readonly ILogger<ZookeeperClientProvider> _logger;
            private readonly Dictionary<string, ZooKeeper> _zookeeperClients = new Dictionary<string, ZooKeeper>();
    
            public ZookeeperClientProvider(ConfigInfo config, ILogger<ZookeeperClientProvider> logger)
            {
                _config = config;
                _logger = logger;
            }
    
            public async Task<ZooKeeper> GetZooKeeper()
            {
                return await CreateZooKeeper(_config.Addresses.FirstOrDefault());
            }
            public async Task<ZooKeeper> CreateZooKeeper(string address)
            {
                if (!_zookeeperClients.TryGetValue(address, out ZooKeeper result))
                {
                    await Task.Run(() =>
                    {
                        result = new ZooKeeper(address, (int)_config.SessionTimeout.TotalMilliseconds,
                            new ReconnectionWatcher(
                                async () =>
                                {
                                    if (_zookeeperClients.Remove(address, out ZooKeeper value))
                                    {
                                        await value.closeAsync();
                                    }
                                    await CreateZooKeeper(address);
                                }));
                        _zookeeperClients.TryAdd(address, result);
                    });
                }
                return result;
            }
    
            public async Task<IEnumerable<ZooKeeper>> GetZooKeepers()
            {
                var result = new List<ZooKeeper>();
                foreach (var address in _config.Addresses)
                {
                    result.Add(await CreateZooKeeper(address));
                }
                return result;
            }
        }
    ZookeeperClientProvider

    zookeeper服务注册类:

    /// <summary>
        /// 一个抽象的服务路由发现者。
        /// </summary>
        public interface IServiceRouteManager
        {
    
            /// <summary>
            /// 服务路由被创建。
            /// </summary>
            event EventHandler<ServiceRouteEventArgs> Created;
    
            /// <summary>
            /// 服务路由被删除。
            /// </summary>
            event EventHandler<ServiceRouteEventArgs> Removed;
    
            /// <summary>
            /// 服务路由被修改。
            /// </summary>
            event EventHandler<ServiceRouteChangedEventArgs> Changed;
    
            /// <summary>
            /// 获取所有可用的服务路由信息。
            /// </summary>
            /// <returns>服务路由集合。</returns>
            Task<IEnumerable<ServiceRoute>> GetRoutesAsync();
    
            /// <summary>
            /// 设置服务路由。
            /// </summary>
            /// <param name="routes">服务路由集合。</param>
            /// <returns>一个任务。</returns>
            Task SetRoutesAsync(IEnumerable<ServiceRoute> routes);
    
            /// <summary>
            /// 移除地址列表
            /// </summary>
            /// <param name="routes">地址列表。</param>
            /// <returns>一个任务。</returns>
            Task RemveAddressAsync(IEnumerable<string> Address);
            /// <summary>
            /// 清空所有的服务路由。
            /// </summary>
            /// <returns>一个任务。</returns>
            Task ClearAsync();
        }
    
        /// <summary>
        /// 服务路由事件参数。
        /// </summary>
        public class ServiceRouteEventArgs
        {
            public ServiceRouteEventArgs(ServiceRoute route)
            {
                Route = route;
            }
    
            /// <summary>
            /// 服务路由信息。
            /// </summary>
            public ServiceRoute Route { get; private set; }
        }
    
        /// <summary>
        /// 服务路由变更事件参数。
        /// </summary>
        public class ServiceRouteChangedEventArgs : ServiceRouteEventArgs
        {
            public ServiceRouteChangedEventArgs(ServiceRoute route, ServiceRoute oldRoute) : base(route)
            {
                OldRoute = oldRoute;
            }
    
            /// <summary>
            /// 旧的服务路由信息。
            /// </summary>
            public ServiceRoute OldRoute { get; set; }
        }
    IServiceRouteManager
    public class ZooKeeperServiceRouteManager : IServiceRouteManager, IDisposable
        {
            private readonly ConfigInfo _configInfo;
            private readonly ISerializer<byte[]> _serializer;
            private readonly ILogger<ZooKeeperServiceRouteManager> _logger;
            private ServiceRoute[] _routes;
            private readonly ZookeeperClientProvider _zookeeperClientProvider;
    
            public ZooKeeperServiceRouteManager(ConfigInfo configInfo, ISerializer<byte[]> serializer,
                ISerializer<string> stringSerializer,
                ILogger<ZooKeeperServiceRouteManager> logger,
                ZookeeperClientProvider zookeeperClientProvider)
            {
                _configInfo = configInfo;
                _serializer = serializer;
                _logger = logger;
                _zookeeperClientProvider = zookeeperClientProvider;
                EnterRoutes().Wait();
            }
    
            private EventHandler<ServiceRouteEventArgs> _created;
            private EventHandler<ServiceRouteEventArgs> _removed;
            private EventHandler<ServiceRouteChangedEventArgs> _changed;
    
            /// <summary>
            /// 服务路由被创建。
            /// </summary>
            public event EventHandler<ServiceRouteEventArgs> Created
            {
                add { _created += value; }
                remove { _created -= value; }
            }
    
            /// <summary>
            /// 服务路由被删除。
            /// </summary>
            public event EventHandler<ServiceRouteEventArgs> Removed
            {
                add { _removed += value; }
                remove { _removed -= value; }
            }
    
            /// <summary>
            /// 服务路由被修改。
            /// </summary>
            public event EventHandler<ServiceRouteChangedEventArgs> Changed
            {
                add { _changed += value; }
                remove { _changed -= value; }
            }
    
    
    
            protected void OnCreated(params ServiceRouteEventArgs[] args)
            {
                if (_created == null)
                    return;
    
                foreach (var arg in args)
                    _created(this, arg);
            }
    
            protected void OnChanged(params ServiceRouteChangedEventArgs[] args)
            {
                if (_changed == null)
                    return;
    
                foreach (var arg in args)
                    _changed(this, arg);
            }
    
            protected void OnRemoved(params ServiceRouteEventArgs[] args)
            {
                if (_removed == null)
                    return;
    
                foreach (var arg in args)
                    _removed(this, arg);
            }
    
    
            /// <summary>
            /// 获取所有可用的服务路由信息。
            /// </summary>
            /// <returns>服务路由集合。</returns>
            public async Task<IEnumerable<ServiceRoute>> GetRoutesAsync()
            {
                await EnterRoutes();
                return _routes;
            }
    
            /// <summary>
            /// 清空所有的服务路由。
            /// </summary>
            /// <returns>一个任务。</returns>
            public async Task ClearAsync()
            {
                if (_logger.IsEnabled(LogLevel.Information))
                    _logger.LogInformation("准备清空所有路由配置。");
                var zooKeepers = await _zookeeperClientProvider.GetZooKeepers();
                foreach (var zooKeeper in zooKeepers)
                {
                    var path = _configInfo.RoutePath;
                    var childrens = path.Split(new[] { '/' }, StringSplitOptions.RemoveEmptyEntries);
    
                    var index = 0;
                    while (childrens.Count() > 1)
                    {
                        var nodePath = "/" + string.Join("/", childrens);
    
                        if (await zooKeeper.existsAsync(nodePath) != null)
                        {
                            var result = await zooKeeper.getChildrenAsync(nodePath);
                            if (result?.Children != null)
                            {
                                foreach (var child in result.Children)
                                {
                                    var childPath = $"{nodePath}/{child}";
                                    if (_logger.IsEnabled(LogLevel.Debug))
                                        _logger.LogDebug($"准备删除:{childPath}。");
                                    await zooKeeper.deleteAsync(childPath);
                                }
                            }
                            if (_logger.IsEnabled(LogLevel.Debug))
                                _logger.LogDebug($"准备删除:{nodePath}。");
                            await zooKeeper.deleteAsync(nodePath);
                        }
                        index++;
                        childrens = childrens.Take(childrens.Length - index).ToArray();
                    }
                    if (_logger.IsEnabled(LogLevel.Information))
                        _logger.LogInformation("路由配置清空完成。");
                }
            }
    
            /// <summary>
            /// 设置服务路由。
            /// </summary>
            /// <param name="routes">服务路由集合。</param>
            /// <returns>一个任务。</returns>
            public async Task SetRoutesAsync(IEnumerable<ServiceRoute> routes)
            {
                var hostAddr = NetUtils.GetHostAddress();
                var serviceRoutes = await GetRoutes(routes.Select(p => p.serviceRouteDescriptor.Id));
                if (serviceRoutes.Count() > 0)
                {
                    foreach (var route in routes)
                    {
                        var serviceRoute = serviceRoutes.Where(p => p.serviceRouteDescriptor.Id == route.serviceRouteDescriptor.Id).FirstOrDefault();
                        if (serviceRoute != null)
                        {
                            var addresses = serviceRoute.Address.Concat(
                              route.Address.Except(serviceRoute.Address)).ToList();
    
                            foreach (var address in route.Address)
                            {
                                addresses.Remove(addresses.Where(p => p.ToString() == address.ToString()).FirstOrDefault());
                                addresses.Add(address);
                            }
                            route.Address = addresses;
                        }
                    }
                }
                await RemoveExceptRoutesAsync(routes, hostAddr);
    
                if (_logger.IsEnabled(LogLevel.Information))
                    _logger.LogInformation("准备添加服务路由。");
                var zooKeepers = await _zookeeperClientProvider.GetZooKeepers();
                foreach (var zooKeeper in zooKeepers)
                {
                    await CreateSubdirectory(zooKeeper, _configInfo.RoutePath);
    
                    var path = _configInfo.RoutePath;
                    if (!path.EndsWith("/"))
                        path += "/";
    
                    routes = routes.ToArray();
    
                    foreach (var serviceRoute in routes)
                    {
                        var nodePath = $"{path}{serviceRoute.serviceRouteDescriptor.Id}";
                        var nodeData = _serializer.Serialize(serviceRoute);
                        if (await zooKeeper.existsAsync(nodePath) == null)
                        {
                            if (_logger.IsEnabled(LogLevel.Debug))
                                _logger.LogDebug($"节点:{nodePath}不存在将进行创建。");
    
                            await zooKeeper.createAsync(nodePath, nodeData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                        }
                        else
                        {
                            if (_logger.IsEnabled(LogLevel.Debug))
                                _logger.LogDebug($"将更新节点:{nodePath}的数据。");
    
                            var onlineData = (await zooKeeper.getDataAsync(nodePath)).Data;
                            if (!DataEquals(nodeData, onlineData))
                                await zooKeeper.setDataAsync(nodePath, nodeData);
                        }
                    }
                    if (_logger.IsEnabled(LogLevel.Information))
                        _logger.LogInformation("服务路由添加成功。");
                }
            }
    
            public async Task RemveAddressAsync(IEnumerable<string> Address)
            {
                var routes = await GetRoutesAsync();
                foreach (var route in routes)
                {
                    route.Address = route.Address.Except(Address);
                }
                await SetRoutesAsync(routes);
            }
    
            private async Task RemoveExceptRoutesAsync(IEnumerable<ServiceRoute> routes, string hostAddr)
            {
                var path = _configInfo.RoutePath;
                if (!path.EndsWith("/"))
                    path += "/";
                routes = routes.ToArray();
                var zooKeepers = await _zookeeperClientProvider.GetZooKeepers();
                foreach (var zooKeeper in zooKeepers)
                {
                    if (_routes != null)
                    {
                        var oldRouteIds = _routes.Select(i => i.serviceRouteDescriptor.Id).ToArray();
                        var newRouteIds = routes.Select(i => i.serviceRouteDescriptor.Id).ToArray();
                        var deletedRouteIds = oldRouteIds.Except(newRouteIds).ToArray();
                        foreach (var deletedRouteId in deletedRouteIds)
                        {
                            var addresses = _routes.Where(p => p.serviceRouteDescriptor.Id == deletedRouteId).Select(p => p.Address).FirstOrDefault();
                            if (addresses.Contains(hostAddr))
                            {
                                var nodePath = $"{path}{deletedRouteId}";
                                await zooKeeper.deleteAsync(nodePath);
                            }
                        }
                    }
                }
            }
    
            private async Task CreateSubdirectory(ZooKeeper zooKeeper, string path)
            {
                if (await zooKeeper.existsAsync(path) != null)
                    return;
    
                if (_logger.IsEnabled(LogLevel.Information))
                    _logger.LogInformation($"节点{path}不存在,将进行创建。");
    
                var childrens = path.Split(new[] { '/' }, StringSplitOptions.RemoveEmptyEntries);
                var nodePath = "/";
    
                foreach (var children in childrens)
                {
                    nodePath += children;
                    if (await zooKeeper.existsAsync(nodePath) == null)
                    {
                        await zooKeeper.createAsync(nodePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    }
                    nodePath += "/";
                }
            }
    
            private async Task<ServiceRoute> GetRoute(byte[] data)
            {
                if (_logger.IsEnabled(LogLevel.Debug))
                    _logger.LogDebug($"准备转换服务路由,配置内容:{Encoding.UTF8.GetString(data)}。");
    
                if (data == null)
                    return null;
    
                return await Task.Run(() =>
                {
                    return _serializer.Deserialize<ServiceRoute>(data);
                });
            }
    
            private async Task<ServiceRoute> GetRoute(string path)
            {
                ServiceRoute result = null;
                var zooKeeper = await GetZooKeeper();
                var watcher = new NodeMonitorWatcher(GetZooKeeper(), path,
                     async (oldData, newData) => await NodeChange(oldData, newData));
                if (await zooKeeper.existsAsync(path) != null)
                {
                    var data = (await zooKeeper.getDataAsync(path, watcher)).Data;
                    watcher.SetCurrentData(data);
                    result = await GetRoute(data);
                }
                return result;
            }
    
            private async Task<ServiceRoute[]> GetRoutes(IEnumerable<string> childrens)
            {
                var rootPath = _configInfo.RoutePath;
                if (!rootPath.EndsWith("/"))
                    rootPath += "/";
    
                childrens = childrens.ToArray();
                var routes = new List<ServiceRoute>(childrens.Count());
    
                foreach (var children in childrens)
                {
                    if (_logger.IsEnabled(LogLevel.Debug))
                        _logger.LogDebug($"准备从节点:{children}中获取路由信息。");
    
                    var nodePath = $"{rootPath}{children}";
                    var route = await GetRoute(nodePath);
                    if (route != null)
                        routes.Add(route);
                }
    
                return routes.ToArray();
            }
    
            private async Task EnterRoutes()
            {
                if (_routes != null)
                    return;
                var zooKeeper = await GetZooKeeper();
                var watcher = new ChildrenMonitorWatcher(GetZooKeeper(), _configInfo.RoutePath,
                 async (oldChildrens, newChildrens) => await ChildrenChange(oldChildrens, newChildrens));
                if (await zooKeeper.existsAsync(_configInfo.RoutePath, watcher) != null)
                {
                    var result = await zooKeeper.getChildrenAsync(_configInfo.RoutePath, watcher);
                    var childrens = result.Children.ToArray();
                    watcher.SetCurrentData(childrens);
                    _routes = await GetRoutes(childrens);
                }
                else
                {
                    if (_logger.IsEnabled(LogLevel.Warning))
                        _logger.LogWarning($"无法获取路由信息,因为节点:{_configInfo.RoutePath},不存在。");
                    _routes = new ServiceRoute[0];
                }
            }
    
            private static bool DataEquals(IReadOnlyList<byte> data1, IReadOnlyList<byte> data2)
            {
                if (data1.Count != data2.Count)
                    return false;
                for (var i = 0; i < data1.Count; i++)
                {
                    var b1 = data1[i];
                    var b2 = data2[i];
                    if (b1 != b2)
                        return false;
                }
                return true;
            }
    
            public async Task NodeChange(byte[] oldData, byte[] newData)
            {
                if (DataEquals(oldData, newData))
                    return;
    
                var newRoute = await GetRoute(newData);
                //得到旧的路由。
                var oldRoute = _routes.FirstOrDefault(i => i.serviceRouteDescriptor.Id == newRoute.serviceRouteDescriptor.Id);
    
                lock (_routes)
                {
                    //删除旧路由,并添加上新的路由。
                    _routes =
                        _routes
                            .Where(i => i.serviceRouteDescriptor.Id != newRoute.serviceRouteDescriptor.Id)
                            .Concat(new[] { newRoute }).ToArray();
                }
    
                //触发路由变更事件。
                OnChanged(new ServiceRouteChangedEventArgs(newRoute, oldRoute));
            }
    
            public async Task ChildrenChange(string[] oldChildrens, string[] newChildrens)
            {
                if (_logger.IsEnabled(LogLevel.Debug))
                    _logger.LogDebug($"最新的节点信息:{string.Join(",", newChildrens)}");
    
                if (_logger.IsEnabled(LogLevel.Debug))
                    _logger.LogDebug($"旧的节点信息:{string.Join(",", oldChildrens)}");
    
                //计算出已被删除的节点。
                var deletedChildrens = oldChildrens.Except(newChildrens).ToArray();
                //计算出新增的节点。
                var createdChildrens = newChildrens.Except(oldChildrens).ToArray();
    
                if (_logger.IsEnabled(LogLevel.Debug))
                    _logger.LogDebug($"需要被删除的路由节点:{string.Join(",", deletedChildrens)}");
                if (_logger.IsEnabled(LogLevel.Debug))
                    _logger.LogDebug($"需要被添加的路由节点:{string.Join(",", createdChildrens)}");
    
                //获取新增的路由信息。
                var newRoutes = (await GetRoutes(createdChildrens)).ToArray();
    
                var routes = _routes.ToArray();
                lock (_routes)
                {
                    _routes = _routes
                        //删除无效的节点路由。
                        .Where(i => !deletedChildrens.Contains(i.serviceRouteDescriptor.Id))
                        //连接上新的路由。
                        .Concat(newRoutes)
                        .ToArray();
                }
                //需要删除的路由集合。
                var deletedRoutes = routes.Where(i => deletedChildrens.Contains(i.serviceRouteDescriptor.Id)).ToArray();
                //触发删除事件。
                OnRemoved(deletedRoutes.Select(route => new ServiceRouteEventArgs(route)).ToArray());
    
                //触发路由被创建事件。
                OnCreated(newRoutes.Select(route => new ServiceRouteEventArgs(route)).ToArray());
    
                if (_logger.IsEnabled(LogLevel.Information))
                    _logger.LogInformation("路由数据更新成功。");
            }
    
    
            /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
            public void Dispose()
            {
            }
    
            private async Task<ZooKeeper> GetZooKeeper()
            {
                return await _zookeeperClientProvider.GetZooKeeper();
            }
    
        }
    ZooKeeperServiceRouteManager

    zookeeper连接配置类:

    public class ConfigInfo
        {
            /// <summary>
            /// 初始化会话超时为20秒的Zookeeper配置信息。
            /// </summary>
            /// <param name="connectionString">连接字符串。</param>
            /// <param name="routePath">路由配置路径。</param>
            /// <param name="subscriberPath">订阅者配置路径</param>
            /// <param name="commandPath">服务命令配置路径</param>
            /// <param name="cachePath">缓存中心配置路径</param>
            /// <param name="mqttRoutePath">mqtt路由配置路径</param>
            /// <param name="chRoot">根节点。</param>
            public ConfigInfo(string connectionString, string routePath = "/services/serviceRoutes",
                string subscriberPath = "/services/serviceSubscribers",
                string commandPath = "/services/serviceCommands",
                string cachePath = "/services/serviceCaches",
                string mqttRoutePath = "/services/mqttServiceRoutes",
                string chRoot = null,
                bool reloadOnChange = false, bool enableChildrenMonitor = false) : this(connectionString,
                    TimeSpan.FromSeconds(20),
                    routePath,
                    subscriberPath,
                    commandPath,
                    cachePath,
                    mqttRoutePath,
                    chRoot,
                    reloadOnChange, enableChildrenMonitor)
            {
            }
    
            /// <summary>
            /// 初始化Zookeeper配置信息。
            /// </summary>
            /// <param name="connectionString">连接字符串。</param>
            /// <param name="routePath">路由配置路径。</param>
            /// <param name="commandPath">服务命令配置路径</param>
            /// <param name="subscriberPath">订阅者配置路径</param>
            /// <param name="sessionTimeout">会话超时时间。</param>
            /// <param name="cachePath">缓存中心配置路径</param>
            /// <param name="mqttRoutePath">mqtt路由配置路径</param>
            /// <param name="chRoot">根节点。</param>
            public ConfigInfo(string connectionString, TimeSpan sessionTimeout, string routePath = "/services/serviceRoutes",
                string subscriberPath = "/services/serviceSubscribers",
                string commandPath = "/services/serviceCommands",
                string cachePath = "/services/serviceCaches",
                string mqttRoutePath = "/services/mqttServiceRoutes",
                string chRoot = null,
                bool reloadOnChange = false, bool enableChildrenMonitor = false)
            {
                CachePath = cachePath;
                ReloadOnChange = reloadOnChange;
                ChRoot = chRoot;
                CommandPath = commandPath;
                SubscriberPath = subscriberPath;
                ConnectionString = connectionString;
                RoutePath = routePath;
                SessionTimeout = sessionTimeout;
                MqttRoutePath = mqttRoutePath;
                EnableChildrenMonitor = enableChildrenMonitor;
                Addresses = connectionString?.Split(",");
            }
    
            public bool EnableChildrenMonitor { get; set; }
    
            public bool ReloadOnChange { get; set; }
    
            /// <summary>
            /// 连接字符串。
            /// </summary>
            public string ConnectionString { get; set; }
    
            /// <summary>
            /// 命令配置路径
            /// </summary>
            public string CommandPath { get; set; }
    
            /// <summary>
            /// 路由配置路径。
            /// </summary>
            public string RoutePath { get; set; }
    
            /// <summary>
            /// 订阅者配置路径
            /// </summary>
            public string SubscriberPath { get; set; }
    
            /// <summary>
            /// 会话超时时间。
            /// </summary>
            public TimeSpan SessionTimeout { get; set; }
    
            /// <summary>
            /// 根节点。
            /// </summary>
            public string ChRoot { get; set; }
    
    
            public IEnumerable<string> Addresses { get; set; }
    
            /// <summary>
            /// 缓存中心配置中心
            /// </summary>
            public string CachePath { get; set; }
    
    
            /// <summary>
            /// Mqtt路由配置路径。
            /// </summary>
            public string MqttRoutePath { get; set; }
        }
    ConfigInfo

    路由和路由描述:

    public class ServiceRoute
        {
            /// <summary>
            /// 服务可用地址。
            /// </summary>
            public IEnumerable<string> Address { get; set; }
            /// <summary>
            /// 服务描述符。
            /// </summary>
            public ServiceRouteDescriptor serviceRouteDescriptor { get; set; }
    
            #region Equality members
    
            /// <summary>Determines whether the specified object is equal to the current object.</summary>
            /// <returns>true if the specified object  is equal to the current object; otherwise, false.</returns>
            /// <param name="obj">The object to compare with the current object. </param>
            public override bool Equals(object obj)
            {
                var model = obj as ServiceRoute;
                if (model == null)
                    return false;
    
                if (obj.GetType() != GetType())
                    return false;
    
                if (model.serviceRouteDescriptor != serviceRouteDescriptor)
                    return false;
    
                return model.Address.Count() == Address.Count() && model.Address.All(addressModel => Address.Contains(addressModel));
            }
    
            /// <summary>Serves as the default hash function. </summary>
            /// <returns>A hash code for the current object.</returns>
            public override int GetHashCode()
            {
                return ToString().GetHashCode();
            }
    
            public static bool operator ==(ServiceRoute model1, ServiceRoute model2)
            {
                return Equals(model1, model2);
            }
    
            public static bool operator !=(ServiceRoute model1, ServiceRoute model2)
            {
                return !Equals(model1, model2);
            }
    
            #endregion Equality members
        }
    ServiceRoute
    /// <summary>
        /// 服务描述符。
        /// </summary>
        [Serializable]
        public class ServiceRouteDescriptor
        {
            /// <summary>
            /// 初始化一个新的服务描述符。
            /// </summary>
            public ServiceRouteDescriptor()
            {
                Metadatas = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
            }
    
            /// <summary>
            /// 服务Id。
            /// </summary>
            public string Id { get; set; }
    
            /// <summary>
            /// 访问的令牌
            /// </summary>
            public string Token { get; set; }
    
            /// <summary>
            /// 路由
            /// </summary>
            public string RoutePath { get; set; }
    
            /// <summary>
            /// 元数据。
            /// </summary> 
            public IDictionary<string, object> Metadatas { get; set; }
    
            /// <summary>
            /// 获取一个元数据。
            /// </summary>
            /// <typeparam name="T">元数据类型。</typeparam>
            /// <param name="name">元数据名称。</param>
            /// <param name="def">如果指定名称的元数据不存在则返回这个参数。</param>
            /// <returns>元数据值。</returns>
            public T GetMetadata<T>(string name, T def = default(T))
            {
                if (!Metadatas.ContainsKey(name))
                    return def;
    
                return (T)Metadatas[name];
            }
    
            #region Equality members
    
            /// <summary>Determines whether the specified object is equal to the current object.</summary>
            /// <returns>true if the specified object  is equal to the current object; otherwise, false.</returns>
            /// <param name="obj">The object to compare with the current object. </param>
            public override bool Equals(object obj)
            {
                var model = obj as ServiceRouteDescriptor;
                if (model == null)
                    return false;
    
                if (obj.GetType() != GetType())
                    return false;
    
                if (model.Id != Id)
                    return false;
    
                return model.Metadatas.Count == Metadatas.Count && model.Metadatas.All(metadata =>
                {
                    object value;
                    if (!Metadatas.TryGetValue(metadata.Key, out value))
                        return false;
    
                    if (metadata.Value == null && value == null)
                        return true;
                    if (metadata.Value == null || value == null)
                        return false;
    
                    return metadata.Value.Equals(value);
                });
            }
    
            /// <summary>Serves as the default hash function. </summary>
            /// <returns>A hash code for the current object.</returns>
            public override int GetHashCode()
            {
                return ToString().GetHashCode();
            }
    
            public static bool operator ==(ServiceRouteDescriptor model1, ServiceRouteDescriptor model2)
            {
                return Equals(model1, model2);
            }
    
            public static bool operator !=(ServiceRouteDescriptor model1, ServiceRouteDescriptor model2)
            {
                return !Equals(model1, model2);
            }
    
            #endregion Equality members
        }
    ServiceRouteDescriptor

    Watcher监听器:

    子节点监听器:

    internal class ChildrenMonitorWatcher : Watcher
        {
            private readonly Task<ZooKeeper> _zooKeeperCall;
            private readonly string _path;
            private readonly Action<string[], string[]> _action;
            private string[] _currentData = new string[0];
    
            public ChildrenMonitorWatcher(Task<ZooKeeper> zooKeeperCall, string path, Action<string[], string[]> action)
            {
                _zooKeeperCall = zooKeeperCall;
                _path = path;
                _action = action;
            }
    
            public ChildrenMonitorWatcher SetCurrentData(string[] currentData)
            {
                _currentData = currentData ?? new string[0];
    
                return this;
            }
    
            #region Overrides of WatcherBase
    
            public override async Task process(WatchedEvent watchedEvent)
            {
                if (watchedEvent.getState() != Event.KeeperState.SyncConnected || watchedEvent.getPath() != _path)
                    return;
                var zooKeeper = await _zooKeeperCall;
                //Func<ChildrenMonitorWatcher> getWatcher = () => new ChildrenMonitorWatcher(_zooKeeperCall, path, _action);
                Task<ChildrenMonitorWatcher> getWatcher =  Task.Run(() => {return new ChildrenMonitorWatcher(_zooKeeperCall, _path, _action); });
                switch (watchedEvent.get_Type())
                {
                    //创建之后开始监视下面的子节点情况。
                    case Event.EventType.NodeCreated:
                        await zooKeeper.getChildrenAsync(_path, await getWatcher);
                        break;
    
                    //子节点修改则继续监控子节点信息并通知客户端数据变更。
                    case Event.EventType.NodeChildrenChanged:
                        try
                        {
                            var watcher = await getWatcher;
                            var result = await zooKeeper.getChildrenAsync(_path, watcher);
                            var childrens = result.Children.ToArray();
                            _action(_currentData, childrens);
                            watcher.SetCurrentData(childrens);
                        }
                        catch (KeeperException.NoNodeException)
                        {
                            _action(_currentData, new string[0]);
                        }
                        break;
    
                    //删除之后开始监控自身节点,并通知客户端数据被清空。
                    case Event.EventType.NodeDeleted:
                        {
                            var watcher = await getWatcher;
                            await zooKeeper.existsAsync(_path, watcher);
                            _action(_currentData, new string[0]);
                            watcher.SetCurrentData(new string[0]);
                        }
                        break;
                }
            }
            #endregion Overrides of WatcherBase
        }
    ChildrenMonitorWatcher

    当前节点监听器:

    internal class NodeMonitorWatcher : Watcher
        {
            private readonly Task<ZooKeeper> _zooKeeperCall;
            private readonly string _path;
            private readonly Action<byte[], byte[]> _action;
            private byte[] _currentData;
    
            public NodeMonitorWatcher(Task<ZooKeeper> zooKeeperCall, string path, Action<byte[], byte[]> action)
            {
                _zooKeeperCall = zooKeeperCall;
                _path = path;
                _action = action;
            }
    
            public NodeMonitorWatcher SetCurrentData(byte[] currentData)
            {
                _currentData = currentData;
    
                return this;
            }
    
            #region Overrides of WatcherBase
    
            public override async Task process(WatchedEvent watchedEvent)
            {
                switch (watchedEvent.get_Type())
                {
                    case Event.EventType.NodeDataChanged:
                        var zooKeeper = await _zooKeeperCall;
                        var watcher = new NodeMonitorWatcher(_zooKeeperCall, _path, _action);
                        var data = await zooKeeper.getDataAsync(_path, watcher);
                        var newData = data.Data;
                        _action(_currentData, newData);
                        watcher.SetCurrentData(newData);
                        break;
                }
            }
    
            #endregion Overrides of WatcherBase
        }
    NodeMonitorWatcher

    连接断开监听器:

    internal class ReconnectionWatcher : Watcher
        {
            private readonly Action _reconnection;
    
            public ReconnectionWatcher(Action reconnection)
            {
                _reconnection = reconnection;
            }
    
            #region Overrides of Watcher
    
            /// <summary>Processes the specified event.</summary>
            /// <param name="watchedEvent">The event.</param>
            /// <returns></returns>
            public override async Task process(WatchedEvent watchedEvent)
            {
                var state = watchedEvent.getState();
                switch (state)
                {
                    case Event.KeeperState.Expired:
                    case Event.KeeperState.Disconnected:
                        {
                            _reconnection();
                            break;
                        }
                }
                await Task.CompletedTask;
            }
    
            #endregion Overrides of Watcher
        }
    ReconnectionWatcher
  • 相关阅读:
    MSSQL安全审计文件执行Rootkit-WarSQLKit
    组建自己的局域网服务器
    python openpyxl表格样式设置
    ssh 和 scp 命令访问非默认22端口。
    set | grep IFS
    Python: check if key exists in dictionary (6 Ways)
    在线Jinja2解析器
    FastApi教程|测试WebSockets
    QT Qstring的用法
    QT UI拖拽方法
  • 原文地址:https://www.cnblogs.com/hongwei918/p/11617770.html
Copyright © 2011-2022 走看看