zoukankan      html  css  js  c++  java
  • pymysql的使用

    pymysql和MySqlDB的用法,大同小异,官方文档也只是给个简单的例子,看一看很容易明白:

    CREATE TABLE `users` (
        `id` int(11) NOT NULL AUTO_INCREMENT,
        `email` varchar(255) COLLATE utf8_bin NOT NULL,
        `password` varchar(255) COLLATE utf8_bin NOT NULL,
        PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin
    AUTO_INCREMENT=1 ;
    import pymysql.cursors
    
    # Connect to the database
    connection = pymysql.connect(host='localhost',
                                 user='user',
                                 password='passwd',
                                 db='db',
                                 charset='utf8mb4',
                                 cursorclass=pymysql.cursors.DictCursor)
    
    try:
        with connection.cursor() as cursor:
            # Create a new record
            sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
            cursor.execute(sql, ('webmaster@python.org', 'very-secret'))
    
        # connection is not autocommit by default. So you must commit to save
        # your changes.
        connection.commit()
    
        with connection.cursor() as cursor:
            # Read a single record
            sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s"
            cursor.execute(sql, ('webmaster@python.org',))
            result = cursor.fetchone()
            print(result)
    finally:
        connection.close()

     我自己总结了一段代码,可以在工程中使用:

    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    import pymysql.cursors
    from six import itervalues
    
    class mysqlCs(object):
        def __init__(self):
            try:
                self.conn = pymysql.connect(
                    host='localhost',
                    user='spider',
                    password='spiderPass',
                    port=5636,
                    db='spiderdatas',
                    charset='utf8mb4',
                    cursorclass=pymysql.cursors.DictCursor
                )
                self.cur = self.conn.cursor()
            except Exception as ex:
                exit(ex)
    
        def run(self, sql):
            try:
                effect_rows = self.cur.execute(sql)
                print("effect rows :", effect_rows)
                return self.cur.fetchall()
            except Exception as ex:
                print(ex)
    
        def tabExists(self, table):
            sql = "SELECT COLUMN_NAME from information_schema.`COLUMNS` where TABLE_NAME='%s'"% table
            try:
                effect_rows = self.cur.execute(sql)
                if effect_rows > 0: return self.cur.fetchall()
                return False
            except:
                return False
    
        def insertstr(self, table, data, seprater=","):
            if type(data) != str:
                exit("Must be str ...")
            try:
                sql = "INSERT INTO %s VALUES (%s)" % (table, ','.join(['%s']*(data.count(seprater) + 1)))
                effect_rows = self.cur.execute(sql, data.split(seprater))
                self.conn.commit()
                return effect_rows
            except Exception as e:
                print(e)
                return False
    
        def insertdic(self, table, dic):
            if type(dic) != dict:
                exit("Must be dict ...")
            try:
                __key = ','.join([i for i in dic.keys()])
                __cols = ','.join(['%s']*len(dic))
                sql = "INSERT INTO %s(%s) VALUES (%s)" % (table, __key, __cols)
                effect_rows = self.cur.execute(sql, list(itervalues(dic)))
                self.conn.commit()
                return effect_rows
            except Exception as e:
                print(e)
                return False
    
        def insertlist(self, table, datas):
            if type(datas) != list:
                exit("Must be list...")
            ret = self.tabExists(table)
            if ret:
                cols = [i['COLUMN_NAME'] for i in ret]
                if type(datas[0]) == list:
                    try:
                        sql = "INSERT INTO %s(%s) VALUES (%s)" % (table, ','.join(cols), ','.join(['%s']*len(cols)))
                        effect_rows = self.cur.executemany(sql, datas)
                        self.conn.commit()
                        return effect_rows
                    except Exception as ex:
                        self.conn.rollback()
                        print(ex)
                        return False
                elif type(datas[0]) == dict:
                    for d in datas:
                        self.insertdic(table, d)
                else:
                    try:
                        sql = "INSERT INTO %s VALUES (%s)" % (table, ','.join(['%s']*len(datas)))
                        effect_rows = self.cur.execute(sql, datas)
                        self.conn.commit()
                        return effect_rows
                    except Exception as ex:
                        print(ex)
                        return False
            else:
                exit("table %s not exists ..." % table)
    
    p = mysqlClient()
  • 相关阅读:
    Java实现 蓝桥杯VIP 算法训练 字符删除
    Java实现 蓝桥杯VIP 算法训练 字符删除
    Java实现 蓝桥杯VIP 算法训练 字符删除
    Java实现 蓝桥杯VIP 算法训练 字符删除
    Java实现 蓝桥杯VIP 算法训练 字符删除
    Java实现 蓝桥杯VIP 算法训练 字符串编辑
    Java实现 蓝桥杯VIP 算法训练 字符串编辑
    Java实现 蓝桥杯VIP 算法训练 字符串编辑
    Java实现 蓝桥杯VIP 算法训练 字符串编辑
    Java实现 蓝桥杯VIP 算法训练 字符串编辑
  • 原文地址:https://www.cnblogs.com/kongzhagen/p/8383751.html
Copyright © 2011-2022 走看看