zoukankan      html  css  js  c++  java
  • Python自定义钉钉机器人发送自动化结果报告

    环境python3.5+jenkins

    # coding:utf-8
    import urllib.request
    import json
    import sys
    import time
    import re
    
    
    class dingHook(object):
        def __init__(self,sys):
            self.msg = sys.argv[1:]
            self.url = "https://oapi.dingtalk.com/robot/send?access_token=30eee8d20287b0051d7c25cc6e468dcdd01XXXX"
    
        def request(self, url, method, data=None, head={}):
            request = urllib.request.Request(url=url, headers=head)
            request.get_method = lambda: method
            httpRes = urllib.request.urlopen(request, data)
            content = httpRes.read()
            httpRes.close()
            return content
    
        def post_start(self):
            data = {}
            data["msgtype"] = "text"
            data["text"] = {}
            data["text"]["content"] = " jenkins:开始执行{}自动化......".format(self.msg[0])
            data = json.dumps(data).encode(encoding='UTF8')
            head = {"Content-Type": "application/json"}
            content = self.request(self.url, "POST", data, head)
            print(content)
            return content
    
        def post_link(self, text, title, picurl="", messageurl=""):
            data = {}
            data["msgtype"] = "link"
            data["link"] = {}
            data["link"]["text"] = text
            data["link"]["title"] = title
            data["link"]["picUrl"] = picurl
            data["link"]["messageUrl"] = messageurl
            data = json.dumps(data).encode(encoding='UTF8')
            head = {"Content-Type": "application/json"}
            content = self.request(self.url, "POST", data, head)
            print(content)
            return content
    
        def post_result(self):
            i = 0
            result = "FAILURE"
            # 获取当前jenkins构建版本号
            get_buildnum = r"C:UsersASUS.jenkinsjobs{}
    extBuildNumber".format(self.msg[0])
            with open(get_buildnum, encoding='utf-8')  as fd:
                 num = fd.read()
            buildNumber = str(int(num)-1)
            print(buildNumber)
            # 获取jenkins自动化执行结果
            get_buildresult = r"C:UsersASUS.jenkinsjobs{}uilds{}log".format(self.msg[0], buildNumber)
            # with open(get_buildresult)  as output_result:
            #      print("result:",output_result.name)
            #      build_result = output_result.read()
            #获取结果报告中的返回结果
            name_re = r'\Report\report\(.*?.html)'
            with open(get_buildresult )  as fd:
                 log_content = fd.read()
            html_name = re.findall(name_re, log_content)
            get_RF_result = r'C:UsersASUS.jenkinsjobs{}htmlreportsHTML_Report{}'.format(self.msg[0], html_name[0])
            print(get_RF_result)
            # jenkins自动化结果报告地址
            messageurl = "http://localhost:8080/job/{}/HTML_Report" .format(self.msg[0])
            # 钉钉结果报告消息图片
            picurl = "D:/work/ClTest/pased.jpg"
            picurl = "http://localhost:8080/job/{}/HTML_Report/pased.jpg".format(self.msg[0])
             # 钉钉结果报告标题
            title = "【%s】执行结果" % self.msg[0]
            print(title)
            try:
                with open(get_buildresult )  as fd:
                    build_result = fd.read()
                    #防止生成报告没这么快,报错时先等待5s
                    if "No such file or directory" in build_result:
                        time.sleep(5)
    
                    #判断jenkins自动化是否执行结束
                    if  "Finished" in build_result:
                        with open(get_RF_result,encoding='UTF-8' )  as fd:
                                output_rf_result = fd.read()
                        result = r'<h2>Results</h2>'
                        failure_index = output_rf_result.find(result)
                        result_content = output_rf_result[failure_index:]
                        print(result_content)
                        failure_api = re.findall(r'<td[^>]*>(.*?)</td>', result_content,re.I|re.M)
                        print(failure_api)
                        failure_api = ["- " + a + '
    
    ' for a in failure_api]
    
                        for a in failure_api:
                            if 'Failed' in a:
                                 result = " {}自动化执行失败!
    ".format(self.msg[0])
                                 print(a)
                                 content = self.post_link(text=result, title=title, messageurl=messageurl, picurl=picurl)
                                 return content
                        result = "{}自动化执行成功!
    ".format(self.msg[0])
                        content = self.post_link(text=result, title=title, messageurl=messageurl, picurl=picurl)
                        return content
                        # fail_case = int(case_count_list[1])
                        # suc_case = int(case_count_list[3])
                        # if fail_case == 0:
                        #     result = "SUCCESS-成功:%d-失败:%d" % (suc_case,fail_case)
                        # else:
                        #     result = "FAILURE-成功:%d-失败:%d" % (suc_case,fail_case)
            except Exception as e:
                print(e)
                time.sleep(10)
    
    
        def ding_hook(self, itype="start"):
            if itype == "start":
                self.post_start()
            elif itype == "end":
                self.post_result()
    
    
    if __name__ == '__main__':
        dh = dingHook(sys)
        print(dh.ding_hook(sys.argv[2]))
    

      运行命令:python urllib-test.py 工程名 end

    问题:TypeError: POST data should be bytes or an iterable of bytes. It cannot be of type str.

    data = json.dumps(data)这行后加个.encode(encoding='UTF8')

  • 相关阅读:
    mysql复合索引的优点和注意事项
    linux服务器时间更新
    mysql慢日志
    Proftpd linux服务器FTP安装配置
    js 截取字符串
    在工信部注销网站备案
    史玉柱传奇
    css white-space属性
    获取微信授权
    禁用ipv6
  • 原文地址:https://www.cnblogs.com/dieyaxianju/p/8321452.html
Copyright © 2011-2022 走看看