zoukankan      html  css  js  c++  java
  • zookeeper初体验之关于解决quartz重复执行任务的一种思路

    前阵子工作中遇到了一个很麻烦的问题。
    本人所在的项目组做了一个机遇quartz集群的任务系统。通俗点讲就是用quartz框架(quartz是一款能跑定时任务的框架支持复杂的时间表达式)
    来执行定时任务。但是这里定时任务的并发数很多,就出现了一个问题,同一个trigger被多个机器重复的触发了,这就造成了执行的任务数目
    比预期的多很多。领导就让我处理这个问题。
    开始我以为是这个框架本身的配置有问题,结果翻了很多资料还是没解决(这里不过多讲这个,有兴趣可以留言)。那么问题出在哪里呢?
    quartz的任务工作的方式是这样的。当任务达到触发条件的时候(当这条任务满足qrtz_cron_triggers表中定义的相关的时间表达式的时候)
    qrtz_triggers表对应的这条记录的状态发生改变,同时下次触发时间根据时间表达式做出改变,同时根据sched_name找到qrtz_job_details
    表中的具体job去执行,下面就是具体的业务了。我这里的问题就出在同一时间内(前后相差几ms)多台机器触发了同一条trigger。然而这个
    我是没有办法解决的或者说不想动quartz的源码(有朋友能从这一步就把问题搞定的可以交流一下),所以我就顺着quartz的工作流程继续往下
    到了job这里,由于多个trigger被触发所以执行了多次job,那么我是否能通过让他只执行一次job来防止重复执行呢。如果有一种方法可以
    让这个job执行一次就可以达到我的要求了(选择在job这一步处理其实还是因为不想动源码,到java这里我就好办了)。正好我对zookeeper
    有一些了解,zookeeper恰好有一种注册机制可以解决这个问题。
    回顾一下zookeeper关于节点注册的用法:
    zookeeper只可以注册一个同名节点如果节点已经存在则返回nodeexits.
    那么运用到我这个场景就是当任务进入job之后用job id(同时触发的这几个job的id是一样的)去向zookeeper完成注册,由于
    id是一样的那么只能有一个注册成功,只要在注册成功的条件下我才允许task。这样就保证了不做重复的运算。
    具体如下:

    public class PlatformQuartzJobBean extends QuartzJobBean {
    	private String path = "/zk_triggerID";
    	private String lock = "/zk_lock";
    	private static ZooKeeper zk = null;
    	static{
    		try {
    			zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher());
    		} catch (IOException e) {
    			logger.error(e.getMessage(),e);
    		}
    	}
    	//任务执行的具体逻辑
    	protected void executeInternal(JobExecutionContext jobexecutioncontext)
    		throws JobExecutionException {
    		Trigger trigger = jobexecutioncontext.getTrigger();
    		String triggerName = trigger.getKey().getName();//triggername是唯一的
    		boolean createSuccess = false;
    		boolean doTask = false; //不对zookeeper注册执行任务
    		int childrens = 0;
    		List<String> children = null;
    		boolean onDelete = false;//是否获取删除节点的权限
    		try {
    			zk.create(path+"/"+triggerName, triggerName.getBytes(), 
    					Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);//如果注册出现问题说明节点存在是重复的任务
    			createSuccess = true;
    			children = zk.getChildren(path, false);
    			if(children != null){
    				childrens = children.size();
    			}
    			if(childrens>99){//节点个数达到100个就执行删除操作
    				try {
    					zk.create(lock+"/dodelete", "dodelete".getBytes(), 
    							Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    					onDelete = true;
    				} catch (KeeperException e1) {
    					if(e1 instanceof NodeExistsException){
    						logger.info("already on delete!");
    					}else{
    						logger.error(e1.getMessage(),e1);
    					}
    				} catch (InterruptedException e1) {
    					logger.error(e1.getMessage(),e1);
    				}
    				
    			}
    			//执行具体的任务
    			execuTask(trigger,triggerName,jobexecutioncontext,af);
    		} catch (KeeperException e) {
    			if(e instanceof NodeExistsException){
    				logger.info("already on do");
    			}else if(e instanceof ConnectionLossException){
    				logger.info("ConnectionLoss ,do task without registered!!");
    				doTask = true;
    			}else if(e instanceof SessionExpiredException){
    				logger.info("session expired ,do task without registered!!");
    				doTask = true;
    				try {
    					zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher());
    				} catch (IOException e1) {
    					logger.error(e1.getMessage(),e1);
    				}
    			}else{
    				logger.error(e.getMessage(),e);
    			}
    		} catch (InterruptedException e) {
    			logger.error(e.getMessage(),e);
    		}
    		if(createSuccess && onDelete){//如果创建成功并且root下有执行删除的权利
    			try {
    				for(String str:children){
    					zk.delete(path+"/"+str, -1);
    				}
    			} catch (KeeperException e1) {
    				logger.error(e1.getMessage(),e1);
    			} catch (InterruptedException e1) {
    				logger.error(e1.getMessage(),e1);
    			}finally{
    				if(onDelete){
    					try {
    						zk.delete(lock+"/dodelete", -1);
    					} catch (InterruptedException e) {
    						logger.error(e.getMessage(),e);
    					} catch (KeeperException e) {
    						if(e instanceof ConnectionLossException){
    							logger.info("ConnectionLoss ,reconnect zk!!");
    							try {
    								zk.close();//人为失效,删除dodelete节点
    								zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher());
    							} catch (InterruptedException e1) {
    								e1.printStackTrace();
    							} catch (IOException e1) {
    								e1.printStackTrace();
    							}
    							
    						}else{
    							logger.error(e.getMessage(),e);
    						}
    						
    					}
    				}
    			}
    			
    		}
    		//如果出现connectloss和sessionexpired 可能是网络有点问题找不到zookeeper就不管重复问题了完成任务为最重要的
    		if(doTask){//如果出现connectloss和sessionexpired 就直接执行任务
    			execuTask(trigger,triggerName,jobexecutioncontext,af);
    		}
    	
    	}
    	
    }

    整个过程就是:当job开始的时候去向zookeeper申请注册,只有当注册成功的时候才执行业务,失败则退出job。同时由于我这里是每天循环的
    定时任务所以当zookeeper下的节点数目达到一定的个数的时候加一个删除锁(就是向zookeeper create一个ondetele节点),同时删除之前
    的triggername节点,这样保证了明天这些任务可以继续完成。至此,任务重复执行的问题就解决了。下一篇博客将简单的介绍一下zookeeper和
    zookeeper的布置,虽然网上这方面东西很多,不过自己写出来(自己实践过可以用的),以后可以直接拿来用。。

  • 相关阅读:
    Lambda表达式 For Android
    RxJava重温基础
    Asp.Net Core 依赖注入默认DI,Autofac注入
    Asp.Net Core2.0 基于QuartzNet任务管理系统
    Asp.Net Core 基于QuartzNet任务管理系统(这是一篇用来水的随笔)
    ADO.NET通用类库
    TripleDES加密解密
    ASP.NET Core的身份认证框架IdentityServer4--(4)添加第三方快捷登录
    ASP.NET Core的身份认证框架IdentityServer4--(3)令牌服务配置访问控制跟UI(可自定义路由)添加
    ASP.NET Core的身份认证框架IdentityServer4--(2)API跟WEB端配置
  • 原文地址:https://www.cnblogs.com/nfsnyy/p/5741593.html
Copyright © 2011-2022 走看看