双人结对作业疫情可视化(util)
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import traceback
import pymysql
import pymysql.cursors
import time
import main
# 获得conn cursor
def get_conn():
# 打开数据库连接
conn = pymysql.connect(host='localhost',
user='root',
passwd='123',
db='text2',
port=3306,
charset='utf8')
# 创建游标,默认是元组型
cursor = conn.cursor()
return conn, cursor
# 关闭conn cursor
def close_conn(conn, cursor):
if cursor:
cursor.close()
if conn:
conn.close()
def query(sql, *args):
conn, cursor = get_conn()
cursor.execute(sql, args)
res = cursor.fetchall()
close_conn(conn, cursor)
return res
def insert_province():
cursor = None
conn = None
try:
li = main.get_data("https://view.inews.qq.com/g2/getOnsInfo?name=disease_h5")[1]
print(f"{time.asctime()}开始插入历史数据")
conn, cursor = get_conn()
sql = "insert into province values(%s,%s,%s,%s,%s,%s,%s)"
for v in li:
cursor.execute(sql, [v[0], v[1], v[2], v[3], v[4], v[5], v[6]]) # [v.get("ptime"), v.get("pname"), v.get("pconfirm"), v.get("pheal"), v.get("psuspect"), v.get("pdead"), v.get("pconfirmAdd")]
conn.commit()
print(f"{time.asctime()}插入历史数据完毕")
except:
traceback.print_exc()
finally:
close_conn(conn, cursor)
def update_provinces():
cursor = None
conn = None
try:
li = main.get_data("https://view.inews.qq.com/g2/getOnsInfo?name=disease_h5")[1]
conn, cursor = get_conn()
sql = "insert into province values(%s,%s,%s,%s,%s,%s,%s)"
sql_query = 'select %s=(select ptime from province order by ptime desc limit 1)' # 对比最大时间戳
cursor.execute(sql_query, li[0][0])
if not cursor.fetchone()[0]:
print(f"{time.asctime()}开始更新最新数据")
for item in li:
cursor.execute(sql, item)
conn.commit() # 提交事务操作
print(f"{time.asctime()}更新最新数据完成")
else:
print(f"{time.asctime()}已经是最新数据")
except:
traceback.print_exc()
finally:
close_conn(conn, cursor)
def get_province():
sql = "select pname,sum(pconfirm),ANY_VALUE(psuspect),ANY_VALUE(pheal),ANY_VALUE(pdead) from province "
"where ptime=(select ptime from province "
"order by ptime desc limit 1) "
"group by pname"
# sql = "select province,sum(pconfirm) from province where ptime=(select ptime from province) order by ptime desc limit 1 group by province"
res = query(sql)
return res
def insert_area():
cursor = None
conn = None
try:
li = main.get_data("https://view.inews.qq.com/g2/getOnsInfo?name=disease_h5")[2]
print(f"{time.asctime()}开始插入历史数据")
conn, cursor = get_conn()
sql = "insert into area values(%s,%s,%s,%s,%s,%s,%s,%s)"
for v in li:
cursor.execute(sql, [v[0], v[1], v[2], v[3], v[4], v[5], v[6], v[7]]) # [v.get("ptime"), v.get("pname"), v.get("pconfirm"), v.get("pheal"), v.get("psuspect"), v.get("pdead"), v.get("pconfirmAdd")]
conn.commit()
print(f"{time.asctime()}插入历史数据完毕")
except:
traceback.print_exc()
finally:
close_conn(conn, cursor)
def update_area():
cursor = None
conn = None
try:
li = main.get_data("https://view.inews.qq.com/g2/getOnsInfo?name=disease_h5")[2]
conn, cursor = get_conn()
sql = "insert into area values(%s,%s,%s,%s,%s,%s,%s,%s)"
sql_query = 'select %s=(select ctime from area order by ctime desc limit 1)' # 对比最大时间戳
cursor.execute(sql_query, li[0][0])
if not cursor.fetchone()[0]:
print(f"{time.asctime()}开始更新最新数据")
for item in li:
cursor.execute(sql, item)
conn.commit() # 提交事务操作
print(f"{time.asctime()}更新最新数据完成")
else:
print(f"{time.asctime()}已经是最新数据")
except:
traceback.print_exc()
finally:
close_conn(conn, cursor)
def get_area(province):
sql = "select cname,cconfirm,csuspect,cheal,cdead from area "
"where cprovince=('"+province+"') and ctime=(select ctime from area "
"order by ctime desc limit 1) "
res = query(sql)
return res
def insert_history():
cursor = None
conn = None
try:
dic = main.Spider.get_data()[0]
print(f"{time.asctime()}开始插入历史数据")
conn, cursor = get_conn()
sql = "insert into history values(%s)"
for k,v in dic.items():
cursor.execute(sql, [k, v.get("???")])
conn.commit()
print(f"{time.asctime()}插入历史数据完毕")
except:
traceback.print_exc()
finally:
close_conn(conn, cursor)
def update_history():
cursor = None
conn = None
try:
dic = main.Spider.get_data()[0]
print(f"{time.asctime()}开始更新历史数据")
conn, cursor = get_conn()
sql = "insert into history values(%s)"
sql_query = "select () from history where ds=%s"
for k, v in dic.items(sql_query, k):
cursor.execute(sql, [k, v.get("???")])
conn.commit()
print(f"{time.asctime()}更新历史数据完毕")
except:
traceback.print_exc()
finally:
close_conn(conn, cursor)