zoukankan      html  css  js  c++  java
  • python/pandas 操作数据库

    1. (数据库)CSV操作
    # 从CSV中读取数据
    df = pd.read_csv('pandas.csv',encoding = "utf-8",delimiter=",",error_bad_lines=False)
    
    # 读取本地CSV文件
    df = pd.read_csv("C:/Users/fuqia/Desktop/example.csv", sep=',')
    
    1. (数据库)使用sqlite3
    import pandas as pd
    import altair as alt
    import sqlite3
    
    con = sqlite3.connect("pandas.db")
    
    # 执行sql语句 ,创建表
    # con.execute("CREATE TABLE t (ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP)")
    
    # 执行sql语句 ,创建表
    # con.execute("CREATE TABLE person2(\
    # id  INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,\
    # time  TEXT NOT NULL,\
    # name  varchar(100) NOT NULL\
    # )")
     
    # # 执行sql语句,向数据表中插入数据记。
    # con.execute("insert into person2(time, name) values('1', 'bill')")
    # 提交之前的操作
    # con.commit()
     
    # pandas 从数据库中读取数据
    df = pd.read_sql("select id,time, name from person2", con)
    
    # pandas 数据存入数据库
    df = pd.DataFrame({'time': [11, 12, 13, 14], 'name': ['zhangsan', 'lisi', 'wangwu', 'zhuliu']})
    df.to_sql('person2', con, index=False)
    
    
    # 从CSV中读取数据
    df = pd.read_csv('pandas.csv',encoding = "utf-8",delimiter=",",error_bad_lines=False)
    df
    # 数据存入数据库
    df.to_sql('hf', con, index=False)
    
    
    1. (数据库)mysql 读取插入pandas数据
    import pandas as pd
    import pymysql
    import sys
    from sqlalchemy import create_engine
    
    def read_mysql_and_insert():
        # pymysql for df read_sql
        try:
            conn = pymysql.connect(host='127.0.0.1', user='root', password='pass', db='db1', charset='utf8')
        except pymysql.err.OperationalError as e:
            print('Error is ' + str(e))
            sys.exit()
    
        # sqlalchemy for df to_sql
        try:
            engine = create_engine('mysql+pymysql://root:pass@127.0.0.1:3306/db1')
        except sqlalchemy.exc.OperationalError as e:
            print('Error is ' + str(e))
            sys.exit()
        except sqlalchemy.exc.InternalError as e:
            print('Error is ' + str(e))
            sys.exit()
    
        try:
            # pymysql 读取数据库,返回df类型数据
            sql = 'select * from person2'
            df = pd.read_sql(sql, con=conn)
        except pymysql.err.ProgrammingError as e:
            print('Error is ' + str(e))
            sys.exit()
    
        print(df.head())
        # sqlalchemy 存取df类型数据到数据库
        df.to_sql(name='add_table',con=engine,if_exists='append',index=False)
        conn.close()
        print('ok')
    
    if __name__ == '__main__':
        df = read_mysql_and_insert()
    
    
    1. (python模块)pymysql,sqlalchemy
        import pandas as pd
        import numpy as np
        import altair as alt
    
        # pymysql ,执行sql语句
        conn = pymysql.connect(host='127.0.0.1', user='root', password='xxx', db='db1', charset='utf8')
        cursor = conn.cursor()
        cursor.execute("CREATE TABLE t3 (ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP)")
        
        # df 相关
        #1 pymysql + sqlalchemy ,创建engine 
        from sqlalchemy import create_engine
        engine = create_engine('mysql+pymysql://root:xxx@127.0.0.1:3306/db1')
        #2 sqlalchemy ,执行sql语句
        engine.execute("CREATE TABLE t2 (ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP)")
        #3 sqlalchemy ,读取df
        sql = ''' select * from employee; '''
        df = pd.read_sql_query(sql, engine)
        print(df)
    
    1. (python模块)使用pymysql
    import pymysql
    
    con = pymysql.connect('localhost',
                          'root',
                          'eve136712',
                          db='model_save',
                          charset='utf8',
                          use_unicode=True)
    cursor = con.cursor()
    
    # 通过游标的execute方法执行sql语句。
    cursor.execute("create table person(id varchar(32) primary key, name varchar(100) )")
    
    
    1. (python模块)使用sqlalchemy
    
    import pandas as pd
    from sqlalchemy import create_engine
    
    # engine 创建 方法1 (还是必须要用 pymysql + sqlalchemy ,创建engine )
    engine = create_engine('mysql+pymysql://root:123456@localhost:3306/test')
    
    # engine 创建 方法2 (还是必须要用pymysql)
    db_info = {'user': 'root',
               'password': '123456',
               'host': 'localhost',
               'port': 3306,
               'database': 'test'
               }
    engine = create_engine('mysql+pymysql://%(user)s:%(password)s@%(host)s:%(port)d/%(database)s?charset=utf8' % db_info,
                           encoding='utf-8')
    
    # sqlalchemy 读取数据库,返回df类型数据
    sql = ''' select * from employee; '''
    df = pd.read_sql_query(sql, engine)
    print(df)
    
    df = pd.DataFrame({'id': [1, 2, 3, 4], 'name': ['zhangsan', 'lisi', 'wangwu', 'zhuliu']})
    # 将df数据储存为MySQL中的数据表,储存index列
    df.to_sql('mydf', engine, index=True)
    
    
    # 将新建的DataFrame储存为MySQL中的数据表,不储存index列(index=False)
    # if_exists:
    # 1.fail:如果表存在,啥也不做
    # 2.replace:如果表存在,删了表,再建立一个新表,把数据插入
    # 3.append:如果表存在,把数据插入,如果表不存在创建一个表!!
    # 方式1
    pd.io.sql.to_sql(df, 'example', con=engine, index=False, if_exists='replace')
    # 方式2
    df.to_sql('example', con=engine,  if_exists='replace')
    
    
    
    1. (python模块)使用Django (应用)
    
    def mysql_to_pd(case_name='XXXXXTraffic'):
        report_details_by_casename = ReportDetail.objects.filter(case_name=case_name)
        details_by_case_name_df = pd.DataFrame(
            list(report_details_by_casename.values(
                'report_create_time',
                'platform_name',
                'throughputd',
                'throughputu',
                'case_name', )))
        df = details_by_case_name_df.rename(columns={'platform_name': 'HW'})
        df['time'] = df['report_create_time'].dt.strftime('%Y-%m-%d')
        df.set_index(pd.to_datetime(df["time"]), inplace=True)
        df = df.sort_index()
        return df
    
    data=mysql_to_pd()
    
    
    
    
    1. (python模块)使用df+mysql (应用)
    import pandas as pd
    import numpy as np
    import altair as alt
    import pymysql
    from sqlalchemy import create_engine
        
    engine = create_engine('mysql+pymysql://root:password@127.x.x.x:3306/db5')
    sql = ''' select report_uuid  from report_reportinfo; '''
    df = pd.read_sql_query(sql, engine)
    df
    
    1. (python)动态创建mysql表【自定义模块】
    # 此代码实现将dict写入mysql指定表中。如果指定表不存在,则根据dict中key的字段创建表,然后将dict写入表中
    
    import pymysql
    from scrapy.conf import settings
    
    
    class DataToMysql:
        def __init__(self, host, user, passwd, db, port):
            try:
                self.conn = pymysql.connect(host=host, user=user, passwd=passwd, db=db,
                                            port=3306, charset='utf8')  # 链接数据库
                self.cursor = self.conn.cursor()
            except pymysql.Error as e:
                print("数据库连接信息报错")
                raise e
    
        def write(self, table_name, info_dict):
            """
            根据table_name与info自动生成建表语句和insert插入语句
            :param table_name: 数据需要写入的表名
            :param info_dict: 需要写入的内容,类型为字典
            :return:
            """
            sql_key = ''  # 数据库行字段
            sql_value = ''  # 数据库值
            for key in info_dict.keys():  # 生成insert插入语句
                sql_value = (sql_value + '"' + pymysql.escape_string(info_dict[key]) + '"' + ',')
                sql_key = sql_key + ' ' + key + ','
    
            try:
                self.cursor.execute(
                    "INSERT INTO %s (%s) VALUES (%s)" % (table_name, sql_key[:-1], sql_value[:-1]))
                self.conn.commit()  # 提交当前事务
            except pymysql.Error as e:
                if str(e).split(',')[0].split('(')[1] == "1146":  # 当表不存在时,生成建表语句并建表
                    sql_key_str = ''  # 用于数据库创建语句
                    columnStyle = ' text'  # 数据库字段类型
                    for key in info_dict.keys():
                        sql_key_str = sql_key_str + ' ' + key + columnStyle + ','
                    self.cursor.execute("CREATE TABLE %s (%s)" % (table_name, sql_key_str[:-1]))
                    self.cursor.execute("INSERT INTO %s (%s) VALUES (%s)" %
                                        (table_name, sql_key[:-1], sql_value[:-1]))
                    self.conn.commit()  # 提交当前事务
                else:
                    raise
    
    
    if __name__ == '__main__':
        mysql = DataToMysql('localhost','root','******','pythonDB')
        di = {"A": "A", "B": "B"}
        mysql.write('te', di)
    
    
    1. (python)动态创建mysql表中的字段【自定义模块】【good】
    
    def check_column_exist(mysql_conn, column_name):
        mysql_sql = f'''SHOW COLUMNS FROM `report_parse_soap_result` LIKE '{column_name[:column_name.index(':')]}';'''
        return bool(mysql_conn.cursor().execute(mysql_sql))
    
    
    def add_column(mysql_conn, column_name):
        try:
            mysql_sql = f"ALTER TABLE `report_parse_soap_result` " \
                        f"ADD COLUMN `{column_name[:column_name.index(':')]}` INT NULL ;"
            mysql_conn.cursor().execute(mysql_sql)
        except Exception as e:
            print('add_column:', e)
    
    
    def save_one_soap_result(mysql_conn,report_uuid,soap_result):
        (column_name, value), = soap_result.items()
        if check_column_exist(mysql_conn, column_name):
            try:
                sql = f"UPDATE report_parse_soap_result SET `{column_name[:column_name.index(':')]}` = {value} " \
                      f'where fk_report_uuid_id= "{report_uuid}"'
                mysql_conn.cursor().execute(sql)
                mysql_conn.commit()
            except Exception as e:
                print('save_one_soap', e)
    
        else:
            add_column(mysql_conn, column_name)
    
    
    def save_soap_result_list(mysql_conn, report_uuid, soap_result_list):
        try:
            sql = f'''INSERT INTO report_parse_soap_result (`fk_report_uuid_id`) VALUES ( '{report_uuid}');'''
            mysql_conn.cursor().execute(sql)
            mysql_conn.commit()
        except Exception as e:
            print(e)
        for result in soap_result_list:
            save_one_soap_result(mysql_conn, report_uuid, result)
    
    
    
    # 使用
    from test import save_soap_result_list
    
    host = '10.101.35.249'
    user = 'xxx'
    password = 'xxx'
    db = 'db'
    conn = pymysql.connect(host=host, user=user, passwd=password, db=db, port=3306, charset='utf8')
    
    ojb = RobotXML('output.xml')
    report_uuid = ojb.report_reportinfo_dic['report_uuid']
    soap_result_list = ojb.report_soap_parser_result
    save_soap_result_list(conn, report_uuid, soap_result_list)
    
    
  • 相关阅读:
    Tkinter的Radiobutton组件
    Tkinter的Canvas组件
    python小程序-日历查询器
    python语言Tkinter的Button组件
    python异常处理机制(try:except)
    python中OS模块的使用
    正则表达式-常用函数的基本使用
    正则表达式-常用元字符的基本使用
    html框架iframe与frameset的介绍
    常见form表单5种input输入类型
  • 原文地址:https://www.cnblogs.com/amize/p/13942598.html
Copyright © 2011-2022 走看看