工具类
pymsql工具类
# 编写settings.py,用于管理数据库基本信息
MYSQL_HOST = "localhost"
CHARSET = "utf8"
DB = "animal"
USER = "root"
MYSQL_PORT = 3306
PASSWORD = "bb961202"
# 编写工具类base_mysql.py
# encoding=utf-8
import pymysql
# 导入所有Mysql配置常量,请自行指定文件
from conf.settings import *
class MysqlConnection(object):
"""
mysql操作类,对mysql数据库进行增删改查
"""
def __init__(self,config):
# Connect to the database
self.connection = pymysql.connect(**config)
# True 在execute时就会立即向数据库发出操作请求,而不是等待运行到commit()时再一起执行
# False 会等到执行commit才会提交
self.connection.autocommit(True)
self.cursor = self.connection.cursor()
def QueryAll(self, sql):
"""
查询所有数据
:param sql:
:return:
"""
# 数据库若断开即重连
self.reConnect()
self.cursor.execute(sql)
# 返回结果行(所有)
return self.cursor.fetchall()
def QueryMany(self, sql, n):
"""
查询某几条数据数据
:param sql:
:return:
"""
# 数据库若断开即重连
self.reConnect()
self.cursor.execute(sql)
# 返回结果行(传入参数n,n=5,返回5条)
return self.cursor.fetchmany(n)
def QueryOne(self, sql):
"""
查询某几条数据数据
:param sql:
:return:
"""
# 数据库若断开即重连
self.reConnect()
self.cursor.execute(sql)
# 返回一条结果行
return self.cursor.fetchone()
def reConnect(self):
"""
重连机制
:return:
"""
try:
self.connection.ping()
except:
self.connection()
def Operate(self, sql, params=None, DML=True):
"""
数据库操作:增删改查
DML: insert / update / delete
DDL: CREATE TABLE/VIEW/INDEX/SYN/CLUSTER
"""
try:
# 数据库若断开即重连
self.reConnect()
with self.connection.cursor() as cursor:
cursor.execute(sql, params)
self.connection.commit()
except Exception as e:
if DML:
# 涉及DML操作时,若抛异常需要回滚
self.connection.rollback()
print(e)
def __del__(self):
"""
MysqlConnection实例对象被释放时调用此方法,用于关闭cursor和connection连接
"""
self.cursor.close()
self.connection.close()
if __name__ == "__main__":
# 初始化MysqlConnection实例对象需要传Mysql配置信息的字典
config = {'host': MYSQL_HOST, 'charset': CHARSET, 'db': DB, 'user': USER, 'port': MYSQL_PORT, 'password': PASSWORD}
msc=MysqlConnection(config)
sql="select * from dog"
print(msc.QueryOne(sql))
时间工具类
import datetime
import time
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
TIME_FORMAT = "%H:%M:%S"
# 当前毫秒数
def cur_millisecond():
return int(time.time() * 1000)
# 当前秒数
def cur_second():
return int(time.time())
# 当前日期 格式%Y-%m-%d %H:%M:%S
def cur_datetime():
return datetime.datetime.strftime(datetime.datetime.now(), DATETIME_FORMAT)
# 当前日期 格式%Y-%m-%d
def cur_date():
return datetime.date.today()
# 当前时间 格式%Y-%m-%d
def cur_time():
return time.strftime(TIME_FORMAT)
# 秒转日期
def second_to_datetime(seconds):
return time.strftime(DATETIME_FORMAT, time.localtime(seconds))
# 毫秒转日期
def millisecond_to_datetime(milli_second):
return time.strftime(DATETIME_FORMAT, time.localtime(milli_second // 1000))
# 日期转毫秒
def datetime_to_millisecond(dates_time):
data = time.strptime(dates_time, DATETIME_FORMAT)
return int(time.mktime(data)) * 1000
# 日期转秒
def datetime_to_second(dates_time):
data = time.strptime(dates_time, DATETIME_FORMAT)
return int(time.mktime(data))
# 当前年
def cur_year():
return datetime.datetime.now().year
# 当前月
def cur_month():
return datetime.datetime.now().month
# 当前日
def cur_day():
return datetime.datetime.now().day
# 当前时
def cur_hour():
return datetime.datetime.now().hour
# 当前分
def cur_minute():
return datetime.datetime.now().minute
# 当前秒
def cur_seconds():
return datetime.datetime.now().second
# 星期几
def cur_week():
return datetime.datetime.now().weekday()
# 几天前的时间
def now_days_ago(days):
days_ago_time = datetime.datetime.now() - datetime.timedelta(days=days)
return time.strftime(DATETIME_FORMAT, days_ago_time.timetuple())
# 几天后的时间
def now_days_after(days):
days_after_time = datetime.datetime.now() + datetime.timedelta(days=days)
return time.strftime(DATETIME_FORMAT, days_after_time.timetuple())
# 某个日期几天前的时间
def date_time_days_ago(dates_time, days):
data_time_ago = datetime.datetime.strptime(dates_time, DATETIME_FORMAT) - datetime.timedelta(days=days)
return time.strftime(DATETIME_FORMAT, data_time_ago.timetuple())
# 某个日期几天前的时间
def date_time_days_after(dates_time, days):
data_time_after = datetime.datetime.strptime(dates_time, DATETIME_FORMAT) + datetime.timedelta(days=days)
return time.strftime(DATETIME_FORMAT, data_time_after.timetuple())
# # 输出当前毫秒
# millisecond = cur_millisecond()
# # 输出当前秒
# second = cur_second()
# 输出当前日期(带时分秒)
# dates_time = cur_datetime()
# # 输出当前日期(不带时分秒)
# date = cur_date()
# print(date)
# 输出当前时间
# now_time = cur_time()
# # 秒转时间
# s_t_d = second_to_datetime(second)
# # 毫秒转时间
# m_t_d = millisecond_to_datetime(millisecond)
# # 时间转毫秒
# d_t_m = datetime_to_millisecond(now_time)
# # 时间转秒
# d_t_s = datetime_to_second(now_time)
# # 当前年
# year = cur_year()
# # 当前月
# month = cur_month()
# # 当前天
# day = cur_day()
# print(day)
# 当前小时
# hour = cur_hour()
# # 当前分钟
# minute = cur_minute()
# # 当前秒
# seconds = cur_seconds()
# # 当前周
# week = cur_week()
# # 几天前的时间
# n_d_ago = now_days_ago(2)
# # 几天后的时间
# n_d_after = now_days_after(2)
# # 某个日期几天前的时间
# d_t_d_ago = date_time_days_ago(date_time, 2)
# # 某个日期几天后的时间
# d_t_d_after = date_time_days_after(date_time, 2)
数据驱动类
json读取
# 操作json
import json
def read_json(fileName):
filepath = "C:/Users/17327/PycharmProjects/yunhe/data/" + fileName
arr = []
with open(filepath, "r", encoding="utf-8") as file:
datas = json.load(file)
for data in datas.values():
arr.append(
(data['username'], data['password'], data['sure_password'], data['success'])
)
return ar
# 读取json
{
"data1": {
"username": "",
"password": 123456,
"sure_password": 123456,
"success": false
},
"data2": {
"username": "sdlsieq",
"password": "",
"sure_password": 123456,
"success": false
},
"data3": {
"username": "sdlsieq",
"password": "",
"sure_password": 123456,
"success": false
}
}
txt读取
# 操作txt
def read_txt(fileName):
filepath = "C:/Users/17327/PycharmProjects/yunhe/data/" + fileName
arr = []
with open(filepath, "r", encoding="utf-8") as file:
datas = file.readlines()
for data in datas:
arr.append(tuple(data.strip().split(",")))
return arr