zoukankan      html  css  js  c++  java
  • motan源码分析十:流量切换

    motan提供了流量切换的功能,可以实现把一个group的流量切换到另一个group(一个或多个服务都可以)。大家可以使用tomcat部署motan的管理工具,并设置几个组,例如可以参考demo代码:motan_demo_server_commandRegistry.xml。分析源码时可以发现,流量切换是在客户端完成的,与服务端没什么关系,在实际的工作中,可以解决很多问题,例如:某个集群出了问题,可以马上将流量切换到其它集群;在系统升级的过程中,将带升级集群的流量切换到其它集群,实现了24小时随时升级等。

    1.motan的流量切换是通过command来实现的,每次我们在motan管理器上进行设置的时候,其实是写入信息到注册中心的command节点,而motan又监听了这些command节点,下面是motan的客户端监听command相关的代码

        protected void subscribeCommand(final URL url, final CommandListener commandListener) {
            try {
                clientLock.lock();//对clientLock进行上锁
                ConcurrentHashMap<CommandListener, IZkDataListener> dataChangeListeners = commandListeners.get(url);//数据变更监听器
                if (dataChangeListeners == null) {
                    commandListeners.putIfAbsent(url, new ConcurrentHashMap<CommandListener, IZkDataListener>());
                    dataChangeListeners = commandListeners.get(url);
                }
                IZkDataListener zkDataListener = dataChangeListeners.get(commandListener);
                if (zkDataListener == null) {
                    dataChangeListeners.putIfAbsent(commandListener, new IZkDataListener() {//增加新的listener
                        @Override
                        public void handleDataChange(String dataPath, Object data) throws Exception {
                            commandListener.notifyCommand(url, (String) data);//调用commandListener的notifyCommand方法
                            LoggerUtil.info(String.format("[ZookeeperRegistry] command data change: path=%s, command=%s", dataPath, (String) data));
                        }
    
                        @Override
                        public void handleDataDeleted(String dataPath) throws Exception {
                            commandListener.notifyCommand(url, null);
                            LoggerUtil.info(String.format("[ZookeeperRegistry] command deleted: path=%s", dataPath));
                        }
                    });
                    zkDataListener = dataChangeListeners.get(commandListener);
                }
    
                String commandPath = ZkUtils.toCommandPath(url);
                zkClient.subscribeDataChanges(commandPath, zkDataListener);//向zookeeper注册监听事件
                LoggerUtil.info(String.format("[ZookeeperRegistry] subscribe command: path=%s, info=%s", commandPath, url.toFullStr()));
            } catch (Throwable e) {
                throw new MotanFrameworkException(String.format("Failed to subscribe %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()), e);
            } finally {
                clientLock.unlock();
            }
        }

    2.CommandServiceManager实现了上节中的commandListener

        public void notifyCommand(URL serviceUrl, String commandString) {
            LoggerUtil.info("CommandServiceManager notify command. service:" + serviceUrl.toSimpleString() + ", command:" + commandString);
    
            if (!MotanSwitcherUtil.isOpen(MOTAN_COMMAND_SWITCHER) || commandString == null) {//判断命令开关是否打开
                LoggerUtil.info("command reset empty since swither is close.");
                commandString = "";
            }
    
            List<URL> finalResult = new ArrayList<URL>();
            URL urlCopy = serviceUrl.createCopy();//serviceurl的副本
    
            if (!StringUtils.equals(commandString, commandStringCache)) {
                commandStringCache = commandString;
                commandCache = RpcCommandUtil.stringToCommand(commandStringCache);//将字符串转换为命令
                Map<String, Integer> weights = new HashMap<String, Integer>();
    
                if (commandCache != null) {
                    commandCache.sort();
                    finalResult = discoverServiceWithCommand(refUrl, weights, commandCache);
                } else {
                    // 如果是指令有异常时,应当按没有指令处理,防止错误指令导致服务异常
                    if (StringUtils.isNotBlank(commandString)) {
                        LoggerUtil.warn("command parse fail, ignored! command:" + commandString);
                        commandString = "";
                    }
                    // 没有命令时,只返回这个manager实际group对应的结果
                    finalResult.addAll(discoverOneGroup(refUrl));
    
                }
    
                // 指令变化时,删除不再有效的缓存,取消订阅不再有效的group
                Set<String> groupKeys = groupServiceCache.keySet();
                for (String gk : groupKeys) {
                    if (!weights.containsKey(gk)) {
                        groupServiceCache.remove(gk);
                        URL urlTemp = urlCopy.createCopy();
                        urlTemp.addParameter(URLParamType.group.getName(), gk);
                        registry.unsubscribeService(urlTemp, this);
                    }
                }
            } else {
                LoggerUtil.info("command not change. url:" + serviceUrl.toSimpleString());
                // 指令没有变化,什么也不做
                return;
            }
    
            for (NotifyListener notifyListener : notifySet) {
                notifyListener.notify(registry.getUrl(), finalResult);
            }
    
            // 当指令从有改到无时,会触发取消订阅所有的group,需要重新订阅本组的service
            if ("".equals(commandString)) {
                LoggerUtil.info("reSub service" + refUrl.toSimpleString());
                registry.subscribeService(refUrl, this);
            }
        }
    

    3.discoverServiceWithCommand的相关代码

        public List<URL> discoverServiceWithCommand(URL serviceUrl, Map<String, Integer> weights, RpcCommand rpcCommand, String localIP) {
            if (rpcCommand == null || CollectionUtil.isEmpty(rpcCommand.getClientCommandList())) {
                return discoverOneGroup(serviceUrl);
            }
    
            List<URL> mergedResult = new LinkedList<URL>();
            String path = serviceUrl.getPath();//获取路径
    
            List<RpcCommand.ClientCommand> clientCommandList = rpcCommand.getClientCommandList();
            boolean hit = false;
            for (RpcCommand.ClientCommand command : clientCommandList) {
                mergedResult = new LinkedList<URL>();
                // 判断当前url是否符合过滤条件
                boolean match = RpcCommandUtil.match(command.getPattern(), path);
                if (match) {
                    hit = true;
                    if (!CollectionUtil.isEmpty(command.getMergeGroups())) {
                        // 计算出所有要合并的分组及权重
                        try {
                            buildWeightsMap(weights, command);
                        } catch (MotanFrameworkException e) {
                            LoggerUtil.warn("build weights map fail!" + e.getMessage());
                            continue;
                        }
                        // 根据计算结果,分别发现各个group的service,合并结果
                        mergedResult.addAll(mergeResult(serviceUrl, weights));
                    } else {
                        mergedResult.addAll(discoverOneGroup(serviceUrl));
                    }
    
                    LoggerUtil.info("mergedResult: size-" + mergedResult.size() + " --- " + mergedResult.toString());
    
                    if (!CollectionUtil.isEmpty(command.getRouteRules())) {
                        LoggerUtil.info("router: " + command.getRouteRules().toString());
    
                        for (String routeRule : command.getRouteRules()) {
                            String[] fromTo = routeRule.replaceAll("\s+", "").split("to");
    
                            if (fromTo.length != 2) {
                                routeRuleConfigError();
                                continue;
                            }
                            String from = fromTo[0];
                            String to = fromTo[1];
                            if (from.length() < 1 || to.length() < 1 || !IP_PATTERN.matcher(from).find() || !IP_PATTERN.matcher(to).find()) {
                                routeRuleConfigError();
                                continue;
                            }
                            boolean oppositeFrom = from.startsWith("!");
                            boolean oppositeTo = to.startsWith("!");
                            if (oppositeFrom) {
                                from = from.substring(1);
                            }
                            if (oppositeTo) {
                                to = to.substring(1);
                            }
                            int idx = from.indexOf('*');
                            boolean matchFrom;
                            if (idx != -1) {
                                matchFrom = localIP.startsWith(from.substring(0, idx));
                            } else {
                                matchFrom = localIP.equals(from);
                            }
    
                            // 开头有!,取反
                            if (oppositeFrom) {
                                matchFrom = !matchFrom;
                            }
                            LoggerUtil.info("matchFrom: " + matchFrom + ", localip:" + localIP + ", from:" + from);
                            if (matchFrom) {
                                boolean matchTo;
                                Iterator<URL> iterator = mergedResult.iterator();
                                while (iterator.hasNext()) {
                                    URL url = iterator.next();
                                    if (url.getProtocol().equalsIgnoreCase("rule")) {
                                        continue;
                                    }
                                    idx = to.indexOf('*');
                                    if (idx != -1) {
                                        matchTo = url.getHost().startsWith(to.substring(0, idx));
                                    } else {
                                        matchTo = url.getHost().equals(to);
                                    }
                                    if (oppositeTo) {
                                        matchTo = !matchTo;
                                    }
                                    if (!matchTo) {
                                        iterator.remove();
                                        LoggerUtil.info("router To not match. url remove : " + url.toSimpleString());
                                    }
                                }
                            }
                        }
                    }
                    // 只取第一个匹配的 TODO 考虑是否能满足绝大多数场景需求
                    break;
                }
            }
    
            List<URL> finalResult = new ArrayList<URL>();
            if (!hit) {
                finalResult = discoverOneGroup(serviceUrl);
            } else {
                finalResult.addAll(mergedResult);
            }
            return finalResult;
        }
    

      

  • 相关阅读:
    洛咕11月月赛部分题解 By cellur925
    POJ 2411 Mondriaan's Dream 【状压Dp】 By cellur925
    Luogu P1637 三元上升子序列【权值线段树】By cellur925
    Luogu P1438无聊的序列【线段树/差分】By cellur925
    Luogu P1558 色板游戏【线段树/状态压缩】By cellur925
    Luogu P4403 [BJWC2008]秦腾与教学评估【二分答案】By cellur925
    Luogu P3941 入阵曲【前缀和】By cellur925
    查询事件状态,mysql查看事件是否开启,设置启动时自动开启方法
    Logback详细整理,基于springboot的日志配置
    使用release自动打包发布正式版详细教程
  • 原文地址:https://www.cnblogs.com/mantu/p/5887158.html
Copyright © 2011-2022 走看看