zoukankan      html  css  js  c++  java
  • xxl-job执行器的注册

    一、执行器注册流程

    二、具体流程

    1.注册监控线程

    //类:JobRegistryHelper.java;方法:public void start()
    registryMonitorThread = new Thread(new Runnable() {
    			@Override
    			public void run() {
    				while (!toStop) {
    					try {
    						//获取自动注册型执行器
    						List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
    						if (groupList!=null && !groupList.isEmpty()) {
    
    							//移除注册中心死亡的地址
    							List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
    							if (ids!=null && ids.size()>0) {
    								XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
    							}
    
    							//刷新注册中心活跃的地址,并保存为app和注册地址列表的映射
    							HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
    							List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
    							if (list != null) {
    								for (XxlJobRegistry item: list) {
    									if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
    										String appname = item.getRegistryKey();
    										List<String> registryList = appAddressMap.get(appname);
    										if (registryList == null) {
    											registryList = new ArrayList<String>();
    										}
    
    										if (!registryList.contains(item.getRegistryValue())) {
    											registryList.add(item.getRegistryValue());
    										}
    										appAddressMap.put(appname, registryList);
    									}
    								}
    							}
    
    							//刷新执行器地址
    							for (XxlJobGroup group: groupList) {
    								List<String> registryList = appAddressMap.get(group.getAppname());
    								String addressListStr = null;
    								//注册中心存在活跃的地址则更新为活跃地址,否则更新为空地址
    								if (registryList!=null && !registryList.isEmpty()) {
    									Collections.sort(registryList);
    									StringBuilder addressListSB = new StringBuilder();
    									for (String item:registryList) {
    										addressListSB.append(item).append(",");
    									}
    									addressListStr = addressListSB.toString();
    									addressListStr = addressListStr.substring(0, addressListStr.length()-1);
    								}
    								group.setAddressList(addressListStr);
    								group.setUpdateTime(new Date());
    
    								XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
    							}
    						}
    					} catch (Exception e) {
    						if (!toStop) {
    							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
    						}
    					}
    					try {
    						//睡眠一个心跳超时时间,集训监控自动注册型执行器列表
    						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
    					} catch (InterruptedException e) {
    						if (!toStop) {
    							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
    						}
    					}
    				}
    				logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
    			}
    		});
    		registryMonitorThread.setDaemon(true);
    		registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
    		registryMonitorThread.start();
    

    2.注册过程

    1 初始化执行器

    //XxlJobExecutor.java
    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
            if (adminAddresses!=null && adminAddresses.trim().length()>0) {
                for (String address: adminAddresses.trim().split(",")) {
                    if (address!=null && address.trim().length()>0) {
    
                        AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
    
                        if (adminBizList == null) {
                            adminBizList = new ArrayList<AdminBiz>();
                        }
                        adminBizList.add(adminBiz);
                    }
                }
            }
        }
    

    2 执行器端注册

        public void start(final String appname, final String address){
    
            //省略部分
    
            registryThread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    // registry
                    while (!toStop) {
                        try {
                            RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                            for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                                try {
    				//选择一个执行器,发起rpc注册请求
                                    ReturnT<String> registryResult = adminBiz.registry(registryParam);
                                    if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                        registryResult = ReturnT.SUCCESS;
                                        logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
    				    //注册成功则退出循环
                                        break;
                                    } else {
    				    //注册失败则打印日志,尝试下一个执行器
                                        logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                    }
                                } catch (Exception e) {
                                    logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                                }
    
                            }
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(e.getMessage(), e);
                            }
    
                        }
    
                        try {
                            if (!toStop) {
    			    //睡眠一个心跳超时时间,继续注册
                                TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                            }
                        } catch (InterruptedException e) {
                            if (!toStop) {
                                logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                            }
                        }
                    }
    
                    //移除注册部分省略
                }
            });
            registryThread.setDaemon(true);
            registryThread.setName("xxl-job, executor ExecutorRegistryThread");
            registryThread.start();
        }
    

    3 调度中心执行注册

    //JobRegistryHelper.java
    public ReturnT<String> registry(RegistryParam registryParam) {
    
    		// valid
    		if (!StringUtils.hasText(registryParam.getRegistryGroup())
    				|| !StringUtils.hasText(registryParam.getRegistryKey())
    				|| !StringUtils.hasText(registryParam.getRegistryValue())) {
    			return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
    		}
    
    		//从线程池中获取注册线程执行注册
    		registryOrRemoveThreadPool.execute(new Runnable() {
    			@Override
    			public void run() {
    				//更新注册结果
    				int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
    				if (ret < 1) {
    					//更新失败则添加注册结果
    					XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
    
    					// fresh
    					freshGroupRegistryInfo(registryParam);
    				}
    			}
    		});
    
    		return ReturnT.SUCCESS;
    	}
    
  • 相关阅读:
    Facebook主页照片和封面照片的尺寸要求
    NopCommerce源码架构详解
    Razor语法大全
    IIS 8 上传图片 上传文件报413错误及仅Https下报413问题,IIS高版本的配置方案及Web.config配置全解
    (一) MongoDB安装与配置
    Net Core 导出PDF
    ASP.NET Core AutoWrapper 自定义响应输出
    浅谈Docker之Docker数据持久化Bind Mount和Volume(转)
    搭建mysql集群
    MySQL错误:Can't connect to MySQL server (10060) 解决方案
  • 原文地址:https://www.cnblogs.com/huanongying/p/14900890.html
Copyright © 2011-2022 走看看