zoukankan      html  css  js  c++  java
  • Elastic-Job

    一:简介

      分布式任务调度框架,结合zookeeper技术解决quartz框架在分布式系统中重复的定时任务导致的不可预见的错误

    二:示例

    pom

     <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid-spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.github.kuhn-he</groupId>
                <artifactId>elastic-job-lite-spring-boot-starter</artifactId>
                <version>2.1.5</version>
            </dependency>
        </dependencies>

    application.yml

    # zk配置
    elaticjob:
      zookeeper:
        server-lists: localhost:2181
        namespace: elastic-job-demo

    # 数据源配置
    省略

    SimpleJob

    @ElasticSimpleJob(cron = "0/10 * * * * ?", jobName = "test123",
            shardingTotalCount = 3, jobParameter = "测试参数",
            shardingItemParameters = "0=Beijing,1=Shanghai,2=Guangzhou")
    @Component
    public class MySimpleJob implements SimpleJob {
    
        private static final Logger logger = LoggerFactory.getLogger(MySimpleJob.class);
    
        @Override
        public void execute(ShardingContext shardingContext) {
            logger.info(String.format("------Thread ID: %s, 任务总片数: %s, " +
                            "当前分片项: %s,当前参数: %s," +
                            "当前任务名称: %s,当前任务参数: %s,"+
                            "当前任务的id: %s",
                    //获取当前线程的id
                    Thread.currentThread().getId(),
                    //获取任务总片数
                    shardingContext.getShardingTotalCount(),
                    //获取当前分片项
                    shardingContext.getShardingItem(),
                    //获取当前的参数
                    shardingContext.getShardingParameter(),
                    //获取当前的任务名称
                    shardingContext.getJobName(),
                    //获取当前任务参数
                    shardingContext.getJobParameter(),
                    //获取任务的id
                    shardingContext.getTaskId()
            ));
        }
    }

    三: 分片策略

    默认使用:基于平均分配算法的分片策略

    /*
     * Copyright 1999-2015 dangdang.com.
     * <p>
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     * </p>
     */
    
    package io.elasticjob.lite.api.strategy.impl;
    
    import io.elasticjob.lite.api.strategy.JobInstance;
    import io.elasticjob.lite.api.strategy.JobShardingStrategy;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.LinkedHashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * 基于平均分配算法的分片策略.
     * 
     * <p>
     * 如果分片不能整除, 则不能整除的多余分片将依次追加到序号小的服务器.
     * 如: 
     * 1. 如果有3台服务器, 分成9片, 则每台服务器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].
     * 2. 如果有3台服务器, 分成8片, 则每台服务器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].
     * 3. 如果有3台服务器, 分成10片, 则每台服务器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].
     * </p>
     * 
     * @author zhangliang
     */
    public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
        
        @Override
        public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
            if (jobInstances.isEmpty()) {
                return Collections.emptyMap();
            }
            Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount);
            addAliquant(jobInstances, shardingTotalCount, result);
            return result;
        }
        
        private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
            Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);
            int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
            int count = 0;
            for (JobInstance each : shardingUnits) {
                List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
                for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                    shardingItems.add(i);
                }
                result.put(each, shardingItems);
                count++;
            }
            return result;
        }
        
        private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
            int aliquant = shardingTotalCount % shardingUnits.size();
            int count = 0;
            for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
                if (count < aliquant) {
                    entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
                }
                count++;
            }
        }
    }

    四:运维平台

    第一步:去下载包
        https://github.com/miguangying/elastic-job-lite-console#elastic-job-lite-console

    第二步:解压缩
        解压缩elastic-job-lite-console-${version}.tar.gz并执行binstart.sh,windows平台执行start.bat。打开浏览器访问http://localhost:8899/即可访问控制台。

        8899为默认端口号,可通过启动脚本输入-p自定义端口号。elastic-job-lite-console-${version}.tar.gz可通过mvn install编译获取。

    第三步:登录
        提供两种账户,管理员及访客,管理员拥有全部操作权限,访客仅拥有察看权限。默认管理员用户名和密码是root/root,访客用户名和密码是guest/guest,可通过confauth.properties修改管理员及访客用户名及密码。

  • 相关阅读:
    OOP、DI、IOC的情爱恩仇录
    NuGet:添加EntityFramework
    DataGrid之DataGridComboBoxColumn,DataGridCheckBoxColumn,DataGridHyperlinkColumn,DataGridTextColumn
    JohnSon:动态创建模块选项卡
    maf实例
    MVVM理解之逐步重构成为MVVM模式,比MVC的独到之处
    我学Unity系列1:Unity和Mef的比较
    微软一站式代码资料
    匿名方法,泛型委托,Lambda表达式
    关于JS表单验证(转)
  • 原文地址:https://www.cnblogs.com/fdzfd/p/10311172.html
Copyright © 2011-2022 走看看