zoukankan      html  css  js  c++  java
  • dubbo源码分析

    dubbo 整体架构图。

    1.dubbo-config-spring-2.6.2.jar 里的spring.handlers,spring.schmeas  根据自定义标签查找标签处理类

    http://dubbo.apache.org/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler
    http://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler

    2.初始化名字

    3.dubbo中的代理  使用Javassis 生产的动态代理

        @Override
        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName,
                                          Class<?>[] parameterTypes,
                                          Object[] arguments) throws Throwable {
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }

    invoker调用invoke 。

        @Override
        public Result invoke(Invocation invocation) throws RpcException {
            try {
                return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
            } catch (InvocationTargetException e) {
                return new RpcResult(e.getTargetException());
            } catch (Throwable e) {
                throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }

     4.URL里的service-key 为接口名: com.alibaba.dubbo.demo.DemoService

    一.名词介绍

    1.invocation 被调用的对象,在服务的消费方,根据调用的对象和方法,组装成一个被调用的对象。

    2.Invoker 调用对象,用户提供服务方,

    public interface Invoker<T> extends Node {
    
        /**
         * get service interface.
         *
         * @return service interface.
         */
        Class<T> getInterface();
    
        /**
         * invoke.
         *
         * @param invocation
         * @return result
         * @throws RpcException
         */
        Result invoke(Invocation invocation) throws RpcException;
    
    }

     3.Exporter 获取调用者,摧毁调用者。

    public interface Exporter<T> {
    
        /**
         * get invoker.
         *
         * @return invoker
         */
        Invoker<T> getInvoker();
    
        /**
         * unexport.
         * <p>
         * <code>
         * getInvoker().destroy();
         * </code>
         */
        void unexport();
    
    }

    4.DubboProtocol  1.创建 dubboExporter, 2.开启dubbo socketserver.

        @Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
    
            // export service.
            String key = serviceKey(url);
         //创建 dubboExporter DubboExporter
    <T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } }      //打开socketserver openServer(url); optimizeSerialization(url); return exporter; }

    5.HeaderExchangeServer  心跳server,定时在socket建立的channel 上发送信息。

        private void startHeartbeatTimer() {
            stopHeartbeatTimer();
            if (heartbeat > 0) {
                heartbeatTimer = scheduled.scheduleWithFixedDelay(
                        new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                            @Override
                            public Collection<Channel> getChannels() {
                                return Collections.unmodifiableCollection(
                                        HeaderExchangeServer.this.getChannels());
                            }
                        }, heartbeat, heartbeatTimeout),
                        heartbeat, heartbeat, TimeUnit.MILLISECONDS);
            }
        }

    6.NettyServer  开启socket  server

        @Override
        protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
            ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
            ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
            ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
            bootstrap = new ServerBootstrap(channelFactory);
    
            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
            channels = nettyHandler.getChannels();
            // https://issues.jboss.org/browse/NETTY-365
            // https://issues.jboss.org/browse/NETTY-379
            // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                @Override
                public ChannelPipeline getPipeline() {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ChannelPipeline pipeline = Channels.pipeline();
                    /*int idleTimeout = getIdleTimeout();
                    if (idleTimeout > 10000) {
                        pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                    }*/
                    pipeline.addLast("decoder", adapter.getDecoder());
                    pipeline.addLast("encoder", adapter.getEncoder());
                    pipeline.addLast("handler", nettyHandler);
                    return pipeline;
                }
            });
            // bind
            channel = bootstrap.bind(getBindAddress());
        }

    7.requestHandler 处理invocation :流程查找协议里exporter,然后获取exporter里的invoker,然后通过代理的方式,调用实现类的对应方法。

        private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    
            @Override
            public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    Invocation inv = (Invocation) message;
                    Invoker<?> invoker = getInvoker(channel, inv);
                    // need to consider backward-compatibility if it's a callback
                    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                        String methodsStr = invoker.getUrl().getParameters().get("methods");
                        boolean hasMethod = false;
                        if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                            hasMethod = inv.getMethodName().equals(methodsStr);
                        } else {
                            String[] methods = methodsStr.split(",");
                            for (String method : methods) {
                                if (inv.getMethodName().equals(method)) {
                                    hasMethod = true;
                                    break;
                                }
                            }
                        }
                        if (!hasMethod) {
                            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                    + " not found in callback service interface ,invoke will be ignored."
                                    + " please update the api interface. url is:"
                                    + invoker.getUrl()) + " ,invocation is :" + inv);
                            return null;
                        }
                    }
                    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                    return invoker.invoke(inv);
                }
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }

    8.MulticastRegistry 使用jdk中MulticastSocket 实现。并且每隔5秒广播一次。

            super(url);
            int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
            this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    // Check and connect to the registry
                    try {
                        retry();
                    } catch (Throwable t) { // Defensive fault tolerance
                        logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                    }
                }
            }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
  • 相关阅读:
    grub
    init
    内核的ramdisk
    kernel信息及其相关命令
    mvc和mvvm模式
    vue从零开始(一)
    echarts自动播放图形
    echarts自定义悬浮框的显示
    typescript入门
    echarts的最外层配置项
  • 原文地址:https://www.cnblogs.com/z-test/p/9336096.html
Copyright © 2011-2022 走看看