zoukankan      html  css  js  c++  java
  • zookeeper实战:SingleWorker代码样例

    们需要一个“单点worker”系统,此系统来确保系统中定时任务在分布式环境中,任意时刻只有一个实例处于活跃;比如,生产环境中,有6台机器支撑一个应用,但是一个应用中有30个定时任务,这些任务有些必须被在“单线程”环境中运行(例如“数据统计”任务),避免并发的原因不是在java层面,可能是在操作db数据时,或者是在消息消费时,或者是信息推送时等。某个指标的“数据统计”任务,每天只需要执行一次,即使执行多次也是妄费,因为这种类型的定时任务,需要被“单点”。同时,如果一个任务在没有报告结果的情况下异常推出,我们仍然期望集群中其他实例能够主动“接管”它。在实现不良好的架构中,可能有些开发者使用手动触发特定脚本的方式执行,有些web项目可能是通过配置特定host的方式开启任务。对于某些定时任务,可能会采用quartz-cluster中的某些实现,但是他需要数据库的额外支持。
         此时,我们将使用zookeeper来实现此功能。本实例提供了如下功能展示:
        1) 提供了单点worker功能
        2) 提供了worker均衡能力(30个worker相对均匀的分配到6台机器上)
        3) 提供了worker失效接管能力。

         但是仍有很多亟待解决的问题:
        1) 无法确保任务的接管是及时的,即一个任务执行者失效,将会在一定的过期时间后,才会被其他sid接管
        2) 在极少的情况下,仍然会有一个任务同时被2个sid执行。
        3) 在极少的情况下,会有极短的时间内,一个任务不会被任何sid接管,处于“孤立”状态
        尽管zk提供了watch机制,但是上述问题,不仅不能完全避免,还会额外增加代码的复杂度。最终我个人放弃了对在此类中使用watch的想法。。
        注意:zk中exist和create/delete等操作并非原子,可能在exist返回false的情况下,去create此节点,也有可能抛出NoExistsException;你应该能够想到“并发”环境造成此问题的时机(其他zk客户端也有类似的操作,并发)。
        注意:在zk中删除父节点,将会导致子节点一并删除;同理,如果创建一个节点,那么它的各级父节点必须已经存在,且节点的层级越深,对zk底层存储而言数据结构越冗杂。
     
        数据结构与设计思路:
        1) serverType为当前应用标识,我们期望每个应用都有各自的serverType,方便数据分类; jobType为任务类型或者任务名称;如下全节点表示某个serverType的jobType下有sid1,sid2,sid3共三个实例(例如tomcat实例,或者物理机器标识)参与了此任务。zk节点路径格式:
            /severType/jobType/register/sid1 
            ............................../sid2
            ............................../sid3
        2) 表示此jobType,被sid1运行。zk节点路径格式
            /severType/jobType/alive/sid1 挂载数据:null
        3) /serverType/jobType 挂载数据:cronException;将任务的“cron表达式”作为数据挂载
        4) {todu} 表示serverType下每个sid运行的任务个数,我们可以用来“均衡”任务,将新任务分配给任务较少的sid上。
            /serverType/sid1 挂载数据:任务个数.

    如下是本人的代码样例,实际生产环境中代码与样例有区别,此处仅供参考,本实例基于zookeeper + quartz 2.1,如有错误之处,请不吝赐教:

    1) TestMain.java :测试引导类

    2) PrintNumberJob.java:一个简单的任务,打印一个随即数字。

    3) PrintTimeJob.java:一个简单的任务,打印当前时间。

    4) SingleWorkerManager.java:核心类,用于处理调用者提交的任务,并确保结果符合预期。此类有2个内部工作线程组成,分别处理zk数据同步和用户任务交付等工作。

    很遗憾,源代码非常的长,尽管我已经足够细心的整理格式,但还是不够悦目,建议参阅者下载代码阅读,谢谢

    SingleWorkerManager.java

    1. package com.sample.zk.singleWorker;  
    2.   
    3. /**  
    4.  *  
    5.  * @author qing 
    6.  * 
    7.  */  
    8. public class SingleWorkerManager {  
    9.   
    10.     private static final String GROUP = "single-worker";  
    11.     private Scheduler scheduler;  
    12.     private ZooKeeper zkClient;  
    13.     private String serverType = "_-default-_";//默认serverType类型,我祈祷不会有人估计它和一样  
    14.     private static final String REGISTER = "/register";  
    15.     private static final String ALIVE = "/alive";  
    16.       
    17.     private Watcher dw = new InnerZK();//default watcher;  
    18.       
    19.     private boolean isAlive = false;//是否可用  
    20.       
    21.     private Object tag = new Object();  
    22.       
    23.     private ReentrantLock lock = new ReentrantLock();  
    24.       
    25.     private String sid;//当前server标记,可以是IP等,主要用来表达如下描述:某sid上运行**任务;将**任务分配给某sid;  
    26.     //真实场景下,可以为IP为192.168.197.2下tomcat运行printNumber任务;  
    27.     //任务在哪个server上运行,需要有明确的信息才行,所以sid的设计需要很直观。  
    28.     //  
    29.     //很多时候,我们都是以ip地址来标记任务被运行的环境地址,不过在有些比较“穷苦的公司”,  
    30.     //可能一个物理server下运行多个对等的tomcat实例  
    31.     //或许这种方式下,使用ip作为标记,就一些麻烦了。  
    32.       
    33.     //已经在本地提交,但尚未提交给zk的任务,直到zk接受任务之后,提交任务者才返回  
    34.     //+++++++++++++++++++++++++++++++++++++++++++  
    35.     //如何设计任务提交,可能面临一个奇怪的选择,如下是2种队列:  
    36.     //LinkedBlockingQueue提供了阻塞与非阻塞两种方式,非阻塞的方式允许任务提交这立即返回,但是此任务此后是否能够被zk  
    37.     //正确接受将存在风险,有可能zk的故障,导致此任务无法正常运行。  
    38.     //private BlockingQueue<Worker> outgoingWorker  = new LinkedBlockingQueue<Worker>();  
    39.     //++++++++++++++++++++++++++++++++++++++++++++  
    40.     //同步队列,是一个“单工”队列,如果任务任务被zk正确接受之后,任务提交者才返回,这是一个理想的情况。任务一旦开始被处理,任务提交者就可以返回了。  
    41.     //如果任务提交时,刚好zk环境故障,那么此任务将会被重试多次,如果还未能成功,则失败。  
    42.     //++++++++++++++++++++++++  
    43.     //无论如何,你都需要做出一个选择,我选择了最直观的答案:SynchronousQueue + 同步  
    44.     //++++++++++++++++++++++++  
    45.     private SynchronousQueue<Worker> outgoingWorker = new SynchronousQueue<Worker>();  
    46.       
    47.     //当前serverType下所有的任务  
    48.     private Map<String,Worker> allWorkers = new HashMap<String,Worker>();  
    49.       
    50.     //当前实例上所运行的任务,它是allWorkers的子集  
    51.     private Map<String,Worker> selfWorkers = new HashMap<String,Worker>();  
    52.       
    53.     //用来间歇性的与zk进行同步,用来检测job的冲突或者新job的分配  
    54.     private Thread syncThread;  
    55.     //用于向zk提交任务数据的线程,将和SynchronousQueue协同工作  
    56.     private Thread workerThread;  
    57.       
    58.     /** 
    59.      * 创建zk实例 
    60.      */  
    61.     public SingleWorkerManager(String sid){  
    62.         this(sid,null);  
    63.           
    64.     }  
    65.       
    66.     public SingleWorkerManager(String sid,String sType){  
    67.         if(sType != null){  
    68.             this.serverType = sType;  
    69.         }  
    70.         try{  
    71.             zkClient = new ZooKeeper(Constants.connectString, 3000, dw,false);  
    72.         }catch(Exception e){  
    73.             e.printStackTrace();  
    74.             throw new RuntimeException(e);  
    75.         }  
    76.         this.sid = sid;  
    77.         syncThread = new Thread(new SyncHandler());  
    78.         syncThread.setDaemon(true);  
    79.         syncThread.start();  
    80.     }  
    81.       
    82.     /** 
    83.      * 开启任务调度器 
    84.      */  
    85.     public void start(){  
    86.         try{  
    87.             scheduler = StdSchedulerFactory.getDefaultScheduler();  
    88.             scheduler.start();  
    89.             workerThread = new Thread(new WorkerHandler());  
    90.             workerThread.setDaemon(true);  
    91.             workerThread.start();  
    92.             isAlive = true;  
    93.             synchronized (tag) {  
    94.                 tag.notifyAll();  
    95.             }  
    96.               
    97.             //首次同步  
    98.             sync();  
    99.         }catch(Exception e){  
    100.             e.printStackTrace();  
    101.             throw new RuntimeException(e);//异常退出  
    102.         }  
    103.     }  
    104.       
    105.     /** 
    106.      * 关闭任务调度器,关闭zookeeper链接 
    107.      * 此后将导致任务被立即取消,singleWorkerManager实例将无法被重用 
    108.      */  
    109.     public void close(){  
    110.         lock.lock();  
    111.         try{  
    112.             isAlive = false;  
    113.             scheduler.shutdown();  
    114.             if (syncThread.isAlive()) {  
    115.                 syncThread.interrupt();  
    116.             }  
    117.             if(workerThread.isAlive()){  
    118.                 workerThread.interrupt();  
    119.             }  
    120.             if(zkClient != null){  
    121.                 zkClient.close();  
    122.             }  
    123.         }catch(Exception e){  
    124.             e.printStackTrace();  
    125.         }finally{  
    126.             lock.unlock();  
    127.         }  
    128.     }  
    129.       
    130.     /** 
    131.      * 取消job,将触发zk服务也“取消”此任务 
    132.      * @param jobName 
    133.      */  
    134.     public void unschedule(String jobName){  
    135.         try{  
    136.             //here,zk  
    137.             lock.unlock();  
    138.             try{  
    139.                 String jobPath = "/" + serverType + "/" + jobName;  
    140.                 Stat stat = zkClient.exists(jobPath, false);  
    141.                 if(stat != null){  
    142.                     zkClient.delete(jobPath, stat.getVersion());  
    143.                 }  
    144.             }catch(NoNodeException e){  
    145.                 //ignore;  
    146.             }catch(Exception e){  
    147.                 e.printStackTrace();  
    148.             }  
    149.             //有syncHandler来取消本地任务  
    150. //          //here,local scheduler  
    151. //          //无论如何,本地都要取消  
    152. //          TriggerKey key = new TriggerKey(jobName, GROUP);  
    153. //          if(scheduler.checkExists(key)){  
    154. //              scheduler.unscheduleJob(key);  
    155. //          }  
    156.         }catch(Exception e){  
    157.             e.printStackTrace();  
    158.         }  
    159.     }  
    160.       
    161.     /** 
    162.      * 提交任务,如果提交失败,将抛出异常 
    163.      * @param jobClass 
    164.      * @param cronExpression 
    165.      * @return true任务提交成功,false任务提交失败 
    166.      */  
    167.     public boolean schedule(Class<? extends Job> jobClass,String cronExpression){  
    168.         if(!isAlive){  
    169.             throw new IllegalStateException("worker has been closed!");  
    170.         }  
    171.         try{  
    172.             Worker worker = this.build(jobClass, cronExpression);  
    173.             return outgoingWorker.offer(worker,15,TimeUnit.SECONDS);//waiting here,最多15妙  
    174.         }catch(Exception e){  
    175.             e.printStackTrace();  
    176.             throw new RuntimeException(e);  
    177.         }  
    178.     }  
    179.       
    180.     private Worker build(Class<? extends Job> jobClass,String cronExpression){  
    181.         String name = jobClass.getName();//全路径name  
    182.         JobDetail job = JobBuilder.newJob(jobClass).withIdentity(name,GROUP).build();  
    183.         CronScheduleBuilder sb = CronScheduleBuilder.cronSchedule(cronExpression);//每两秒执行一次:"*/2 * * * * ?"  
    184.         Trigger trigger = TriggerBuilder.newTrigger().withIdentity(name, GROUP).withSchedule(sb).build();  
    185.         return new Worker(job, trigger,cronExpression);  
    186.     }  
    187.       
    188.       
    189.     ///////////////////////////////////////////////////////inner worker//////////////////////////////  
    190.       
    191.     /** 
    192.      * 当前实例的zkClient是否链接正常,scheduler是否处于可用状态 
    193.      * @return 
    194.      */  
    195.     private boolean isReady(){  
    196.         if(!isAlive){  
    197.             return false;  
    198.         }  
    199.         if(scheduler == null || zkClient == null){  
    200.             return false;  
    201.         }  
    202.         try{  
    203.             if(scheduler.isShutdown() || !scheduler.isStarted()){  
    204.                 return false;  
    205.             }  
    206.         }catch(Exception e){  
    207.             e.printStackTrace();  
    208.             return false;  
    209.         }  
    210.         if(zkClient.getState().isConnected()){  
    211.             return true;  
    212.         }  
    213.         return false;  
    214.     }  
    215.       
    216.     /** 
    217.      * 同步selfWorkers列表,和zk环境中的列表进行比较,查看是否有任务冲突 
    218.      */  
    219.     private void syncSelfWorker(){  
    220.         lock.lock();  
    221.         try{  
    222.             if(!isReady()){  
    223.                 throw new RuntimeException("Scheduler error..");//以异常的方式中断  
    224.             }  
    225.             //首先检测自己持有的任务列表,是否和zk一致,首次同步,selfWorkers肯定是空,需要sync后续去做调度。  
    226.             for(String job : selfWorkers.keySet()){  
    227.                 String jobPath = "/" + serverType + "/" + job;  
    228.                 //如果此任务已经被远程取消,则取消本地job执行  
    229.                 //所有的实例都会做同样的事情,一定会把那些“取消的任务”取消  
    230.                 if(zkClient.exists(jobPath, false) == null){  
    231.                     allWorkers.remove(job);  
    232.                     Worker cw = selfWorkers.remove(job);  
    233.                     if(cw != null){  
    234.                         if(scheduler.checkExists(cw.getJob().getKey())){  
    235.                             scheduler.unscheduleJob(cw.getTrigger().getKey());  
    236.                         }  
    237.                     }  
    238.                     continue;  
    239.                 }  
    240.                 String alive = "/" + serverType + "/" + job + ALIVE;  
    241.                 //查看是否有子节点冲突,比如一个job被多个server运行  
    242.                 List<String> alives = zkClient.getChildren(alive, false);  
    243.                 if(alives == null || alives.isEmpty()){  
    244.                     //如果此任务尚未分配,则交付给workerHandler  
    245.                     continue;  
    246.                 }  
    247.                 if(alives.size() == 1){  
    248.                     String holder = alives.get(0);  
    249.                     //如果已分配且接管者是自己,更新时间  
    250.                     if(holder.equalsIgnoreCase(sid)){  
    251.                         byte[] data = String.valueOf(System.currentTimeMillis()).getBytes();  
    252.                         zkClient.setData(alive + "/" + sid, data, -1);//ignore version  
    253.                         continue;//如果是自己  
    254.                     }  
    255.                 }  
    256.                 //对于其他情况,当前sid只能让步(有可能会存在所有的sid都让步,导致任务在极短时间内无法运行,  
    257.                 //后台“补救”线程会做工作)  
    258.                 if(zkClient.exists(alive + "/" + sid, false) != null){  
    259.                     try{  
    260.                         zkClient.delete(alive + "/" + sid, -1);  
    261.                         scheduler.unscheduleJob(new TriggerKey(job, GROUP));  
    262.                         selfWorkers.remove(job);  
    263.                     }catch(NoNodeException e){  
    264.                         //ignore:  
    265.                     }catch (Exception e) {  
    266.                         e.printStackTrace();  
    267.                     }  
    268.                 }  
    269.             }  
    270.         }catch(Exception e){  
    271.             e.printStackTrace();  
    272.         }finally{  
    273.             lock.unlock();  
    274.         }  
    275.     }  
    276.       
    277.   
    278.     /** 
    279.      * 同步任务信息,将当前实例中scheduler运行的任务和zk进行比较,进行冲突检测。 
    280.      * 1) 检测自己正在运行的任务,是否和zk中心中分配给自己的任务列表一致。 
    281.      * 2) 获得当前serverType下所有的任务列表 
    282.      *  
    283.      */  
    284.     private void sync(){  
    285.         lock.lock();  
    286.         try{  
    287.             if(!isReady()){  
    288.                 throw new RuntimeException("Scheduler error..");  
    289.             }  
    290.             //检测一级节点  
    291.             Stat tstat = zkClient.exists("/" + serverType,false);  
    292.             if(tstat == null){  
    293.                 try{  
    294.                     zkClient.create("/" + serverType, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
    295.                 }catch(NodeExistsException e){  
    296.                     //ignore  
    297.                 }  
    298.             }  
    299.             //+++++++++++++++++++  
    300.             syncSelfWorker();  
    301.             //+++++++++++++++++++  
    302.               
    303.             //获得所有任务列表  
    304.             List<String> allJobs = zkClient.getChildren("/" + serverType, false);  
    305.             if(allJobs == null){  
    306.                 throw new RuntimeException("NO jobs, error..");//以异常的方式,终端方法调用,没有别的意思。  
    307.             }  
    308.             allWorkers.clear();//reload all  
    309.             for(String job : allJobs){  
    310.                 try{  
    311.                     //job为类的全名,节点下挂载的数据为cronException  
    312.                     byte[] data = zkClient.getData("/" + serverType + "/" + job, false, null);  
    313.                     if(data == null || data.length == 0){  
    314.                         continue;  
    315.                     }  
    316.                       
    317.                     //简单考虑吧,不过作为一名合格的程序员,此处可能需要太多的校验。  
    318.                     Class<? extends Job> jobClass = (Class<? extends Job>)ClassLoader.getSystemClassLoader().loadClass(job);  
    319.                     Worker worker = build(jobClass, new String(data));  
    320.                     allWorkers.put(job,worker);  
    321.                     //自己检测到任务后,注册自己  
    322.                     String registerPath = "/" + serverType + "/" + job + REGISTER + "/" + sid;  
    323.                     //如果不存在  
    324.                     if(zkClient.exists(registerPath, false) == null){  
    325.                         try{  
    326.                             zkClient.create(registerPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
    327.                         }catch(NodeExistsException ex){  
    328.                             //ignore;如果自己已经注册过,则忽略  
    329.                         }  
    330.                     }  
    331.                     //检测此worker是否为自己所持有  
    332.                     String alivePath = "/" + serverType + "/" + job + ALIVE +"/" + sid;  
    333.                     //如果此任务不属于自己运行,则继续  
    334.                     if(zkClient.exists(alivePath, false) == null){  
    335.                         continue;  
    336.                     }  
    337.                     //如果属于自己运行,则开启任务,本地是否开启任务,完全取决于zk的数据状态  
    338.                     try{  
    339.                         boolean exists = scheduler.checkExists(worker.getJob().getKey());  
    340.                         if(!exists){  
    341.                             //如果尚未在当前实例中调度,则立即调度  
    342.                             scheduler.scheduleJob(worker.getJob(),worker.getTrigger());  
    343.                             selfWorkers.put(job,worker);  
    344.                         }  
    345.                     }catch(Exception e){  
    346.                         e.printStackTrace();  
    347.                         zkClient.delete(alivePath, -1);//ignore version;  
    348.                         //再次校验  
    349.                         selfWorkers.remove(job);  
    350.                     }  
    351.                 }catch(ClassNotFoundException e){  
    352.                     e.printStackTrace();  
    353.                     throw new RuntimeException(e);  
    354.                 }  
    355.             }  
    356.               
    357.         }catch(Exception e){  
    358.             e.printStackTrace();  
    359.         }finally{  
    360.             lock.unlock();  
    361.         }  
    362.     }  
    363.       
    364.       
    365.     class InnerZK implements Watcher {  
    366.   
    367.         public void process(WatchedEvent event) {  
    368.             // 如果是“数据变更”事件  
    369.             if (event.getType() != EventType.None) {  
    370.                 //processExt(event);  
    371.                 return;  
    372.             }  
    373.               
    374.             // 如果是链接状态迁移  
    375.             // 参见keeperState  
    376.             switch (event.getState()) {  
    377.             case SyncConnected:  
    378.                 System.out.println("Connected...");  
    379.                 // 链接状态迁移时,检测worker信息  
    380.                 sync();  
    381.                 break;  
    382.             case Expired:  
    383.                 System.out.println("Expired...");  
    384.                 break;  
    385.             // session过期  
    386.             case Disconnected:  
    387.                 // 链接断开,或session迁移  
    388.                 System.out.println("Connecting....");  
    389.                 break;  
    390.             case AuthFailed:  
    391.                 close();  
    392.                 throw new RuntimeException("ZK Connection auth failed...");  
    393.             default:  
    394.                 break;  
    395.             }  
    396.         }  
    397.           
    398.     }  
    399.       
    400.     /** 
    401.      * 分配任务,在所有的worker信息都同步结束后,然后在逐个检测任务状态,对于没有 
    402.      * 被执行的新任务,或者已经失去托管的任务,交付给其他sid。 
    403.      *  
    404.      * 任务分配,没有采取“严格均衡”的方式,我们使用了一个随即方式。 
    405.      */  
    406.     private void scheduler(){  
    407.         lock.lock();  
    408.         for(String job : allWorkers.keySet()){  
    409.             try{  
    410.                 //如果没有,则创建一个持久节点,挂载数据为,系统时间戳,你可以为此节点加上ACL控制,但会带来复杂度  
    411.                 //这里可以创建为临时节点,那么你需要对此节点注册watch,当watch触发时(比如其他sid的session失效等)做job的接管  
    412.                 //考虑到如果大量的job,大量的watch,在网络复杂的情况下,再加上对zk的并发操作,数据一致性是个问题。  
    413.                 //此处,我们采取挂载“时间戳”的方式,在SyncHandler线程中,间歇性的去检测,惰性的非实时的分配和协调任务  
    414.                 //此处就要求,你的应用服务器的时间,应该几乎非常一致,如果你无法做到,请在此处增加一个操作分支,从一个统一的地方获得时间:比如DB中等  
    415.                 String alivePath = "/" + serverType + "/" + job + ALIVE;  
    416.                 List<String> children = zkClient.getChildren(alivePath, false);//如果节点不存在,则在下一次sync时被补救  
    417.                 if(children == null || children.isEmpty()){  
    418.                     //此job尚未分配  
    419.                     String registerPath = "/" + serverType + "/" + job + REGISTER;  
    420.                     List<String> rc = zkClient.getChildren(registerPath, false);  
    421.                     //等待下一次sync时准备节点数据  
    422.                     if(rc == null || rc.isEmpty()){  
    423.                         continue;  
    424.                     }  
    425.                     Collections.shuffle(rc);//打乱顺序,随即,取出第一个,其实你可以有很多更好的手段来实现“任务均衡”,此处仅为参考  
    426.                     String tsid = rc.get(0);  
    427.                     try{  
    428.                         byte[] data = String.valueOf(System.currentTimeMillis()).getBytes();  
    429.                         zkClient.create(alivePath + "/" + tsid, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
    430.                         //tsid对应的syncHandler此后将会检测并补救。此处只是分配给他。  
    431.                         //如果tsid也是失去托管的,那么下一次sync检测将会发现并移除,此处不再做多余的校验;  
    432.                         //在极端情况下,比如你的“任务托管过期时间”过短,或者你的系统发布过程很长,但是所有的任务都失去托管  
    433.                         //那么最终将会有一台机器接管大部分job,如果job个数很多,将会出现“雪崩效应”;  
    434.                         //如果你不能容忍这些事情的发生,请在此处增加有效的barrier操作(如果接管任务个数达到一定个数,将接受但不执行任务)  
    435.                         //或者refuse操作(既不接管也不执行任务)。  
    436.                         System.out.println("Job switch,SID:" + tsid + ",JOB :" + job);  
    437.                     }catch(NodeExistsException e){  
    438.                         //ignore;  
    439.                     }  
    440.                     continue;  
    441.                 }  
    442.                 //如果job已经被其他sid接管,那么检测接管者,是否处于活跃,如果存在多个子节点,其实是  
    443.                 //一种异常情况,此处我们只做校验,冲突有sync解决  
    444.                 for(String id : children){  
    445.                     String tpath = alivePath + "/" + id;  
    446.                     Stat stat = new Stat();  
    447.                     byte[] data = zkClient.getData(tpath, false,stat);  
    448.                     long time = Long.valueOf(new String(data));  
    449.                     long current = System.currentTimeMillis();  
    450.                     //如果一个任务,它的执行者在2分钟内都没有和zk交互(synSelfWorker方法中会更新time)  
    451.                     //表明已经过期  
    452.                     //为了便于测试,此处为15秒  
    453.                     if(time + 1500 < current){  
    454.                         try{  
    455.                             zkClient.delete(tpath, stat.getVersion());  
    456.                         }catch(BadVersionException e){  
    457.                             //ignore  
    458.                         }catch(NoNodeException e){  
    459.                             //ignore;  
    460.                         }  
    461.                     }else{  
    462.                         System.out.println(id + " :" + job);  
    463.                     }  
    464.                 }  
    465.             }catch(Exception e){  
    466.                 e.printStackTrace();  
    467.             }  
    468.         }  
    469.         lock.unlock();  
    470.     }  
    471.     /** 
    472.      * 任务同步线程,间歇性的检测zk持有的任务和本地任务是否一致 
    473.      * 并负责分配任务 
    474.      * @author qing 
    475.      * 
    476.      */  
    477.     class SyncHandler implements Runnable {  
    478.   
    479.         public void run() {  
    480.             try {  
    481.                 int i = 0;  
    482.                 int l = 10;  
    483.                 while (true) {  
    484.                     synchronized (tag) {  
    485.                         try{  
    486.                             while(!scheduler.isStarted()){  
    487.                                 tag.wait();  
    488.                             }  
    489.                         }catch(Exception e){  
    490.                             //  
    491.                         }  
    492.                     }  
    493.                     System.out.println("Sync handler,running...tid: " + Thread.currentThread().getId());  
    494.                     if (zkClient == null || (zkClient.getState() == States.NOT_CONNECTED || zkClient.getState() == States.CLOSED)) {  
    495.                         lock.lock();  
    496.                         try {  
    497.                             // 回话重建等异常行为  
    498.                             zkClient = new ZooKeeper(Constants.connectString, 3000, dw, false);  
    499.                             System.out.println("Reconnected success!...");  
    500.                         } catch (Exception e) {  
    501.                             e.printStackTrace();  
    502.                             i++;  
    503.                             Thread.sleep(3000 + i * l);// 在zk环境异常情况下,每3秒重试一次  
    504.                         } finally {  
    505.                             lock.unlock();  
    506.                         }  
    507.                         continue;  
    508.                     }  
    509.                     if (zkClient.getState().isConnected()) {  
    510.                         sync();//同步任务  
    511.                         scheduler();//任务分配和过期检测  
    512.                         Thread.sleep(3000);// 如果被“中断”,直接退出  
    513.                         i = 0;  
    514.                     }else{  
    515.                         Thread.sleep(3000);  
    516.                     }  
    517.                 }  
    518.             } catch (InterruptedException e) {  
    519.                 System.out.println("SID:" + sid + ",SyncHandler Exit...");  
    520.                 close();  
    521.             }  
    522.   
    523.         }  
    524.     }  
    525.       
    526.     /** 
    527.      * 调用者提交的任务,将会被同步的方式交付给zk。此线程就是负责从queue中获取调用者 
    528.      * 提交的job,然后依次在zk环境中生成节点数据。 
    529.      * @author qing 
    530.      * 
    531.      */  
    532.     class WorkerHandler implements Runnable{  
    533.         private Set<Worker> pending = new HashSet<Worker>();  
    534.         private int count = 0;//max = 20;  
    535.           
    536.         /** 
    537.          * 将worker信息生成zk节点数据 
    538.          * @param worker 
    539.          * @return 
    540.          */  
    541.         private boolean register(Worker worker){  
    542.             lock.lock();  
    543.             //逐级创建其父节点  
    544.             String jobName = worker.getJob().getKey().getName();  
    545.             try{  
    546.                 Transaction tx = zkClient.transaction();//使用事务的方式  
    547.                 String jobPath = "/" + serverType + "/" + jobName;  
    548.                 if(zkClient.exists(jobPath, false) == null){  
    549.                     tx.create(jobPath, worker.getCronExpression().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
    550.                 }  
    551.                 String registerPath = "/" + serverType + "/" + jobName+ REGISTER;  
    552.                 if(zkClient.exists(registerPath, false) == null){  
    553.                     tx.create(registerPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
    554.                 }  
    555.                 String alivePath = "/" + serverType + "/" + jobName+ ALIVE;  
    556.                 if(zkClient.exists(alivePath, false) == null){  
    557.                     tx.create(alivePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
    558.                 }  
    559.                 tx.create(registerPath + "/" + sid, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
    560.                 tx.commit();  
    561.             }catch(NodeExistsException e){  
    562.                 //ignore  
    563.             }catch(Exception e){  
    564.                 e.printStackTrace();  
    565.                 pending.add(worker);  
    566.                 //对于异常数据,添加到一个补充操作队列,如果在操作中出现异常,那么将会在  
    567.                 //补充操作中得到再次校验  
    568.             }  
    569.             lock.unlock();  
    570.             return true;  
    571.         }  
    572.           
    573.         public void run(){  
    574.             try{  
    575.                 while(true){  
    576.                     synchronized (tag) {  
    577.                         try{  
    578.                             while(!scheduler.isStarted()){  
    579.                                 tag.wait();  
    580.                             }  
    581.                         }catch(Exception e){  
    582.                             //  
    583.                         }  
    584.                     }  
    585.                     System.out.println("Worker handler,running...");  
    586.                     if(zkClient != null && zkClient.getState().isConnected()){  
    587.                         System.out.println("Register...");  
    588.                         Worker worker = outgoingWorker.take();  
    589.                         register(worker);  
    590.                         if(!pending.isEmpty()){  
    591.                             Thread.sleep(500);  
    592.                             Iterator<Worker> it = pending.iterator();  
    593.                             while(it.hasNext()){  
    594.                                 boolean isOk = register(it.next());  
    595.                                 if(!isOk){  
    596.                                     count++;  
    597.                                     Thread.sleep(1000);  
    598.                                 }else{  
    599.                                     count = 0;  
    600.                                     it.remove();  
    601.                                 }  
    602.                                 //如果重试20次,仍无法成功,直接抛弃,非常遗憾  
    603.                                 if(count > 20){  
    604.                                     pending.clear();  
    605.                                 }  
    606.                             }  
    607.                         }  
    608.                           
    609.                     }else{  
    610.                         Thread.sleep(1000);  
    611.                     }  
    612.                 }  
    613.                   
    614.             }catch(InterruptedException e){  
    615.                 System.out.println("SID:" + sid + ",WorkerHandler Exit...");  
    616.                 close();  
    617.             }  
    618.         }  
    619.     }  
    620.       
    621.     /** 
    622.      * 全部删除当前serverType下所有的任务 
    623.      */  
    624.     public void clear(){  
    625.         lock.lock();  
    626.         try{  
    627.             if(zkClient != null && zkClient.getState().isConnected()){  
    628.                 zkClient.delete("/" + serverType, -1);  
    629.             }  
    630. //          if(scheduler != null && scheduler.isStarted()){  
    631. //              for(Worker worker : selfWorkers.values()){  
    632. //                  scheduler.unscheduleJob(worker.getTrigger().getKey());  
    633. //              }  
    634. //          }  
    635. //          allWorkers.clear();  
    636.         }catch(Exception e){  
    637.             e.printStackTrace();  
    638.         }finally{  
    639.             lock.unlock();  
    640.         }  
    641.     }  
    642.   
    643. }  

    其他辅助类,请参考附件中的源码,谢谢。

  • 相关阅读:
    ReentrantLock重入锁
    Java对象序列化和反序列
    echarts踩坑笔记
    金融风控之贷款违约预测笔记
    go安装模块
    vasp计算轨道吸附
    html
    css/js 小技巧
    python 调用父类方法:super && 直接使用父类名
    python 多线程
  • 原文地址:https://www.cnblogs.com/scwanglijun/p/4268730.html
Copyright © 2011-2022 走看看