zoukankan      html  css  js  c++  java
  • java基础之----zookeeper

    概述

      zookeeper,拆开来就是zoo  keeper,翻译就是动物管理员,为什么这么命名呢?因为当时yahoo开发zookeeper的时候,已经开发很多其他的产品,都是以动物命名的,而zookeeper的作用就是要协调这些产品,所以最后就命名为了动物管理员。相信每个人都不陌生,无论是阿里的dubbo把zookeeper作为注册中心,无论是在使用kafka还有hbase的时候都离不开zookeeper,那zookeeper到底是什么,又可以干什么呢?

    zookeeper是什么?

    简单来说就是zookeeper是一个分布式协调服务。

    zookeeper可以干什么?

    可用于服务发现,分布式锁,配置管理等

    zookeeper的架构

    zookeeper是一个集群,集群中由3中角色。

      Leader: 维护和Follower,Observer的心跳,写请求必须通过leader,并且同步到Follower,Observer,集群中只有一个Leader

           Follower: 处理读请求,将写请求转发给Leader,集群中一般由多个Follower

           Observer: 功能和Follower类似,只是不参与投票。

    由以上可知,zookeeper只有Leader负责写操作,如果有多个线程同时执行写操作,会导致Leader节点的负载太大,有崩溃的风险,而Leader如果崩溃,zookeeper就会触发重新选主过程,在这个过程中zookeeper是不能对外提供服务的,这个过程一般会持续几十秒,可想而知,这个后果有多严重,但是zookeeper有多个Follower和Oberver可以处理读请求,所以zookeeper适合那种读多写少的场景。 

    zookeeper的选主过程

    zookeeper触发选主过程有两种场景,一种是zookeeper集群刚刚启动时,第二种时运行期间主节点出现问题,出现什么问题呢?比如主节点崩溃,主节点网络出现问题等都会触发重新选主过程。zookeeper选主采用的是FastLeaderElection算法,下面就介绍一下该算法。

    先介绍一下选票的数据结构

    • logicClock 每个服务器会维护一个自增的整数,名为logicClock,它表示这是该服务器发起的第多少轮投票

    • state 当前服务器的状态

    • self_id 当前服务器的myid

    • self_zxid 当前服务器上所保存的数据的最大zxid

    • vote_id 被推举的服务器的myid

    • vote_zxid 被推举的服务器上所保存的数据的最大zxid

    这里解释一下myid,zxid

    myid

    安装过集群模式zookeeper都知道,我们需要自己创建一个名为myid的文件,里面会写上这个节点的编号,比如在文件里面写上1,就表示该节点的编号为1; 

    zxid

    下面再说说zxid,这玩意其实是一个64位的字符,前面32位epoch是用来标识主节点的,因为每次选举出来一个主节点,都会生成一个不同的epoch,后面32位是一个递增计数,那记什么数呢,是这样的,zookeeper的每个节点发生改变都会对后32进行加1处理。那我的问题又来了,为什么节点改动就要进行加1操作呢?举个例子说明,现在有一个写操作,主节点这个zxid进行了加1操作,现在主节点要把这个写操作同步到Follower和Observer,那如果有些同步成功了,那成功的zxid就会加1,而同步失败就不会,这样又有什么用呢?这个的用处在下面介绍的选举过程就会体现出来。

    投票流程

    自增选举轮次

    ZooKeeper规定所有有效的投票都必须在同一轮次中。每个服务器在开始新一轮投票时,会先对自己维护的logicClock进行自增操作。

    初始化选票

    每个服务器在广播自己的选票前,会将自己的投票箱清空。该投票箱记录了所收到的选票。例:服务器2投票给服务器3,服务器3投票给服务器1,则服务器1的投票箱为(2, 3), (3, 1), (1, 1)。票箱中只会记录每一投票者的最后一票,如投票者更新自己的选票,则其它服务器收到该新选票后会在自己票箱中更新该服务器的选票。

    发送初始化选票

    每个服务器最开始都是通过广播把票投给自己。

    接收外部投票

    服务器会尝试从其它服务器获取投票,并记入自己的投票箱内。如果无法获取任何外部投票,则会确认自己是否与集群中其它服务器保持着有效连接。如果是,则再次发送自己的投票;如果否,则马上与之建立连接。

    判断选举轮次

    收到外部投票后,首先会根据投票信息中所包含的logicClock来进行不同处理:

    • 外部投票的logicClock大于自己的logicClock。说明该服务器的选举轮次落后于其它服务器的选举轮次,立即清空自己的投票箱并将自己的logicClock更新为收到的logicClock,然后再对比自己之前的投票与收到的投票以确定是否需要变更自己的投票,最终再次将自己的投票广播出去。

    • 外部投票的logicClock小于自己的logicClock。当前服务器直接忽略该投票,继续处理下一个投票。

    • 外部投票的logickClock与自己的相等。当时进行选票PK。

    选票PK

    选票PK是基于(self_id, self_zxid)与(vote_id, vote_zxid)的对比:

    • 外部投票的logicClock大于自己的logicClock,则将自己的logicClock及自己的选票的logicClock变更为收到的logicClock
    • 若logicClock一致,则对比二者的vote_zxid,若外部投票的vote_zxid比较大,则将自己的票中的vote_zxid与vote_myid更新为收到的票中的vote_zxid与vote_myid并广播出去,另外将收到的票及自己更新后的票放入自己的票箱。如果票箱内已存在(self_myid, self_zxid)相同的选票,则直接覆盖

    • 若二者vote_zxid一致,则比较二者的vote_myid,若外部投票的vote_myid比较大,则将自己的票中的vote_myid更新为收到的票中的vote_myid并广播出去,另外将收到的票及自己更新后的票放入自己的票箱

    这六步是重中之重,上面在介绍zxid的时候说这玩意会在选举过程用到,第五步就用到了,而且可以看出,当选票轮次相同时,时先根据zxid进行优先判断的,只有当zxid相等时才会根据myid判断,上面也介绍到触发选主的两种情况,第一种时集群刚刚启动时,这是其实说有节点的zxid都是一样的,所以这时都是根据myid判断的,但是如果是因为主节点崩溃触发的选主,那就要优先根据zxid进行判断了。 

    统计选票

    如果已经确定有过半服务器认可了自己的投票(可能是更新后的投票),则终止投票。否则继续接收其它服务器的投票。

    更新服务器状态

    投票终止后,服务器开始更新自身状态。若过半的票投给了自己,则将自己的服务器状态更新为LEADING,否则将自己的状态更新为FOLLOWING。

    zookeeper的应用

    zookeeper作为分布式锁

    之前看过一篇文章,写的非常好,在这里我就总结一下,然后把那篇文章引用进来,总结下来来说就是,zookeeper会维护一个临时顺序节点,当有进程或者线程申请加锁时,zookeeper会创建一个临时节点,当多线程竞争锁时,zookeeper会为每个线程或者进程创建一个临时节点,并别时有顺序的,然后,zookeeper会这样做,就是后一个节点会监听前一个节点,举个例子,第一个节点监听发现他前面没有节点了,就会直接获取锁,当锁释放时会删除这个临时节点,这时第二个节点发现他前面也没有节点了,那相应的第二个节点就获取到锁。

    public class CuratorLock {
    
        private static final String CONNECT_PATH = "192.168.66.110:2181,192.168.66.110:2182,192.168.66.110:2183";
    
        // Session 超时时间
        private static final int SESSION_TIME_OUT = 60000;
    
        //连接超时
        private static final int CONNECT_TIME_OUT = 5000;
    
        //定义
        private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
        public static void main(String[] args) throws Exception {
            /*
            */
    
            final AtomicInteger cnt = new AtomicInteger(10);
    
            for(int i=0;i<10;i++){
                new Thread(new Runnable() {
    
                    public void run() {
                        //分布式锁
                        CuratorFramework cf = getConenction();
                        InterProcessMutex lock = null;
                        try {
                            System.out.println(Thread.currentThread().getName()+"	线程准备");
    
                            //等10个线程都准备好了,再启动
                            cyclicBarrier.await();
    
                            //获取到锁
                            lock = new InterProcessMutex(cf,"/lock");
                            if(cnt.get() >= 0){
                                //分布式锁
                                lock.acquire();
                                Thread.sleep(1000);
                                System.out.println(Thread.currentThread().getName()+"	"+cnt.decrementAndGet());
                            }
                        } catch (Exception e) {
                            // TODO: handle exception
                        }finally{
                            //释放锁
                            try {
                                if(lock != null){
                                    lock.release();
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }).start();
            }
        }
    
        private static CuratorFramework getConenction(){
            RetryPolicy retryPolicy = new   ExponentialBackoffRetry(5000, 10);
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .retryPolicy(retryPolicy)
                    .connectString(CONNECT_PATH)
                    .sessionTimeoutMs(SESSION_TIME_OUT)
                    .connectionTimeoutMs(CONNECT_TIME_OUT)
                    .build();
    
            cf.start();
    
            return cf ;
        }
    
    
    }

    文章地址:https://www.itcodemonkey.com/article/11887.html

    zookeeper作为注册中心

    zookeeper作为注册中心,可以提供服务发现,具体是这样做的,zookeeper可以充当一个服务注册表(Service Registry),让多个服务提供者形成一个集群,让服务消费者通过服务注册表获取具体的服务访问地址(ip+端口)去访问具体的服务提供者。

    在zookeeper中,进行服务注册,实际上就是在zookeeper中创建了一个znode节点,该节点存储了该服务的IP、端口、调用方式(协议、序列化方式)等。该节点承担着最重要的职责,它由服务提供者(发布服务时)创建,以供服务消费者获取节点中的信息,从而定位到服务提供者真正网络拓扑位置以及得知如何调用。RPC服务注册、发现过程简述如下:

    服务提供者启动时,会将其服务名称,ip地址注册到配置中心。
    服务消费者在第一次调用服务时,会通过注册中心找到相应的服务的IP地址列表,并缓存到本地,以供后续使用。当消费者调用服务时,不会再去请求注册中心,而是直接通过负载均衡算法从IP列表中取一个服务提供者的服务器调用服务。
    当服务提供者的某台服务器宕机或下线时,相应的ip会从服务提供者IP列表中移除。同时,注册中心会将新的服务IP地址列表发送给服务消费者机器,缓存在消费者本机。
    当某个服务的所有服务器都下线了,那么这个服务也就下线了。
    同样,当服务提供者的某台服务器上线时,注册中心会将新的服务IP地址列表发送给服务消费者机器,缓存在消费者本机。
    服务提供方可以根据服务消费者的数量来作为服务下线的依据。

    感知服务的下线&上线

    zookeeper提供了“心跳检测”功能,它会定时向各个服务提供者发送一个请求(实际上建立的是一个 socket 长连接),如果长期没有响应,服务中心就认为该服务提供者已经“挂了”,并将其剔除,比如100.19.20.02这台机器如果宕机了,那么zookeeper上的路径就会只剩/HelloWorldService/1.0.0/100.19.20.01:16888。

    服务消费者会去监听相应路径(/HelloWorldService/1.0.0),一旦路径上的数据有任务变化(增加或减少),zookeeper都会通知服务消费方服务提供者地址列表已经发生改变,从而进行更新。

    更为重要的是zookeeper 与生俱来的容错容灾能力(比如leader选举),可以确保服务注册表的高可用性。

    使用 zookeeper 作为注册中心时,客户端订阅服务时会向 zookeeper 注册自身;主要是方便对调用方进行统计、管理。但订阅时是否注册 client 不是必要行为,和不同的注册中心实现有关,例如使用 consul 时便没有注册。

    熟悉springcloud的应该知道,springcloud的注册中心默认使用的是Eureka,这篇文章就是将这两个注册中心的区别的:京东面试官让你谈谈 zookeeper 和 eureka 哪个更好使?

    zookeeper配置管理

    这个不在赘述,看网上的解决方案,很多使用zookeeper作为配置中心的,做个配置管理还是很简单的事。

    参考文章:

    https://dbaplus.cn/news-141-1875-1.html

     

  • 相关阅读:
    Git详解之四 服务器上的Git
    Git详解之三 Git分支
    Git详解之二 Git基础
    Git详解之一 Git起步
    Spring Data JPA 梳理
    Spring Data JPA 梳理
    Spring boot 梳理
    Apache和Tomcat 配置负载均衡(mod-proxy方式)-粘性session
    Apache和Tomcat 配置负载均衡(mod-proxy方式)-无session共享、无粘性session
    Spring boot 官网学习笔记
  • 原文地址:https://www.cnblogs.com/gunduzi/p/12382201.html
Copyright © 2011-2022 走看看