zoukankan      html  css  js  c++  java
  • diamond的设计思路

    diamond主要包含四个包:diamond-client、diamond-sdk、diamond-server和diamond-util
    client就非常简单的进行http的调用server拿数据
    server查数据库返回给client

    其中server是集群
    当一台server服务接收到写请求的时候:
    1、先写数据库;
    2、更新缓存(hashmap)
    3、再写磁盘;
    4、通知其他节点的服务器http

    diamond的消息传输是通过http长链接来传输的。

    diamond-client读取配置的context的时候是通过 DataId和GroupId来查询,本地Context是存在本地服务器文件的。文件路径根据一定规则来配置。

    private Map<String, DiamondManager> dmMap = new HashMap<>();
    	
    	private DiamondManager getDiamondManager(String dataId, String group) {
    		String key = dataId + "#_#" + group;
    		DiamondManager dm = dmMap.get(key);
    		if (dm == null) {
    			dm = new DefaultDiamondManager(group, dataId, new ArrayList<ManagerListener>());
    			dmMap.put(key, dm);
    		}
    		return dm;
    	}
    

    获取远程服务器列表并保持到本地

     protected void synAcquireServerAddress() {
            if (!isRun) {
                throw new RuntimeException("ServerAddressProcessor不在运行状态,无法同步获取服务器地址列表");
            }
            if (MockServer.isTestMode()) {
                diamondConfigure.addDomainName("测试模式,没有使用的真实服务器");
                return;
            }
    
            int acquireCount = 0;
            if (diamondConfigure.getDomainNameList().size() == 0) {
                if (!acquireServerAddressOnce(acquireCount)) {
                    acquireCount++;
                    if (acquireServerAddressOnce(acquireCount)) {
                        // 存入本地文件
                        storeServerAddressesToLocal();
                        log.info("在同步获取服务器列表时,向日常ConfigServer服务器获取到了服务器列表");
                    }
                    else {
                        log.info("从本地获取Diamond地址列表");
                        reloadServerAddresses();
                        if (diamondConfigure.getDomainNameList().size() == 0)
                            throw new RuntimeException("当前没有可用的服务器列表,请检查~/diamond/ServerAddress文件");
                    }
                }
                else {
                    log.info("在同步获取服务器列表时,向线上ConfigServer服务器获取到了服务器列表");
                    // 存入本地文件
                    storeServerAddressesToLocal();
                }
            }
        }
    

    如果在本地缓存map找不到就执行远程查找,这个代码是有一定问题的,应该是先从本地服务器找,新版已经解决了这个问题

    /**
         * 使用指定的集群类型clusterType
         * 
         * @param group
         * @param dataId
         * @param managerListenerList
         * @param clusterType
         */
        public DefaultDiamondManager(String group, String dataId, List<ManagerListener> managerListenerList) {
            this.dataId = dataId;
            this.group = group;
    
            diamondSubscriber = DiamondClientFactory.getSingletonDiamondSubscriber();
    
            this.managerListeners.addAll(managerListenerList);
            ((DefaultSubscriberListener) diamondSubscriber.getSubscriberListener()).addManagerListeners(this.dataId,
                this.group, this.managerListeners);
            diamondSubscriber.addDataId(this.dataId, this.group);
            diamondSubscriber.start();
        }
    

    接下来看这个订阅者start是如何执行的

     /**
         * 启动DiamondSubscriber:<br>
         * 1.阻塞主动获取所有的DataId配置信息<br>
         * 2.启动定时线程定时获取所有的DataId配置信息<br>
         */
        public synchronized void start() {
            if (isRun) {
                return;
            }
    
            if (null == scheduledExecutor || scheduledExecutor.isTerminated()) {
                scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
            }
    
            localConfigInfoProcessor.start(this.diamondConfigure.getFilePath() + "/" + DATA_DIR);
            serverAddressProcessor = new ServerAddressProcessor(this.diamondConfigure, this.scheduledExecutor);
            serverAddressProcessor.start();
    
            this.snapshotConfigInfoProcessor =
                    new SnapshotConfigInfoProcessor(this.diamondConfigure.getFilePath() + "/" + SNAPSHOT_DIR);
            // 设置domainNamePos值
            randomDomainNamePos();
            initHttpClient();
    
            // 初始化完毕
            isRun = true;
    
            if (log.isInfoEnabled()) {
                log.info("当前使用的域名有:" + this.diamondConfigure.getDomainNameList());
            }
    
            if (MockServer.isTestMode()) {
                bFirstCheck = false;
            }
            else {
                // 设置轮询间隔时间
                this.diamondConfigure.setPollingIntervalTime(Constants.POLLING_INTERVAL_TIME);
            }
            // 轮询
            rotateCheckConfigInfo();
    
            addShutdownHook();
        }
    

    我们重点看下 serverAddressProcessor.start();这个有点像继承runable,然后执行run,我们看下具体逻辑

     public synchronized void start() {
            if (isRun) {
                return;
            }
            isRun = true;
            initHttpClient();
            if (this.diamondConfigure.isLocalFirst()) {
                acquireServerAddressFromLocal();
            }
            else {
                synAcquireServerAddress();
                asynAcquireServerAddress();
            }
    
        }
    
    

    我们可以看到判断是否先从本地服务器获取,这个我上面说过了,新版的逻辑已经改掉了,信保就不适用hashMap作为缓存,的确hashMap容易被jvm回收,也占用内存空间。那么我们看下synACquireServerAddress方法

    
        protected void synAcquireServerAddress() {
            if (!isRun) {
                throw new RuntimeException("ServerAddressProcessor不在运行状态,无法同步获取服务器地址列表");
            }
            if (MockServer.isTestMode()) {
                diamondConfigure.addDomainName("测试模式,没有使用的真实服务器");
                return;
            }
    
            int acquireCount = 0;
            if (diamondConfigure.getDomainNameList().size() == 0) {
                if (!acquireServerAddressOnce(acquireCount)) {
                    acquireCount++;
                    if (acquireServerAddressOnce(acquireCount)) {
                        // 存入本地文件
                        storeServerAddressesToLocal();
                        log.info("在同步获取服务器列表时,向日常ConfigServer服务器获取到了服务器列表");
                    }
                    else {
                        log.info("从本地获取Diamond地址列表");
                        reloadServerAddresses();
                        if (diamondConfigure.getDomainNameList().size() == 0)
                            throw new RuntimeException("当前没有可用的服务器列表,请检查~/diamond/ServerAddress文件");
                    }
                }
                else {
                    log.info("在同步获取服务器列表时,向线上ConfigServer服务器获取到了服务器列表");
                    // 存入本地文件
                    storeServerAddressesToLocal();
                }
            }
        }
    

    我们看diamand是怎么获取服务器列表的

    /**
         * 获取diamond服务器地址列表
         * 
         * @param acquireCount
         *            根据0或1决定从日常或线上获取
         * @return
         */
        private boolean acquireServerAddressOnce(int acquireCount) {
            HostConfiguration hostConfiguration = configHttpClient.getHostConfiguration();
            String configServerAddress;
            int port;
            if (null != diamondConfigure.getConfigServerAddress()) {
                configServerAddress = diamondConfigure.getConfigServerAddress();
                port = diamondConfigure.getConfigServerPort();
            }
            else {
                if (acquireCount == 0) {
                    configServerAddress = Constants.DEFAULT_DOMAINNAME;
                    port = Constants.DEFAULT_PORT;
                }
                else {
                    configServerAddress = Constants.DAILY_DOMAINNAME;
                    port = Constants.DEFAULT_PORT;
                }
            }
            hostConfiguration.setHost(configServerAddress, port);
    
            String serverAddressUrl = Constants.CONFIG_HTTP_URI_FILE;
    
            HttpMethod httpMethod = new GetMethod(serverAddressUrl);
            // 设置HttpMethod的参数
            HttpMethodParams params = new HttpMethodParams();
            params.setSoTimeout(diamondConfigure.getOnceTimeout());
            // ///////////////////////
            httpMethod.setParams(params);
    
            try {
                if (SC_OK == configHttpClient.executeMethod(httpMethod)) {
                    InputStreamReader reader = new InputStreamReader(httpMethod.getResponseBodyAsStream());
                    BufferedReader bufferedReader = new BufferedReader(reader);
                    String address = null;
                    List<String> newDomainNameList = new LinkedList<String>();
                    while ((address = bufferedReader.readLine()) != null) {
                        address = address.trim();
                        if (StringUtils.isNotBlank(address)) {
                            newDomainNameList.add(address);
                        }
                    }
                    if (newDomainNameList.size() > 0) {
                        log.debug("更新使用的服务器列表");
                        this.diamondConfigure.setDomainNameList(newDomainNameList);
                        return true;
                    }
                }
                else {
                    log.warn("没有可用的新服务器列表");
                }
            }
            catch (HttpException e) {
                log.error(getErrorMessage(configServerAddress) + ", " + e);
            }
            catch (IOException e) {
                log.error(getErrorMessage(configServerAddress) + ", " + e);
            }
            catch (Exception e) {
                log.error(getErrorMessage(configServerAddress) + ", " + e);
            }
            finally {
                httpMethod.releaseConnection();
            }
            return false;
        }
    

    思路简单清晰,但是这边有一个设计思想就是订阅者DiamondSubscriber和监听SubscriberListener的思想是值得研究下,我们研究下他们是怎么关联起来的。

    /**
     * Diamond订阅者的配置信息监听器
     * 
     * @author aoqiong
     * 
     */
    public interface SubscriberListener {
    
        public Executor getExecutor();
    
    
        /**
         * 接收到一次配置信息
         * 
         * @param configureInfomation
         */
        public void receiveConfigInfo(final ConfigureInfomation configureInfomation);
    }
    
    

    这个监听器只有一个默认的实现DefaultSubscriberListener

    public class DefaultSubscriberListener implements SubscriberListener {
    
        // 回调日志单独记录
        private static final Log dataLog = LogFactory.getLog(LoggerInit.LOG_NAME_CONFIG_DATA);
    
        private final ConcurrentMap<String/* dataId + group */, CopyOnWriteArrayList<ManagerListener>/* listeners */> allListeners =
                new ConcurrentHashMap<String, CopyOnWriteArrayList<ManagerListener>>();
    
    
        public Executor getExecutor() {
            return null;
        }
    

    那么这个DefaultSubscriberListener和DefaultDiamondSubscriber有什么关系呢?如我们上面的代码,为了方便我这边重写贴下:

     /**
         * 使用指定的集群类型clusterType
         * 
         * @param group
         * @param dataId
         * @param managerListenerList
         * @param clusterType
         */
        public DefaultDiamondManager(String group, String dataId, List<ManagerListener> managerListenerList) {
            this.dataId = dataId;
            this.group = group;
    
            diamondSubscriber = DiamondClientFactory.getSingletonDiamondSubscriber();
    
            this.managerListeners.addAll(managerListenerList);
            ((DefaultSubscriberListener) diamondSubscriber.getSubscriberListener()).addManagerListeners(this.dataId,
                this.group, this.managerListeners);
            diamondSubscriber.addDataId(this.dataId, this.group);
            diamondSubscriber.start();
        }
    
    

    我们是通过diamondSubscriber的start去获取服务器列表的数据的,但是在start之前的addManagerListeners是干嘛的呢?原因很简单,那就是跟这个diamondSubscriber增加一个listeners的监听。

    this.managerListeners.addAll(managerListenerList);
            ((DefaultSubscriberListener) diamondSubscriber.getSubscriberListener()).addManagerListeners(this.dataId,
                this.group, this.managerListeners);
    

    diamondSubscriber 是通过DiamondClientFactory.getSingletonDiamondSubscriber()获取的,其实他就是一个DefaultDiamondSubscriber类,那么就是这个监听就是加到这个DefaultDiamondSubscriber类上的,这个监听其实就是DefaultSubscriberListener。

    public class DiamondClientFactory {
    
        private static DiamondSubscriber diamondSubscriber = new DefaultDiamondSubscriber(new DefaultSubscriberListener());
    
    
        public static DiamondSubscriber getSingletonDiamondSubscriber() {
            return diamondSubscriber;
        }
    
    }
    

    以上我们是从一个client的角度解析diamand的设计思想和逻辑处理,思想其实不是很复杂,但是订阅监听的设计思想是亮点,本文也重点讲了订阅和监听,已经如何获取服务器列表的方法。

  • 相关阅读:
    【从0开始学架构】架构设计三原则
    【Linux网络】端口
    【Python连接数据库】Python连接Teradata数据库-TD SQL Driver 方式(teradatasql包)
    【Python实战】python中含有中文字符无法运行
    【Linux基础】linux updatedb命令
    【Linux基础】查看和更改当前系统字符集(LC_ALL、LC_TYPE和LANG)
    【数据库】ODBC与JDBC
    LNMP安装目录及配置文件位置
    Linux Vi 的使用
    C#VS2017添加ReportViewer控件
  • 原文地址:https://www.cnblogs.com/huaizuo/p/6611589.html
Copyright © 2011-2022 走看看