zoukankan      html  css  js  c++  java
  • dolphin 增加钉钉通知-功能

    dolphin 1.6已经增加了企业微信的功能。

    公司用的钉钉,那么增加功能到钉钉指定分组,可以针对分组将错误的任务或者被杀死的任务进行提示。

    alert模块,增加配置:

    1  enterprise.dingtalk.enable=true
    2  enterprise.dingtalk.secret=钉钉秘钥
    3  enterprise.dingtalk.url=钉钉地址
    4 # enterprise.dingtalk.url=https://oapi.dingtalk.com/robot/send?access_token=token

    增加配置,Constants增加配置

    1     public static final String ENTERPRISE_DINGTALK_ENABLE="enterprise.dingtalk.enable";
    2 
    3     public static final String ENTERPRISE_DINGTALK_SECRET="enterprise.dingtalk.secret";
    4 
    5     public static final String ENTERPRISE_DINGTALK_URL="enterprise.dingtalk.url";

    增加工具类

      1 package org.apache.dolphinscheduler.alert.utils;
      2 
      3 import com.alibaba.fastjson.JSON;
      4 import com.alibaba.fastjson.JSONObject;
      5 import org.apache.dolphinscheduler.common.utils.HttpUtils;
      6 import org.apache.http.HttpEntity;
      7 import org.apache.http.client.ClientProtocolException;
      8 import org.apache.http.client.methods.CloseableHttpResponse;
      9 import org.apache.http.client.methods.HttpPost;
     10 import org.apache.http.entity.StringEntity;
     11 import org.apache.http.impl.client.CloseableHttpClient;
     12 import org.apache.http.impl.client.HttpClients;
     13 import org.apache.http.util.EntityUtils;
     14 import org.slf4j.Logger;
     15 import org.slf4j.LoggerFactory;
     16 
     17 import javax.crypto.Mac;
     18 import javax.crypto.spec.SecretKeySpec;
     19 import java.io.IOException;
     20 import java.net.URLEncoder;
     21 import java.util.Base64;
     22 
     23 public class EnterpriseDingTalkUtils {
     24     public static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatUtils.class);
     25 
     26 //    private static final boolean ENTERPRISE_DINGTALK_ENABLE = PropertyUtils.getBoolean(Constants.ENTERPRISE_DINGTALK_ENABLE);
     27 
     28     private static final String ENTERPRISE_DINGTALK_URL = PropertyUtils.getString(Constants.ENTERPRISE_DINGTALK_URL);
     29 
     30     private static final String ENTERPRISE_DINGTALK_SECRET = PropertyUtils.getString(Constants.ENTERPRISE_DINGTALK_SECRET);
     31 
     32     /**
     33      * get Enterprise Ding Ding is enable
     34      * @return isEnable
     35      */
     36     public static boolean isEnable(){
     37         return PropertyUtils.getBoolean(Constants.ENTERPRISE_DINGTALK_ENABLE);
     38     }
     39 
     40     /**
     41      * get Secret string and time number
     42      */
     43     public static String encodeKey(){
     44         try {
     45             //获取时间戳
     46             Long timestamp = System.currentTimeMillis();
     47             //把时间戳和密钥拼接成字符串,中间加入一个换行符
     48             String stringToSign = timestamp + "
    " + ENTERPRISE_DINGTALK_SECRET;
     49             //声明一个Mac对象,用来操作字符串
     50             Mac mac = null;
     51             mac = Mac.getInstance("HmacSHA256");
     52 
     53             //初始化,设置Mac对象操作的字符串是UTF-8类型,加密方式是SHA256
     54             mac.init(new SecretKeySpec(ENTERPRISE_DINGTALK_SECRET.getBytes("UTF-8"), "HmacSHA256"));
     55             //把字符串转化成字节形式
     56             byte[] signData = mac.doFinal(stringToSign.getBytes("UTF-8"));
     57             //新建一个Base64编码对象
     58             Base64.Encoder encoder = Base64.getEncoder();
     59             //把上面的字符串进行Base64加密后再进行URL编码
     60             String sign = URLEncoder.encode(new String(encoder.encodeToString(signData)),"UTF-8");
     61             System.out.println(timestamp);
     62             System.out.println(sign);
     63 
     64             String result = "&timestamp=" + timestamp + "&sign=" + sign;
     65             return result;
     66         } catch (Exception e) {
     67             e.printStackTrace();
     68             return null;
     69         }
     70     }
     71     public static String getJsonBodyString(String msg){
     72         JSONObject result = new JSONObject();
     73         JSONObject text = new JSONObject();
     74         text.put("content", msg);
     75         result.put("text", text);
     76         result.put("msgtype", "text");
     77         String jsonString = JSON.toJSONString(result);
     78         return jsonString;
     79     }
     80 
     81     public static void sendMessageToDingTalk(String msg){
     82         String enterDingTalkUrl = ENTERPRISE_DINGTALK_URL + encodeKey();
     83         String jsonBodyStr = getJsonBodyString(msg);
     84         CloseableHttpClient httpClient = HttpClients.createDefault();
     85         try {
     86             HttpPost httpPost = new HttpPost(enterDingTalkUrl);
     87             httpPost.setHeader("Content-Type", "application/json;charset=utf8");
     88             httpPost.setEntity(new StringEntity(jsonBodyStr, Constants.UTF_8));
     89             CloseableHttpResponse response = httpClient.execute(httpPost);
     90             String resp;
     91             try {
     92                 HttpEntity entity = response.getEntity();
     93                 resp = EntityUtils.toString(entity, Constants.UTF_8);
     94                 EntityUtils.consume(entity);
     95             } finally {
     96                 response.close();
     97             }
     98             logger.info("Enterprise DingTalk send [{}], param:{}, resp:{}",
     99                     ENTERPRISE_DINGTALK_URL, msg, resp);
    100         } catch (ClientProtocolException e) {
    101             e.printStackTrace();
    102         } catch (IOException e) {
    103             e.printStackTrace();
    104         } finally {
    105             try {
    106                 httpClient.close();
    107             } catch (IOException e) {
    108                 e.printStackTrace();
    109             }
    110         }
    111     }
    112 
    113 }

    在server模块的worker运行逻辑(TaskExecuteThread)加入:

        @Override
        public void run() {
            String result = "";
            TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
            try {
                logger.info("script path : {}", taskExecutionContext.getExecutePath());
                // task node
                TaskNode taskNode = JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
    
                // copy hdfs/minio file to local
                downloadResource(taskExecutionContext.getExecutePath(),
                        taskExecutionContext.getResources(),
                        logger);
    
                taskExecutionContext.setTaskParams(taskNode.getParams());
                taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
                taskExecutionContext.setDefinedParams(getGlobalParamsMap());
    
                // set task timeout
                setTaskTimeout(taskExecutionContext, taskNode);
    
                taskExecutionContext.setTaskAppId(String.format("%s_%s_%s",
                        taskExecutionContext.getProcessDefineId(),
                        taskExecutionContext.getProcessInstanceId(),
                        taskExecutionContext.getTaskInstanceId()));
    
                task = TaskManager.newTask(taskExecutionContext, taskLogger);
    
                // task init
                task.init();
                result = String.format("task name=%s,task-id=%s,type=%s error",taskNode.getName(),taskNode.getId(),taskNode.getType());
                // task handle
                task.handle();
    
                // task result process
                task.after();
                responseCommand.setStatus(task.getExitStatus().getCode());
                responseCommand.setEndTime(new Date());
                responseCommand.setProcessId(task.getProcessId());
                responseCommand.setAppIds(task.getAppIds());
                logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
            } catch (Exception e) {
                logger.error("task scheduler failure", e);
                kill();
                responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
                responseCommand.setEndTime(new Date());
                responseCommand.setProcessId(task.getProcessId());
                responseCommand.setAppIds(task.getAppIds());
            } finally {
                taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
                ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
                taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
                clearTaskExecPath();
                //
                if(EnterpriseDingTalkUtils.isEnable()) {
                    if (responseCommand.getStatus() == ExecutionStatus.FAILURE.getCode() ||
                            responseCommand.getStatus() == ExecutionStatus.KILL.getCode()) {
                        EnterpriseDingTalkUtils.sendMessageToDingTalk(result);
                    }
                }
            }
        }

    参考:https://segmentfault.com/a/1190000022077236

    有朝一日同风起,扶摇直上九万里
  • 相关阅读:
    小程序接入第三方ui库(组件库)
    element ui表格的校验和自定义校验规则
    element ui表格 表头的的特殊处理(换行/jsx风格表头)以及上传组件的一点小问题
    MongoDB 配置本地服务
    乙方渗透测试之Fuzz爆破
    SSRF漏洞挖掘经验
    SQL注入绕过技巧总结
    Xss Bypass备忘录
    bilibili存储型xss (绕过长度限制打乱顺序限制)
    XSS攻击常识及常见的XSS攻击脚本汇总
  • 原文地址:https://www.cnblogs.com/wind-man/p/15132971.html
Copyright © 2011-2022 走看看