1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 # @Time : 2021/4/16 10:42 上午 4 # @Name : peilun 5 # @File : Fly_book.py 6 # @Software: PyCharm 7 import sys 8 sys.path.append("..") 9 import jenkins 10 from comment.readConfig import * 11 from comment.log import * 12 import requests 13 import json 14 15 ''' 16 jenkins持续集成飞书发送报告通知 17 ''' 18 19 class flyb(): 20 def __init__(self): 21 self.errorMsg = "" 22 self.tile = time.strftime("%Y-%m-%d %H:%M:%S") 23 try: 24 self.jenkins_url = con.jenkins_url 25 self.server = jenkins.Jenkins(self.jenkins_url, username=con.jenkins_name, password=con.jenkins_passwd) 26 27 # 获取jenkins_url 28 self.job_name = "job/autotest/" # job名称 29 self.job_url = self.jenkins_url + self.job_name # job的url地址 30 # 获取最后一次构建 31 self.job_last_number = self.server.get_info(self.job_name)['lastBuild']['number'] 32 # #获取报告地址 33 self.report_url = self.job_url + str(self.job_last_number) + '/allure' # 报告地址 34 log.debug("jenkins地址:%s" % self.report_url) 35 except Exception as e: 36 self.errorMsg = str(e) 37 log.info("jenkins连接异常 %s" %e) 38 39 40 41 def ly_data(self): 42 43 #获取生成的txt报告数据 44 d = {} 45 proDir = "/allure-report/export/prometheusData.txt" 46 f = open(con.allure_path + proDir, 'r') 47 for lines in f: 48 for c in lines: 49 launch_name = lines.strip(' ').split(' ')[0] 50 num = lines.strip(' ').split(' ')[1] 51 d.update({launch_name: num}) 52 f.close() 53 retries_run = d.get('launch_retries_run') # 运行总数 54 status_passed = d.get('launch_status_passed') # 通过数量 55 status_failed = d.get('launch_status_failed') # 不通过数量 56 status_broken = d.get('launch_status_broken') # 异常数量 57 try: 58 url_path = 'https://open.feishu.cn/open-apis/bot/v2/hook/' 59 url = url_path+con.fly 60 log.debug("webhook地址: %s"% url) 61 62 headers = { 63 'content-type': "application/json" 64 } 65 66 67 payload = { 68 "mobiles": "13282148187", 69 "msg_type": "post", 70 "content": { 71 "post": { 72 "zh_cn": { 73 "title": "项目接口测试报告", 74 "content": [ 75 [ 76 { 77 "tag": "text", 78 "text": "项目名称: " 79 }, 80 { 81 "tag": "text", 82 "text": "储能管家 " 83 }, 84 { 85 "tag": "text", 86 "un_escape": True, 87 "text": "报告链接: " 88 }, 89 { 90 "tag": "a", 91 "text": "链接 ", 92 "href": self.report_url 93 }, 94 { 95 "tag": "text", 96 "text": "监测分支: " 97 }, 98 { 99 "tag": "text", 100 "text": "master " 101 }, 102 { 103 "tag": "text", 104 "text": "运行总数: " 105 }, 106 { 107 "tag": "text", 108 "text": retries_run+" " 109 }, 110 { 111 "tag": "text", 112 "text": "通过数量: " 113 }, 114 { 115 "tag": "text", 116 "text": status_passed+" " 117 }, 118 { 119 "tag": "text", 120 "text": "异常数量: " 121 }, 122 { 123 "tag": "text", 124 "text": status_broken+" " 125 }, 126 { 127 "tag": "text", 128 "text": "不通过数量: " 129 }, 130 { 131 "tag": "text", 132 "text": status_failed+" " 133 }, 134 { 135 "tag": "text", 136 "text": "运行时间: " 137 }, 138 { 139 "tag": "text", 140 "text": self.tile 141 } 142 ] 143 ] 144 } 145 } 146 } 147 } 148 149 response = requests.request("POST", url, headers=headers, data=json.dumps(payload)) 150 log.info(response.text) 151 152 except Exception as e: 153 self.errorMsg = str(e) 154 log.info("接口请求异常 %s" % e) 155 156 157 158 159 160 161 f = flyb() 162 f.ly_data()
因公司要求 结合jenkins和飞书发送测试报告通知(与钉钉通知雷同)