zoukankan      html  css  js  c++  java
  • dubbo框架揭秘之服务引用

            ApplicationConfig config = new ApplicationConfig("hello-worldp");
    
            RegistryConfig reg = new RegistryConfig("127.0.0.1:2181");
            reg.setProtocol("zookeeper");
            
            ReferenceConfig<DemoService> refrence=new ReferenceConfig<DemoService>();
            refrence.setApplication(config);
            refrence.setRegistry(reg);
            refrence.setInterface(DemoService.class);
            refrence.setVersion("1.0");
            
            DemoService demo=refrence.get();
            System.out.println(demo.sayHello("selrain"));
            
            Thread.sleep(Integer.MAX_VALUE);

    和服务发布一样,不采用Spring配置的方式,手动编写消费者的代码。其中ReferenceConfig的get()方法获取代理类。

    public synchronized T get() {
            if (ref == null) {
                init();
            }
            return ref;
        }
        
    private void init() {
            ...        
            if (isJvmRefer) {
                URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
                invoker = refprotocol.refer(interfaceClass, url);
                if (logger.isInfoEnabled()) {
                    logger.info("Using injvm service " + interfaceClass.getName());
                }
            } else {
                if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
                    String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                    if (us != null && us.length > 0) {
                        for (String u : us) {
                            URL url = URL.valueOf(u);
                            if (url.getPath() == null || url.getPath().length() == 0) {
                                url = url.setPath(interfaceName);
                            }
                            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                                urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                            } else {
                                urls.add(ClusterUtils.mergeUrl(url, map));
                            }
                        }
                    }
                } else { // 通过注册中心配置拼装URL
                    List<URL> us = loadRegistries(false);
                    if (us != null && us.size() > 0) {
                        for (URL u : us) {
                            URL monitorUrl = loadMonitor(u);
                            if (monitorUrl != null) {
                                map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls == null || urls.size() == 0) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address="..." /> to your spring config.");
                    }
                }
    
                if (urls.size() == 1) {
                    //注册中心注册消费者地址
                    invoker = refprotocol.refer(interfaceClass, urls.get(0));
                } else {
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    for (URL url : urls) {
                        invokers.add(refprotocol.refer(interfaceClass, url));
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            registryURL = url; // 用了最后一个registry url
                        }
                    }
                    if (registryURL != null) { // 有 注册中心协议的URL
                        // 对有注册中心的Cluster 只用 AvailableCluster
                        URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
                        invoker = cluster.join(new StaticDirectory(u, invokers));
                    }  else { // 不是 注册中心的URL
                        invoker = cluster.join(new StaticDirectory(invokers));
                    }
                }
            }
            ref = createProxy(map);
        }    
    private T createProxy(Map<String, String> map) {
            ...
            // 创建服务代理
            return (T) proxyFactory.getProxy(invoker);
        }    

    proxyFactory通过ExtensionLoader的getAdaptiveExtension方法初始化,为StubProxyFactoryWrapper

    private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

    看看StubProxyFactoryWrapper的getProxy方法,其调用了内部的proxyFactory的getProxy方法,而StubProxyFactoryWrapper在初始化的时候proxyFactory也一并初始化了,其值为JavassistProxyFactory。

     public <T> T getProxy(Invoker<T> invoker) throws RpcException {
            T proxy = proxyFactory.getProxy(invoker);
            ...
            return proxy;
        }

    看看JavassistProxyFactory的getProxy方法:

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
        }

    Proxy: 创建代理类

    public static Proxy getProxy(ClassLoader cl, Class<?>... ics)
        {
            if( ics.length > 65535 )
                throw new IllegalArgumentException("interface limit exceeded");
            
            StringBuilder sb = new StringBuilder();
            for(int i=0;i<ics.length;i++)
            {
                String itf = ics[i].getName();
                if( !ics[i].isInterface() )
                    throw new RuntimeException(itf + " is not a interface.");
    
                Class<?> tmp = null;
                try
                {
                    tmp = Class.forName(itf, false, cl);
                }
                catch(ClassNotFoundException e)
                {}
    
                if( tmp != ics[i] )
                    throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
    
                sb.append(itf).append(';');
            }
    
            // use interface class name list as key.
            String key = sb.toString();
    
            // get cache by class loader.
            Map<String, Object> cache;
            synchronized( ProxyCacheMap )
            {
                cache = ProxyCacheMap.get(cl);
                if( cache == null )
                {
                    cache = new HashMap<String, Object>();
                    ProxyCacheMap.put(cl, cache);
                }
            }
    
            Proxy proxy = null;
            synchronized( cache )
            {
                do
                {
                    Object value = cache.get(key);
                    if( value instanceof Reference<?> )
                    {
                        proxy = (Proxy)((Reference<?>)value).get();
                        if( proxy != null )
                            return proxy;
                    }
    
                    if( value == PendingGenerationMarker )
                    {
                        try{ cache.wait(); }catch(InterruptedException e){}
                    }
                    else
                    {
                        cache.put(key, PendingGenerationMarker);
                        break;
                    }
                }
                while( true );
            }
    
            long id = PROXY_CLASS_COUNTER.getAndIncrement();
            String pkg = null;
            ClassGenerator ccp = null, ccm = null;
            try
            {
                ccp = ClassGenerator.newInstance(cl);
    
                Set<String> worked = new HashSet<String>();
                List<Method> methods = new ArrayList<Method>();
    
                for(int i=0;i<ics.length;i++)
                {
                    if( !Modifier.isPublic(ics[i].getModifiers()) )
                    {
                        String npkg = ics[i].getPackage().getName();
                        if( pkg == null )
                        {
                            pkg = npkg;
                        }
                        else
                        {
                            if( !pkg.equals(npkg)  )
                                throw new IllegalArgumentException("non-public interfaces from different packages");
                        }
                    }
                    ccp.addInterface(ics[i]);
    
                    for( Method method : ics[i].getMethods() )
                    {
                        String desc = ReflectUtils.getDesc(method);
                        if( worked.contains(desc) )
                            continue;
                        worked.add(desc);
    
                        int ix = methods.size();
                        Class<?> rt = method.getReturnType();
                        Class<?>[] pts = method.getParameterTypes();
                        //为代理类实例添加方法,当我们调用sayHello()方法,执行这里为sayHello方法生成的代码,即:handler.invoke()
                        StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                        for(int j=0;j<pts.length;j++)
                            code.append(" args[").append(j).append("] = ($w)$").append(j+1).append(";");
                        code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
                        if( !Void.TYPE.equals(rt) )
                            code.append(" return ").append(asArgument(rt, "ret")).append(";");
    
                        methods.add(method);        
                        ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
                    }
                }
    
                if( pkg == null )
                    pkg = PACKAGE_NAME;
    
                // create ProxyInstance class. 
                String pcn = pkg + ".proxy" + id;
                ccp.setClassName(pcn);
                ccp.addField("public static java.lang.reflect.Method[] methods;");
                ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
                ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{ InvocationHandler.class }, new Class<?>[0], "handler=$1;");
                ccp.addDefaultConstructor();
                Class<?> clazz = ccp.toClass();
                clazz.getField("methods").set(null, methods.toArray(new Method[0]));
    
                // create Proxy class. 
                String fcn = Proxy.class.getName() + id;
                ccm = ClassGenerator.newInstance(cl);
                ccm.setClassName(fcn);
                ccm.addDefaultConstructor();
                ccm.setSuperClass(Proxy.class);            
                ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
                Class<?> pc = ccm.toClass();
                proxy = (Proxy)pc.newInstance();
            }
            catch(RuntimeException e)
            {
                throw e;
            }
            catch(Exception e)
            {
                throw new RuntimeException(e.getMessage(), e);
            }
            ...
            return proxy;
        }

    最终调用DubboInvoker 的doInvoke(Invocation)方法进行与服务器间通信

    protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
            
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture(null);
                    return new RpcResult();
                } else if (isAsync) {
                    ResponseFuture future = currentClient.request(inv, timeout) ;
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    return new RpcResult();
                } else {
                    RpcContext.getContext().setFuture(null);
                    return (Result) currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }

     参考:

         你应该知道的RPC原理

  • 相关阅读:
    Valid Number
    ZigZag Conversion
    KMP
    [OJ#40]后宫佳丽
    [OJ#39]左手右手
    [COJ0968]WZJ的数据结构(负三十二)
    [COJ0970]WZJ的数据结构(负三十)
    [BZOJ2815][ZJOI2012]灾难
    [BZOJ1923][Sdoi2010]外星千足虫
    [BZOJ4034][HAOI2015]树上操作
  • 原文地址:https://www.cnblogs.com/Non-Tecnology/p/6288197.html
Copyright © 2011-2022 走看看