zoukankan      html  css  js  c++  java
  • motan源码分析一:服务发布及注册

    motan是新浪微博开源的服务治理框架,具体介绍请看:http://tech.sina.com.cn/i/2016-05-10/doc-ifxryhhh1869879.shtml.

    本系列的文章将分析它的底层源码,分析的源码版本为:0.1.2。第一篇文章将以服务的发布和注册开始,注册服务使用zookeeper来分析。源码地址:https://github.com/weibocom/motan

    本文涉及到的主要类和接口:MotanApiExportDemo、MotanDemoService、MotanDemoServiceImpl、ServiceConfig、RegistryConfig、ProtocolConfig、DefaultProvider、ZookeeperRegistryFactory、ZookeeperRegistry、SimpleConfigHandler、ProtocolFilterDecorator等。

    1.首先来看demo源码:MotanApiExportDemo 

        demo中先后创建了ServiceConfig、RegistryConfig和ProtocolConfig相关的对象,其中ServiceConfig是我们提供服务的相关配置(每个服务一个配置,例如一个服务接口一个配置,本文中的具体服务是:MotanDemoServiceImpl)、RegistryConfig是注册中心相关的配置信息、ProtocolConfig是应用协议相关的配置(在客户端还负责集群相关的配置)。

            ServiceConfig<MotanDemoService> motanDemoService = new ServiceConfig<MotanDemoService>();
    
            // 设置接口及实现类
            motanDemoService.setInterface(MotanDemoService.class);//设置服务接口,客户端在rpc调用时,会在协议中传递接口名称,从而实现与具体实现类一一对应
            motanDemoService.setRef(new MotanDemoServiceImpl());//设置接口实现类,实际的业务代码
    
            // 配置服务的group以及版本号
            motanDemoService.setGroup("motan-demo-rpc");//服务所属的组
            motanDemoService.setVersion("1.0");
    
            // 配置ZooKeeper注册中心
            RegistryConfig zookeeperRegistry = new RegistryConfig();
            zookeeperRegistry.setRegProtocol("zookeeper");//使用zookeeper作为注册中心
            zookeeperRegistry.setAddress("127.0.0.1:2181");//zookeeper的连接地址
            motanDemoService.setRegistry(zookeeperRegistry);
            
            // 配置RPC协议
            ProtocolConfig protocol = new ProtocolConfig();
            protocol.setId("motan");//使用motan应用协议
            protocol.setName("motan");
            motanDemoService.setProtocol(protocol);
            
            motanDemoService.setExport("motan:8010");//本服务的监控端口号是8010
            motanDemoService.export();//发布及在zookeeper上注册此服务
    

    2.从上面的代码可知ServiceConfig类是服务的发布及注册的核心是motanDemoService.export()方法,我们来看一下此方法的实现细节:

        public synchronized void export()
        {
            if(exported.get())
            {
                LoggerUtil.warn(String.format("%s has already been expoted, so ignore the export request!", new Object[] {
                    interfaceClass.getName()
                }));
                return;
            }
            checkInterfaceAndMethods(interfaceClass, methods);
            List registryUrls = loadRegistryUrls();//加载注册中心的url,支持多个注册中心
            if(registryUrls == null || registryUrls.size() == 0)
                throw new IllegalStateException((new StringBuilder("Should set registry config for service:")).append(interfaceClass.getName()).toString());
            Map protocolPorts = getProtocolAndPort();
            ProtocolConfig protocolConfig;
            Integer port;
            for(Iterator iterator = protocols.iterator(); iterator.hasNext(); doExport(protocolConfig, port.intValue(), registryUrls))//发布服务
            {
                protocolConfig = (ProtocolConfig)iterator.next();
                port = (Integer)protocolPorts.get(protocolConfig.getId());
                if(port == null)
                    throw new MotanServiceException(String.format("Unknow port in service:%s, protocol:%s", new Object[] {
                        interfaceClass.getName(), protocolConfig.getId()
                    }));
            }
    
            afterExport();
        }

     方法中调用了doexport和afterExport方法:

        private void doExport(ProtocolConfig protocolConfig, int port, List registryURLs)
        {
            String protocolName = protocolConfig.getName();//获取协议名称,此处为motan
            if(protocolName == null || protocolName.length() == 0)
                protocolName = URLParamType.protocol.getValue();
            String hostAddress = host;//本机地址
            if(StringUtils.isBlank(hostAddress) && basicServiceConfig != null)
                hostAddress = basicServiceConfig.getHost();
            if(NetUtils.isInvalidLocalHost(hostAddress))
                hostAddress = getLocalHostAddress(registryURLs);
            Map map = new HashMap();
            map.put(URLParamType.nodeType.getName(), "service");
            map.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));
            collectConfigParams(map, new AbstractConfig[] {
                protocolConfig, basicServiceConfig, extConfig, this
            });
            collectMethodConfigParams(map, getMethods());
            URL serviceUrl = new URL(protocolName, hostAddress, port, interfaceClass.getName(), map);//组装serviceUrl信息
            if(serviceExists(serviceUrl))//判断服务之前是否已经加载过
            {
                LoggerUtil.warn(String.format("%s configService is malformed, for same service (%s) already exists ", new Object[] {
                    interfaceClass.getName(), serviceUrl.getIdentity()
                }));
                throw new MotanFrameworkException(String.format("%s configService is malformed, for same service (%s) already exists ", new Object[] {
                    interfaceClass.getName(), serviceUrl.getIdentity()
                }), MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);//抛出同名服务异常
            }
            List urls = new ArrayList();
            if("injvm".equals(protocolConfig.getId()))
            {
                URL localRegistryUrl = null;
                for(Iterator iterator2 = registryURLs.iterator(); iterator2.hasNext();)
                {
                    URL ru = (URL)iterator2.next();
                    if("local".equals(ru.getProtocol()))
                    {
                        localRegistryUrl = ru.createCopy();
                        break;
                    }
                }
    
                if(localRegistryUrl == null)
                    localRegistryUrl = new URL("local", hostAddress, 0, com/weibo/api/motan/registry/RegistryService.getName());
                urls.add(localRegistryUrl);
            } else
            {
                URL ru;
                for(Iterator iterator = registryURLs.iterator(); iterator.hasNext(); urls.add(ru.createCopy()))
                    ru = (URL)iterator.next();
    
            }
            URL u;
            for(Iterator iterator1 = urls.iterator(); iterator1.hasNext(); registereUrls.add(u.createCopy()))
            {
                u = (URL)iterator1.next();
                u.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(serviceUrl.toFullStr()));
            }
    
            ConfigHandler configHandler = (ConfigHandler)ExtensionLoader.getExtensionLoader(com/weibo/api/motan/config/handler/ConfigHandler).getExtension("default");//使用spi机制加载SimpleConfigHandler
            exporters.add(configHandler.export(interfaceClass, ref, urls));//调用SimpleConfigHandler的export方法
            initLocalAppInfo(serviceUrl);
        }
    
        private void afterExport()
        {
            exported.set(true);
            Exporter ep;
            for(Iterator iterator = exporters.iterator(); iterator.hasNext(); existingServices.add(ep.getProvider().getUrl().getIdentity()))
                ep = (Exporter)iterator.next();
    
        }

     再来看一下SimpleConfigHandler的export方法

        public <T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registryUrls) {
    
            String serviceStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
            URL serviceUrl = URL.valueOf(serviceStr);
    
            // export service
            // 利用protocol decorator来增加filter特性
            String protocolName = serviceUrl.getParameter(URLParamType.protocol.getName(), URLParamType.protocol.getValue());
            Protocol protocol = new ProtocolFilterDecorator(ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(protocolName));//对于Protoclo对象增强filter
            Provider<T> provider = new DefaultProvider<T>(ref, serviceUrl, interfaceClass);服务的代理提供者,包装ref的服务
            Exporter<T> exporter = protocol.export(provider, serviceUrl);//发布服务,将代理对象provider与具体的serviceUrl关联
    
            // register service
            register(registryUrls, serviceUrl);
    
            return exporter;
        }
    

    3.下面我们来看一下,motan如何对filter进行相应的增强处理

    public class ProtocolFilterDecorator implements Protocol { //实现Protocol的接口,联系到上文中使用此类对实际的Protocol进行包装
    
        private Protocol protocol;
    
        public ProtocolFilterDecorator(Protocol protocol) {
            if (protocol == null) {
                throw new MotanFrameworkException("Protocol is null when construct ProtocolFilterDecorator",
                        MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
            }
            this.protocol = protocol;//给实际的Protocol进行赋值
        }
    
        @Override
        public <T> Exporter<T> export(Provider<T> provider, URL url) {
            return protocol.export(decorateWithFilter(provider, url), url);发布服务时,调用filter增强处理方法
        }
    
        private <T> Provider<T> decorateWithFilter(Provider<T> provider, URL url) {
            List<Filter> filters = getFilters(url, MotanConstants.NODE_TYPE_SERVICE);//获取实际需要增强的filter
            if (filters == null || filters.size() == 0) {
                return provider;
            }
            Provider<T> lastProvider = provider;
            for (Filter filter : filters) {//对于代理对象provider进行包装,包装成一个provider链,返回最后一个provider
                final Filter f = filter;
                final Provider<T> lp = lastProvider;
                lastProvider = new Provider<T>() {
                    @Override
                    public Response call(Request request) {
                        return f.filter(lp, request);//对于后面调用的call方法时,首先调用最外层的filter,最后再调用实际的provider的call方法
                    }
    
                    @Override
                    public String desc() {
                        return lp.desc();
                    }
    
                    @Override
                    public void destroy() {
                        lp.destroy();
                    }
    
                    @Override
                    public Class<T> getInterface() {
                        return lp.getInterface();
                    }
    
                    @Override
                    public URL getUrl() {
                        return lp.getUrl();
                    }
    
                    @Override
                    public void init() {
                        lp.init();
                    }
    
                    @Override
                    public boolean isAvailable() {
                        return lp.isAvailable();
                    }
                };
            }
            return lastProvider;
        }
    
        private List<Filter> getFilters(URL url, String key) {
    
            // load default filters
            List<Filter> filters = new ArrayList<Filter>();
            List<Filter> defaultFilters = ExtensionLoader.getExtensionLoader(Filter.class).getExtensions(key);//使用spi机制初始化filer对象
            if (defaultFilters != null && defaultFilters.size() > 0) {
                filters.addAll(defaultFilters);
            }
    
            // add filters via "filter" config
            String filterStr = url.getParameter(URLParamType.filter.getName());
            if (StringUtils.isNotBlank(filterStr)) {
                String[] filterNames = MotanConstants.COMMA_SPLIT_PATTERN.split(filterStr);
                for (String fn : filterNames) {
                    addIfAbsent(filters, fn);
                }
            }
    
            // add filter via other configs, like accessLog and so on
            boolean accessLog = url.getBooleanParameter(URLParamType.accessLog.getName(), URLParamType.accessLog.getBooleanValue());
            if (accessLog) {
                addIfAbsent(filters, AccessLogFilter.class.getAnnotation(SpiMeta.class).name());
            }
    
            // sort the filters
            Collections.sort(filters, new ActivationComparator<Filter>());
            Collections.reverse(filters);
            return filters;
        }
    }
    

    4.服务发布完成后,需要像注册中心注册此服务

        private void register(List<URL> registryUrls, URL serviceUrl) {
    
            for (URL url : registryUrls) {//循环便利多个注册中心的信息
                // 根据check参数的设置,register失败可能会抛异常,上层应该知晓
                RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(url.getProtocol());//文中使用的是zookeeper
                if (registryFactory == null) {
                    throw new MotanFrameworkException(new MotanErrorMsg(500, MotanErrorMsgConstant.FRAMEWORK_REGISTER_ERROR_CODE,
                            "register error! Could not find extension for registry protocol:" + url.getProtocol()
                                    + ", make sure registry module for " + url.getProtocol() + " is in classpath!"));
                }
                Registry registry = registryFactory.getRegistry(url);//获取registry
                registry.register(serviceUrl);//将服务注册到zookeeper,也就是把节点信息写入到zookeeper中
            }
        }
    

    我们来看一下zookeeper注册中心的工厂类:每个Registry都需要独立维护一个ZkClient与zookeeper的链接

    @SpiMeta(name = "zookeeper")
    public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
    
        @Override
        protected Registry createRegistry(URL registryUrl) {
            try {
                int timeout = registryUrl.getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue());
                int sessionTimeout =
                        registryUrl.getIntParameter(URLParamType.registrySessionTimeout.getName(),
                                URLParamType.registrySessionTimeout.getIntValue());
                ZkClient zkClient = new ZkClient(registryUrl.getParameter("address"), sessionTimeout, timeout);//创建zookeeper的客户端
                return new ZookeeperRegistry(registryUrl, zkClient);//创建实际的Registry
            } catch (ZkException e) {
                LoggerUtil.error("[ZookeeperRegistry] fail to connect zookeeper, cause: " + e.getMessage());
                throw e;
            }
        }
    }
    

    我们再来分析ZookeeperRegistry中的代码

        public ZookeeperRegistry(URL url, ZkClient client) {
            super(url);
            this.zkClient = client;
            IZkStateListener zkStateListener = new IZkStateListener() {
                @Override
                public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
                    // do nothing
                }
    
                @Override
                public void handleNewSession() throws Exception {//响应zkClient的事件
                    LoggerUtil.info("zkRegistry get new session notify.");
                    reconnectService();//重新注册服务
                    reconnectClient();
                }
            };
            zkClient.subscribeStateChanges(zkStateListener);
        }
        private void reconnectService() {
            Collection<URL> allRegisteredServices = getRegisteredServiceUrls();
            if (allRegisteredServices != null && !allRegisteredServices.isEmpty()) {
                try {
                    serverLock.lock();
                    for (URL url : getRegisteredServiceUrls()) {
                        doRegister(url);//注册
                    }
                    LoggerUtil.info("[{}] reconnect: register services {}", registryClassName, allRegisteredServices);
    
                    for (URL url : availableServices) {
                        if (!getRegisteredServiceUrls().contains(url)) {
                            LoggerUtil.warn("reconnect url not register. url:{}", url);
                            continue;
                        }
                        doAvailable(url);//标识服务可以提供服务
                    }
                    LoggerUtil.info("[{}] reconnect: available services {}", registryClassName, availableServices);
                } finally {
                    serverLock.unlock();
                }
            }
        }
        protected void doRegister(URL url) {
            try {
                serverLock.lock();
                // 防止旧节点未正常注销
                removeNode(url, ZkNodeType.AVAILABLE_SERVER);
                removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
                createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
            } catch (Throwable e) {
                throw new MotanFrameworkException(String.format("Failed to register %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()), e);
            } finally {
                serverLock.unlock();
            }
        }
        protected void doAvailable(URL url) {
            try{
                serverLock.lock();
                if (url == null) {
                    availableServices.addAll(getRegisteredServiceUrls());
                    for (URL u : getRegisteredServiceUrls()) {
                        removeNode(u, ZkNodeType.AVAILABLE_SERVER);
                        removeNode(u, ZkNodeType.UNAVAILABLE_SERVER);
                        createNode(u, ZkNodeType.AVAILABLE_SERVER);
                    }
                } else {
                    availableServices.add(url);
                    removeNode(url, ZkNodeType.AVAILABLE_SERVER);
                    removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
                    createNode(url, ZkNodeType.AVAILABLE_SERVER);
                }
            } finally {
                serverLock.unlock();
            }
        }
        private void createNode(URL url, ZkNodeType nodeType) {
            String nodeTypePath = ZkUtils.toNodeTypePath(url, nodeType);
            if (!zkClient.exists(nodeTypePath)) {
                zkClient.createPersistent(nodeTypePath, true);//对于服务的标识信息,创建持久化节点
            }
            zkClient.createEphemeral(ZkUtils.toNodePath(url, nodeType), url.toFullStr());//对于服务的ip和端口号信息使用临时节点,当服务断了后,zookeeper自动摘除目标服务器
        }
    
    

          本文分析了motan的服务发布及注册到zookeeper的流程相关的源码,主要涉及到的知识点:

    1.利用相关的配置对象进行信息的存储及传递;

    2.利用provider对具体的业务类进行封装代理;

    3.利用filter链的结构,来包装实际的provider,把所有的过滤器都处理完毕后,最后调用实际的业务类,大家可以想象一下aop相关的原理,有些类似;

    4.代码中大量使用jdk的标准spi技术进行类的加载;

    5.支持多个注册中心,也就是同一个服务可以注册到不同的注册中心上,每个registry对应一个具体的zkclient;

    6.利用了zookeeper的临时节点来维护服务器的host和port信息;

    7.支持多个服务发布到同一个端口,在本文中并没分析netty使用相关的代码,后面会分析到。

      

      

  • 相关阅读:
    基于Furion的.NET5 WebApi开发框架
    由react的todolist想到的
    react第三节-基础概念梳理
    uniapp引入字体图标与uniapp传入事件对象与自定义参数
    (react+tsx)函数式组件传参问题
    关于git正确clone指定分支
    关于‘react-app-rewried 不是内部或外部命令’的深度解析
    flex下的多行对齐与预处理器中使用nth-child选择器
    webpack(2)--webapck自身的配置
    webpack解析(1)
  • 原文地址:https://www.cnblogs.com/mantu/p/5872793.html
Copyright © 2011-2022 走看看