zoukankan      html  css  js  c++  java
  • Pigeon源码分析(一) -- 服务发布源码分析

    @Configuration
    public class WaybillQueryFacadeConfiguration {
    
        @Autowired
        private WaybillQueryFacade waybillQueryFacade;
    
        @Bean
        public boolean waybillQueryFacade() throws Exception {
            ServiceFactory.addService("http://service.ymm.com/trade/om/waybillQueryFacade_1.0.0",
                    WaybillQueryFacade.class, waybillQueryFacade);
            ServiceFactory.publishService("http://service.ymm.com/trade/om/waybillQueryFacade_1.0.0");
            return true;
        }
    }

      上面是发布一个rpc服务端的代码,看得出来发布的入口就是ServiceFactory。ServiceFactory的初始化方式是通过静态代码块完成。

      先看ServiceFactory的静态代码块 ServiceFactory # 

    static {
            try {
                ProviderBootStrap.init();
                String appname = ConfigManagerLoader.getConfigManager().getAppName();
                if (StringUtils.isBlank(appname) || "NULL".equalsIgnoreCase(appname)) {
                    throw new RuntimeException("appname is not assigned");
                }
            } catch (Throwable t) {
                t.printStackTrace();
                logger.error("error while initializing service factory:", t);
                System.exit(1);
            }
        }

      再看 ProviderBootStrap.init(),该方法也是一个static方法,看来作者还真是喜欢static方法。主要就是干了三件事,初始化各种handler,初始化序列化器,初始化写zk的客户端

    public static void init() {
            if (!isInitialized) {
                LoggerLoader.init();
                ConfigManager configManager = ConfigManagerLoader.getConfigManager();
                RegistryConfigLoader.init();
                ProviderProcessHandlerFactory.init();//各种和业务相关的handler
                SerializerFactory.init();//初始化各种序列化器
                ClassUtils.loadClasses("com.dianping.pigeon");
                Monitor monitor = MonitorLoader.getMonitor();
                if (monitor != null) {
                    monitor.init();
                }
                Thread shutdownHook = new Thread(new ShutdownHookListener());
                shutdownHook.setDaemon(true);
                shutdownHook.setPriority(Thread.MAX_PRIORITY);
                Runtime.getRuntime().addShutdownHook(shutdownHook);
                ServerConfig config = new ServerConfig();
                config.setProtocol(Constants.PROTOCOL_HTTP);
                String poolStrategy = ConfigManagerLoader.getConfigManager().getStringValue(
                        "pigeon.provider.pool.strategy", "shared");
                if ("server".equals(poolStrategy)) {
                    int corePoolSize = configManager.getIntValue("pigeon.provider.http.corePoolSize", 5);
                    int maxPoolSize = configManager.getIntValue("pigeon.provider.http.maxPoolSize", 300);
                    int workQueueSize = configManager.getIntValue("pigeon.provider.http.workQueueSize", 300);
                    config.setCorePoolSize(corePoolSize);
                    config.setMaxPoolSize(maxPoolSize);
                    config.setWorkQueueSize(workQueueSize);
                }
                RegistryManager.getInstance();//写zk的工具类
                List<Server> servers = ExtensionLoader.getExtensionList(Server.class);
                for (Server server : servers) {
                    if (!server.isStarted()) {
                        if (server.support(config)) {
                            server.start(config);
                            httpServer = server;
                            serversMap.put(server.getProtocol() + server.getPort(), server);
                            logger.warn("pigeon " + server + "[version:" + VersionUtils.VERSION
                                    + "] has been started");
                        }
                    }
                }
                //spring 正常启动后回调信息
                SpringEventBinder.regOnSpringLoaded(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            ServiceStatusChangeTask.start("publish");
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                });
                isInitialized = true;
            }
        }

    注意下 ,所以在公司里windows电脑,在工程代码的同磁盘下这个路径  /data/webapps/appenv 其中 appenv就是文件名 没有后缀

    public class RegistryConfigLoader {
    
        private static final Logger logger = LoggerLoader.getLogger(RegistryConfigLoader.class);
    
        private static final String ENV_FILE = "/data/webapps/appenv";
    
        static volatile boolean isInitialized = false;

    上述代码首先分析了 ServiceFactory的静态代码块,初始化工作主要包括三个

    1 业务handlers

    2 序列化器

    3 注册客户端,比如curator

    继续分析 ServiceFactory.addService

    public static <T> void addService(String url, Class<T> serviceInterface, T service, int port) throws RpcException {
            ProviderConfig<T> providerConfig = new ProviderConfig<T>(serviceInterface, service);
            providerConfig.setUrl(url);
            providerConfig.getServerConfig().setPort(port);
            addService(providerConfig);
        }

    一个rpc服务端最重要的三个参数 1 接口 2 实现类对象 3 url

    这三个参数会构造成ProviderConfig

    public static <T> void addService(ProviderConfig<T> providerConfig) throws RpcException {
            if (StringUtils.isBlank(providerConfig.getUrl())) {
                providerConfig.setUrl(getServiceUrl(providerConfig));
            }
            try {
                ServicePublisher.addService(providerConfig);
                ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig);
                providerConfig.setServerConfig(serverConfig);
                ServicePublisher.publishService(providerConfig, false);
            } catch (RegistryException t) {
                throw new RpcException("error while adding service:" + providerConfig, t);
            } catch (Throwable t) {
                throw new RpcException("error while adding service:" + providerConfig, t);
            }
        }

    ServicePublisher#addService

    public static <T> void addService(ProviderConfig<T> providerConfig) throws Exception {
            if (logger.isInfoEnabled()) {
                logger.info("add service:" + providerConfig);
            }
            String version = providerConfig.getVersion();
            String url = providerConfig.getUrl();
            if (StringUtils.isBlank(version)) {// default version 我们一般都不会加version的
                serviceCache.put(url, providerConfig);
            } else {
                String urlWithVersion = getServiceUrlWithVersion(url, version);
                if (serviceCache.containsKey(url)) {
                    serviceCache.put(urlWithVersion, providerConfig);
                    ProviderConfig<?> providerConfigDefault = serviceCache.get(url);
                    String defaultVersion = providerConfigDefault.getVersion();
                    if (!StringUtils.isBlank(defaultVersion)) {
                        if (VersionUtils.compareVersion(defaultVersion, providerConfig.getVersion()) < 0) {
                            // replace existing service with this newer service as
                            // the default provider
                            serviceCache.put(url, providerConfig);
                        }
                    }
                } else {
                    serviceCache.put(urlWithVersion, providerConfig);
                    // use this service as the default provider
                    serviceCache.put(url, providerConfig);
                }
            }
            T service = providerConfig.getService();
            if (service instanceof InitializingService) {
                ((InitializingService) service).initialize();
            }
            ServiceMethodFactory.init(url);//如果该url是第一次注册,会建立url和method的本地缓存
        }

    其实就是建立各种缓存

    ProviderBootStrap# startup(providerConfig);

    public static ServerConfig startup(ProviderConfig<?> providerConfig) {
            ServerConfig serverConfig = providerConfig.getServerConfig();
            if (serverConfig == null) {
                throw new IllegalArgumentException("server config is required");
            }
            Server server = serversMap.get(serverConfig.getProtocol() + serverConfig.getPort());//找tcp服务器
            if (server != null) {
                server.addService(providerConfig);//继续调用server的addService
                return server.getServerConfig();
            } else {
                synchronized (ProviderBootStrap.class) {
                    List<Server> servers = ExtensionLoader.newExtensionList(Server.class);//通过classloader的方式实例化
                    for (Server s : servers) {
                        if (!s.isStarted()) {
                            if (s.support(serverConfig)) {
                                s.start(serverConfig);//启动netty
                                s.addService(providerConfig);
                                serversMap.put(s.getProtocol() + serverConfig.getPort(), s);
                                logger.warn("pigeon " + s + "[version:" + VersionUtils.VERSION
                                        + "] has been started");
                                break;
                            }
                        }
                    }
                    server = serversMap.get(serverConfig.getProtocol() + serverConfig.getPort());
                    if (server != null) {
                        return server.getServerConfig();
                    }
                    return null;
                }
            }
        }
    AbstractServer#addService
    public <T> void addService(ProviderConfig<T> providerConfig) {
            requestProcessor.addService(providerConfig);//下面的代码不重要,重点就是这个
            doAddService(providerConfig);
            List<ServiceChangeListener> listeners = ServiceChangeListenerContainer.getListeners();
            for (ServiceChangeListener listener : listeners) {
                listener.notifyServiceAdded(providerConfig);
            }
        }

    RequestThreadPoolProcessor # addService

     看代码其实就是根据配置,给方法配置执行的线程池

    @Override
        public synchronized <T> void addService(ProviderConfig<T> providerConfig) {
            String url = providerConfig.getUrl();
            Map<String, ProviderMethodConfig> methodConfigs = providerConfig.getMethods();
            ServiceMethodCache methodCache = ServiceMethodFactory.getServiceMethodCache(url);
            Set<String> methodNames = methodCache.getMethodMap().keySet();
            if (needStandalonePool(providerConfig)) {
                if (methodThreadPools == null) {
                    methodThreadPools = new ConcurrentHashMap<String, DynamicThreadPool>();
                }
                if (serviceThreadPools == null) {
                    serviceThreadPools = new ConcurrentHashMap<String, DynamicThreadPool>();
                }
                if (providerConfig.getActives() > 0 && CollectionUtils.isEmpty(methodConfigs)) {
                    String key = url;
                    DynamicThreadPool pool = serviceThreadPools.get(key);
                    if (pool == null) {
                        int actives = providerConfig.getActives();
                        int coreSize = (int) (actives / DEFAULT_POOL_RATIO_CORE) > 0 ? (int) (actives / DEFAULT_POOL_RATIO_CORE)
                                : actives;
                        int maxSize = actives;
                        int queueSize = actives;
                        pool = new DynamicThreadPool("Pigeon-Server-Request-Processor-service", coreSize, maxSize,queueSize);
                        serviceThreadPools.putIfAbsent(key, pool);
                    }
                }
                if (!CollectionUtils.isEmpty(methodConfigs)) {
                    for (String name : methodNames) {
                        if (!methodConfigs.containsKey(name)) {
                            continue;
                        }
                        String key = url + "#" + name;
                        DynamicThreadPool pool = methodThreadPools.get(key);
                        if (pool == null) {
                            int actives = DEFAULT_POOL_ACTIVES;
                            ProviderMethodConfig methodConfig = methodConfigs.get(name);
                            if (methodConfig != null && methodConfig.getActives() > 0) {
                                actives = methodConfig.getActives();
                            }
                            int coreSize = (int) (actives / DEFAULT_POOL_RATIO_CORE) > 0 ? (int) (actives / DEFAULT_POOL_RATIO_CORE)
                                    : actives;
                            int maxSize = actives;
                            int queueSize = actives;
                            pool = new DynamicThreadPool("Pigeon-Server-Request-Processor-method", coreSize, maxSize, queueSize);
                            methodThreadPools.putIfAbsent(key, pool);
                        }
                    }
                }
            }
        }

      最后到了 ServicePublisher.publishService(providerConfig, false);

      就是这个方法进行写zk

      核心代码在这里

    void registerPersistentNode(String serviceName, String group, String serviceAddress, int weight)
                throws RegistryException {
            String weightPath = Utils.getWeightPath(serviceAddress);/DP/WEIGHT/ip:port
            String servicePath = Utils.getServicePath(serviceName, group); 
            try {
                if (weight > 0) {
                    client.set(weightPath, "" + weight);
                }
                if (client.exists(servicePath, false)) {
                    Stat stat = new Stat();
                    String addressValue = client.get(servicePath, stat);
                    String[] addressArray = addressValue.split(",");
                    List<String> addressList = new ArrayList<String>();
                    for (String addr : addressArray) {
                        addr = addr.trim();
                        if (addr.length() > 0 && !addressList.contains(addr)) {
                            addressList.add(addr.trim());
                        }
                    }
                    if (!addressList.contains(serviceAddress)) {
                        addressList.add(serviceAddress);
                        Collections.sort(addressList);
                        client.set(servicePath, StringUtils.join(addressList.iterator(), ","), stat.getVersion());
                    }
                } else {
                    client.create(servicePath, serviceAddress);
                }
                if (logger.isInfoEnabled()) {
                    logger.info("registered service to persistent node: " + servicePath);
                }
            } catch (Throwable e) {
                if(e instanceof BadVersionException || e instanceof NodeExistsException) {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException ie) {
                        //ignore
                    }
                    registerPersistentNode(serviceName, group, serviceAddress, weight);
                } else {
                    logger.error("failed to register service to " + servicePath, e);
                    throw new RegistryException(e);
                }
    
            }
        }

    [zk: localhost:2181(CONNECTED) 1] ls /DP/SERVER
    [@HTTP@http:^^service.dianping.com^rpcserver^commonService_1.0.0, http:^^service.dianping.com^rpcserver^commonService_1.0.0]

    dp/server/中实际的数据格式如下

    [zk: localhost:2181(CONNECTED) 2] get /DP/SERVER/http:^^service.dianping.com^rpcserver^commonService_1.0.0
    10.190.38.63:6088 

    /DP/WEIGHT/ 中实际的值如下

    [zk: localhost:2181(CONNECTED) 5] ls /DP/WEIGHT
    [10.190.38.63:4080, 10.190.38.63:4081, 10.190.38.63:6088]
    [zk: localhost:2181(CONNECTED) 6] get /DP/WEIGHT/10.190.38.63:6088
    10

  • 相关阅读:
    Asp.Net中virtual、override理解
    SQL 知道字段名 全表搜索此字段属于哪个表
    C#中(int)a和Convert.ToInt32(a)的区别
    sql 查询表共多少列
    理解字节和字符
    C# IIS7.0+ Web.Config 配置Session过期时间
    Java版的扫雷游戏源码
    Activiti6.0教程 28张表解析 (三)
    Activiti6.0教程 Service用途剖析 (二)
    Activiti6.0教程 Eclipse安装Activiti Diagram插件(一)
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14832970.html
Copyright © 2011-2022 走看看