zoukankan      html  css  js  c++  java
  • dubbo provider如何对invoker进行export

    如何把provider的invoker export出去:1)为原始对象加wrapper,生成invoker;2)给invoker加各种filter,启动监听服务;3)注册服务地址

    以HelloService为例:

    public interface HelloService {
        String sayHello();
    }
    <dubbo:service interface="com.zhang.HelloService" ref="helloService" />

    ServiceConfig和<dubbo:service>基本对应上了。

    public interface ApplicationListener {
        public void onApplicationEvent(ApplicationEvent event);
    }
    
    public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, ApplicationContextAware, ApplicationListener, BeanNameAware {
        public void onApplicationEvent(ApplicationEvent event) {
            if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
                if (isDelay()) {
                    if (logger.isInfoEnabled()) {
                        logger.info("The service ready on spring started. service: " + getInterface());
                    }
                    export();
                }
            }
        }
    }
    
    public class ServiceConfig<T> extends AbstractServiceConfig {
        private ProviderConfig provider;
        // 是否暴露
        protected Boolean             export;
        
        private String              interfaceName;
        private Class<?>            interfaceClass;
        //即<dubbo:service>的ref对象
        private T                   ref;
        
        public synchronized void export() {
            if (provider != null) {
                if (export == null) {
                    export = provider.getExport();
                }
                if (delay == null) {
                    delay = provider.getDelay();
                }
            }
            if (export != null && ! export.booleanValue()) {
                return;
            }
            if (delay != null && delay > 0) {
                Thread thread = new Thread(new Runnable() {
                    public void run() {
                        try {
                            Thread.sleep(delay);
                        } catch (Throwable e) {
                        }
                        doExport();
                    }
                });
                thread.setDaemon(true);
                thread.setName("DelayExportServiceThread");
                thread.start();
            } else {
                //关于export和delay的配置好像不怎么用,基本走else分支
                doExport();
            }
        }
        
        protected synchronized void doExport() {
            if (unexported) {
                throw new IllegalStateException("Already unexported!");
            }
            if (exported) {
                return;
            }
            if (interfaceName == null || interfaceName.length() == 0) {
                throw new IllegalStateException("<dubbo:service interface="" /> interface not allow null!");
            }
            checkDefault();
            if (provider != null) {
                if (application == null) {
                    application = provider.getApplication();
                }
                if (registries == null) {
                    registries = provider.getRegistries();
                }
                if (monitor == null) {
                    monitor = provider.getMonitor();
                }
                if (protocols == null) {
                    protocols = provider.getProtocols();
                }
            }
            if (application != null) {
                if (registries == null) {
                    registries = application.getRegistries();
                }
                if (monitor == null) {
                    monitor = application.getMonitor();
                }
            }
            if (ref instanceof GenericService) {
                interfaceClass = GenericService.class;
                generic = true;
            } else {
                try {
                    interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                            .getContextClassLoader());
                } catch (ClassNotFoundException e) {
                    throw new IllegalStateException(e.getMessage(), e);
                }
                checkInterfaceAndMethods(interfaceClass, methods);
                checkRef();
                generic = false;
            }
            if(local !=null){
                if(local=="true"){
                    local=interfaceName+"Local";
                }
                Class<?> localClass;
                try {
                    localClass = Class.forName(local);
                } catch (ClassNotFoundException e) {
                    throw new IllegalStateException(e.getMessage(), e);
                }
                if(!interfaceClass.isAssignableFrom(localClass)){
                    throw new IllegalStateException("The local implemention class " + localClass.getName() + " not implement interface " + interfaceName);
                }
            }
            if(stub !=null){
                if(stub=="true"){
                    stub=interfaceName+"Stub";
                }
                Class<?> stubClass;
                try {
                    stubClass = Class.forName(stub);
                } catch (ClassNotFoundException e) {
                    throw new IllegalStateException(e.getMessage(), e);
                }
                if(!interfaceClass.isAssignableFrom(stubClass)){
                    throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + interfaceName);
                }
            }
            checkApplication();
            checkRegistry();
            checkProtocol();
            checkStubAndMock(interfaceClass);
            if (path == null || path.length() == 0) {
                path = interfaceName;
            }
            doExportUrls();
            exported = true;
        }
    }

    一、生成invoker

    ServiceConfig.doExportUrlsFor1Protocol 代码片段:

    // ref就是原始对象
    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass,
                                registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
    // protocol是Protocol$Adpative
    Exporter<?> exporter = protocol.export(invoker);
    exporters.add(exporter);

    以javassist为例,默认从JavassistProxyFactory获取invoker,其实是生成wrapper。

    // JavassistProxyFactory
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper类不能正确处理带$的类名。这里的 T proxy 是原始对象
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

    com.alibaba.dubbo.common.bytecode.Wrapper.makeWrapper(Class<?> c) 方法中生成wrapper类,代码格式化后如下:

    public class com.alibaba.dubbo.common.bytecode.Wrapper1 extends Wrapper {
        public static String[] pns = null;
        public static Map pts = null;
        public static String[] mns = { "sayHello" };
        public static String[] dmns = { "sayHello" };
        public static Class[] mts0 = null;
    
        public String[] getPropertyNames() {
            return pns;
        }
    
        public boolean hasProperty(String n) {
            return pts.containsKey($1);
        }
    
        public Class getPropertyType(String n) {
            return (Class) pts.get($1);
        }
    
        public String[] getMethodNames() {
            return mns;
        }
    
        public String[] getDeclaredMethodNames() {
            return dmns;
        }
    
        public void setPropertyValue(Object o, String n, Object v) {
            com.zhang.HelloServiceImpl w;
            try {
                w = ((com.zhang.HelloServiceImpl) $1);
            } catch (Throwable e) {
                throw new IllegalArgumentException(e);
            }
            throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException(
                    "Not found property ""
                            + $2
                            + "" filed or setter method in class com.zhang.HelloServiceImpl.");
        }
    
        public Object getPropertyValue(Object o, String n) {
            com.zhang.HelloServiceImpl w;
            try {
                w = ((com.zhang.HelloServiceImpl) $1);
            } catch (Throwable e) {
                throw new IllegalArgumentException(e);
            }
            throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException(
                    "Not found property ""
                            + $2
                            + "" filed or setter method in class com.zhang.HelloServiceImpl.");
        }
    
        public Object invokeMethod(Object o, String n, Class[] p, Object[] v)
                throws java.lang.reflect.InvocationTargetException {
            com.zhang.HelloServiceImpl w;
            try {
                w = ((com.zhang.HelloServiceImpl) $1);
            } catch (Throwable e) {
                throw new IllegalArgumentException(e);
            }
            try {
                if ("sayHello".equals($2) && $3.length == 0) {
                    return ($w) w.sayHello();
                }
            } catch (Throwable e) {
                throw new java.lang.reflect.InvocationTargetException(e);
            }
            throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException(
                    "Not found method "" + $2
                            + "" in class com.zhang.HelloServiceImpl.");
        }
    }

    二、开始export。贴出Protocol$Adpative的代码,重点关注export方法:

    package com.alibaba.dubbo.rpc;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
        public void destroy() {
            throw new UnsupportedOperationException(
                    "method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public int getDefaultPort() {
            throw new UnsupportedOperationException(
                    "method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0,
                com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
            if (arg1 == null)
                throw new IllegalArgumentException("url == null");
            com.alibaba.dubbo.common.URL url = arg1;
            String extName = (url.getProtocol() == null ? "dubbo" : url
                    .getProtocol());
            if (extName == null)
                throw new IllegalStateException(
                        "Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url("
                                + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader
                    .getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
                    .getExtension(extName);
            return extension.refer(arg0, arg1);
        }
    
        public com.alibaba.dubbo.rpc.Exporter export(
                com.alibaba.dubbo.rpc.Invoker arg0)
                throws com.alibaba.dubbo.rpc.Invoker {
            if (arg0 == null)
                throw new IllegalArgumentException(
                        "com.alibaba.dubbo.rpc.Invoker argument == null");
            if (arg0.getUrl() == null)
                throw new IllegalArgumentException(
                        "com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
            com.alibaba.dubbo.common.URL url = arg0.getUrl();
            // 根据invoker的url,获取扩展名
            String extName = (url.getProtocol() == null ? "dubbo" : url
                    .getProtocol());
            if (extName == null)
                throw new IllegalStateException(
                        "Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url("
                                + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader
                    .getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
                    .getExtension(extName);
            return extension.export(arg0);
        }
    }
    
    package com.alibaba.dubbo.rpc.cluster;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {
        public com.alibaba.dubbo.rpc.Invoker join(
                com.alibaba.dubbo.rpc.cluster.Directory arg0)
                throws com.alibaba.dubbo.rpc.cluster.Directory {
            if (arg0 == null)
                throw new IllegalArgumentException(
                        "com.alibaba.dubbo.rpc.cluster.Directory argument == null");
            if (arg0.getUrl() == null)
                throw new IllegalArgumentException(
                        "com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
            com.alibaba.dubbo.common.URL url = arg0.getUrl();
            String extName = url.getParameter("cluster", "failover");
            if (extName == null)
                throw new IllegalStateException(
                        "Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url("
                                + url.toString() + ") use keys([cluster])");
            com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader
                    .getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class)
                    .getExtension(extName);
            // extension是MockClusterWrapper
            return extension.join(arg0);
        }
    }

    从调用栈中能看出,Protocol$Adpative.export调用了两次,在Protocol$Adpative的export函数内部,
    第一次获取的protocol是 ProtocolListenerWrapper/ProtocolFilterWrapper/RegistryProtocol,
    第二次获取的protocol是 ProtocolListenerWrapper/ProtocolFilterWrapper/DubboProtocol。

    第一次:ServiceConfig.doExportUrlsFor1Protocol代码片段:

    if (registryURLs != null && registryURLs.size() > 0
            && url.getParameter("register", true)) {
        for (URL registryURL : registryURLs) {
            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
            URL monitorUrl = loadMonitor(registryURL);
            if (monitorUrl != null) {
                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
            }
            if (logger.isInfoEnabled()) {
                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
            }
            // 获取invoker
            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
            // protocol是Protocol$Adpative
            // invoker的url:registry://127.0.0.1:2181/xxxxx
            Exporter<?> exporter = protocol.export(invoker);
            exporters.add(exporter);
        }
    } else {
        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
    
        Exporter<?> exporter = protocol.export(invoker);
        exporters.add(exporter);
    }

    第二次 RegistryProtocol:

    // RegistryProtocol
    private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = 
    new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); //protocol是com.alibaba.dubbo.rpc.Protocol$Adpative // invoker的url:dubbo://192.168.233.6:20880 exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return (ExporterChangeableWrapper<T>) exporter; }

    暴露服务主要的步骤可能在这儿了:

    // RegistryProtocol
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker 开启服务
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider 注册服务
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保证每次export都返回一个新的exporter实例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }
            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }

    在DubboProtocol中export,主要是启动服务:

    // DubboProtocol
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        
        // export service.
        String key = serviceKey(url);
        //DubboExporter封装了invoker,invoker对象是RegistryProtocol$InvokerDelegete
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        //把exporter放入map中
        //key: a/com.zhang.HelloService:20880
        //value: DubboExporter
        exporterMap.put(key, exporter);
        
        //export an stub service for dispaching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice){
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
                if (logger.isWarnEnabled()){
                    logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
    
        openServer(url);
        
        return exporter;
    }
    
    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client 也可以暴露一个只有server可以调用的服务。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                //server支持reset,配合override功能使用
                server.reset(url);
            }
        }
    }
    private ExchangeServer createServer(URL url) {
        //默认开启server关闭时发送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //默认开启heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    
        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        ExchangeServer server;
        try {
            //requestHandler是DubboProtocol的一个内部类(ExchangeHandlerAdapter)
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

    绑定requestHandler

    // Exchangers
    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }
    
    // HeaderExchanger
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
  • 相关阅读:
    Entity Framework Tips: IN关键字的支持
    (转载)用IT网络和安全专业人士视角来裁剪云的定义
    解决数据库查询中的锁冲突
    2010年计划
    MergeOption 枚举实测
    习惯的力量
    Entity Framewok中获取实体对象的部分属性
    JQuery下拉框联动本地数据
    Json学习整理
    Hadoop:mapreduce的splitsize和blocksize
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8418091.html
Copyright © 2011-2022 走看看