zoukankan      html  css  js  c++  java
  • Pigeon源码分析(六) -- 服务下线流程

    当服务提供端下线时,先看正常的流程。一般来说都是通过kill -15 结束jvm进程,此时会执行钩子函数

    public final class ProviderBootStrap {
    
        private static Logger logger = LoggerLoader.getLogger(ServicePublisher.class);
        static Server httpServer = null;
        static volatile Map<String, Server> serversMap = new HashMap<String, Server>();
        static volatile boolean isInitialized = false;
        static Date startTime = new Date();
    
        public static Date getStartTime() {
            return startTime;
        }
    
        public static void init() {
            if (!isInitialized) {
                synchronized (ProviderBootStrap.class) {
                    if (!isInitialized) {
                        ProviderProcessHandlerFactory.init();
                        SerializerFactory.init();
                        ClassUtils.loadClasses("com.dianping.pigeon");
                        Thread shutdownHook = new Thread(new ShutdownHookListener());
                        shutdownHook.setDaemon(true);
                        shutdownHook.setPriority(Thread.MAX_PRIORITY);
                        Runtime.getRuntime().addShutdownHook(shutdownHook);

      ShutdownHookListener 

    public void run() {
            if (logger.isInfoEnabled()) {
                logger.info("shutdown hook begin......");
            }
    
            boolean isRocketShutdown = ConfigManagerLoader.getConfigManager().getBooleanValue("pigeon.invoker.rocketshutdown",false);
            if(isRocketShutdown && ServicePublisher.getAllServiceProviders().size() == 0){
                // rocket shutdown
            } else {
                try {
                    ServiceFactory.unpublishAllServices();
                } catch (Throwable e) {
                    logger.error("error with shutdown hook", e);
                }
                try {
                    InvokerBootStrap.shutdown();
                } catch (Throwable e) {
                    logger.error("error with shutdown hook", e);
                }
                try {
                    ProviderBootStrap.shutdown();
                } catch (Throwable e) {
                    logger.error("error with shutdown hook", e);
                }
            }
            if (logger.isInfoEnabled()) {
                logger.info("shutdown hook end......");
            }
        }

      ServicePublisher.unpublishAllServices();

    public static void unpublishAllServices() throws RegistryException {
            if (logger.isInfoEnabled()) {
                logger.info("unpublish all services");
            }
            ServiceOnlineTask.stop();
            setServerWeight(0);
            try {
                Thread.sleep(UNPUBLISH_WAITTIME);
            } catch (InterruptedException e) {
            }
            for (String url : serviceCache.keySet()) {
                ProviderConfig<?> providerConfig = serviceCache.get(url);
                if (providerConfig != null) {
                    unpublishService(providerConfig);
                }
            }
        }
    if (existingService) {
                List<Server> servers = ProviderBootStrap.getServers(providerConfig);//获取本台机器上所有的服务
                for (Server server : servers) {
                    String serverAddress = configManager.getLocalIp() + ":" + server.getPort();//拿到ip:port
                    String registryUrl = server.getRegistryUrl(providerConfig.getUrl());//注册的url
                    RegistryManager.getInstance().unregisterService(registryUrl,
                            RegistryManager.getInstance().getGroup(url), serverAddress);

    RegistryManager.unregisterService

    public void unregisterService(String serviceName, String group, String serviceAddress) throws RegistryException {
            if (registry != null) {
                registry.unregisterService(serviceName, group, serviceAddress);
                registeredServices.remove(serviceName);
                monitor.logEvent("PigeonService.unregister", serviceName, "group=" + group);
            }
        }

    CuratorRegistry.unregisterPersistentNode

    public void unregisterPersistentNode(String serviceName, String group, String serviceAddress)
                throws RegistryException {
            String servicePath = Utils.getServicePath(serviceName, group);// DP/SERVER/http:^^service.dianping.com^rpcserver^commonServer_1.00
            try {
                if (client.exists(servicePath, false)) {
                    Stat stat = new Stat();
                    String addressValue = client.getWithNodeExistsEx(servicePath, stat);
                    String[] addressArray = addressValue.split(",");
                    List<String> addressList = new ArrayList<String>();
                    for (String addr : addressArray) {
                        addr = addr.trim();
                        if (addr.length() > 0 && !addressList.contains(addr)) {
                            addressList.add(addr);
                        }
                    }
                    if (addressList.contains(serviceAddress)) {
                        addressList.remove(serviceAddress);//把本机的ip地址去掉
                        if (!addressList.isEmpty()) {//如果去掉本机ip后不为空,继续写zk
                            Collections.sort(addressList);
                            client.set(servicePath, StringUtils.join(addressList.iterator(), ","), stat.getVersion());
                        } else {
                            List<String> children = client.getChildren(servicePath, false);//这里估计是兼容写法,因为不一定是写dp/server/节点,其他节点可能存在有子节点的情况
                            if (CollectionUtils.isEmpty(children)) {
                                if (delEmptyNode) {
                                    try {
                                        client.delete(servicePath);
                                    } catch (NoNodeException e) {
                                        logger.warn("Already deleted path:" + servicePath + ":" + e.getMessage());
                                    }
                                } else {
                                    client.set(servicePath, "", stat.getVersion());
                                }
                            } else {
                                logger.warn("Existing children [" + children + "] under path:" + servicePath);
                                client.set(servicePath, "", stat.getVersion());
                            }
                        }
                    }

    总结一下就是,如果是正常的关闭,会走钩子函数,通过写zk的方式把该服务器的ip从服务列表里去掉。

    接下来看客户端,客户端会监听zk

    无论是客户端还是服务端,启动时都会执行 

    ProviderBootStrap.init()

    在这个方法里都会调用 RegistryManager.getInstance()  进行初始化。

    public static RegistryManager getInstance() {
            if (!isInit) {
                synchronized (RegistryManager.class) {
                    if (!isInit) {
                        instance.init();
                        initializeException = null;
                        RegistryEventListener.addListener(new InnerServerInfoListener());
                        isInit = true;
                    }
                }
            }
            return instance;
        }

    最终会调用  CuratorRegistry.init() 

    进而调用 CuratorClient.init()

    private void init() throws Exception {
            if (!initialized) {
                synchronized (this) {
                    if (!initialized) {
                        curatorStateListenerActive = true;
                        newCuratorClient(configManager.getStringValue(KEY_REGISTRY_ADDRESS));
                        initialized = true;
                    }
                }
            }
        }
    private boolean newCuratorClient() throws InterruptedException {
            logger.info("begin to create zookeeper client:" + address);
            // CuratorFramework client = CuratorFrameworkFactory.newClient(address,
            // sessionTimeout, connectionTimeout,
            // new MyRetryPolicy(retries, retryInterval));
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(address)
                    .sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout)
                    .retryPolicy(new MyRetryPolicy(retries, retryInterval)).build();
            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState) {
                    logger.info("zookeeper state changed to " + newState);
                    if (newState == ConnectionState.RECONNECTED) {
                        RegistryEventListener.connectionReconnected();
                    }
                    monitor.logEvent(EVENT_NAME, "zookeeper:" + newState.name().toLowerCase(), "");
                }
            });
            client.getCuratorListenable().addListener(new CuratorEventListener(this), curatorEventListenerThreadPool);

      CuratorEventListener#eventReceived

      

    public void eventReceived(CuratorFramework client, CuratorEvent curatorEvent) throws Exception {
            WatchedEvent event = (curatorEvent == null ? null : curatorEvent.getWatchedEvent());
    
            if (event == null
                    || (event.getType() != EventType.NodeCreated && event.getType() != EventType.NodeDataChanged
                            && event.getType() != EventType.NodeDeleted && event.getType() != EventType.NodeChildrenChanged)) {
                return;
            }
    
            if (logger.isInfoEnabled())
                logEvent(event);
    
            try {
                PathInfo pathInfo = parsePath(event.getPath());
                if (pathInfo == null) {
                    logger.warn("Failed to parse path " + event.getPath());
                    return;
                }
    
                if (pathInfo.type == ADDRESS) {
                    addressChanged(pathInfo);

      最终会调用 DefaultServiceChangeListener # onServiceHostChange

    public void onServiceHostChange(String serviceName, List<String[]> hostList) {
            try {
                Set<HostInfo> newHpSet = parseHostPortList(serviceName, hostList);//收到zk事件之后,最新的服务器ip列表
                Set<HostInfo> oldHpSet = RegistryManager.getInstance().getReferencedServiceAddresses(serviceName);//从缓存里拿出来
                Set<HostInfo> toAddHpSet = Collections.emptySet();
                Set<HostInfo> toRemoveHpSet = Collections.emptySet();
                if (oldHpSet == null) {
                    toAddHpSet = newHpSet;
                } else {
                    toRemoveHpSet = Collections.newSetFromMap(new ConcurrentHashMap<HostInfo, Boolean>());
                    toRemoveHpSet.addAll(oldHpSet);
                    toRemoveHpSet.removeAll(newHpSet);
                    toAddHpSet = Collections.newSetFromMap(new ConcurrentHashMap<HostInfo, Boolean>());
                    toAddHpSet.addAll(newHpSet);
                    toAddHpSet.removeAll(oldHpSet);
                }
                if (logger.isInfoEnabled()) {
                    logger.info("service hosts changed, to added hosts:" + toAddHpSet);
                    logger.info("service hosts changed, to removed hosts:" + toRemoveHpSet);
                }
                for (HostInfo hostPort : toAddHpSet) {
                    RegistryEventListener.providerAdded(serviceName, hostPort.getHost(), hostPort.getPort(),
                            hostPort.getWeight());
                }
                for (HostInfo hostPort : toRemoveHpSet) {
                    RegistryEventListener.providerRemoved(serviceName, hostPort.getHost(), hostPort.getPort());
                }

      重点看 RegistryEventListener.providerRemoved

      ServiceProviderChangeListener的实现类有四种,都是匿名类,每个类要解决的问题都不同

      1 ClientManager中的匿名类是这么实现的

      RegistryManager # removeServiceAddress

    public void removeServiceAddress(String serviceName, HostInfo hostInfo) {
            Set<HostInfo> hostInfos = referencedServiceAddresses.get(serviceName);
            if (hostInfos == null || !hostInfos.contains(hostInfo)) {
                logger.info("address:" + hostInfo + " is not in address list of service " + serviceName);
                return;
            }
            hostInfos.remove(hostInfo);
            logger.info("removed address:" + hostInfo + " from service:" + serviceName);
    
            HostInfo cachedHostInfo = referencedAddresses.get(hostInfo.getConnect());
            if (cachedHostInfo != null) {
                cachedHostInfo.removeService(serviceName);
            }
    
            // If server is not referencd any more, remove from server list
            if (!isAddressReferenced(hostInfo)) {
                referencedAddresses.remove(hostInfo.getConnect());
            }
        }

    上面的代码很好理解,就是把缓存的服务端信息清理掉

    2  ClusterListenerManager # InnerServiceProviderChangeListener

    class InnerServiceProviderChangeListener implements ServiceProviderChangeListener {
            @Override
            public void hostWeightChanged(ServiceProviderChangeEvent event) {
            }
    
            @Override
            public void providerAdded(ServiceProviderChangeEvent event) {
            }
    
            @Override
            public void providerRemoved(ServiceProviderChangeEvent event) {
                // addConnect的逆操作
                String connect = NetUtils.toAddress(event.getHost(), event.getPort());
                if (logger.isInfoEnabled()) {
                    logger.info("[cluster-listener-mgr] remove:" + connect + " from " + event.getServiceName());
                }
                ConnectInfo cmd = connectInfoMap.get(connect);
                if (cmd != null) {
                    cmd.getServiceNames().remove(event.getServiceName());
                    if (cmd.getServiceNames().size() == 0) {
                        connectInfoMap.remove(connect);
                    }
                }
                for (ClusterListener listener : listeners) {
                    listener.doNotUse(event.getServiceName(), event.getHost(), event.getPort());
                }
            }
        }

      DefaultClusterListener # doNotUse

    public void doNotUse(String serviceName, String host, int port) {
            if (logger.isInfoEnabled()) {
                logger.info("[cluster-listener] do not use service provider:" + serviceName + ":" + host + ":" + port);
            }
            List<Client> cs = serviceClients.get(serviceName);
            List<Client> newCS = new CopyOnWriteArrayList<Client>();
            if (cs != null && !cs.isEmpty()) {
                newCS.addAll(cs);
            }
            Client clientFound = null;
            for (Client client : cs) {
                if (client != null && client.getHost() != null && client.getHost().equals(host) && client.getPort() == port) {
                    newCS.remove(client);
                    clientFound = client;
                }
            }
            serviceClients.put(serviceName, newCS);//还是清理缓存
    
            // 一个client可能对应多个serviceName,仅当client不被任何serviceName使用时才关闭
            if (clientFound != null) {
                if (!isClientInUse(clientFound)) {
                    allClients.remove(clientFound.getAddress());
                    RequestQualityManager.INSTANCE.removeClientQualities(clientFound.getAddress());//统计质量信息也就是请求失败次数等,清理掉
                    closeClientInFuture(clientFound);//关闭tcp连接
                }
            }
        }

    上下的两个内名实现类就略过了,一个是空实现,一个是清理权重信息

    其实,除此之外,当客户端启动的时候,会和每一个服务端都建立连接,同时和每个服务端都有心跳检测,如果心跳检测失败,客户端也会执行清理缓存和断开连接的操作。

  • 相关阅读:
    frame框架 超链接
    SQL与ASP日期时间
    J2EE开发环境搭建(3)——保存你搭建好的J2EE开发环境
    td.moveRow方法
    SQL Management Studio Express 安装缺少MSXML6解决
    js tr onmouseover时改变该行背景色
    doc 解决windows系统中名字为eula的文件无法被删除
    js select 隐藏option
    LLBLGen的数据库相对应SQL语句实现方法收藏
    使用 DataAdapter 更新数据源
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14863087.html
Copyright © 2011-2022 走看看