zoukankan      html  css  js  c++  java
  • Python常用功能函数系列总结(四)之数据库操作

    本节目录

    常用函数一:redis操作

    常用函数二:mongodb操作

    常用函数三:数据库连接池操作

    常用函数四:pandas连接数据库

    常用函数一:redis操作

    # -*- coding: utf-8 -*-
    
    """
    Datetime: 2020/07/06
    Author: Zhang Yafei
    Description: 
    """
    import redis
    
    
    def get_redis_conn():
        conn = redis.Redis(host='127.0.0.1', port=6379)
        return conn
    
    
    def get_redis_conn_pool():
        pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=1000)
        # max_connection最多创建1000个连接
        conn = redis.Redis(connection_pool=pool)
        return  conn
    
    
    def redis_string_practice():
        conn = get_redis_conn_pool()
        # 添加
        conn.set('str_k', 'hello')  # 为指定key设置value
        # {'str_k':'hello'}
        conn.mset({'str_k': 'hello', 'str_k1': 'world'})  # 设置多个key/value
        # {'str_k':'hello', 'str_k1':'world'}
        conn.msetnx({'str_k': 'msetnx_hello'})  # 若当前key未设定, 则基于mapping设置key/value,结果返回True或False
        # {'str_k':'hello'}
        conn.setex('str_k2', 'str_v2', 2)  # 秒
    
        conn.decr('num', amount=1)
        conn.incr('num', amount=1)
        conn.incrbyfloat('num', amount='1.5')
    
        # 删除
        conn.delete('str_k1')
    
        # 修改
        conn.append('str_k', ' world')  # 为指定key添加value
        # {'str_k':'hello world'}
        conn.setrange('str_k', 5, 'world')  # 在key对应的的value指定位置上设置值
        # b'helloworld'
    
        # 查询
        print(conn.get('str_k'))
        print(conn.get('num'))
        print(conn.getrange('str_k', 0, 100))
        print(conn.keys())
        print(conn.strlen('str_k'))  # 长度
        print(conn.exists('str_k'))
        conn.expire('str_k1', 5)
        print(conn.get('str_k1'))
    
        # 添加并查询
        print(conn.getset('str_k2', 'str_v2'))
        # b'str_v2'
    
    
    def redis_dict_practice():
        """
        redis dict
        redis = {
            k4:{
            'username': 'zhangyafei',
            'age': 23,
            }
        }
        """
        conn = get_redis_conn_pool()
        # 1. 创建字典
        conn.hset('k4','username','zhangyafei')
        conn.hset('k4','age',23)
        conn.hsetnx('k4','username','root')  # 若key不存在则将value赋值给key, 如果赋值成功则返回1,否则返回0
        conn.hsetnx('k4', 'hobby', 'basketball')
        conn.hmset('k4',{'username':'zhangyafei','age':23})
    
        # 2. 获取字典的值
        # 获取一个值
        val = conn.hget('k4', 'username')  # b'zhangyafei'
        # print(val)
        # 获取多个值
        vals = conn.mget('k4', ['username','age'])
        vals = conn.mget('k4', 'username','age')    # {b'username': b'zhangyafei', b'age': b'23'}
        # 获取所有值
        vals = conn.hgetall('k4')  # {b'username': b'zhangyafei', b'age': b'23'}
        print(vals)
        # 获取长度
        lens = conn.hlen('k4')  # 2
        str_lens = conn.hstrlen('k4', 'username')  # 10
        keys = conn.hkeys('k4')  # [b'username', b'age']
        values = conn.hvals('k4')  # [b'zhangyafei', b'23']
        judge = conn.hexists('k4', 'username')  # True
        # conn.hdel('k4', 'age', 'username')
        # print(conn.hkeys('k4')) # []
    
        # 计算器
        # print(conn.hget('k4', 'age'))
        # conn.hincrby('k4','age',amount=2)
        # conn.hincrbyfloat('k4','age',amount=-1.5)
        # print(conn.hget('k4', 'age'))
    
        # 问题:如果redis的k4对应的字典中有1000w条数据,请打印所有数据
        # 不可取:redis取到数据之后,服务器内存无法承受,爆栈
        # result = conn.hgetall('k4')
        # print(result)
    
        for item in conn.hscan_iter('k4'):
            print(item)
    
    
    def redis_list_practice():
        """
        redis list
        redis = {
            k1: [1,2,3,]
        }
        """
        conn = get_redis_conn_pool()
        # 左插入
        conn.lpush('k1', 11)
    
        conn.lpush('k1', 22)
        # 右插入
        conn.rpush('k1', 33)
    
        # 左获取
        val = conn.lpop('k1')
        val = conn.blpop('k1', timeout=10) # 夯住
        # 右获取
        val = conn.rpop('k1')
        val = conn.brpop('k1', timeout=10) # 夯住
    
        conn.lpush('k1',*[12,3,1,21,21,1,212,11,1,1,1,2,2,34,5,5,5])
    
        def list_iter(key, count=3):
            index = 0
            while True:
                data_list = conn.lrange(key, index, index + count - 1)
                if not data_list:
                    return
                index += count
    
                for item in data_list:
                    yield item
    
        result = conn.lrange('k1', 0, 100)
        print(result)  # [b'22', b'11', b'33']
    
        for item in list_iter('k1', 3):
            print(item)
    
    
    def redis_pipeline_practice():
        """
        pipeline:管道,也即事务。一次放多个值,一次执行所有管道中的操作,要么全部成功,要么全部失
        """
        conn = get_redis_conn_pool()
        pipe = conn.pipeline(transaction=True)
        pipe.multi()
    
        pipe.set('k2', '123')
        pipe.hset('k3', 'n1', 666)
        pipe.lpush('k4', 'laonanhai')
    
        pipe.execute()
    
    def redis_set_practice():
        """
        {
            'set_k':{v1,v2,v3},
        }
        """
        conn = get_redis_conn_pool()
        # 添加
        conn.sadd('set_k', 3, 4, 5, 6)
        conn.sadd('set_k1', 3, 4, 5, 6)
    
        # 删除
        print(conn.spop('set_k'))
        conn.srem('set_k', 2)
    
        # 修改
        conn.smove('set_k', 'set_k1', 1)
    
        # 查询
        print(conn.smembers('set_k'))
        print(conn.smembers('set_k1'))
        print(conn.srandmember('set_k', 3))
        print(conn.scard('set_k'))
        print(conn.sismember('set_k', 2))
    
        print(conn.sdiff('set_k', 'set_k1'))  # 集合之差
        conn.sdiffstore('set_k_k1', 'set_k', 'set_k1')
        print(conn.smembers('set_k_k1'))
    
        print(conn.sinter('set_k', 'set_k1'))  # 集合交集
        conn.sinterstore('set_k_k1_inter', 'set_k', 'set_k1')
        print(conn.smembers('set_k_k1_inter'))
    
        print(conn.sunion('set_k', 'set_k1'))  # 集合并集
        conn.sunionstore('set_k_k1_union', 'set_k', 'set_k1')
        print(conn.smembers('set_k_k1_union'))
    
    
    def redis_zset_practice():
        """
        {
            'set_k':{
                {v1: score1},
                {v2: score2},
                {v3: score3},
            },
        }
        """
        conn = get_redis_conn_pool()
        # # 添加
        # conn.zadd('zset_k', 'math', 99, 'english', 80, 'chinese', 85, 'sport', 100, 'music', 60)
        #
        # # 删除
        # conn.zrem('zset_k', 'music')
        # conn.zremrangebyrank('zset_k', 0, 0)  # 按等级大小删除, 删除等级在第min-max个值
        # conn.zremrangebyscore('zset_k', 0, 90)   # 按分数范围删除, Min < x < max之间的删除
    
        # 查询
        print(conn.zrange('zset_k', 0, 100))
        print(conn.zrevrange('zset_k', 0, 100))
        # score从小到大排序, 默认小值先出, 广度优先
        results = conn.zrangebyscore('zset_k', 0, 100)
        print(results)
        print(conn.zcard('zset_k'))
        print(conn.zcount('zset_k', 0, 90))
        print(conn.zrank('zset_k', 'chinese'))
        print(conn.zscore('zset_k', 'chinese'))
        print(conn.zrange('zset_k', 0, 100))
    
    
    if __name__ == '__main__':
        redis_string_practice()

    常用函数二:mongodb操作

    import json
    import pymongo
    import pandas as pd
    
    
    class MongoPipeline(object):
        """
        mongodb:
            save(self, data, collection):                    将数据保存到数据库
            read(self, data):                                读取数据库中指定表格
            insert(self, table, dict_data):                 插入数据
            delete(self, table, condition):                 删除指定数据
            update(self, table, condition, new_dict_data):  更新指定数据
            dbFind(self, table, condition=None):             按条件查找
            findAll(self, table):                           查找全部
            close(self):                                    关闭连接
        """
    
        def __init__(self, mongo_db, mongo_uri='localhost'):
            self.mongo_uri = mongo_uri
            self.mongo_db = mongo_db
            self.client = pymongo.MongoClient(self.mongo_uri)
            self.db = self.client[self.mongo_db]
    
        def close(self):
            """
            关闭连接
            :return:
            """
            self.client.close()
    
        def save(self, data, collection):
            """
            将数据保存到数据库表
            :param data:
            :param collection:
            :return: None
            """
            self.collection = self.db[collection]
            try:
                if self.collection.insert(json.loads(data.T.to_json()).values()):
                    print('mongodb insert {} sucess.'.format(collection))
                    return
            except Exception as e:
                print('insert error:', e)
                import traceback
                traceback.print_exc(e)
    
        def read(self, table):
            """
            读取数据库中的数据
            :param table:
            :return: dataframe
            """
            try:
                # 连接数据库
                table = self.db[table]
                # 读取数据
                data = pd.DataFrame(list(table.find()))
                return data
            except Exception as e:
                import traceback
                traceback.print_exc(e)
    
        def insert(self, table, dict_data):
            """
            插入
            :param table:
            :param dict_data:
            :return: None
            """
            try:
                self.db[table].insert(dict_data)
                print("插入成功")
            except Exception as e:
                print(e)
    
        def update(self,table, condition, new_dict_data):
            """
            更新
            :param table:
            :param dict_data:
            :param new_dict_data:
            :return: None
            """
            try:
                self.db[table].update(condition, new_dict_data)
                print("更新成功")
            except Exception as e:
                print(e)
    
        def delete(self,table, condition):
            """
            删除
            :param table:
            :param dict_data:
            :return: None
            """
            try:
                self.db[table].remove(condition)
                print("删除成功")
            except Exception as e:
                print(e)
    
        def dbFind(self, table, condition=None):
            """
            按条件查找
            :param table:
            :param dict_data:
            :return: generator dict
            """
            data = self.db[table].find(condition)
            for item in data:
                yield item
    
        def findAll(self, table):
            """
            查找全部
            :param table:
            :return: generator dict
            """
            for item in self.db[table].find():
                yield item
    
    
    if __name__ == '__main__':
        mongo = MongoPipeline('flask')
        # data = mongo.read('label')
        # print(data.head())
        condition = {"药品ID": 509881}
        data = mongo.dbFind('label', condition)
        print(data)
        for i in data:
            print(i)
        # mongo.findAll()

    常用操作三:数据连接池操作

    # -*- coding: utf-8 -*-
    
    """
    Datetime: 2020/07/06
    Author: Zhang Yafei
    Description: 
    """
    from DBUtils.PooledDB import PooledDB
    
    
    class DBPoolHelper(object):
        def __init__(self, dbname, user=None, password=None, db_type='postgressql', host='localhost', port=5432):
            """
            # sqlite3
            # 连接数据库文件名,sqlite不支持加密,不使用用户名和密码
            import sqlite3
            config = {"datanase": "path/to/your/dbname.db"}
            pool = PooledDB(sqlite3, maxcached=50, maxconnections=1000, maxusage=1000, **config)
            # mysql
            import pymysql
            pool = PooledDB(pymysql,5,host='localhost', user='root',passwd='pwd',db='myDB',port=3306) #5为连接池里的最少连接数
            # postgressql
            import psycopg2
            POOL = PooledDB(creator=psycopg2, host="127.0.0.1", port="5342", user, password, database)
            # sqlserver
            import pymssql
            pool = PooledDB(creator=pymssql, host=host, port=port, user=user, password=password, database=database, charset="utf8")
            :param type:
            """
            if db_type == 'postgressql':
                import psycopg2
                pool = PooledDB(creator=psycopg2, host=host, port=port, user=user, password=password, database=dbname)
            elif db_type == 'mysql':
                import pymysql
                pool = PooledDB(pymysql, 5, host='localhost', user='root', passwd='pwd', db='myDB',
                                port=3306)  # 5为连接池里的最少连接数
            elif db_type == 'sqlite':
                import sqlite3
                config = {"database": dbname}
                pool = PooledDB(sqlite3, maxcached=50, maxconnections=1000, maxusage=1000, **config)
            else:
                raise Exception('请输入正确的数据库类型, db_type="postgresql" or db_type="mysql" or db_type="sqlite"')
            self.conn = pool.connection()
            self.cursor = self.conn.cursor()
    
        def __connect_close(self):
            """关闭连接"""
            self.cursor.close()
            self.conn.close()
    
        def execute(self, sql, params=tuple()):
            self.cursor.execute(sql, params)  # 执行这个语句
            self.conn.commit()
    
        def execute_many(self, sql, params=tuple()):
            self.cursor.executemany(sql, params)
            self.conn.commit()
    
        def fetchone(self, sql, params=tuple()):
            self.cursor.execute(sql, params)
            data = self.cursor.fetchone()
            return data
    
        def fetchall(self, sql, params=tuple()):
            self.cursor.execute(sql, params)
            data = self.cursor.fetchall()
            return data
    
        def __del__(self):
            print("dbclass del ----------------")
            self.__connect_close()

    常用操作四:pandas连接数据库

    # -*- coding: utf-8 -*-
    
    """
    Datetime: 2020/07/06
    Author: Zhang Yafei
    Description: pandas连接数据库
    """
    from sqlalchemy import create_engine
    from pandas import read_sql
    
    
    def pandas_db_helper():
        """
        'postgresql://postgres:0000@127.0.0.1:5432/xiaomuchong'
        "mysql+pymysql://root:0000@127.0.0.1:3306/srld?charset=utf8mb4"
        "sqlite: ///sqlite3.db"
        """
        engine = create_engine("sqlite:///sqlite3.db")
        conn = engine.connect()
        return conn
    
    
    if __name__ == '__main__':
        conn = pandas_db_helper()
        data = read_sql("select * from articles", con=conn)
        print(data.info())
  • 相关阅读:
    jquery+flask+keras+nsfw快速搭建一个简易鉴黄工具
    基于LSTM + keras 的诗歌生成器
    Bilateral Multi-Perspective Matching for Natural Language Sentences---读书笔记
    text matching(文本匹配) 相关资料总结
    redis实战---读书笔记
    effective java(第三版)---读书笔记
    深入理解java虚拟机---读书笔记
    这就是搜索引擎(核心技术讲解)---读书笔记
    google nmt 实验踩坑记录
    python打包到pypi小结
  • 原文地址:https://www.cnblogs.com/zhangyafei/p/13253773.html
Copyright © 2011-2022 走看看