zoukankan      html  css  js  c++  java
  • zookeeper主节点竞争类

    import com.alibaba.fastjson.JSON;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.imps.CuratorFrameworkState;
    import org.apache.curator.framework.recipes.cache.NodeCache;
    import org.apache.curator.framework.recipes.cache.NodeCacheListener;
    import org.apache.curator.framework.recipes.leader.HIKLeaderLatch;
    import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.Closeable;
    import java.io.IOException;
    import java.io.Serializable;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    /**
     * 主节点竞争工具类
     *
     * @author
     * @version V1.0
     * @modificationHistory=========================逻辑或功能性重大变更记录
     * @modify by user: $author$ $date$
     * @modify by reason:{方法名}:{原因}
     */
    public class LeaderLatchClient implements Closeable {
        private static final Logger LOGGER = LoggerFactory.getLogger(LeaderLatchClient.class);
    
        private static Executor executor = Executors.newCachedThreadPool();
        public static final String LATCH_PATH_PARENT = "/leaderlatch";//默认
        private static final String LEADER_INFO_PATH = "/LEADER_INFO";//节点信息路径
    
        private final HIKLeaderLatch leaderLatch;
        private NodeInfo leaderInfo;//主节点信息
        private final NodeInfo localInfo;//本地节点信息
        private final CuratorFramework client;//zookeeper连接客户端
        private NodeCache nodeCache;//节点缓存
        private final String leaderInfoPath;//主节点信息缓存路径
        private String latchPath;//主节点竞争路径
        private boolean cacheLeaderInd = false;//是否开启缓存主节点信息功能,默认不开启
    
        public LeaderLatchClient(CuratorFramework client, NodeInfo localInfo, String latchNode, LeaderLatchListener... latchListeners) {
            String errMsg;
            if (client == null || !client.getState().equals(CuratorFrameworkState.STARTED)) {
                errMsg = "Zookeeper Client is null or not start.";
                LOGGER.error(errMsg);
                throw new IllegalArgumentException(errMsg);
            }
            latchPath = genLeaderLatchPath(latchNode);
            if (localInfo == null) {
                LOGGER.warn("Local node info is null,cache leader is off.");
                this.localInfo = null;
                this.leaderInfoPath = null;
            } else {
                LOGGER.info("Local node info is not null,cache leader is on.");
                this.localInfo = localInfo;
                leaderInfoPath = genLeaderInfoPath(latchPath);
                cacheLeaderInd = true;
            }
            this.client = client;
            leaderLatch = new HIKLeaderLatch(client, latchPath);
    
            for (LeaderLatchListener latchListener : latchListeners) {
                leaderLatch.addListener(latchListener, executor);
            }
    
            //初始化主节点信息缓存
            initLeaderInfoCache();
        }
    
        /**
         * 开始竞争
         *
         * @throws Exception
         */
        public void start() throws Exception {
            leaderLatch.start();
        }
    
        /*
         * (non-Javadoc)
         *
         * @see java.io.Closeable#close()
         */
        @Override
        public void close() throws IOException {
            leaderLatch.close();
        }
    
        /**
         * 判断本节点是否为主节点
         *
         * @return
         */
        public boolean isLeader() {
            return leaderLatch.hasLeadership();
        }
    
        /**
         * 获取主节点信息
         *
         * @return
         */
        public NodeInfo getLeaderInfo() {
            if (cacheLeaderInd) {
                return leaderInfo;
            } else {
                String errMsg = "This client cache leader info is off can not get leader info.";
                LOGGER.error(errMsg);
                throw new UnsupportedOperationException(errMsg);
            }
        }
    
        /**
         * 初始化主节点信息缓存
         */
        private void initLeaderInfoCache() {
            if (cacheLeaderInd) {
                //添加主节点信息更新监听器
                leaderLatch.addListener(new LeaderLatchListener() {
                    @Override
                    public void isLeader() {
                        LOGGER.info("node [{}] now is leader node,update node info to zookeeper path [{}]", localInfo.toString(), leaderInfoPath);
                        //创建主节点信息缓存路径
                        try {
                            if (client.checkExists().forPath(leaderInfoPath) == null) {
                                client.create().forPath(leaderInfoPath, JSON.toJSONString(localInfo).getBytes());
                            } else {
                                client.setData().forPath(leaderInfoPath, JSON.toJSONString(localInfo).getBytes());
                            }
                        } catch (Exception e) {
                            LOGGER.error("Update Leader info error.", e);
                        }
                    }
    
                    @Override
                    public void notLeader() {
                    }
                }, executor);
    
                //设置主节点信息缓存
                nodeCache = new NodeCache(client, leaderInfoPath);
                nodeCache.getListenable().addListener(new NodeCacheListener() {
                    @Override
                    public void nodeChanged() throws Exception {
                        String data = new String(nodeCache.getCurrentData().getData());
                        LOGGER.info("watch leader info change, now data is [{}]", data);
                        leaderInfo = JSON.parseObject(data, NodeInfo.class);
                    }
                }, executor);
                try {
                    nodeCache.start();
                } catch (Exception e) {
                    LOGGER.error("Leader info change cache start error.");
                }
            }
        }
    
        /**
         * 生成主节点信息zk缓存路径
         *
         * @param latchPath
         * @return
         */
        private String genLeaderInfoPath(String latchPath) {
            if (latchPath.endsWith("/")) {
                latchPath = latchPath.substring(0, latchPath.length() - 1);
            }
            return latchPath + LEADER_INFO_PATH;
        }
    
        /**
         * 生成主节点竞争路径
         *
         * @param latchNode
         * @return
         */
        private String genLeaderLatchPath(String latchNode) {
            if (!latchNode.startsWith("/")) {
                latchNode = "/" + latchNode;
            }
            return LATCH_PATH_PARENT + latchNode;
        }
    
        /**
         * 主节点信息
         */
        public static class NodeInfo implements Serializable {
            private String nodeHost;
            private int nodePort;
    
            public NodeInfo() {
            }
    
            public NodeInfo(String nodeHost, int nodePort) {
                this.nodeHost = nodeHost;
                this.nodePort = nodePort;
            }
    
            public String getNodeHost() {
                return nodeHost;
            }
    
            public void setNodeHost(String nodeHost) {
                this.nodeHost = nodeHost;
            }
    
            public int getNodePort() {
                return nodePort;
            }
    
            public void setNodePort(int nodePort) {
                this.nodePort = nodePort;
            }
    
            @Override
            public String toString() {
                return JSON.toJSONString(this);
            }
        }
    }
  • 相关阅读:
    September 29th 2017 Week 39th Friday
    September 28th 2017 Week 39th Thursday
    September 27th 2017 Week 39th Wednesday
    September 26th 2017 Week 39th Tuesday
    September 25th 2017 Week 39th Monday
    September 24th 2017 Week 39th Sunday
    angular2 学习笔记 ( Form 表单 )
    angular2 学习笔记 ( Component 组件)
    angular2 学习笔记 ( Http 请求)
    angular2 学习笔记 ( Router 路由 )
  • 原文地址:https://www.cnblogs.com/jinniezheng/p/6384010.html
Copyright © 2011-2022 走看看