zoukankan      html  css  js  c++  java
  • Curator leader 选举(一)

    要想使用Leader选举功能,需要添加recipes包,可以在maven中添加如下依赖:

    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.9.0</version>
    </dependency>

    当然了,由于recipes需要使用framework,所以你肯定还要添加如下依赖:


    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.9.0</version>
    </dependency>

    最后,为了简化测试也为了便于学习,可以添加test依赖:


    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-test</artifactId>
    <version>2.9.0</version>
    </dependency>

    LeaderLatch使用流程

      recipes包里面提供了Leader选举实现,Spark中的master选举使用的就是reciples包里面的LeaderLatch,使用他们可以极大的简化代码,使你将注意力更多的放在核心业务逻辑上。Leader选举的实现在org.apache.curator.framework.recipes.leader包中,这个包提供了两组Leader选举:

      1.LeaderLatch,LeaderLatchListener

      2.LeaderSelector,LeaderSelectorListener,LeaderSelectorListenerAdapter

    这两组类都可以实现Leader选举,spark 使用的是第一种。再这篇文章里,只介绍第一种。

    第一组使用起来非常简单,使用思路大致如下:假设你有3个节点,姑且叫做node0,node1,node2。你需要为每一个node创建一个CuratorFramework,LeaderLatch,LeaderLatchListener,如下:

    node0:

      1.CuratorFramework client=CuratorFrameworkFactory.newClient(.....);client.start();

      2.new LeaderLatch(client,path)->addListener(LeaderLatchListener )->start()

    node1:

      1.CuratorFramework client=CuratorFrameworkFactory.newClient(.....);client.start();

      2.new LeaderLatch(client,path)->addListener(LeaderLatchListener )->start()

    node2:

      1.CuratorFramework client=CuratorFrameworkFactory.newClient(.....);client.start();

      2.new LeaderLatch(client,path)->addListener(LeaderLatchListener )->start()

    你首先要创建CuratorFramework,然后并启动它,一个CuratorFramework就是一个ZooKeeper客户端。然后创建LeaderLatch,并制定刚才创建的CuratorFramework和一个leaderPath,leaderPath是一个ZooKeepe路径,node0,node1,node2中的leaderPath必须一致。创建好LeaderLatch之后,需要为他注册一个LeaderLatchListener回掉,如果某个node成为leader,那么会调用这个node的LeaderLatchListener的isLeader(),因此你可以在这里写自己的业务逻辑。最后,调用LeaderLatch的start(),这个LeaderLatch将参加选举了。

    可以参考如下代码:

    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.leader.LeaderLatch;
    import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.curator.test.TestingServer;
    import org.apache.curator.utils.CloseableUtils;
    
    public class LeaderDemo {
        public static void main(String[]args) throws Exception{
            List<LeaderLatch>leaders=new ArrayList<LeaderLatch>();
            List<CuratorFramework>clients=new ArrayList<CuratorFramework>();
            
            TestingServer server=new TestingServer();
            
            
            try{
                for(int i=0;i<10;i++){
                  CuratorFramework client=CuratorFrameworkFactory.newClient(server.getConnectString(),new ExponentialBackoffRetry(20000,3));
                  clients.add(client);
                  
                  LeaderLatch leader=new LeaderLatch(client,"/francis/leader");
                  leader.addListener(new LeaderLatchListener(){
    
                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }
    
                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }});
                  
                  
                  leaders.add(leader);
            
                  client.start();
                  leader.start();
                }
                
                Thread.sleep(Integer.MAX_VALUE);
            }finally{
                
                for(CuratorFramework client:clients){
                  CloseableUtils.closeQuietly(client);    
                }
                
                for(LeaderLatch leader:leaders){
                    CloseableUtils.closeQuietly(leader);
                }
                
                CloseableUtils.closeQuietly(server);
            }
            
            
            Thread.sleep(Integer.MAX_VALUE);
        }
    
    }

    LeaderLatch和LeaderLatchListener方法介绍

    LeaderLatch提供了如下方法:

     start()/close():启动/停止LeaderLatch

     addListener(LeaderLatchListener)/removeListener(LeaderLatchListener):添加/移除LeaderLatchListener

     hasLeadership():如果LeaderLatch是Leader,那么返回true,否则false。

     getLeader():

     await:等待Leaderlatch成为Leader。

    LeaderLatchListener提供了如下方法:

      isLeader():当LeaderLatch的hasLeaderShip()从false到true后,就会调用isLeader(),表明这个LeaderLatch成为leader了。

      notLeader():当LeaderLatch的hahLeaderShip从true到false后,就会调用notLeader(),表明这个LeaderLatch不再是leader了。

    LeaderLatch在Master-Slave中的应用

    在一个典型的master-slave场景下。你可以在isLeader中做如下处理:

      1.每一个master类都有一个state属性,初始值为standby.

      2.在isLeader中,从持久话引擎中读取要恢复的数据到一个临时的内存缓存中

      3.将这个master的state修改为recovering

      4.通知所有worker将其内部的master修改为当前master。

      5.将临时内存缓存中的数据恢复到master内部。

      6.将master状态修改为alive,然后这个master就可以对外服务了。

    注意第5步,由于将持久话引擎中的数据添加到了master内部的内存中,所以需要确保之多恢复一次语义。

  • 相关阅读:
    Codeforces Round #249 (Div. 2) D. Special Grid 枚举
    图论二
    C语言中的atan和atan2(转)
    BestCoder Round #79 (div.2)
    数学
    LCA
    二分图
    动态规划
    线段树
    树状数组
  • 原文地址:https://www.cnblogs.com/francisYoung/p/5464789.html
Copyright © 2011-2022 走看看