@Configuration public class WaybillQueryFacadeConfiguration { @Autowired private WaybillQueryFacade waybillQueryFacade; @Bean public boolean waybillQueryFacade() throws Exception { ServiceFactory.addService("http://service.ymm.com/trade/om/waybillQueryFacade_1.0.0", WaybillQueryFacade.class, waybillQueryFacade); ServiceFactory.publishService("http://service.ymm.com/trade/om/waybillQueryFacade_1.0.0"); return true; } }
上面是发布一个rpc服务端的代码,看得出来发布的入口就是ServiceFactory。ServiceFactory的初始化方式是通过静态代码块完成。
先看ServiceFactory的静态代码块 ServiceFactory #
static { try { ProviderBootStrap.init(); String appname = ConfigManagerLoader.getConfigManager().getAppName(); if (StringUtils.isBlank(appname) || "NULL".equalsIgnoreCase(appname)) { throw new RuntimeException("appname is not assigned"); } } catch (Throwable t) { t.printStackTrace(); logger.error("error while initializing service factory:", t); System.exit(1); } }
再看 ProviderBootStrap.init(),该方法也是一个static方法,看来作者还真是喜欢static方法。主要就是干了三件事,初始化各种handler,初始化序列化器,初始化写zk的客户端
public static void init() { if (!isInitialized) { LoggerLoader.init(); ConfigManager configManager = ConfigManagerLoader.getConfigManager(); RegistryConfigLoader.init(); ProviderProcessHandlerFactory.init();//各种和业务相关的handler SerializerFactory.init();//初始化各种序列化器 ClassUtils.loadClasses("com.dianping.pigeon"); Monitor monitor = MonitorLoader.getMonitor(); if (monitor != null) { monitor.init(); } Thread shutdownHook = new Thread(new ShutdownHookListener()); shutdownHook.setDaemon(true); shutdownHook.setPriority(Thread.MAX_PRIORITY); Runtime.getRuntime().addShutdownHook(shutdownHook); ServerConfig config = new ServerConfig(); config.setProtocol(Constants.PROTOCOL_HTTP); String poolStrategy = ConfigManagerLoader.getConfigManager().getStringValue( "pigeon.provider.pool.strategy", "shared"); if ("server".equals(poolStrategy)) { int corePoolSize = configManager.getIntValue("pigeon.provider.http.corePoolSize", 5); int maxPoolSize = configManager.getIntValue("pigeon.provider.http.maxPoolSize", 300); int workQueueSize = configManager.getIntValue("pigeon.provider.http.workQueueSize", 300); config.setCorePoolSize(corePoolSize); config.setMaxPoolSize(maxPoolSize); config.setWorkQueueSize(workQueueSize); } RegistryManager.getInstance();//写zk的工具类 List<Server> servers = ExtensionLoader.getExtensionList(Server.class); for (Server server : servers) { if (!server.isStarted()) { if (server.support(config)) { server.start(config); httpServer = server; serversMap.put(server.getProtocol() + server.getPort(), server); logger.warn("pigeon " + server + "[version:" + VersionUtils.VERSION + "] has been started"); } } } //spring 正常启动后回调信息 SpringEventBinder.regOnSpringLoaded(new Runnable() { @Override public void run() { try { ServiceStatusChangeTask.start("publish"); } catch (Exception e) { throw new RuntimeException(e); } } }); isInitialized = true; } }
注意下 ,所以在公司里windows电脑,在工程代码的同磁盘下这个路径 /data/webapps/appenv 其中 appenv就是文件名 没有后缀
public class RegistryConfigLoader { private static final Logger logger = LoggerLoader.getLogger(RegistryConfigLoader.class); private static final String ENV_FILE = "/data/webapps/appenv"; static volatile boolean isInitialized = false;
上述代码首先分析了 ServiceFactory的静态代码块,初始化工作主要包括三个
1 业务handlers
2 序列化器
3 注册客户端,比如curator
继续分析 ServiceFactory.addService
public static <T> void addService(String url, Class<T> serviceInterface, T service, int port) throws RpcException { ProviderConfig<T> providerConfig = new ProviderConfig<T>(serviceInterface, service); providerConfig.setUrl(url); providerConfig.getServerConfig().setPort(port); addService(providerConfig); }
一个rpc服务端最重要的三个参数 1 接口 2 实现类对象 3 url
这三个参数会构造成ProviderConfig
public static <T> void addService(ProviderConfig<T> providerConfig) throws RpcException { if (StringUtils.isBlank(providerConfig.getUrl())) { providerConfig.setUrl(getServiceUrl(providerConfig)); } try { ServicePublisher.addService(providerConfig); ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig); providerConfig.setServerConfig(serverConfig); ServicePublisher.publishService(providerConfig, false); } catch (RegistryException t) { throw new RpcException("error while adding service:" + providerConfig, t); } catch (Throwable t) { throw new RpcException("error while adding service:" + providerConfig, t); } }
ServicePublisher#addService
public static <T> void addService(ProviderConfig<T> providerConfig) throws Exception { if (logger.isInfoEnabled()) { logger.info("add service:" + providerConfig); } String version = providerConfig.getVersion(); String url = providerConfig.getUrl(); if (StringUtils.isBlank(version)) {// default version 我们一般都不会加version的 serviceCache.put(url, providerConfig); } else { String urlWithVersion = getServiceUrlWithVersion(url, version); if (serviceCache.containsKey(url)) { serviceCache.put(urlWithVersion, providerConfig); ProviderConfig<?> providerConfigDefault = serviceCache.get(url); String defaultVersion = providerConfigDefault.getVersion(); if (!StringUtils.isBlank(defaultVersion)) { if (VersionUtils.compareVersion(defaultVersion, providerConfig.getVersion()) < 0) { // replace existing service with this newer service as // the default provider serviceCache.put(url, providerConfig); } } } else { serviceCache.put(urlWithVersion, providerConfig); // use this service as the default provider serviceCache.put(url, providerConfig); } } T service = providerConfig.getService(); if (service instanceof InitializingService) { ((InitializingService) service).initialize(); } ServiceMethodFactory.init(url);//如果该url是第一次注册,会建立url和method的本地缓存 }
其实就是建立各种缓存
ProviderBootStrap# startup(providerConfig);
public static ServerConfig startup(ProviderConfig<?> providerConfig) { ServerConfig serverConfig = providerConfig.getServerConfig(); if (serverConfig == null) { throw new IllegalArgumentException("server config is required"); } Server server = serversMap.get(serverConfig.getProtocol() + serverConfig.getPort());//找tcp服务器 if (server != null) { server.addService(providerConfig);//继续调用server的addService return server.getServerConfig(); } else { synchronized (ProviderBootStrap.class) { List<Server> servers = ExtensionLoader.newExtensionList(Server.class);//通过classloader的方式实例化 for (Server s : servers) { if (!s.isStarted()) { if (s.support(serverConfig)) { s.start(serverConfig);//启动netty s.addService(providerConfig); serversMap.put(s.getProtocol() + serverConfig.getPort(), s); logger.warn("pigeon " + s + "[version:" + VersionUtils.VERSION + "] has been started"); break; } } } server = serversMap.get(serverConfig.getProtocol() + serverConfig.getPort()); if (server != null) { return server.getServerConfig(); } return null; } } }
AbstractServer#addService
public <T> void addService(ProviderConfig<T> providerConfig) { requestProcessor.addService(providerConfig);//下面的代码不重要,重点就是这个 doAddService(providerConfig); List<ServiceChangeListener> listeners = ServiceChangeListenerContainer.getListeners(); for (ServiceChangeListener listener : listeners) { listener.notifyServiceAdded(providerConfig); } }
RequestThreadPoolProcessor # addService
看代码其实就是根据配置,给方法配置执行的线程池
@Override public synchronized <T> void addService(ProviderConfig<T> providerConfig) { String url = providerConfig.getUrl(); Map<String, ProviderMethodConfig> methodConfigs = providerConfig.getMethods(); ServiceMethodCache methodCache = ServiceMethodFactory.getServiceMethodCache(url); Set<String> methodNames = methodCache.getMethodMap().keySet(); if (needStandalonePool(providerConfig)) { if (methodThreadPools == null) { methodThreadPools = new ConcurrentHashMap<String, DynamicThreadPool>(); } if (serviceThreadPools == null) { serviceThreadPools = new ConcurrentHashMap<String, DynamicThreadPool>(); } if (providerConfig.getActives() > 0 && CollectionUtils.isEmpty(methodConfigs)) { String key = url; DynamicThreadPool pool = serviceThreadPools.get(key); if (pool == null) { int actives = providerConfig.getActives(); int coreSize = (int) (actives / DEFAULT_POOL_RATIO_CORE) > 0 ? (int) (actives / DEFAULT_POOL_RATIO_CORE) : actives; int maxSize = actives; int queueSize = actives; pool = new DynamicThreadPool("Pigeon-Server-Request-Processor-service", coreSize, maxSize,queueSize); serviceThreadPools.putIfAbsent(key, pool); } } if (!CollectionUtils.isEmpty(methodConfigs)) { for (String name : methodNames) { if (!methodConfigs.containsKey(name)) { continue; } String key = url + "#" + name; DynamicThreadPool pool = methodThreadPools.get(key); if (pool == null) { int actives = DEFAULT_POOL_ACTIVES; ProviderMethodConfig methodConfig = methodConfigs.get(name); if (methodConfig != null && methodConfig.getActives() > 0) { actives = methodConfig.getActives(); } int coreSize = (int) (actives / DEFAULT_POOL_RATIO_CORE) > 0 ? (int) (actives / DEFAULT_POOL_RATIO_CORE) : actives; int maxSize = actives; int queueSize = actives; pool = new DynamicThreadPool("Pigeon-Server-Request-Processor-method", coreSize, maxSize, queueSize); methodThreadPools.putIfAbsent(key, pool); } } } } }
最后到了 ServicePublisher.publishService(providerConfig, false);
就是这个方法进行写zk
核心代码在这里
void registerPersistentNode(String serviceName, String group, String serviceAddress, int weight) throws RegistryException { String weightPath = Utils.getWeightPath(serviceAddress);/DP/WEIGHT/ip:port String servicePath = Utils.getServicePath(serviceName, group); try { if (weight > 0) { client.set(weightPath, "" + weight); } if (client.exists(servicePath, false)) { Stat stat = new Stat(); String addressValue = client.get(servicePath, stat); String[] addressArray = addressValue.split(","); List<String> addressList = new ArrayList<String>(); for (String addr : addressArray) { addr = addr.trim(); if (addr.length() > 0 && !addressList.contains(addr)) { addressList.add(addr.trim()); } } if (!addressList.contains(serviceAddress)) { addressList.add(serviceAddress); Collections.sort(addressList); client.set(servicePath, StringUtils.join(addressList.iterator(), ","), stat.getVersion()); } } else { client.create(servicePath, serviceAddress); } if (logger.isInfoEnabled()) { logger.info("registered service to persistent node: " + servicePath); } } catch (Throwable e) { if(e instanceof BadVersionException || e instanceof NodeExistsException) { try { Thread.sleep(500); } catch (InterruptedException ie) { //ignore } registerPersistentNode(serviceName, group, serviceAddress, weight); } else { logger.error("failed to register service to " + servicePath, e); throw new RegistryException(e); } } }
[zk: localhost:2181(CONNECTED) 1] ls /DP/SERVER
[@HTTP@http:^^service.dianping.com^rpcserver^commonService_1.0.0, http:^^service.dianping.com^rpcserver^commonService_1.0.0]
dp/server/中实际的数据格式如下
[zk: localhost:2181(CONNECTED) 2] get /DP/SERVER/http:^^service.dianping.com^rpcserver^commonService_1.0.0
10.190.38.63:6088
/DP/WEIGHT/ 中实际的值如下
[zk: localhost:2181(CONNECTED) 5] ls /DP/WEIGHT
[10.190.38.63:4080, 10.190.38.63:4081, 10.190.38.63:6088]
[zk: localhost:2181(CONNECTED) 6] get /DP/WEIGHT/10.190.38.63:6088
10