Overview
- zk是一个典型的发布/订阅模式的分布式数据管理与协调框架,开发人员可以使用它来进行分布式数据的发布与订阅。
- 另一方面,通过对zk中丰富的数据节点进行交叉使用,配合watcher事件通知机制,可以非常方便地构建一系列分布式应用中都会涉及的核心功能,如数据发布/订阅、负载均衡、命名服务、分布式协调通知、集群管理、Master选举、分布式锁和分布式队列等。
典型应用场景及实现
- zk是一个高可用的分布式数据管理与协调框架。基于对ZAB算法的实现,该框架能很好地保证分布式环境中数据的一致性。 --> 因为zk成为了解决分布式一致性问题的利器。
- 大量的分布式系统将zk作为核心组件使用,如Hadoop、HBase和Kafka等。
数据发布/订阅
- Publish/Subscribe系统,即所谓的配置中心,即是发布者将数据发布到zk的一个或一系列节点上,供订阅者进行数据订阅,从而达到动态获取数据的目的,实现配置信息的集中式管理和数据的动态更新。
- 发布/订阅系统,一般有两种设计模式:推Push模式和拉Pull模式。
- push模式中,服务器主动将数据更新发送给所有订阅的client;
- pull模式则是由client主动发送请求来获取最新的数据。
- zk采用的是推拉结合的方式:client向server注册自己需要关注的节点,一旦该节点的数据发生变更,那么server就会向相应的client发送Watcher事件通知,client接收到消息后,需要主动到server端获取最新的消息。
- 如果将配置信息存放到zk上进行集中管理,那么通常情况下,应用在启动的时候都会主动到zk server上进行一次配置信息的获取,同时在指定节点上注册一个Watcher监听,从而可以实时获取最新的配置信息。
应用实例
- 考虑这样一个需求:系统中需要使用一些通用的配置信息,例如机器列表信息等。这些配置信息通常:数据量较小、数据内容运行时动态变化、集群中个机器共享,配置一致。
- 对于这类配置,可以选择将其存储在本地配置文件或是内存变量中。
- 使用zk的解决方案:
- 配置存储:首先将初始化配置存储到zk上 --> 选取一个数据节点用于配置的存储,如/app1/database_config。并将配置信息写入该数据节点。
- 配置获取:集群中每台机器在启动初始化阶段,首先会从相应的zk节点上读取数据库信息,同时client向该节点注册一个数据变更的Watcher。
- 配置变更:在系统运行过程中,只需要对zk上节点的内容进行更新即可。zk会把数据变更的通知发送到各个client。
均衡负载
- Load Balance: 用来对多个计算机(计算机集群)、网络连接、CPU、磁盘驱动器或其他资源进行分配负载,以达到优化资源使用、最大化吞吐率、最小化响应时间和避免过载的目的。
- 分布式系统具有对等性,为了保证系统的高可用性,通常采用副本的方式来对数据和服务进行部署。对消费者而言,需要在这些对等的服务中,选择一个来执行相关的业务逻辑,其中最典型的就是DNS服务。
一种动态的DNS服务
- DNS即 Domain Name System,用于将域名和IP地址进行一一映射。
- 通常情况下,我们可以向域名注册服务商申请域名注册,但只能注册有限的域名。
- 因此,实际开发中,往往使用本地HOST绑定来实现域名解析工作。(本地HOST绑定:访问一个域名,一般是通过dns服务器得到该域名对应的IP地址。显然本地HOST绑定就是直接走本地的。)然而,该方法的问题在于,如果需要在应用的每台机器上去绑定域名,在机器规模相当庞大的情况下会相当不方便,无法保证实时性。现在介绍一种基于zk实现的动态DNS方案。即DDNS(Dynamic DNS)。
- 使用zk的解决方案:
- 域名配置:创建数据节点,如/DDNS/app1/server.app1.company1.com,称为域名节点。我们可以为每个app应用创建一个属于自己的数据节点作为域名配置的根节点,如/DDNS/app1。
配置实例:
# 单个IP:PORT 192.168.0.1:8080 # 多个IP:PORT 192.168.0.1:8080, 192.168.0.2:8080
- 域名解析:在传统的DNS解析中,我们不需要关心域名的解析过程(由操作系统的域名和IP地址映射机制(本地HOST绑定)或是专门的域名解析服务器)。而DDNS中,域名的解析过程都是每个应用自己负责的。通常应用都会首先从域名节点中获取一份IP地址和端口的配置,进行自行解析。同时,每个应用还会在域名节点上注册一个数据变更Watcher监听。
- 域名变更:只需要对指定的域名节点进行更新操作,接着zk向订阅的client发送事件通知,client接收通知后,会再次进行域名配置的获取。
- 域名配置:创建数据节点,如/DDNS/app1/server.app1.company1.com,称为域名节点。我们可以为每个app应用创建一个属于自己的数据节点作为域名配置的根节点,如/DDNS/app1。
- 以上,通过DDNS可以:
- 避免域名无线增长带来的集中式维护的成本;
- 在域名变更时,能够避免因逐台机器更新本地HOST而带来的繁琐工作。
自动化的DNS服务
- 上述DDNS存在一个问题:在域名变更过程中,仍需要手动的修改域名节点上的IP地址和端口。
- 为了避免这种手动更新,我们可以使用zk实现的更为自动化的DNS服务。
- 自动化的DNS服务系统主要是为了实现服务的自动化定位。
- 系统架构:
- 组件:
- Register cluster: 负责域名的动态注册
- Dispatcher cluster: 负责域名解析
- Scanner cluster: 负责检测以及维护服务状态(探测服务的可用性、屏蔽异常服务节点等)
- SDK提供各种语言的系统接入协议,提供服务注册以及查询接口
- Monitor负责收集服务信息以及对DDNS自身状态的监控
- Controller是一个后台管理的Console,负责授权管理、流量控制、静态配置服务和手动屏蔽服务等功能。运维人员可以在其上管理Register、Dispatcher和Scanner集群。
- 域名注册:针对服务提供者而言。每个服务provider在启动过程中,都会把自己的域名信息注册到Register集群中去。
- 服务provider提供SDK提供的API接口,将域名、IP地址和端口发送给Register集群。例如,A机器通过serviceA.xxx.com,于是它向Register发送一个域名到IP:PORT的映射。
- Register收到映射后,根据域名将信息写入相对应的zk域名节点。
- 域名解析:针对消费者而言。服务consumer在使用域名时,会向Dispatcher发送域名解析请求。Dispatcher读取指定域名节点,通过一定的策略选取其中一个返回给前端应用。
- 域名探测:域名探测是指DDNS系统需要对域名下所有注册的IP地址和端口可用性进行检测。
一般可用性检测的方式有两种:- server端主动发起心跳检测,该方式一般需要在server和client之间建立TCP长连接
- client端主动(定时)向Scanner进行状态汇报。
Scanner会记录每个service provider最近一次状态汇报时间,超过5s则认为该地址不可用,于是开始域名清理过程。
命名服务
- Name Service是分布式系统中最基本的公共服务之一。在分布式系统中,被命名的实体可以是集群中的机器、提供的服务地址或远程对象等。
- 较常见的场景就是一些分布式框架(如RPC、RMI)中的服务地址列表,通过使用命名服务,client可以根据指定名字来获取资源的实体、服务地址和提供者的信息等。
- zk提供的命名服务能够帮助应用系统通过一个资源引用的方式来实现对资源的定位与使用。广义上命名服务的资源定位都不是真正意义上的实体资源,而是在分布式环境中的一个全局唯一的名字,类似于数据库中的唯一主键。
- 说到全局ID,最常见的就是UUID(Universally Unique Identifier,通用唯一识别码),最典型的实现就是GUID(Globally Unique Identifier,全局唯一标识符)。
- UUID是指在一台机器上生成的数字,它保证对在同一时空中的所有机器都是唯一的。
- UUID的主要缺陷就在于生成的字符串过长。其次是含义不明,类似于"ef0f1357-f260-46ff-a32d-53a086c57ade"。
使用zk的解决方案
- 前面提到,可以调用zk节点创建的api创建一个顺序节点,并且在API返回值中会返回该节点的完整名字。
- 该方案利用了zk的一个特性:在zk中,每个数据节点都能够维护一份子节点的顺序序列,当client对其创建一个顺序子节点时zk会自动以后缀的形式在其子节点后添加一个序号。(关于顺序节点,之后会有详细介绍)。
分布式协调/通知
- 分布式协调/通知是将不同的分布式组件有机结合的关键所在。
- 对于一个在多台机器上部署运行的应用而言,通常需要一个协调者(coordinator)来控制整个系统的运行流程,例如分布式事务的处理、机器间的互相协调等。同时,引入coordinator便于将分布式协调的职责从应用中分离出来,从而可以大大减少系统之间的耦合性,而且能够显著提高系统的可扩展性。
- zk中特有的Watcher注册与异步通知机制,能够很好地实现分布式环境下不同机器,甚至是不同系统之间的协调与通知,从而实现对数据变更的实时处理。
- 基于zk实现分布式协调与通知功能,通常的做法是不同的client都对zk上的同一个数据节点进行watcher注册,监听数据节点的变化(包括数据节点本身及其子节点),如果数据节点发生变化,那么所有订阅的client都能接收到相应的watcher通知并做出相应的处理。
MySQL数据复制总线:Mysql_Relicator
- MySQL数据复制总线是一个实时数据复制框架,用于在不同的MySQL数据库实例之间进行异步数据复制和数据变化通知。
- 整个系统是一个由MySQL数据库集群、消息队列系统、任务管理监控平台以及zk集群等组件共同构成的一个包含数据生产者、复制管道和数据消费者等部分的数据总线系统。
- zk在该系统中负责进行一系列的分布式协调工作。
任务注册
- Core进程(实现数据复制的核心逻辑,将数据复制封装成管道,并抽象出生产者和消费者两个概念)启动的时候,首先会向/mysql_replicator/tasks节点(任务列表节点)注册任务。例如任务A(复制热门商品)会在启动是注册子节点/mysql_replicator/tasks/copy_hot_item。
任务热备份
- 为了应对复制任务故障或者复制任务所在主机故障,复制组件采用“热备份”的容灾方式。即将同一个复制任务部署在不同主机上,主备任务通过zk互相监测运行健康状况。
- 每台机器都需要在/mysql_replicator/tasks/copy_hot_item/instances节点上将自己的主机名注册上去。注意,这里注册的是临时节点。(临时顺序节点!)
- 在完成子节点的创建后,每台机器都可以获取到自己创建的节点的完成节点名以及所有子节点的列表,然后通过对比判断自己是否是所有子节点中序号最小的。序号最小的子节点会将自己运行状态设置为RUNNING,其余的任务机器则将自己设置为STANDBY。我们将这样的热备份策略称为“小序号优先”策略。
热备切换
- 完成运行状态标识后,任务的客户端机器就可以正常工作了,(RUNNING的client机器进行正常的数据复制,STANDBY机器则进入待命状态)。
- 所有STANDBY机器都需要在/mysql_replicator/tasks/copy_hot_item/instances节点上注册一个“子节点列表变更”的watcher监听,用来订阅所有任务执行机器的变化情况:一旦RUNNING机器宕机与zk断开连接后,对应的节点就会消失,STANDBY收到变更通知后开启新一轮的RUNNING选举。
记录执行状态
- 既然是热备份,那么RUNNING任务机器就需要将运行时的上下文状态保留给STANDBY机器。
- 在该场景中,最主要的上下文状态就是数据复制过程中的一些进度信息,因此需要将这些信息保存在zk上以便于共享。
- RUNNING机器会定时将信息写入某个zk数据节点。
控制台协调
-
在Mysql_Replicator中,server主要工作是进行任务的控制,通过zk来对不同任务进行控制与协调。
- server会将每个复制任务对应生产者的元数据,即库名、表名、用户名与密码等数据库信息以及消费者的相关信息,以配置的形式写入任务节点/mysql_replicator/tasks/copy_hot_item中去,以便该任务的所有任务机器都能共享该复制任务的配置。
一种通用的分布式系统机器间通信方式
- 在绝大部分的分布式系统中,系统机器间通信无外乎心跳检测、工作进度汇报和系统调度三种类型。
心跳检测
- 传统开发中,我们通常通过主机之间是否可以互相PING通来判断。更复杂的做法是在机器之间建立长连接,通过TCP连接固有的心跳检测机制来判断上层机器的心跳检测。
- 使用zk的解决方案:基于zk的临时节点特性,可以让不同的机器都在zk的一个指定节点下创建临时子节点,不同机器之间可以根据该临时节点来判断对应的client机器是否存活。
- 通过这种方式,检测系统和被检测系统之间不需要直接相关联,而是通过zk上的某个节点进行关联,大大减少了系统耦合。
工作进度汇报
- 在一个常见的任务分发系统中,通常任务呗分发到不同的机器上执行后,需要实时地将自己的任务执行进度汇报给分发系统。
- 使用zk的解决方案:在zk上选择一个节点,每个任务client都在这个节点下创建临时节点,这样:
- 可以通过判断临时节点是否存在来确定任务机器是否存活;
- 各个任务机器会实时地将自己的任务写到这个临时节点上,以便中心系统能实时获取到任务的执行进度。
系统调度
- 使用zk,能够实现另一种系统调度模式:一个分布式系统由控制台和一些客户端系统两部分组成,控制台的职责就是需要将一些指令信息发送给所有client,以控制他们进行相应的业务逻辑。
- 实现方式就是修改zk上某些节点的数据,而zk进一步把这些数据变更以事件通知的形式发送给对于的订阅客户端。
总之,使用zk来实现分布式系统机器间的通信,不仅能省去大量底层网络通信和协议涉及上重复的工作,更为重要的是大大降低了系统之间的耦合,能够非常方便地实现异构系统之间的灵活通信。
集群管理
- 集群管理包括集群监控和集群控制两大块,前者侧重对集群运行时状态的收集,后者则是对集群进行操作与控制。
- 一些典型需求:
- 集群中有多少机器在工作
- 对集群中每台机器的运行时状态进行数据收集
- 对集群中机器进行上下线操作
- 传统的解决方案:在每台机器上部署一个agent,由这个agent负责主动向指定的一个监控中心系统汇报自己所在机器的状态。
该方案在大规模集群下的弊端:- 大规模升级困难:以客户端形式存在的agent升级成本很高;
- 统一的agent无法满足多样的需求:对于机器的CPU使用率、负载、内存使用率、网络吞吐以及磁盘容量等机器基本的物理状态,使用统一的agnet来进行监控或许可以满足。但是对于与业务紧密的监控需求则不适合由一个统一的agent来提供。
- 而zk有这样的特性:
- client可以注册watcher监听;
- zk的临时节点,一旦会话失效,那么该临时节点也会被自动清除。
- 利用上述两大特性,可以实现另一种集群机器存活性监控的系统。例如:监控系统在/clusterServers节点上注册一个Watcher监听,那么但凡进行动态添加机器的操作,就会在/clusterServers节点下创建一个临时节点:/clusterServers/[Hostname]。
分布式日志收集系统
- 在一个典型的日志系统的架构设计中,整个日志系统会把所有需要收集的日志机器分为多个组别,每个组别对应一个收集器,用于收集日志。
- 在这个场景中需要解决如下两个问题:
- 变化的日志源机器
- 变化的收集器机器
对于上述两个问题,其本质都是如何快读、合理、动态地为每个收集器分配对应的日志源机器。
注册收集器机器
- 注册/logs/collector/[Hostname]
任务分发
- 系统根据收集器节点下子节点的个数,将所有日志源机器分为若干组,分发到收集器节点。
- 这里要注意的是,一般情况下我们会使用临时节点来判断收集器机器的存活情况。但在这个情景下,收集器节点上还会存放所有已经分配给该收集器的日志源机器列表。
状态汇报
- 考虑到收集器机器随时可能挂掉,我们需要一个收集器状态汇报机制,即在每个收集器在其子节点下还要一个状态子节点:/logs/collector/host1/status。
- 每个收集器机器都需要定期向该节点写入自己的状态信息,类似于心跳检测机制。
- 日志系统根据该状态子节点的最后更新时间来判断对应的收集器机器是否存活。
- 这里要注意的是,一般情况下我们会在每个节点下注册watcher监听。但在该情景下,收集器数量可能非常大,变更的消息会非常多,且大部分变更通知是无效的。因此我们放弃监听设置,而是采用日志系统主动轮训收集器节点的策略。这样可以节省不少网卡流量,但是会存在一定的延时。
全局动态分配
- 在收集器挂掉或者是扩容时,就需要动态地进行收集任务的分配。
- 有两种做法:
- 全局动态分配:一种简单粗暴的做法,即对所有的日志源机器重新进行一次分组。
- 局部动态分配:明显全局动态分配的影响面太太,存在的风险就比较大。在局部分配的策略中, 每个收集器机器会在汇报的时候同时汇报自己的负载(可以采用某种评估算法)。
Master选举
- Master选举可以说是zk最典型的应用场景了。
- Master选举的需求:在集群所有机器中选举出一台机器作为Master。
- 通常,我们可以选择常见的关系型数据库中的主键特性来实现:集群中所有机器向数据库中插入一条相同主键ID的记录,那么只有一台机器能够成功,该机器成为Master。
但该方法存在的问题是:如果当前选举出的master挂了,该如何处理? - 我们知道,利用zk的强一致性,能够很好地保证在分布式高并发情况下节点创建一定能够保证全局唯一性,即zk将会保证client无法重复创建一个已经存在的数据节点。
- 利用zk的解决方案:client集群每天在zk上创建一个临时节点,例如/master_election/2013-09-20/binding。只有一个client能成功创建该节点,则该client所在机器成为Master。而其他client会在节点/master_election/2013-09-20上注册一个子节点变更的watcher,一旦当前master挂了,其余client将会重新进行Master选举。
分布式锁
- 分布式锁是控制分布式系统之间同步访问共享资源的一种方式。
- 通常,我们很少会去在意分布式锁,而是依赖于关系型数据库固有的排他性来实现不同进程之间的互斥。这就引入了绝大多数大型分布式系统的主要性能瓶颈:数据库操作。
- 因此,如果上层业务再给数据库添加一些额外的锁,例如行锁、表锁等就会使数据库更佳不堪重负。
排他锁
- 排他锁(Exclusive Locks,简称X锁),又称为写锁或独占锁,是一种基本的锁类型。
- 如果事务T1给数据对象O1加了拍他锁,那么在整个加锁期间,只允许事务T1对O1进行读取和更新操作,其他任何事务都不能对O1进行任何类型的操作,知道T1释放了锁。
- 接下来看如何用zk实现排他锁:
定义锁
- 定义锁:在Java中我们有两种常见的方式来定义锁:synchronized机制和ReentrantLock。具体参考link。
- 然而zk中直接通过数据节点来表示一个锁。例如/exclusive_lock/lock节点就可以被定义为一个锁
获取锁
- 在需要获取拍他锁时,所有client都会试图通过调用creat()接口,在/exclusive_lock节点下创建临时子节点/exclusive_lock/lock。zk会保证最终只有一个client能够创建成功。
- 同时,没有获取锁的client需要在节点上注册一个watcher,以便实时监听到lock节点的变更情况。
释放锁
- /exclusive_lock/lock是一个临时节点,因此在以下两种情况下,都有可能释放锁:
- 当前获取锁的client机器宕机
- 正常执行完业务逻辑后,client主动将自己创建的临时节点删除。
共享锁
- 共享锁(Shared Locks,简称S锁),又称为读锁。
- 如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,其他事务也只能对这个数据对象加共享锁,直到该数据对象上的所有共享锁都被释放。
- 共享锁和排他锁的根本区别在于:加排他锁后,数据对象只对一个事务可见,而加共享锁后,数据对所有事务都可见。
- 接下来看如何用zk实现排他锁:
定义锁
- 同样通过数据节点来表示一个锁,是一个类似于/shared_lock/[Hostname]-请求类型-序号的临时顺序节点。
获取锁
- 在需要获取锁时,所有client都会到/shared_lock这个节点下创建一个临时顺序节点。
判断读写顺序
- 根据共享锁的定义,不同事务都可以同时对同一个数据对象进行读取操作,而更新操作必须在当前没有任何事务进行读写操作的情况下进行。
- 基于这个原则,我们看看如何通过zk的节点来确定分布式读写顺序:
- 创建完节点后,获取/shared_lock节点下的所有子节点,并对该节点注册子节点变更的Watcher监听;
- 确定自己的节点序号在所有子节点中的顺序;
- 对于读请求:
如果没有比自己小的子节点 or 所有比自己序号小的子节点都是读请求,那么表明已经成功获取共享锁,同时开始执行读取逻辑。
如果比自己小的子节点中有读请求,那么就需要进入等待。 - 对于写请求:如果自己不是最小的子节点,那么就需要进入等待。
- 接收到watcher通知后,重复步骤1。
释放锁
- 与共享锁相同。
羊群效应
- 当集群规模比较大的时候,节点会接收到过多与自己无关的事件通知。从而对zk服务器造成巨大的性能影响和网络冲击,更严重的是,zk server会在短时间内向其余客户端发送大量的事件通知(羊群效应)。
- 对上述例子,一个很直接的优化方案就是,每个节点对应的client只需要关注比自己序号小的节点的变更情况就可以了。
改进后的分布式锁实现
- 主要的改动在于:每个锁竞争者,只需要关注/shared_lock节点下序号比自己小的那个节点是否存在即可。
- 具体实现:
- client调用create()方法创建一个类似于"/shared_lock/[Hostname]-请求类型-序号"的临时顺序节点;
- client调用getChildern()接口来获取所有已创建的子节点列表。注意,这里不注册任何watcher;
- 若无法获取共享锁,那么就调用exist()来对比自己小的那个节点注册Watcher。
对于读请求:向比自己序号小的最后一个写请求节点注册watcher监听;
对于写请求:向比自己序号小的最后一个节点注册watcher监听。 - 等待watcher通知,继续进入步骤2。
分布式队列
- 业界有不少分布式队列产品,不过大多都是类似于ActiveMQ、Metamorphosis、Kafka和HornetQ等的消息中间件(或称为消息队列)。
- 分布式队列,简单说来分为两大类:
- FIFO队列
- 等到队列元素集聚后才统一安排执行的Barrier模型。
FIFO队列
- 用zk实现FIFO队列就类似于一个全写的共享锁模型。
- 所有client都会到/queue_fifo 这个节点下面创建一个临时顺序节点,例如 /queue_fifo/192.168.0.1-00001
- 创建完节点之后,根据如下4个步骤来确定执行顺序:
- 通过调用getChildren()接口来获取 /queue_fifo节点下的所有子节点,即获取队列中所有元素;
- 确定自己的节点序号在所有子节点中的顺序;
- 如果自己不是最小的子节点,则进入等待,同时向比自己小的最后一个子节点注册Watcher监听;
- 接收到watcher通知后,重复步骤1。
Barrier:分布式屏障
- 在分布式系统中,barrier特指系统之间的一个协调条件,规定了一个队列的元素必须都聚集之后才能统一进行安排,否则一直等待。这往往出现在那些大规模分布式并行计算的应用场景上:最终的合并需要基于很多并行计算的子结果。
- /queue_barrier是一个已存在的默认节点,并且将其节点的数据内容赋值为一个数字n来代表barrier值,例如10表示子节点个数达到10后,才会打开Barrier。
- 创建完节点后,根据如下5个步骤来确定执行顺序:
- 通过调用getData()接口获取/queue_barrier节点的数据内容:10;
- 通过调用getChildren()获取/queue_barrier节点下的所有子节点,同时注册监听;
- 统计子节点的个数;
- 如果子节点个数还不足10个,那么就需要进入等待;
- 接收到watcher通知后,重复步骤2。