- (数据库)CSV操作
# 从CSV中读取数据
df = pd.read_csv('pandas.csv',encoding = "utf-8",delimiter=",",error_bad_lines=False)
# 读取本地CSV文件
df = pd.read_csv("C:/Users/fuqia/Desktop/example.csv", sep=',')
- (数据库)使用sqlite3
import pandas as pd
import altair as alt
import sqlite3
con = sqlite3.connect("pandas.db")
# 执行sql语句 ,创建表
# con.execute("CREATE TABLE t (ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP)")
# 执行sql语句 ,创建表
# con.execute("CREATE TABLE person2(\
# id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,\
# time TEXT NOT NULL,\
# name varchar(100) NOT NULL\
# )")
# # 执行sql语句,向数据表中插入数据记。
# con.execute("insert into person2(time, name) values('1', 'bill')")
# 提交之前的操作
# con.commit()
# pandas 从数据库中读取数据
df = pd.read_sql("select id,time, name from person2", con)
# pandas 数据存入数据库
df = pd.DataFrame({'time': [11, 12, 13, 14], 'name': ['zhangsan', 'lisi', 'wangwu', 'zhuliu']})
df.to_sql('person2', con, index=False)
# 从CSV中读取数据
df = pd.read_csv('pandas.csv',encoding = "utf-8",delimiter=",",error_bad_lines=False)
df
# 数据存入数据库
df.to_sql('hf', con, index=False)
- (数据库)mysql 读取插入pandas数据
import pandas as pd
import pymysql
import sys
from sqlalchemy import create_engine
def read_mysql_and_insert():
# pymysql for df read_sql
try:
conn = pymysql.connect(host='127.0.0.1', user='root', password='pass', db='db1', charset='utf8')
except pymysql.err.OperationalError as e:
print('Error is ' + str(e))
sys.exit()
# sqlalchemy for df to_sql
try:
engine = create_engine('mysql+pymysql://root:pass@127.0.0.1:3306/db1')
except sqlalchemy.exc.OperationalError as e:
print('Error is ' + str(e))
sys.exit()
except sqlalchemy.exc.InternalError as e:
print('Error is ' + str(e))
sys.exit()
try:
# pymysql 读取数据库,返回df类型数据
sql = 'select * from person2'
df = pd.read_sql(sql, con=conn)
except pymysql.err.ProgrammingError as e:
print('Error is ' + str(e))
sys.exit()
print(df.head())
# sqlalchemy 存取df类型数据到数据库
df.to_sql(name='add_table',con=engine,if_exists='append',index=False)
conn.close()
print('ok')
if __name__ == '__main__':
df = read_mysql_and_insert()
- (python模块)pymysql,sqlalchemy
import pandas as pd
import numpy as np
import altair as alt
# pymysql ,执行sql语句
conn = pymysql.connect(host='127.0.0.1', user='root', password='xxx', db='db1', charset='utf8')
cursor = conn.cursor()
cursor.execute("CREATE TABLE t3 (ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP)")
# df 相关
#1 pymysql + sqlalchemy ,创建engine
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://root:xxx@127.0.0.1:3306/db1')
#2 sqlalchemy ,执行sql语句
engine.execute("CREATE TABLE t2 (ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP)")
#3 sqlalchemy ,读取df
sql = ''' select * from employee; '''
df = pd.read_sql_query(sql, engine)
print(df)
- (python模块)使用pymysql
import pymysql
con = pymysql.connect('localhost',
'root',
'eve136712',
db='model_save',
charset='utf8',
use_unicode=True)
cursor = con.cursor()
# 通过游标的execute方法执行sql语句。
cursor.execute("create table person(id varchar(32) primary key, name varchar(100) )")
- (python模块)使用sqlalchemy
import pandas as pd
from sqlalchemy import create_engine
# engine 创建 方法1 (还是必须要用 pymysql + sqlalchemy ,创建engine )
engine = create_engine('mysql+pymysql://root:123456@localhost:3306/test')
# engine 创建 方法2 (还是必须要用pymysql)
db_info = {'user': 'root',
'password': '123456',
'host': 'localhost',
'port': 3306,
'database': 'test'
}
engine = create_engine('mysql+pymysql://%(user)s:%(password)s@%(host)s:%(port)d/%(database)s?charset=utf8' % db_info,
encoding='utf-8')
# sqlalchemy 读取数据库,返回df类型数据
sql = ''' select * from employee; '''
df = pd.read_sql_query(sql, engine)
print(df)
df = pd.DataFrame({'id': [1, 2, 3, 4], 'name': ['zhangsan', 'lisi', 'wangwu', 'zhuliu']})
# 将df数据储存为MySQL中的数据表,储存index列
df.to_sql('mydf', engine, index=True)
# 将新建的DataFrame储存为MySQL中的数据表,不储存index列(index=False)
# if_exists:
# 1.fail:如果表存在,啥也不做
# 2.replace:如果表存在,删了表,再建立一个新表,把数据插入
# 3.append:如果表存在,把数据插入,如果表不存在创建一个表!!
# 方式1
pd.io.sql.to_sql(df, 'example', con=engine, index=False, if_exists='replace')
# 方式2
df.to_sql('example', con=engine, if_exists='replace')
- (python模块)使用Django (应用)
def mysql_to_pd(case_name='XXXXXTraffic'):
report_details_by_casename = ReportDetail.objects.filter(case_name=case_name)
details_by_case_name_df = pd.DataFrame(
list(report_details_by_casename.values(
'report_create_time',
'platform_name',
'throughputd',
'throughputu',
'case_name', )))
df = details_by_case_name_df.rename(columns={'platform_name': 'HW'})
df['time'] = df['report_create_time'].dt.strftime('%Y-%m-%d')
df.set_index(pd.to_datetime(df["time"]), inplace=True)
df = df.sort_index()
return df
data=mysql_to_pd()
- (python模块)使用df+mysql (应用)
import pandas as pd
import numpy as np
import altair as alt
import pymysql
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://root:password@127.x.x.x:3306/db5')
sql = ''' select report_uuid from report_reportinfo; '''
df = pd.read_sql_query(sql, engine)
df
- (python)动态创建mysql表【自定义模块】
# 此代码实现将dict写入mysql指定表中。如果指定表不存在,则根据dict中key的字段创建表,然后将dict写入表中
import pymysql
from scrapy.conf import settings
class DataToMysql:
def __init__(self, host, user, passwd, db, port):
try:
self.conn = pymysql.connect(host=host, user=user, passwd=passwd, db=db,
port=3306, charset='utf8') # 链接数据库
self.cursor = self.conn.cursor()
except pymysql.Error as e:
print("数据库连接信息报错")
raise e
def write(self, table_name, info_dict):
"""
根据table_name与info自动生成建表语句和insert插入语句
:param table_name: 数据需要写入的表名
:param info_dict: 需要写入的内容,类型为字典
:return:
"""
sql_key = '' # 数据库行字段
sql_value = '' # 数据库值
for key in info_dict.keys(): # 生成insert插入语句
sql_value = (sql_value + '"' + pymysql.escape_string(info_dict[key]) + '"' + ',')
sql_key = sql_key + ' ' + key + ','
try:
self.cursor.execute(
"INSERT INTO %s (%s) VALUES (%s)" % (table_name, sql_key[:-1], sql_value[:-1]))
self.conn.commit() # 提交当前事务
except pymysql.Error as e:
if str(e).split(',')[0].split('(')[1] == "1146": # 当表不存在时,生成建表语句并建表
sql_key_str = '' # 用于数据库创建语句
columnStyle = ' text' # 数据库字段类型
for key in info_dict.keys():
sql_key_str = sql_key_str + ' ' + key + columnStyle + ','
self.cursor.execute("CREATE TABLE %s (%s)" % (table_name, sql_key_str[:-1]))
self.cursor.execute("INSERT INTO %s (%s) VALUES (%s)" %
(table_name, sql_key[:-1], sql_value[:-1]))
self.conn.commit() # 提交当前事务
else:
raise
if __name__ == '__main__':
mysql = DataToMysql('localhost','root','******','pythonDB')
di = {"A": "A", "B": "B"}
mysql.write('te', di)
- (python)动态创建mysql表中的字段【自定义模块】【good】
def check_column_exist(mysql_conn, column_name):
mysql_sql = f'''SHOW COLUMNS FROM `report_parse_soap_result` LIKE '{column_name[:column_name.index(':')]}';'''
return bool(mysql_conn.cursor().execute(mysql_sql))
def add_column(mysql_conn, column_name):
try:
mysql_sql = f"ALTER TABLE `report_parse_soap_result` " \
f"ADD COLUMN `{column_name[:column_name.index(':')]}` INT NULL ;"
mysql_conn.cursor().execute(mysql_sql)
except Exception as e:
print('add_column:', e)
def save_one_soap_result(mysql_conn,report_uuid,soap_result):
(column_name, value), = soap_result.items()
if check_column_exist(mysql_conn, column_name):
try:
sql = f"UPDATE report_parse_soap_result SET `{column_name[:column_name.index(':')]}` = {value} " \
f'where fk_report_uuid_id= "{report_uuid}"'
mysql_conn.cursor().execute(sql)
mysql_conn.commit()
except Exception as e:
print('save_one_soap', e)
else:
add_column(mysql_conn, column_name)
def save_soap_result_list(mysql_conn, report_uuid, soap_result_list):
try:
sql = f'''INSERT INTO report_parse_soap_result (`fk_report_uuid_id`) VALUES ( '{report_uuid}');'''
mysql_conn.cursor().execute(sql)
mysql_conn.commit()
except Exception as e:
print(e)
for result in soap_result_list:
save_one_soap_result(mysql_conn, report_uuid, result)
# 使用
from test import save_soap_result_list
host = '10.101.35.249'
user = 'xxx'
password = 'xxx'
db = 'db'
conn = pymysql.connect(host=host, user=user, passwd=password, db=db, port=3306, charset='utf8')
ojb = RobotXML('output.xml')
report_uuid = ojb.report_reportinfo_dic['report_uuid']
soap_result_list = ojb.report_soap_parser_result
save_soap_result_list(conn, report_uuid, soap_result_list)