zoukankan      html  css  js  c++  java
  • python封装mysq操作,进行数据库的增删改

    python操作mysql进行封装,封装的好处我就不提了,以下是我做项目时的一个封装,大家可以根据实际需要进行自己的一个封装

    我封装的内容:

     1.数据库的配置文件

    2.获取数据配置文件的地址

    3.连接数据库的操作

    4.操作mysql的语句

    5.调用mysql的语句--执行

     封装1:数据库配置文件

    config中db_conf.ini

    里面主要是一些mysql的配置文件

    [mysqlconf]
    host=127.0.0.1
    port=3306
    user=root
    password=123456
    db_name=数据库名
    

    封装2:读取配置文件地址public_data.py

    import os
    # 整个项目的根目录绝对路劲
    baseDir = os.path.dirname(os.path.dirname(__file__))
    
    # 数据库配置你文件绝对路径
    config_path = baseDir + "/config/db_config.ini"
    print(config_path)

      打印目录地址:

     封装3:数据库的连接 config_handler.py

    # -*- coding:utf-8 -*-
    #@Time : 2020/6/14 11:01
    #@Author: 张君
    #@File : config_handler.py
    
    from configparser import ConfigParser
    from config.public_data import config_path
    
    class ConfigParse(object):
        def __init__(self):
            pass
    
        @classmethod
        def get_db_config(cls):
            #cls使用的类方法,cls就是指定本身
    
            cls.cfp = ConfigParser()
            cls.cfp.read(config_path)
            host = cls.cfp.get("mysqlconf", "host")
            port = cls.cfp.get("mysqlconf", "port")
            user = cls.cfp.get("mysqlconf", "user")
            password = cls.cfp.get("mysqlconf", "password")
            db = cls.cfp.get("mysqlconf", "db_name")
            return {"host":host, "port":port, "user":user, "password":password,"db":db}
    
    
    if __name__ == '__main__':
        cp=ConfigParse()
        result=cp.get_db_config()
        print(result)
    

      

     封装4:sql语句的封装  db_handle.py

    import datetime
    
    import pymysql
    from utils.config_handler import ConfigParse
    
    class DB(object):
        def __init__(self):
            #获取mysql连接信息
            self.db_conf = ConfigParse.get_db_config()
            #获取连接对象
            self.conn = pymysql.connect(
                host = self.db_conf["host"],
                port = int(self.db_conf["port"]),
                user = self.db_conf["user"],
                password = self.db_conf["password"],
                database = self.db_conf["db"],
                charset = "utf8"
            )
            #获取数据的游标
            self.cur = self.conn.cursor()
    
        def close_connect(self):
            # 关闭数据连接
            #提交,物理存储
            self.conn.commit()
            #游标关闭
            self.cur.close()
            #连接对象关闭
            self.conn.close()
    
        def get_api_list(self):
            """获取所有的对象数据"""
            sqlStr = "select * from interface_api where status=1"
            self.cur.execute(sqlStr)
            data = self.cur.fetchall()
            #转成一个list
            apiList = list(data)
            return apiList
    
        def get_api_case(self, api_id):
            """获取某一条数据"""
            sqlStr = "select * from interface_test_case where api_id=%s and status=1" %api_id
            self.cur.execute(sqlStr)
            api_case_list = list(self.cur.fetchall())
            return api_case_list
    
        def get_rely_data(self, api_id, case_id):
            """获取所有数据库中的某一条"""
            sqlStr = "select data_store from interface_data_store where api_id=%s and case_id=%s" %(api_id, case_id)
            self.cur.execute(sqlStr)
            rely_data = eval(self.cur.fetchall()[0][0])
            return rely_data
    
        def write_check_result(self, case_id, errorInfo, res_data):
            """更新数据库表"""
            sqlStr = "update interface_test_case set error_info="%s", res_data="%s" where id=%s" %(errorInfo, res_data, case_id)
            self.cur.execute(sqlStr)
            self.conn.commit()
        def insert_dab(self):
            sqlStr="INSERT INTO `interface_api` VALUES (4, '修改博文', 'http://39.106.41.11:8080/getBlogContent/', 'get', 'url','0', '2018-07-27 22:13:30')"
            self.cur.execute(sqlStr)
    
        def get_api_case(self, api_id):
            """获取表中id"""
            sqlStr = "select * from interface_test_case where api_id=%s and status=1" % api_id
            self.cur.execute(sqlStr)
            api_case_list = list(self.cur.fetchall())
            return api_case_list
    
        def get_api_id(self, api_name):
            """获取表中id"""
            sqlStr = "select api_id from interface_api where api_name='%s'" % api_name
            self.cur.execute(sqlStr)
            api_id = self.cur.fetchall()[0][0]
            return api_id
    
        def update_store_data(self, api_id, case_id, store_data):
            
            sqlStr = "select data_store from interface_data_store where api_id=%s and case_id=%s" % (api_id, case_id)
            self.cur.execute(sqlStr)
            if self.cur.fetchall():
                sqlStr = "update interface_data_store set data_store="%s" where api_id=%s and case_id=%s" % (
                store_data, api_id, case_id)
                print(sqlStr)
                self.cur.execute(sqlStr)
                self.conn.commit()
            else:
                sqlStr = "insert into interface_data_store values(%s, %s, "%s", '%s')" % (
                api_id, case_id, store_data, datetime.now())
                self.cur.execute(sqlStr)
                self.conn.commit()
    
    if __name__ == '__main__':
        db=DB()
        print(db.get_api_list())
        print(db.get_api_case(1))
        # print(db.get_rely_data(1,1))
        #print(db.insert_dab())
    

      

     使用ing,我新建了一个test.py文件

    from utils.db_handler import DB
    from action.get_rely import GetRely
    from utils.HttpClient import HttpClient
    
    def main():
        #连接数据库,获取连接实例对象
        db=DB()
        #从数据库中获取需要执行的api执行集合
    
        api_list=db.get_api_list()
        print(api_list)
    
    main()
    

      

     至此一整套封装,与使用就完成了,为什么要分为几个文件夹,我遵循的一个原则, 使用、配置、公共方法是用不的文件夹整理,其实最终也就是你使用怎么方便,维护比较好,就怎么用

  • 相关阅读:
    lvs+nginx负载均衡
    数据库读写分离、分表分库——用Mycat
    RocketMQ最佳实战
    几个常用类
    Future复习笔记
    线程池复习笔记
    HashMap 和 ConcurrentHashMap比较
    HTTP长连接和短连接(转)
    咨询
    RocketMQ 问题汇总
  • 原文地址:https://www.cnblogs.com/chongyou/p/13124269.html
Copyright © 2011-2022 走看看