需求
监测dolphinscheduler调度系统,任务执行异常情况。如有异常,则发送邮件通知。
处理思路
因DS本身自带的邮件发送功能,不能正常发送邮件。
故而,通过查询DS源数据表,获取当前任务执行情况。将获取结果,使用python发送邮件。
通过shell脚本获取执行情况,并将结果保存到本地。
执行代码
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
mysql -hip -P3306 -u用户 -p密码 -e" use dolphinscheduler; select a.name, a.state, a.submit_time, a.start_time, a.end_time, CONCAT(TIMESTAMPDIFF(SECOND, a.start_time,a.end_time),'(s)') dur from t_ds_task_instance a, ( select name, max(submit_time) submit_time from t_ds_task_instance group by name ) b where a.name=b.name and a.submit_time=b.submit_time and substr(a.start_time,1,10)='${deal_day}' and a.state not in(0,1,7) union all select a.name, a.state, a.submit_time, a.start_time, '-' end_time, CONCAT(cast(TIMESTAMPDIFF(SECOND, a.start_time,current_timestamp)/60/60 as decimal(10,2)),'(h)') dur from t_ds_task_instance a, ( select name, max(submit_time) submit_time from t_ds_task_instance group by name ) b where a.name=b.name and a.submit_time=b.submit_time and substr(a.submit_time,1,10)='${deal_day}' and a.state in(1,10,11) and TIMESTAMPDIFF(SECOND, a.start_time,current_timestamp)>7200; " > rwjk.txt
python处理导出结果,并发送邮件
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
# !/user/bin/env python # -*- coding: utf-8 -*- import smtplib from email.mime.text import MIMEText from email.header import Header def send_mail(): # 第三方 SMTP 服务 mail_host = "" # 设置服务器 mail_user = "" # 用户名 mail_pass = "" # 口令 sender = '' # 发送邮件用户 receivers = ['收件人1', '收件人2', '收件人3' ] msg_text = '邮件正文发送内容' mail_msg_list=['<p>你好,DS调度异常任务如下:</p>', '<table border="1" cellpadding="0" cellspacing="0" width="1000" style="border-collapse: collapse; style="text-align:center"">', '<tbody>', '<tr><th>任务</th><th>状态</th><th>提交时间</th><th>开始时间</th><th>结束时间</th><th>执行时长(s)</th></tr>' ] with open('/zhjs/work01/lizhenhua/rwjk/rwjk.txt', 'r') as f: s = f.readlines() for k,v in enumerate(s): s[k]=v.replace(' ','').split(' ') # 替换掉换行符、并转换为列表 for i in range(len(s)): mail_msg_list.append('<tr>') for j in range(len(s[i])): if i>=1: s[i][1] = s[i][1].replace('0','提交成功') s[i][1] = s[i][1].replace('1', '正在运行') s[i][1] = s[i][1].replace('2', '准备暂停') s[i][1] = s[i][1].replace('3', '暂停') s[i][1] = s[i][1].replace('4', '准备停止') s[i][1] = s[i][1].replace('5', '停止') s[i][1] = s[i][1].replace('6', '失败') s[i][1] = s[i][1].replace('7', '成功') s[i][1] = s[i][1].replace('8', '需要容错') s[i][1] = s[i][1].replace('9', 'kill') s[i][1] = s[i][1].replace('10', '等待线程') s[i][1] = s[i][1].replace('11', '等待依赖完成') mail_msg_list.append('<td>{}</td>'.format(s[i][j])) mail_msg_list.append('</tr>') # 结尾标识 mail_msg_list.append('</tbody>') mail_msg_list.append('</table>') mail_msg=''.join(mail_msg_list) # 列表转为字符串HTML message = MIMEText(mail_msg, 'html', 'utf-8') message['From'] = Header("任务监测", 'utf-8') message['To'] = ','.join(receivers) subject = 'DS调度异常任务监控' message['Subject'] = Header(subject, 'utf-8') try: smtpObj = smtplib.SMTP() smtpObj.connect(mail_host, 25) # 25 为 SMTP 端口号 smtpObj.login(mail_user, mail_pass) # 登陆 smtpObj.sendmail(sender, receivers, message.as_string()) # 发送 print('邮件发送成功') except smtplib.SMTPException: print('Error: 无法发送邮件') if __name__ == '__main__': send_mail()
整合以上两个为一个shell脚本
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
#!/bin/bash if [ ${#} -ne 1 ] then deal_day=$(date +"%Y-%m-%d") else deal_day=$1 fi echo ${day} mysql -hip -P3306 -u用户 -p密码 -e" use dolphinscheduler; select a.name, a.state, a.submit_time, a.start_time, a.end_time, CONCAT(TIMESTAMPDIFF(SECOND, a.start_time,a.end_time),'(s)') dur from t_ds_task_instance a, ( select name, max(submit_time) submit_time from t_ds_task_instance group by name ) b where a.name=b.name and a.submit_time=b.submit_time and substr(a.start_time,1,10)='${deal_day}' and a.state not in(0,1,7) union all select a.name, a.state, a.submit_time, a.start_time, '-' end_time, CONCAT(cast(TIMESTAMPDIFF(SECOND, a.start_time,current_timestamp)/60/60 as decimal(10,2)),'(h)') dur from t_ds_task_instance a, ( select name, max(submit_time) submit_time from t_ds_task_instance group by name ) b where a.name=b.name and a.submit_time=b.submit_time and substr(a.submit_time,1,10)='${deal_day}' and a.state in(1,10,11) and TIMESTAMPDIFF(SECOND, a.start_time,current_timestamp)>7200; " > rwjk.txt # 发送邮件 n=`cat rwjk.txt|wc -l` if [ $n -ge 2 ] then python rwjk_mail.py # 执行python,发送邮件 else echo "无异常任务" fi