zoukankan      html  css  js  c++  java
  • python3 Flask Redis 如何切换数据库

    项目背景:

    之前在做项目的时候,由于是采用微服务架构,所有服务通信使用 Redis 作为数据交互层。需求是不同类型的数据存储在不同的数据库中,不同的数据库就需要动态切换 Redis 数据库。

    Redis 默认有 16 个数据库,index(值范围:0~15),默认 index=0。切换数据库命令为:select index。

    但是呢,flask_redis 却没有这个功能。为什么会没有呢?因为 redis-py 就没有。flask_redis 是基于 redis-py 进行二次封装的库,redis-py 为了保证 Redis 实例的线程安全,没有实现 SELECT 指令。如果有不明白的欢迎扣扣裙询问哦(++六零六九一六八三一)

    Redis
    客户端实例可以安全地在线程间共享。从内部实现来说,只有在命令执行时才获取连接实例,完成后直接返回连接池,命令永不修改客户端实例的状态。但是,有一点需要注意:SELECT
    命令。SELECT
    命令允许切换当前连接使用的数据库。新的数据库保持被选中状态,直到选中另一个数据库或连接关闭。这会导致在返回连接池时,连接可能指定了别的数据库。因此,redis-py
    没有在客户端实例中实现 SELECT 命令。如果要在同一个应用中使用多个 Redis
    数据库,应该给第一个数据库创建独立的客户端实例(可能也需要独立的连接池)。

    解决方案

    flask_redis 既然不能动态切换数据库,那我们就从根源入手——使用前都重新连接并指定数据库。

    感觉是挺暴力的,其实,这是官方建议的做法。自己动手丰衣足食,接下来我们就来封装一个支持动态切换数据库的 Redis,需要满足用户在使用的时候无感知。

    废话不多说,直接上代码:

    #!/usr/bin/env python3
    # -*- coding: UTF-8 -*-
    
    import copy
    import datetime
    
    from redis import StrictRedis, ConnectionPool
    
    
    class RedisLib(object):
    
        def __init__(self, redis_db, redis_url, blacklist=None):
            self.redis = None
            self.redis_db = redis_db
            self.redis_url = redis_url
            self.blacklist = blacklist
            self.blacklist_data = None
    
        def select(self, db):
            url = '%s/%s' % (self.redis_url, db.split('db')[1])
            pool = ConnectionPool.from_url(url=url, decode_responses=True)
            self.redis = StrictRedis(connection_pool=pool)
    
        def push_redis(self, db, data):
    
            def handle_data():
    
                self.blacklist_data = [value for value in map(
                    lambda index: data.pop(index) if data.get(index) else None, self.blacklist)]
    
                key = '%s:%s' % (self.redis_db[db], data['id'])
    
                for k, v in data.items():
                    self.redis.hset(key, k, v.strftime("%Y-%m-%d %H:%M:%S") if isinstance(
                        v, datetime.datetime) else (v if v else ''))
    
            self.select(db)
    
            if isinstance(data, list):
                for obj in data:
                    data = copy.deepcopy(obj.__dict__)
                    handle_data()
            else:
                data = copy.deepcopy(data.__dict__)
                handle_data()
    
        def pull_redis(self, db, _id=None):
    
            self.select(db)
    
            key = '%s:%s' % (self.redis_db[db], _id if _id else '')
    
            if _id is None:
                data = self.redis.dbsize()
            elif _id == 'key':
                data = self.redis.keys()
            elif _id == '*':
                data = [self.redis.hgetall(key) for key in self.redis.keys()]
            else:
                data = self.redis.hgetall(key)
            return data
    
        def __del__(self):
          self.redis.connection_pool.disconnect()
    

      

    以上代码就是实现了新增数据、获取数据、切换数据库的功能,在实例化的时候需要进行配置。虽然可以使用了,但是并不友好,因为我每次都得需要指定数据库,那有没有什么方法可以连数据库指定都不用呢?

    当然有的啦!

    那就是再写一个[狗头]哈哈!

    class HandleQueue(RedisLib):
    
        def __init__(self):
    
            self.redis_db = {
                'db0': None,
                'db1': 'oss:aop:user',
                'db9': 'oss:aop:role'
            }
    
            self.redis_url = 'redis://127.0.0.1:6379'
    
            # Flask app config from redis url
            # from app import app
            # self.redis_url = app.config['REDIS_URL']
    
            self.blacklist = ['_sa_instance_state', 'version', 'status']
    
            RedisLib.__init__(self, self.redis_db, self.redis_url, self.blacklist)
    
        def set_user_data(self, data):
            return self.push_redis('db1', data)
    
        def get_user_data(self, user_id=None):
            return self.pull_redis('db1', user_id)
    
        def set_role_data(self, data):
            return self.push_redis('db9', data)
    
        def get_role_data(self, dict_id=None):
            return self.pull_redis('db9', dict_id)
    
    
    if __name__ == '__main__':
        handle_queue = HandleQueue()
        return_value = handle_queue.get_user_data(1)
        print(return_value)
    

      

    运行结果:

    {'username': '极客点儿', 'role_id': '2', 'phone': '', 'account': 'GeekDot', 'mail': '', 'update_time': '2020-09-22 15:52:32', 'create_time': '2020-08-13 16:51:51', 'token': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MDA4NDc1NTIsImlhdCI6MTYwMDc2MTE1MiwiaXNzIjoiMjF2aWFuZXQiLCJkYXRhIjp7ImlkIjoxfX0.fEAfuRC3tvA-qv5j6Xjbqu9W8ksY4PZwYwFoI3ST3xU', 'number': '', 'id': '1'}
    

    通过类继承把切换数据库等基本功都保留,然后通过构造函数配置数据库,最后用到哪些数据和数据库就写一个很简单的方法即可。

    因为我们是 Flask 项目,所以要使用 Flask 的配置文件,方法如下,也很简单。

    # Flask app config from redis url
    # from app import app
    # self.redis_url = app.config['REDIS_URL']
    

    因为项目组中用的是 ORM 所以 push_redis 现在只能接受 ORM 对象。如果想支持其他对象,可以自己改写,也简单。

    最后别忘了使用析构函数将数据库关闭!以上为部分代码,需要源码的可以来扣扣裙,群里有免费的源码和python学习资料。
    在这里插入图片描述

  • 相关阅读:
    华为数据之道03
    机器学习10讲 第二讲
    机器学习10讲 第一讲
    07.TensorFlow双向循环神经网络
    06.TensorFlow循环神经网络
    华为数据之道:华为数据治理2
    线性回归
    MySQL配置文件my.cnf的理解
    Linux怎么查看软件安装路径 查看mysql安装在哪
    hadoop-2.7.4-nodemanager无法启动问题解决方案
  • 原文地址:https://www.cnblogs.com/python-miao/p/14210192.html
Copyright © 2011-2022 走看看