zoukankan      html  css  js  c++  java
  • Pigeon源码分析(一)-- 服务注册

    跟代码过程中发现的问题记录下

     ServiceFactory是一切的基础,它是通过静态代码块初始化的

    static {
            try {
                ProviderBootStrap.init();
                String appname = ConfigManagerLoader.getConfigManager().getAppName();
                if (StringUtils.isBlank(appname) || "NULL".equalsIgnoreCase(appname)) {
                    throw new RuntimeException("appname is not assigned");
                }
            } catch (Throwable var1) {
                logger.error("error while initializing service factory:", var1);
                System.exit(1);
            }
    
        }

      实际调用的是  ProviderBootStrap

    ServiceFactory ServiceFactory.addService(ProviderConfig<T> providerConfig) 
    public static <T> void addService(ProviderConfig<T> providerConfig) throws RpcException {
            if (StringUtils.isBlank(providerConfig.getUrl())) {
                providerConfig.setUrl(getServiceUrl(providerConfig));
            }
    
            try {
                ServicePublisher.addService(providerConfig);
                ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig);
                providerConfig.setServerConfig(serverConfig);
                ServicePublisher.publishService(providerConfig, false);
            } catch (RegistryException var2) {
                throw new RpcException("error while adding service:" + providerConfig, var2);
            } catch (Throwable var3) {
                throw new RpcException("error while adding service:" + providerConfig, var3);
            }
        }

      

    public static ServerConfig startup(ProviderConfig<?> providerConfig) {
        ServerConfig serverConfig = providerConfig.getServerConfig();
        if (serverConfig == null) {
            throw new IllegalArgumentException("server config is required");
        }
        // 根据protocol+port判断当前服务器列表中是否存在相关的服务器实例,这里的server就是netty或者jetty
        Server server = serversMap.get(serverConfig.getProtocol() + serverConfig.getPort());
        if (server != null) {
            // 存在直接返回
            server.addService(providerConfig);
            return server.getServerConfig();
        } else {
            // 不存在先创建一个
            synchronized (ProviderBootStrap.class) {
                List<Server> servers = ExtensionLoader.newExtensionList(Server.class);
                for (Server s : servers) {
                    if (!s.isStarted()) {//这里说明只能支持一种port 2021-09-09
                        if (s.support(serverConfig)) {
                            s.start(serverConfig);//这里就是启动netty服务端
                            // 添加服务
                            s.addService(providerConfig);
                            serversMap.put(s.getProtocol() + serverConfig.getPort(), s);
                            logger.warn("pigeon " + s + "[version:" + VersionUtils.VERSION + "] has been started");
                            break;
                        }
                    }
                }
                server = serversMap.get(serverConfig.getProtocol() + serverConfig.getPort());
                // 预启动内部的请求处理器核心线程
                if (server != null) {
                    server.getRequestProcessor().getRequestProcessThreadPool().prestartAllCoreThreads();
                    return server.getServerConfig();
                }
                return null;
            }
        }
    }
    

      

    public <T> void addService(ProviderConfig<T> providerConfig) {
            this.requestProcessor.addService(providerConfig);
            this.doAddService(providerConfig);
            List<ServiceChangeListener> listeners = ServiceChangeListenerContainer.getListeners();
            Iterator i$ = listeners.iterator();
    
            while(i$.hasNext()) {
                ServiceChangeListener listener = (ServiceChangeListener)i$.next();
                listener.notifyServiceAdded(providerConfig);
            }
    
        }

      RequestThreadPoolProcessor.addService(ProviderConfig<T> providerConfig) 负责创建线程池并缓存起来

    public synchronized <T> void addService(ProviderConfig<T> providerConfig) {
            String url = providerConfig.getUrl();
            Map<String, ProviderMethodConfig> methodConfigs = providerConfig.getMethods();
            ServiceMethodCache methodCache = ServiceMethodFactory.getServiceMethodCache(url);
            Set<String> methodNames = methodCache.getMethodMap().keySet();
            if (this.needStandalonePool(providerConfig)) {
                if (this.methodThreadPools == null) {
                    this.methodThreadPools = new ConcurrentHashMap();
                }
    
                if (this.serviceThreadPools == null) {
                    this.serviceThreadPools = new ConcurrentHashMap();
                }
    
                if (providerConfig.getActives() > 0 && CollectionUtils.isEmpty(methodConfigs)) {
                    DynamicThreadPool pool = (DynamicThreadPool)this.serviceThreadPools.get(url);
                    if (pool == null) {
                        int actives = providerConfig.getActives();
                        int coreSize = (int)((float)actives / DEFAULT_POOL_RATIO_CORE) > 0 ? (int)((float)actives / DEFAULT_POOL_RATIO_CORE) : actives;
                        pool = new DynamicThreadPool("Pigeon-Server-Request-Processor-service", coreSize, actives, actives);
                        this.serviceThreadPools.putIfAbsent(url, pool);
                    }
                }
    
                if (!CollectionUtils.isEmpty(methodConfigs)) {
                    Iterator i$ = methodNames.iterator();
    
                    while(i$.hasNext()) {
                        String name = (String)i$.next();
                        if (methodConfigs.containsKey(name)) {
                            String key = url + "#" + name;
                            DynamicThreadPool pool = (DynamicThreadPool)this.methodThreadPools.get(key);
                            if (pool == null) {
                                int actives = DEFAULT_POOL_ACTIVES;
                                ProviderMethodConfig methodConfig = (ProviderMethodConfig)methodConfigs.get(name);
                                if (methodConfig != null && methodConfig.getActives() > 0) {
                                    actives = methodConfig.getActives();
                                }
    
                                int coreSize = (int)((float)actives / DEFAULT_POOL_RATIO_CORE) > 0 ? (int)((float)actives / DEFAULT_POOL_RATIO_CORE) : actives;
                                pool = new DynamicThreadPool("Pigeon-Server-Request-Processor-method", coreSize, actives, actives);
                                this.methodThreadPools.putIfAbsent(key, pool);
                            }
                        }
                    }
                }
            }
    
        }

    二 服务的发布

      

    1 调用链路

    ServiceFactory.addService(ProviderConfig<T> providerConfig)

    -> ServicePublisher.addService(providerConfig); 这里调用主要是缓存服务信息

    -> RequestThreadPoolProcessor.addService(ProviderConfig<T> providerConfig) 不知道怎么就走到这里了,以后还得多跟代码

    -> AbstractServer.doAddService

    AbstractServer的实现类默认的是NettyServer

    -> ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig);

    -> ServicePublisher.publishService

    String url, String registryUrl, int port, String group

    url = http://service.ymm.com/cargo-detail/DriverVisitDetailFacade_1.0.0

    registryUrl = @HTTP@http://service.ymm.com/cargo-detail/DriverVisitDetailFacade_1.0.0

    port = 4080

    group = hfd33

    2 写zk节点

    RegistryManager.getInstance().registerSupportNewProtocol(serverAddress, registryUrl, false);

    serverAddress = 10.190.20.66:4080

    registryUrl = @HTTP@http://service.ymm.com/cargo-detail/DriverVisitDetailFacade_1.0.0

    通过跟代码解析出来的zk地址是 10.13.65.186:2181,10.13.65.187:2181,10.13.65.188:2181

    setSupportNewProtocol的意义是是否支持新协议,什么是新协议呢,比如thrift,false没关系的

    CuratorRegistry.setSupportNewProtocol
    public void setSupportNewProtocol(String serviceAddress, String serviceName, boolean support) throws RegistryException {
          try {
              String protocolPath = Utils.getProtocolPath(serviceAddress);//       /DP/PROTOCOL/10.190.20.66:4080
              Stat stat = new Stat();
               
              String info = this.client.get(protocolPath, stat);
              // zk中的一个节点 {"@HTTP@http://service.ymm.com/cargo-detail/DriverVisitDetailFacade_1.0.0":false,"@HTTP@http://service.ymm.com/cargo-detail/cargoDetailFacade_1.0.0":false,"@HTTP@http://service.ymm.com/cargo-detail/cargoStatusFacade_1.0.0":false}
              if (info != null) {
                  Map<String, Boolean> infoMap = Utils.getProtocolInfoMap(info);
                  infoMap.put(serviceName, support);
                  // infoMap的内容
                  @HTTP@http://service.ymm.com/cargo-detail/DriverVisitDetailFacade_1.0.0 -> {Boolean@17173} false
    @HTTP@http://service.ymm.com/cargo-detail/cargoDetailFacade_1.0.0 -> {Boolean@17173} false
    @HTTP@http://service.ymm.com/cargo-detail/cargoStatusFacade_1.0.0 -> {Boolean@17173} false 不知道为啥 这三个都是false是不是因为这个导致了 服务不对外提供呢
                  this.client.set(protocolPath, Utils.getProtocolInfo(infoMap), stat.getVersion());
                  //这个就是写zk
              } else {
                  Map<String, Boolean> infoMap = ImmutableMap.of(serviceName, support);
                  this.client.create(protocolPath, Utils.getProtocolInfo(infoMap));
              }

    上面的是protocol在zk上的格式,还有另一个更重要的zk节点

    registerPersistentNode

    该方法不止会写一个zk节点,分别如下

    1 this.client.set(weightPath, "" + weight);

    /DP/WEIGHT/10.190.20.66:4080 1

    2 this.client.create(servicePath, serviceAddress);

    /DP/SERVER/@HTTP@http:^^service.ymm.com^cargo-detail^DriverVisitDetailFacade_1.0.0/hfd33

    10.190.20.66:4080

    3

    public void setServerApp(String serverAddress, String app) {
      String path = Utils.getAppPath(serverAddress);// /DP/APP/10.190.20.66:4080
      if (StringUtils.isNotBlank(app)) {
          try {
              this.client.set(path, app);// app = cargo-detail
          } catch (Throwable var5) {
              logger.error("failed to set app of " + serverAddress + " to " + app);
          }
      }

    }

    4 RegistryManager.getInstance().setServerVersion(serverAddress, "2.7.8"); // serverAddress = 10.190.20.66:4080

    最后写入zk的path是 /DP/VERSION/10.190.20.66:4080

  • 相关阅读:
    docker学习
    io性能调优之page cache
    ll命令执行后结果分析
    Angular2+ ViewChild & ViewChildren解析
    <router-outlet> 干什么用的?
    npm基本命令
    什么情况下会出现undefined
    关于VUE调用父实例($parent) 根实例 中的数据和方法
    vue中的this指向问题
    对 Foreach 的理解
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14501037.html
Copyright © 2011-2022 走看看