1、jira单邮件提醒
代码示例
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# coding:utf8
import requests
import datetime
import smtplib
from email.mime.text import MIMEText
from email.header import Header
class JiraTest(object):
# def __init__(self):
# pass
# 获取需求列表
def query_wf_project(param):
payload={
'jql': 'issuetype in (业务需求, 技术需求) AND 需求类型 = 项目 AND status not in (发布上线, Done, 不解决, 废弃) ORDER BY createdDate ASC',
'startAt': 0, 'expand': 'changelog'}
apiUrl = 'http://wf.vdian.net/rest/api/2/search'
r = requests.get(apiUrl, params=payload)
issues = r.json()["issues"]
total = r.json()["total"]
print(r.url)
return issues
def publish_delay(publistTime, testTime, created):
desc = ""
if testTime:
planTestTime = datetime.datetime.strptime(testTime, '%Y-%m-%d')
nowTimeStamp = datetime.datetime.now()
if nowTimeStamp > planTestTime:
desc = "提测延期" + str((nowTimeStamp - planTestTime).days) + "天;"
else:
desc = ""
else:
desc = "计划提测时间未填写;"
if publistTime:
#由字符串转为日期型的函数为:datetime.datetime.strptime()
planTimeStamp = datetime.datetime.strptime(publistTime, '%Y-%m-%d')
nowTimeStamp = datetime.datetime.now()
if nowTimeStamp > planTimeStamp:
desc = desc + "发布延期" + str((nowTimeStamp - planTimeStamp).days) + "天;"
else:
desc = desc + ""
else:
desc = desc + "计划发布时间未填写;"
##项目时间超过2个月
nowTimeStamp = datetime.datetime.now()
createdStamp = datetime.datetime.strptime(created, '%Y-%m-%d')
if (nowTimeStamp - createdStamp).days > 60:
desc = desc + "项目创建时间超过2个月;"
if desc.strip() == "":
return "项目进度正常"
return desc
# 发送邮件
def send_notify(email, key, title, res, subject):
# 第三方 SMTP 服务
mail_host = 'smtp.126.com' # 设置服务器
mail_user = 'XX@126.com' # 用户名
mail_pass ='126password' # 126邮箱对应的口令
sender = 'XX@126.com'
receivers = 'XX@qq.com' # 接收邮件,可设置为你的QQ邮箱或者其他邮箱
mail_msg=''
mail_msg = mail_msg.replace("TEST-412", key)
mail_msg = mail_msg.replace("标题", title)
mail_msg = mail_msg.replace("项目描述", res)
message = MIMEText(mail_msg, 'html', 'utf-8')
message['From'] = 'xx@126.com'
message['to'] = 'xx@qq.com'
message['Subject'] = Header(subject, 'utf-8')
smtpObj = smtplib.SMTP()
smtpObj.connect(mail_host) # 25 为 SMTP 端口号
#或者 smtpObj = smtplib.SMTP(mail_host, 25)
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receivers, message.as_string())
smtpObj.quit()
print('邮件发送成功')
issues=query_wf_project(0)
for issue in issues:
#url
url = "http://wf.vdian.net/browse/"+(issue["key"])
#标题
title = (issue["fields"]["summary"])
#报告人
reporter = (issue["fields"]["reporter"]["displayName"])
#创建日期
created = issue["fields"]["created"][0:10]
#产品
project = (issue["fields"]["project"]["name"])
#产品线
projectCategory = (issue["fields"]["project"]["projectCategory"]["name"])
#计划提测时间
testPlanTime = (issue["fields"]["customfield_10200"])
#计划上线时间
publistPlanTime = (issue["fields"]["customfield_10109"])
#进度
status = (issue["fields"]["status"]["name"])
#经办人
assignee = (issue["fields"]["assignee"]["displayName"])
#测试人员
if issue["fields"]["customfield_10108"]:
tester = (issue["fields"]["customfield_10108"][0]["displayName"])
else:
tester = ("无测试人员")
delay = publish_delay(publistPlanTime, testPlanTime, created)
print('延期信息打印',delay)
send_notify('XX@qq.com', 'WDAPP-8503', 'title', 'desc', 'subject')
2、数据库连接
代码示例
import mysql.connector
import requests
mydb = mysql.connector.connect(
host="xx.xx.xx.xx", # 数据库主机地址
user="xx", # 数据库用户名
passwd="xx" , # 数据库密码
database="xx"
)
mycursor = mydb.cursor()
def query_data_job(date):
payload = {
'bizDate': date,
'business': 'xxx',
}
apiUrl = 'http://dw-meta.vdian.net/api/baseline/getAllOnlineItem'
cookies = dict(cookies_are='UM_distinctid=16425b64d696ca-063cc6e7c416b-17356953-384000-16425b64d6a62; __spider__visitorid=c00d2f6b6aeb465c; gr_user_id=f01d0c69-8913-4de1-8278-9acb25f36884; grwng_uid=ffd5e645-aa92-4b4d-ad94-0d3743a20f79; _ga=GA1.2.797647807.1519787175; sso-ticket=9ec76cd5abaa71c72af5933003c60723-7596ea916f27baa7c3781445212dea16; wdr-ticket=ee5e0ea331a1f5360d3decf2bbb7d851-c881790b671adff0b5a2482381253d29')
r = requests.get(apiUrl, params=payload,cookies=cookies)
datas = r.json()["data"]
print(r.url)
return datas
def get_sum_delay_time(datas):
total_time = 0;
for data in datas:
if data['deadlineTime'] and data['actualDoneTime']:
deadlineTimeH = calculationc(data['deadlineTime'])
actualDoneTimeH = calculationc(data['actualDoneTime'])
if(actualDoneTimeH-deadlineTimeH>0):
total_time = total_time+(actualDoneTimeH-deadlineTimeH)
return total_time
#转换成小时
def calculationc(datatime):
hou = 0.0
timelist = datatime.split(':')
hou = float(timelist[0]) + float(timelist[1])/60 + float(timelist[2])/3600
return hou
def insertDb(datatime):
sql = "INSERT INTO data_delay_time (date, time) VALUES (%s, %s)"
val = (datatime,get_sum_delay_time(query_data_job(datatime)))
mycursor.execute(sql, val)
mydb.commit() # 数据表内容有更新,必须使用到该语句
for i in range(29):
datatime = '2018-10-'+str(i+1)
insertDb(datatime)