zoukankan      html  css  js  c++  java
  • python 操作数据库

    Python操作数据库规范

     

    pymysql + configparser 简单实例

    • 本地mysql种的数据:

    • 和项目脚本同级目录种存放mysql的配置文件:mysql.ini

      [MYSQL]
      HOST = 127.0.0.1
      PORT = 3306
      USER = root
      PASSWORD = 123
      DATABASE = s2
      CHARSET = utf8
    • 通过脚本pyDemo.py来访问数据库

      # -*- coding:utf-8 -*-
      import os
      import pymysql
      import configparser
      ​
      # 获取配置
      current_path = os.path.abspath(".")
      config = configparser.ConfigParser()
      config.read(os.path.join(current_path, "mysql.ini"))
      ​
      mysql_conf = dict(
          host=config["MYSQL"]["HOST"],
          port=int(config["MYSQL"]["PORT"]),  # 注意这里得把port转换成int!
          user=config["MYSQL"]["USER"],
          password=config["MYSQL"]["PASSWORD"],
          database=config["MYSQL"]["DATABASE"],
          charset=config["MYSQL"]["CHARSET"],
      )
      ​
      # 连接数据库 —— 注意这里password得写成字符串类型
      conn = pymysql.connect(**mysql_conf)
      # 获取光标对象
      cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
      ​
      ## 执行sql语句
      sql_str1 = """ select * from user """
      # 获得受影响的信息条数
      res_count = cursor.execute(sql_str1)
      print(res_count)  # 2 两条数据
      # 获取数据
      res_datas = cursor.fetchall()
      print(res_datas)  # [{'id': 1, 'username': 'TOM2'}, {'id': 2, 'username': 'TOM2'}]
      cursor.close()
      conn.close()
      

        

      注意:将连接数据库封装为一个方法,每次需要连接的时候调用该方法获取conn和cursor对象,但是这样会有损耗,因为每次都需要 建立连接->执行数据库操作->释放连接。而数据库连接池为维护一个保存有多个数据库连接的池子,每次需要连接数据库时,从连接池中取出一个连接进行使用即可,使用完毕后连接不会释放,而是归还给连接池进行管理,节省了不断建立连接和释放连接的过程。

     

    使用DBUtils+pymysql建立数据库连接池

    • 安装DBUtils模块

      pip install -i https://pypi.douban.com/simple DBUtils
    • 配置文件依旧,操作文件如下:

      # -*- coding:utf-8 -*-
      import os
      import pymysql
      import configparser
      from DBUtils.PooledDB import PooledDB
      ​
      ​
      # 获取配置
      current_path = os.path.abspath(".")
      config = configparser.ConfigParser()
      config.read(os.path.join(current_path,"settings.ini"))
      ​
      mysql_conf = dict(
          host=config["MYSQL"]["HOST"],
          port=int(config["MYSQL"]["PORT"]), # 注意这里得把port转换成int!
          user=config["MYSQL"]["USER"],
          password=config["MYSQL"]["PASSWORD"],
          database=config["MYSQL"]["DATABASE"],
          charset=config["MYSQL"]["CHARSET"],
      )
      ​
      class MySQLPool(object):
          # 类属性
          pool = PooledDB(creator=pymysql,**mysql_conf) # 注意creator参数
      ​
          def __enter__(self):
              self.conn = MySQLPool.pool.connection()
              self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
              return self
      ​
          def __exit__(self, exc_type, exc_val, exc_tb):
              # 关闭链接
              self.cursor.close()
              self.conn.close()
      ​
      # 自定义异常类
      class MyException(BaseException):
          def __init__(self,msg):
              super().__init__()
              self.msg = msg
      ​
          def __str__(self):
              return self.msg
      ​
      ## 装饰器方式执行 —— 执行事物是出现异常的测试
      def mysql_wrapper(func):
          def wrapper(*args,**kwargs):
              with MySQLPool() as db:
                  result = func(db,*args,**kwargs)
              # return
              return result
          return wrapper
      ​
      flag = True
      ​
      @mysql_wrapper
      def func2(db,*args,**kwargs):
          # 执行事物
          try:
              # 添加一条数据
              sql_insert = """insert into test_table(name,age) values(%s,%s)"""
              # 修改一条数据
              sql_update = """ update test_table set age=23 where name='whw' """
              db.cursor.execute(sql_insert,["whw123",20])
              db.cursor.execute(sql_update)
              # 这里做一个逻辑判断 —— 可根据具体的业务而定
              if flag:
                  raise MyException("执行事物是发生异常!")
          except MyException as e:
              # 回滚
              db.conn.rollback()
              print(f"事物执行失败:{e}")
          except Exception as e:
              # 回滚
              db.conn.rollback()
              print(f"事物执行失败:{e}")
          else:
              # 提交事物
              db.conn.commit()
              print("事物执行成功:",db.cursor.rowcount)
      # func2
      func2()
      

     

     

    参考文章:

    https://www.cnblogs.com/paulwhw/articles/12498430.html?share_from=com.rae.cnblogs

     

     

  • 相关阅读:
    notepad++中使用正则表达式
    python 使用多线程
    EMS查看及修改邮箱发送和接受邮件大小的方法
    EMS修改邮箱容量限制的方法
    EMS查看邮箱容量限制的方法
    EMS批量为用户分配邮箱
    EMS已有用户分配邮箱方法
    EMS创建独立新用户并分配邮箱
    【转载】【zabbix】自定义监控项key值
    Centos6.9 安装zabbix3.4 过程
  • 原文地址:https://www.cnblogs.com/double-W/p/12508803.html
Copyright © 2011-2022 走看看