zoukankan      html  css  js  c++  java
  • ElasticJob简单使用

    ElasticJob单点使用

    任务类

    public class BackupJob implements SimpleJob {
        public void execute(ShardingContext shardingContext) {
                String selectSql = "select * from resume where state='未归档' limit 1";
            List<Map<String, Object>> list =
                    JdbcUtil.executeQuery(selectSql);
            if(list == null || list.size() == 0) {
                return;
            }
            Map<String, Object> stringObjectMap = list.get(0);
            long id = (long) stringObjectMap.get("id");
            String name = (String) stringObjectMap.get("name");
            String education = (String)
                    stringObjectMap.get("education");
    // 打印出这条记录
            System.out.println("======>>>id:" + id + " name:" +
                    name + " education:" + education);
    // 更改状态
            String updateSql = "update resume set state='已归档' where id=?";
            JdbcUtil.executeUpdate(updateSql,id);
    // 归档这条记录
            String insertSql = "insert into resume_bak select * from resume where id=?";
            JdbcUtil.executeUpdate(insertSql,id);
        }
    
    }
    

    主要的任务就是将未归档的数据整理到归档的表中,表结构一样
    执行类

    public class JobMain {
        public static void main(String[] args) {
            //初始化注册中心
            ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181","data-job");
            CoordinatorRegistryCenter coordinatorRegistryCenter= new ZookeeperRegistryCenter(zookeeperConfiguration);
            coordinatorRegistryCenter.init();
            //创建任务
            JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("data-job","*/2 * * * * ?",1).build();
            SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,BackupJob.class.getName());
            //执行任务
            new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build()).init();
        }
    }
    

    这种情况下,启动两个任务类只会有一个在执行任务。但是当一个任务停止之后,另一个任务会立马开始接着执行任务,相当于其他中间件中的主备切换。但是这里的主备切换是依托zk进行的

    多节点分布式任务调度

    修改执行类的代码为

    public class JobMain {
        public static void main(String[] args) {
            //初始化注册中心
            ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181","data-job");
            CoordinatorRegistryCenter coordinatorRegistryCenter= new ZookeeperRegistryCenter(zookeeperConfiguration);
            coordinatorRegistryCenter.init();
            //创建任务
            JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("data-job","*/2 * * * * ?",3).build();
            SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,BackupJob.class.getName());
            //执行任务
            new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build()).init();
        }
    }
    

    除了修改分片数还需要在执行任务的类中执行相应的分片参数,另外需要注意的是仅仅增加分票策略是不生效的,必须同时配置分片参数。另外如果使用同一个job来做执行的话。需要增加overwrite为true
    执行器代码为
    ```
    public class BackupJob implements SimpleJob {
    public void execute(ShardingContext shardingContext) {
    int shardingitem = shardingContext.getShardingItem();
    System.out.println("当前分片"+shardingitem);
    String shardingParamter = shardingContext.getShardingParameter();
    System.out.println(shardingParamter);
    String selectSql = "select * from resume where state='未归档' and name='"+shardingParamter+"' limit 1";
    List<Map<String, Object>> list =
    JdbcUtil.executeQuery(selectSql);
    if(list == null || list.size() == 0) {
    return;
    }
    Map<String, Object> stringObjectMap = list.get(0);
    long id = (long) stringObjectMap.get("id");
    String name = (String) stringObjectMap.get("name");
    String education = (String)
    stringObjectMap.get("education");
    // 打印出这条记录
    System.out.println("======>>>id:" + id + " name:" +
    name + " education:" + education);
    // 更改状态
    String updateSql = "update resume set state='已归档' where id=?";
    JdbcUtil.executeUpdate(updateSql,id);
    // 归档这条记录
    String insertSql = "insert into resume_bak select * from resume where id=?";
    JdbcUtil.executeUpdate(insertSql,id);
    }

    }
    ```
    测试结果为,当执行器未全部启动时,所有分片在一个执行器上运行。当三个执行器都启动时,会平均分配到三个执行器。

    demo代码地址为https://github.com/zhendiao/deme-code/tree/main/schedule_job

    欢迎搜索关注本人与朋友共同开发的微信面经小程序【大厂面试助手】和公众号【微瞰技术】,以及总结的分类面试题https://github.com/zhendiao/JavaInterview

    file
    file

  • 相关阅读:
    伪类样式
    div 文字超出边框后,省略号显示
    关于常用的 meta
    js数组去重
    异步二进制文件下载
    JJWT现前后端分离身份验证
    ApachePOI快速入门
    axios兼容ie7
    vue解决跨域问题
    log4j模板
  • 原文地址:https://www.cnblogs.com/zhendiao/p/14920918.html
Copyright © 2011-2022 走看看