import pymssql host = '127.0.0.1' user = 'sa' password = '******' database = 'WMS' # 数据库连接 def db_connect(): # 打开数据库连接 db = pymssql.connect(host, user, password, database) # 使用 cursor()方法创建一个游标对象 cursor cursor = db.cursor() # 使用 execute()方法执行 SQL 查询 cursor.execute('select @@version') # 使用 fetchone()方法获取单条数据 data = cursor.fetchone() print(f'Database version: {data[0]}') # 关闭数据库连接,此时自动关闭多有游标对象 db.close() # 创建数据库表 def create_table(): # 打开数据库连接 db = pymssql.connect(host, user, password, database) # 使用 cursor()方法创建一个游标对象 cursor cursor = db.cursor() # 使用 execute()方法执行 SQL,如果表存在就删除 cursor.execute('drop table if exists teacher') # 使用预处理语句创建表 sql = '''create table teacher( id int primary key, name varchar(20) not null, sex varchar(2) not null, age int not null)''' try: cursor.execute(sql) # 提交到数据库执行 db.commit() print('CREATE TABLE SUCCESS.') # 捕获与数据库相关的错误 except pymssql.Error as err: print(f'CREATE TABLE FAILED, CASE:{err}') # 如果发生错误就回滚 db.rollback() finally: # 关闭数据库连接 db.close() # 删除数据库表 def drop_table(): # 打开数据库连接 db = pymssql.connect(host, user, password, database) # 使用 cursor()方法创建一个游标对象 cursor cursor = db.cursor() # 使用预处理语句删除表 sql = 'drop table teacher' try: cursor.execute(sql) # 提交到数据库执行 db.commit() print('DROP TABLE SUCCESS.') # 捕获与数据库相关的错误 except pymssql.Error as err: print(f'DROP TABLE FAILED, CASE:{err}') # 如果发生错误就回滚 db.rollback() finally: # 关闭数据库连接 db.close() # 数据库插入 def insert_record(): # 打开数据库连接 db = pymssql.connect(host, user, password, database) # 使用 cursor()方法创建一个游标对象 cursor cursor = db.cursor() # SQL 插入语句 sql = "insert information values(1001, '小明', '男', 18)" try: # 执行 SQL 语句 cursor.execute(sql) # 提交到数据库执行 db.commit() print('INSERT SUCCESS.') # 捕获与数据库相关的错误 except pymssql.Error as err: print(f'INSERT FAILED, CASE:{err}') # 如果发生错误就回滚 db.rollback() finally: # 关闭数据库连接 db.close() # 数据库删除 def delete_record(): # 打开数据库连接 db = pymssql.connect(host, user, password, database) # 使用 cursor()方法创建一个游标对象 cursor cursor = db.cursor() # SQL 删除语句 sql = "delete from information where sex='女'" try: # 执行 SQL 语句 cursor.execute(sql) # 提交到数据库执行 db.commit() print('DELETE SUCCESS.') # 捕获与数据库相关的错误 except pymssql.Error as err: print(f'DELETE FAILED, CASE:{err}') # 如果发生错误就回滚 db.rollback() finally: # 关闭数据库连接 db.close() # 数据库更新 def update_record(): # 打开数据库连接 db = pymssql.connect(host, user, password, database) # 使用 cursor()方法创建一个游标对象 cursor cursor = db.cursor() # SQL 更新语句 sql = "update information set age=25 where name='小明'" try: # 执行 SQL 语句 cursor.execute(sql) # 提交到数据库执行 db.commit() print('UPDATE SUCCESS.') # 捕获与数据库相关的错误 except pymssql.Error as err: print(f'UPDATE FAILED, CASE:{err}') # 如果发生错误就回滚 db.rollback() finally: # 关闭数据库连接 db.close() # 数据库查询 def query_record(): # 打开数据库连接 db = pymssql.connect(host, user, password, database, charset='utf8') # 使用 cursor()方法创建一个游标对象 cursor cursor = db.cursor() # SQL 查询语句 sql = 'select * from information' try: # 执行 SQL 语句 cursor.execute(sql) ''' fetchone():该方法获取下一个查询结果集,结果集是一个对象。 fetchall():接收全部返回结果行。 rowcount:返回执行 execute()方法后影响的行数 ''' # 获取所有记录列表 results = cursor.fetchall() for row in results: num = row[0] # 解决中文乱码问题 name = row[1].encode('latin-1').decode('gbk') sex = row[2].encode('latin-1').decode('gbk') age = row[3] # 输出结果 print(f'id:{num}, name:{name}, sex:{sex}, age:{age}') # 捕获与数据库相关的错误 except pymssql.Error as err: print(f'QUERY MySQL table FAILED, CASE:{err}') finally: # 关闭数据库连接 db.close() if __name__ == '__main__': db_connect()