原文地址:
http://www.cnblogs.com/nevermorewang/p/5611807.html
选主原理介绍:zookeeper的节点有两种类型,持久节点跟临时节点。临时节点有个特性,就是如果注册这个节点的机器失去连接(通常是宕机),那么这个节点会被zookeeper删除。选主过程就是利用这个特性,在服务器启动的时候,去zookeeper特定的一个目录下注册一个临时节点(这个节点作为master,谁注册了这个节点谁就是master),注册的时候,如果发现该节点已经存在,则说明已经有别的服务器注册了(也就是有别的服务器已经抢主成功),那么当前服务器只能放弃抢主,作为从机存在。同时,抢主失败的当前服务器需要订阅该临时节点的删除事件,以便该节点删除时(也就是注册该节点的服务器宕机了或者网络断了之类的)进行再次抢主操作。从机具体需要去哪里注册服务器列表的临时节点,节点保存什么信息,根据具体的业务不同自行约定。选主的过程,其实就是简单的争抢在zookeeper注册临时节点的操作,谁注册了约定的临时节点,谁就是master。
主要有两个类,WorkServer为主服务类,RunningData用于记录运行数据。因为是简单的demo,我们只做抢master节点的编码,对于从节点应该去哪里注册服务列表信息,不作编码。
采用zkClient实现,代码如下:
WorkServer类
package com.zookeeper.master; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.zookeeper.CreateMode; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * Created by nevermore on 16/6/22. */ public class WorkServer { //客户端状态 private volatile boolean running = false; private ZkClient zkClient; //zk主节点路径 public static final String MASTER_PATH = "/master"; //监听(用于监听主节点删除事件) private IZkDataListener dataListener; //服务器基本信息 private RunningData serverData; //主节点基本信息 private RunningData masterData; //调度器 private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); //延迟时间5s private int delayTime = 5; public WorkServer(RunningData runningData){ this.serverData = runningData; this.dataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override public void handleDataDeleted(String s) throws Exception { //takeMaster(); if(masterData != null && masterData.getName().equals(serverData.getName())){ //若之前master为本机,则立即抢主,否则延迟5秒抢主(防止小故障引起的抢主可能导致的网络数据风暴) takeMaster(); }else{ delayExector.schedule(new Runnable() { @Override public void run() { takeMaster(); } },delayTime, TimeUnit.SECONDS); } } }; } //启动 public void start() throws Exception{ if(running){ throw new Exception("server has startup...."); } running = true; zkClient.subscribeDataChanges(MASTER_PATH,dataListener); takeMaster(); } //停止 public void stop() throws Exception{ if(!running){ throw new Exception("server has stopped....."); } running = false; delayExector.shutdown(); zkClient.unsubscribeDataChanges(MASTER_PATH,dataListener); releaseMaster(); } //抢注主节点 private void takeMaster(){ if(!running) return ; try { zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL); masterData = serverData; System.out.println(serverData.getName()+" is master"); delayExector.schedule(new Runnable() {//测试抢主用,每5s释放一次主节点 @Override public void run() { if(checkMaster()){ releaseMaster(); } } },5,TimeUnit.SECONDS); }catch (ZkNodeExistsException e){//节点已存在 RunningData runningData = zkClient.readData(MASTER_PATH,true); if(runningData == null){//读取主节点时,主节点被释放 takeMaster(); }else{ masterData = runningData; } } catch (Exception e) { // ignore; } } //释放主节点 private void releaseMaster(){ if(checkMaster()){ zkClient.delete(MASTER_PATH); } } //检验自己是否是主节点 private boolean checkMaster(){ try { RunningData runningData = zkClient.readData(MASTER_PATH); masterData = runningData; if (masterData.getName().equals(serverData.getName())) { return true; } return false; }catch (ZkNoNodeException e){//节点不存在 return false; }catch (ZkInterruptedException e){//网络中断 return checkMaster(); }catch (Exception e){//其它 return false; } } public void setZkClient(ZkClient zkClient) { this.zkClient = zkClient; } public ZkClient getZkClient() { return zkClient; } }
RunningData类:
package com.zookeeper.master; import java.io.Serializable; /** * Created by nevermore on 16/6/22. */ public class RunningData implements Serializable { private static final long serialVersionUID = 4260577459043203630L; //服务器id private long cid; //服务器名称 private String name; public long getCid() { return cid; } public void setCid(long cid) { this.cid = cid; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
说明:在实际生产环境中,可能会由于插拔网线等导致网络短时的不稳定,也就是网络抖动。由于正式生产环境中可能server在zk上注册的信息是比较多的,而且server的数量也是比较多的,那么每一次切换主机,每台server要同步的数据量(比如要获取谁是master,当前有哪些salve等信息,具体视业务不同而定)也是比较大的。那么我们希望,这种短时间的网络抖动最好不要影响我们的系统稳定,也就是最好选出来的master还是原来的机器,那么就可以避免发现master更换后,各个salve因为要同步数据等导致的zk数据网络风暴。所以在WorkServer中,54-63行,我们抢主的时候,如果之前主机是本机,则立即抢主,否则延迟5s抢主。这样就给原来主机预留出一定时间让其在新一轮选主中占据优势,从而利于环境稳定。
测试代码:
package com.zookeeper.master; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; /** * Created by nevermore on 16/6/23. */ public class LeaderSelectorZkClient { //启动的服务个数 private static final int CLIENT_QTY = 10; //zookeeper服务器的地址 private static final String ZOOKEEPER_SERVER = "localhost:2181"; public static void main(String[] args) throws Exception{ //保存所有zkClient的列表 List<ZkClient> clients = new ArrayList<ZkClient>(); //保存所有服务的列表 List<WorkServer> workServers = new ArrayList<WorkServer>(); try{ for ( int i = 0; i < CLIENT_QTY; ++i ){ //创建zkClient ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer()); clients.add(client); //创建serverData RunningData runningData = new RunningData(); runningData.setCid(Long.valueOf(i)); runningData.setName("Client #" + i); //创建服务 WorkServer workServer = new WorkServer(runningData); workServer.setZkClient(client); workServers.add(workServer); workServer.start(); } System.out.println("敲回车键退出! "); new BufferedReader(new InputStreamReader(System.in)).readLine(); }finally{ System.out.println("Shutting down..."); for ( WorkServer workServer : workServers ){ try { workServer.stop(); } catch (Exception e) { e.printStackTrace(); } } for ( ZkClient client : clients ){ try { client.close(); } catch (Exception e) { e.printStackTrace(); } } } } }