zoukankan      html  css  js  c++  java
  • 分布式任务调度系统 xxl-job

     

    微服务难不难,不难!无非就是一个消费方,一个生产方,一个注册中心,然后就是实现一些微服务,其实微服务的难点在于治理,给你一堆

    微服务,如何来管理?这就有很多方面了,比如容器化,服务间通信,服务上下线发布。我今天要说的是任务调度,如果我们将全部服务中那

    些任务都拿出来统一管理,不在服务内使用Scheduled或者Quartz,是不是很爽呢,而且大神们已经帮我们实现了xxl-job,拿来摩擦一下吧。

    作者原创文章,谢绝一切转载,违者必究!

    本文只发表在"公众号"和"博客园",其他均属复制粘贴!如果觉得排版不清晰,请查看公众号文章。 

    准备:

    Idea2019.03/Gradle6.0.1/Maven3.6.3/JDK11.0.4/Lombok0.28/SpringBoot2.2.4RELEASE/mybatisPlus3.3.0/Soul2.1.2/Dubbo2.7.5/Druid1.2.21/

    Zookeeper3.5.5/Mysql8.0.11/Redis5.0.5/Skywalking7.0.0/XXL-JOB2.2.1

    难度: 新手--战士--老兵--大师

    目标:

    1. 对当天注册成功的用户,发送产品促销邮件。使用BEAN模式实现。
    2. 每5分钟定时扫描订单数据,发现当天30分钟未付款的订单,直接删除。使用HTTP模式实现。

    说明:

    为了遇见各种问题,同时保持时效性,我尽量使用最新的软件版本。源码地址:https://github.com/xiexiaobiao/vehicle-shop-admin

    1 原理

    “调度中心”使用线程池对每个任务隔离运行,并按照Cron表达式将任务分配给具体的“执行器”,其中,调度中心和执行器均可实现HA负载均衡。

    HA架构如下图所示,多调度中心必须使用同一DB源,并确保机器时钟一致

    2 步骤

    2.1 下载运行

    下载xxl-job源码,略!再运行其中的sql初始化脚本。IDE打开,先启动xxl-job-admin项目,打开http://localhost:8080/xxl-job-admin/

    默认账号密码 admin/123456,即可进入调度中心UI。

    2.2 改造vehicle-shop-admin项目

    我这里只改了customer 和 order 模块,先以order为例: resources/config/application-dev.yml 添加的配置:

    ### xxl-job
    xxl:
      job:
        admin:
          #admin address list, such as "http://address" or "http://address01,http://address02"
          addresses: http://127.0.0.1:8080/xxl-job-admin  #调度中心地址
        accessToken: 6eccc15a-5fa2-4737-a606-f74d4f3cee61     #需要与调度中心配对使用
        # 执行器配置信息
        executor:
          appname: vehicle-admin-order-service
          address: #default use address to registry , otherwise use ip:port if address is null
          ip: 127.0.0.1
          port: 9998
          logpath: /data/applogs/xxl-job/jobhandler
          logretentiondays: 30 #日志保留时长

    以上代码解析:accessToken需要与调度中心配对使用,要么都用,要么都不用。addresses调度中心地址可以有多个,直接逗号隔开即可。

     

    com.biao.shop.order.conf.XxlJobConfig

    @Configuration
    public class XxlJobConfig {
        private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    
        @Value("${xxl.job.admin.addresses}")
        private String adminAddresses;
        @Value("${xxl.job.accessToken}")
        private String accessToken;
        @Value("${xxl.job.executor.appname}")
        private String appname;
        @Value("${xxl.job.executor.address}")
        private String address;
        @Value("${xxl.job.executor.ip}")
        private String ip;
        @Value("${xxl.job.executor.port}")
        private int port;
        @Value("${xxl.job.executor.logpath}")
        private String logPath;
        @Value("${xxl.job.executor.logretentiondays}")
        private int logRetentionDays;
    
    
        @Bean
        public XxlJobSpringExecutor xxlJobExecutor() {
            logger.info(">>>>>>>>>>> xxl-job config init.");
            XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
            xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
            xxlJobSpringExecutor.setAppname(appname);
            xxlJobSpringExecutor.setAddress(address);
            xxlJobSpringExecutor.setIp(ip);
            xxlJobSpringExecutor.setPort(port);
            xxlJobSpringExecutor.setAccessToken(accessToken);
            xxlJobSpringExecutor.setLogPath(logPath);
            xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    
            return xxlJobSpringExecutor;
        }
    }

    以上代码解析:XxlJobSpringExecutor是具体执行器类,结合配置文件和SpringBean机制,完成Bean自动注入,实现OnLine 机器地址自动注册。

     

    com.biao.shop.order.controller.OrderController

    @RestController
    @RequestMapping("/order")
    public class OrderController { 
        ...
        //  省略部分代码
        ...
        @SoulClient(path = "/vehicle/order/autoCancel", desc = "自动取消未支付订单")
        @GetMapping("/autoCancel")
        public ObjectResponse<Integer> autoCancelOrder(){
            return orderService.autoCancelOrder();
        }
    }

    以上代码解析: @SoulClient为soul网关注解,即网关转发地址,详细可以看我之前的文章,有关Soul网关的。

     

    com.biao.shop.order.impl.OrderServiceImpl

    @Service
    public class OrderServiceImpl extends ServiceImpl<ShopOrderDao, ShopOrderEntity>  implements OrderService {
    
         ...
        //  省略部分代码
        ...
    
        @Override
        public ObjectResponse<Integer> autoCancelOrder() {
            ObjectResponse<Integer> response = new ObjectResponse<>();
            try{
                // 查找当天30分钟内未付款订单
                List<ShopOrderEntity> orderEntityList = shopOrderDao.selectList(new LambdaQueryWrapper<ShopOrderEntity>()
                        .gt(ShopOrderEntity::getGenerateDate, LocalDate.now())
                        .lt(ShopOrderEntity::getGenerateDate,LocalDateTime.now().minusMinutes(30L)));
                if (!Objects.isNull(orderEntityList) && !orderEntityList.isEmpty()){
                    int result = shopOrderDao.deleteBatchIds(orderEntityList);
                    response.setCode(RespStatusEnum.SUCCESS.getCode());
                    response.setMessage(RespStatusEnum.SUCCESS.getMessage());
                    response.setData(result);
                }
                return response;
            }catch (Exception e){
                response.setCode(RespStatusEnum.FAIL.getCode());
                response.setMessage(RespStatusEnum.FAIL.getMessage());
                response.setData(null);
                return response;
            }
        }
    
        /** 
         * 这里为了演示http模式,直接使用参数:
         *      url: http://127.0.0.1:9195/order/vehicle/order/autoCancel
         *      method: get
         *      data: content
         */
        @XxlJob("autoCancelOrderJobHandler")
        public ReturnT<String> autoCancelOrderJob( String param ){
            // param parse
            if (param==null || param.trim().length()==0) {
                XxlJobLogger.log("param["+ param +"] invalid.");
                return ReturnT.FAIL;
            }
            String[] httpParams = param.split("
    ");
            String url = null;
            String method = null;
            String data = null;
            for (String httpParam: httpParams) {
                if (httpParam.startsWith("url:")) {
                    url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
                }
                if (httpParam.startsWith("method:")) {
                    method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
                    System.out.println("method>>>>>>>>"+ method);
                }
                if (httpParam.startsWith("data:")) {
                    data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
                }
            }
    
            // param valid
            if (url==null || url.trim().length()==0) {
                XxlJobLogger.log("url["+ url +"] invalid.");
                return ReturnT.FAIL;
            }
            // 限制只支持 "GET" 和 "POST"
            if (method==null || !Arrays.asList("GET", "POST").contains(method)) {
                XxlJobLogger.log("method["+ method +"] invalid.");
                return ReturnT.FAIL;
            }
    
            // request
            HttpURLConnection connection = null;
            BufferedReader bufferedReader = null;
            try {
                // connection
                URL realUrl = new URL(url);
                connection = (HttpURLConnection) realUrl.openConnection();
    
                // connection setting
                connection.setRequestMethod(method);
                connection.setDoOutput(true);
                connection.setDoInput(true);
                connection.setUseCaches(false);
                connection.setReadTimeout(5 * 1000);
                connection.setConnectTimeout(3 * 1000);
                connection.setRequestProperty("connection", "Keep-Alive");
                connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
                connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
    
                // do connection
                connection.connect();
    
                // data
                if (data!=null && data.trim().length()>0) {
                    DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
                    dataOutputStream.write(data.getBytes("UTF-8"));
                    dataOutputStream.flush();
                    dataOutputStream.close();
                }
    
                // valid StatusCode
                int statusCode = connection.getResponseCode();
                if (statusCode != 200) {
                    throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
                }
    
                // result
                bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
                StringBuilder result = new StringBuilder();
                String line;
                while ((line = bufferedReader.readLine()) != null) {
                    result.append(line);
                }
                String responseMsg = result.toString();
    
                XxlJobLogger.log(responseMsg);
                return ReturnT.SUCCESS;
            } catch (Exception e) {
                XxlJobLogger.log(e);
                return ReturnT.FAIL;
            } finally {
                try {
                    if (bufferedReader != null) {
                        bufferedReader.close();
                    }
                    if (connection != null) {
                        connection.disconnect();
                    }
                } catch (Exception e2) {
                    XxlJobLogger.log(e2);
                }
            }
        }
    }

    以上代码解析: 1.autoCancelOrder方法直接实现了查找当天30分钟内未付款订单并批量删除的业务需求, 2.autoCancelOrderJob方法

    则是跨平台HTTP任务模式的实现,通过通用的URL连接访问方式实现跨平台,即只需有URL地址即可,外部异构系统都可接入Xxl-Job,

    系统内则实现了解耦的目的。需注意的是代码中限制了"GET"/"POST",其他动词我没测试,其实这两个也够用了。

     

    Customer模块,因需使用邮件功能,我先在com.biao.shop.common.utils.MailUtil实现了邮件发送功能,直接使用了javax.mail包,腾

    讯的SMTP服务,代码明了,非本文目标知识点,不解释:

    public class MailUtil {
    
        @Value("${spring.mail.host}")
        private static String host;
        @Value("${spring.mail.port}")
        private static String port;
        @Value("${spring.mail.sendMail}")
        private static String sendMail;
        @Value("${spring.mail.password}")
        private static String myEmailPassword;
        @Value("${spring.mail.properties.mail.smtp.auth}")
        private static String fallback; // false
        @Value("${spring.mail.properties.mail.smtp.socketFactory.class}")
        private static String socketFactory;
    
        @Resource
        private static JavaMailSender mailSender;
        
        public static boolean sendMailTo(String userName,String receiveMail) throws Exception {
            // JavaMailSender javaMailSender = new JavaMailSenderImpl();
            Properties props = new Properties();
            props.setProperty("mail.transport.protocol", "smtp");
            props.setProperty("mail.smtp.host", host);
            props.setProperty("mail.smtp.port", port);
            props.setProperty("mail.smtp.auth", "true");
            // 如邮箱服务器要求 SMTP 连接需要使用 SSL 安全认证,则需要使用以下配置项
           /*  SMTP 服务器的端口 (非 SSL 连接的端口一般默认为 25, 可以不添加, 如果开启了 SSL 连接,
                              需要改为对应邮箱的 SMTP 服务器的端口, 具体可查看对应邮箱服务的帮助,
                              QQ邮箱的SMTP(SLL)端口为465或587, 其他邮箱自行去查看)*/
            /*final String smtpPort = "465";
            props.setProperty("mail.smtp.port", smtpPort);
            props.setProperty("mail.smtp.socketFactory.class", socketFactory);
            props.setProperty("mail.smtp.socketFactory.fallback", fallback);
            props.setProperty("mail.smtp.socketFactory.port", smtpPort);*/
    
            Session session = Session.getDefaultInstance(props);
            // 设置为debug模式, 可以查看详细的发送 log
            session.setDebug(true);
            MimeMessage message = createMimeMessage(userName,session, sendMail, receiveMail);
            Transport transport = session.getTransport();
            transport.connect(sendMail, myEmailPassword);
            mailSender.send(message);
            // Send the given array of JavaMail MIME messages in batch.
            // void send(MimeMessage... mimeMessages) throws MailException;
            transport.close();
            return true;
        }
    
        static MimeMessage createMimeMessage(String userName,Session session, String sendMail, String receiveMail) throws Exception {
            // MIME邮件类型,还有一种为简单邮件类型
            MimeMessage message = new MimeMessage(session);
            message.setFrom(new InternetAddress(sendMail, "龙岗汽车", "UTF-8"));
            // 可以增加多个收件人、抄送、密送
            message.setRecipient(MimeMessage.RecipientType.TO, new InternetAddress(receiveMail, userName, "UTF-8"));
            // 邮件主题
            message.setSubject("新品信息", "UTF-8");
            // 邮件正文(可以使用html标签)
            message.setContent(userName + ",您好,新品到店,快来体验", "text/html;charset=UTF-8");
            // 设置发件时间
            message.setSentDate(new Date());
            // 保存设置
            message.saveChanges();
            return message;
        }
    }

     

    然后在com.biao.shop.customer.impl.ShopClientServiceImpl 实现找出当天注册的用户,并发送邮件信息的业务需求:

    @org.springframework.stereotype.Service
    @Slf4j
    public class ShopClientServiceImpl extends ServiceImpl<ShopClientDao, ShopClientEntity> implements ShopClientService {
         ...
        //  省略部分代码
        ...
        @Override
        @XxlJob("autoSendPromotionJobHandler")
        public ReturnT<String> autoSendPromotion(String param) {
            try{
                // 找出当天注册的用户
                List<ShopClientEntity> clientEntityList =
                        shopClientDao.selectList(new LambdaQueryWrapper<ShopClientEntity>()
                                .gt(ShopClientEntity::getGenerateDate,LocalDate.now())
                                .lt(ShopClientEntity::getGenerateDate,LocalDate.now().plusDays(1L)));
                // 发送邮件信息
                if (!Objects.isNull(clientEntityList) && !clientEntityList.isEmpty()){
                    // shopClientEntity中需要设计用户邮箱地址,我这里简化为一个固定的邮箱地址
                    clientEntityList.forEach(shopClientEntity -> {
                        try {
                            MailUtil.sendMailTo(shopClientEntity.getClientName(),mailReceiverAddr);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    });
                }
                return ReturnT.SUCCESS;
            }catch (Exception e){
                return ReturnT.FAIL;
            }
        }
    }
     

    其他,如引入依赖之类的边角料代码我就直接略过了 !请直接看代码吧!

    2.3 添加执行器

    AppName对应resources/config/application-dev.yml中的 xxl.job.executor.appname 配置项,自动注册即通过引入xxl-job-core依赖

    和 XxlJobSpringExecutor自动注册,手动录入,即直接填写URL地址和端口信息。

    再启动应用vehicle-shop-admin项目,建议将mysql/redis作为window服务开机自启动,顺序: MySQL—>souladmin—>soulbootstrap

    —>redis—>authority—>customer—>stock—>order —>business,等待一会,“OnLine 机器地址”就会显示自动注册节点,

    比如下图的customer微服务:

    2.4 添加任务

    WEBUI界面—>任务管理:

    建立发送邮件给新用户的任务,重要参数有:

    • JobHandler对应到com.biao.shop.customer.impl.ShopClientServiceImpl 中的@XxlJob("autoSendPromotionJobHandler");
    • 路由策略,即分配任务给多个执行者时的策略;阻塞处理策略,即某个执行者的一个任务还在执行,而后同类任务又到达该执行者时的处理;
    • 运行模式是指任务源码位置,几个GLUE模式是执行器代码直接在调度中心编辑并保存,BEAN则是类模式,需自定义JobHandler类,但不支持自动扫描任务并注入到执行器容器,需要手动注入,就像下图这样,点“新增”后:

     

    建立“自动取消无效订单”任务,注意“任务参数”,对应于com.biao.shop.order.impl.OrderServiceImpl 中 ReturnT autoCancelOrderJob( String param )的实参,

    方法中对 param 进行解析,注意其中的url是soul网关的地址,因为我将整个项目都做了网关转发。

     

    操作-->执行一次,再到调度日志中查看,订单服务调度日志:

     

    客户服务调度日志:

     

    最后看个首页报表图,总结就是:有UI,就是爽!

     

    留个问题请君思考:自动取消30分钟未付款的订单,你能想到多少种方案呢?

    作者原创文章,谢绝一切转载,违者必究!

    全文完!


    我的其他文章:

    只写原创,敬请关注 

  • 相关阅读:
    RabbitMQ 集群与高可用配置
    ManifoldJS
    Top JavaScript Frameworks, Libraries & Tools and When to Use Them
    AngularJS 的安全Apply
    node js 常用模块
    微软发布了ASP.NET WebHooks预览版
    leaflet 了解
    messagepcak 资料
    fastBinaryJSON
    jQuery的图像裁剪插件Jcrop
  • 原文地址:https://www.cnblogs.com/xxbiao/p/12742838.html
Copyright © 2011-2022 走看看