zoukankan      html  css  js  c++  java
  • Dubbo 浅读

    最近公司一直在用阿里的开源框架Dubbo,正好上一篇文章也是讲到了RPC的概念,Dubbo听过的兄弟都知道在业界好评还是很高的,不光是设计优雅,文档也很齐全,这次就简单的分享下LZ的解读成果,当然本文章只是浅层次的,着重分析的是Dubbo核心层如何去高效的执行调用远程RPC服务的。

    这里要简单跟兄弟们区分下概念,最常见最具代表性也是比较简单的HTTP协议(短连接)与Socket编程(长连接)的区别,这里不再多讲前者,这次主要最涉及到后者。

    知识点储备前提:

     JAVA 动态代理(网上很多,这里推荐比较全面的文章:http://www.cnblogs.com/flyoung2008/archive/2013/08/11/3251148.html)

    JAVA NIO(http://www.cnblogs.com/flyoung2008/p/3251826.html)

    这里LZ依然推荐的是带着问题去学习,依旧是Scrum三要素:服务端怎么暴露服务,怎么引用服务,两者怎么实现通讯。

    1:服务端怎么暴露服务

    ServiceConfig类里export方法有一段

            if (delay != null && delay > 0) {
                Thread thread = new Thread(new Runnable() {
                    public void run() {
                        try {
                            Thread.sleep(delay);
                        } catch (Throwable e) {
                        }
                        doExport();
                    }
                });
                thread.setDaemon(true);
                thread.setName("DelayExportServiceThread");
                thread.start();
            } else {
                doExport();
            }

    看上面代码,如果没有设置延迟暴露服务,直接运行doExport();

    一路跟踪代码,校验HOST,类型转换等跳过,最终看到下面代码。

    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
    
    Exporter<?> exporter = protocol.export(invoker);
    exporters.add(exporter);

    光看着几行依旧不太明了,首先关注Invoker类,最终又会看到URL类。

    关注下URL类会看到一些连接需要的字段信息,大概猜测即为连接所用的信息实体类。

    继续往下看proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            // TODO Wrapper类不能正确处理带$的类名
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName, 
                                          Class<?>[] parameterTypes, 
                                          Object[] arguments) throws Throwable {
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }

    可以看到此方法根据参数,返回一个Invoker实体。再看protocol.export(invoker);

    往下追会看到如下:

         try {
                server = Exchangers.bind(url, requestHandler);
            } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
            }

    再往下追,看到

    return getTransporter().bind(url, handler);

    发现Transporter有三个实现类:GrizzlyTransporter,MinaTransporter,NettyTransporter 。

    看到这些名字也许脑海里会有点概念,哦~这里就开始用Grizzly,Mina,Netty这里开源框架去实现通讯啦~

    但是还是不是很彻底,没关系,接着第二个问题

    2:怎么引用服务?

    ReferenceConfig类init();方法里有段

    ref = createProxy(map);我们从这可以看得出有Proxy字样,这就可以确定我们的方向没有走错,继续往下看。

    我们又可以看到下面代码:

    if (urls.size() == 1) {
        invoker = refprotocol.refer(interfaceClass, urls.get(0));
    } else {
        List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
        URL registryURL = null;
        for (URL url : urls) {
            invokers.add(refprotocol.refer(interfaceClass, url));
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                registryURL = url; // 用了最后一个registry url
            }
        }
        if (registryURL != null) { // 有 注册中心协议的URL
            // 对有注册中心的Cluster 只用 AvailableCluster
            URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
            invoker = cluster.join(new StaticDirectory(u, invokers));
        }  else { // 不是 注册中心的URL
            invoker = cluster.join(new StaticDirectory(invokers));
        }
    }

    这里的代码回顾之前的暴露服务时的Invoker有没有熟悉?是的,这里就是根据URL来得到Invoker实体,

    最后有个 getProxy(invoker, interfaces);

        @SuppressWarnings("unchecked")
        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
        }

    看到这里就看熟悉了啦~再看InvokerInvocationHandler类,实现InvocationHandler接口,

    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }

    其实最终还是认出了它的本质。

    那么问题来了......

    3:两者怎么实现通讯

    回到刚刚暴露服务那里,Transporter有三个实现类GrizzlyTransporter,MinaTransporter,NettyTransporter 。

    我们以NettyTransporter为例:

        public Server bind(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyServer(url, listener);
        }
    
        public Client connect(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyClient(url, listener);
        }

    我们跟踪NettyServer看到doOpen()里:

    channel = bootstrap.bind(getBindAddress());

    这个channel也就是Netty里的channel,终于交给Netty啦。

    我们再跟踪NettyClient的时候貌似不是简单的连接,往后跟踪发现

    cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));

    这一段就可以看到利用ExecutorService去分发连接。

    那么可能看到以上分析还是有点晕乎乎,没有连续感有木有?

    是的,其实光分析以上的也不会有太直观的感受,LZ根据这些原理写了个最直观最简单的Demo:

     这是简化过的核心代理层:

    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     * Created by luyichen718 on 14-12-31.
     */
    public class RpcDemo {
    
        /**
         * 暴露服务
         *
         * @param service 服务实现
         * @param port    服务端口
         * @throws Exception
         */
        public static void export(final Object service, int port) throws Exception {
            try {
                ServerSocket server = new ServerSocket(port);
                //该处采用不间断阻塞式监听端口
                //TODO:此处可以用NIO来优化实现
                while (true){
                    final Socket socket = server.accept();
                    try {
                        try {
                            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                            try {
                                //也可以加服务注册方式管理服务
                                String methodName = input.readUTF();
                                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                                Object[] arguments = (Object[]) input.readObject();
                                ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                                //TODO:前后可以做监控
                                try {
                                    Method method = service.getClass().getMethod(methodName, parameterTypes);
                                    Object result = method.invoke(service, arguments);
                                    output.writeObject(result);
                                } catch (Throwable t) {
                                    output.writeObject(t);
                                } finally {
                                    output.close();
                                }
                            } finally {
                                input.close();
                            }
                        } finally {
                            socket.close();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
        /**
         * 引用服务
         *
         * @param <T>            接口泛型
         * @param interfaceClass 接口类型
         * @param host           服务器主机名
         * @param port           服务器端口
         * @return 远程服务
         * @throws Exception
         */
        @SuppressWarnings("unchecked")
        public static <T> T reference(final Class<T> interfaceClass, final String host, final int port) throws Exception {
            return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
                public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
                    Socket socket = new Socket(host, port);
                    try {
                        ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                        try {
                            output.writeUTF(method.getName());
                            output.writeObject(method.getParameterTypes());
                            output.writeObject(arguments);
                            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                            try {
                                Object result = input.readObject();
                                if (result instanceof Throwable) {
                                    throw (Throwable) result;
                                }
                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
            });
        }
    }

    服务生产者:

        public static void main(String[] args) throws Exception {
            BizService service = new BizServiceImpl();
            RpcDemo.export(service, 6543);
        }

    服务消费者:

        public static void main(String[] args) throws Exception {
            BizService service = RpcDemo.reference(BizService.class, "127.0.0.1", 6543);
            service.hello();
        }

    该处的BizService就是一个最简单的业务接口,里面可以实现具体的业务,接口一般单独生产jar包给消费方使用。

    可能对于一般读者也是最有价值的了吧,以上晕乎乎的可以先看Demo然后再下载了源码看看分析^_^

    这个Demo也算是对一般长连接的最简易实现,当然Dubbo加了很多扩展及监控,还运用了很多Decorator模式,接口设计也很优雅,光看官网的一些接口图都晕乎乎了,所以还是要静下心来慢慢研读。

    此处也算是对长连接RPC的实现做了一个小结,欢迎各兄弟交流吐槽~

    不能做SRE的Coding不是一个好的架构师
  • 相关阅读:
    android 4.0 support sf mkv parser on GB.
    Android关于OutOfMemoryError的一些思考
    android devices offline
    Android 查看内存使用情况
    android模块编译,mm,mmm
    eclipse老是building workspace及自动更新问题,eclipse加速
    如何在android native编程中使用logCat
    ANDROID 静音与振动
    android中sim卡相关操作
    真机缺少com.google.android.maps.jar解决方法:
  • 原文地址:https://www.cnblogs.com/Vincentlu/p/4196175.html
Copyright © 2011-2022 走看看