项目背景:
之前在做项目的时候,由于是采用微服务架构,所有服务通信使用 Redis 作为数据交互层。需求是不同类型的数据存储在不同的数据库中,不同的数据库就需要动态切换 Redis 数据库。
Redis 默认有 16 个数据库,index(值范围:0~15),默认 index=0。切换数据库命令为:select index。
但是呢,flask_redis 却没有这个功能。为什么会没有呢?因为 redis-py 就没有。flask_redis 是基于 redis-py 进行二次封装的库,redis-py 为了保证 Redis 实例的线程安全,没有实现 SELECT 指令。如果有不明白的欢迎扣扣裙询问哦(++六零六九一六八三一)
Redis
客户端实例可以安全地在线程间共享。从内部实现来说,只有在命令执行时才获取连接实例,完成后直接返回连接池,命令永不修改客户端实例的状态。但是,有一点需要注意:SELECT
命令。SELECT
命令允许切换当前连接使用的数据库。新的数据库保持被选中状态,直到选中另一个数据库或连接关闭。这会导致在返回连接池时,连接可能指定了别的数据库。因此,redis-py
没有在客户端实例中实现 SELECT 命令。如果要在同一个应用中使用多个 Redis
数据库,应该给第一个数据库创建独立的客户端实例(可能也需要独立的连接池)。
flask_redis 既然不能动态切换数据库,那我们就从根源入手——使用前都重新连接并指定数据库。
感觉是挺暴力的,其实,这是官方建议的做法。自己动手丰衣足食,接下来我们就来封装一个支持动态切换数据库的 Redis,需要满足用户在使用的时候无感知。
废话不多说,直接上代码:
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import copy import datetime from redis import StrictRedis, ConnectionPool class RedisLib(object): def __init__(self, redis_db, redis_url, blacklist=None): self.redis = None self.redis_db = redis_db self.redis_url = redis_url self.blacklist = blacklist self.blacklist_data = None def select(self, db): url = '%s/%s' % (self.redis_url, db.split('db')[1]) pool = ConnectionPool.from_url(url=url, decode_responses=True) self.redis = StrictRedis(connection_pool=pool) def push_redis(self, db, data): def handle_data(): self.blacklist_data = [value for value in map( lambda index: data.pop(index) if data.get(index) else None, self.blacklist)] key = '%s:%s' % (self.redis_db[db], data['id']) for k, v in data.items(): self.redis.hset(key, k, v.strftime("%Y-%m-%d %H:%M:%S") if isinstance( v, datetime.datetime) else (v if v else '')) self.select(db) if isinstance(data, list): for obj in data: data = copy.deepcopy(obj.__dict__) handle_data() else: data = copy.deepcopy(data.__dict__) handle_data() def pull_redis(self, db, _id=None): self.select(db) key = '%s:%s' % (self.redis_db[db], _id if _id else '') if _id is None: data = self.redis.dbsize() elif _id == 'key': data = self.redis.keys() elif _id == '*': data = [self.redis.hgetall(key) for key in self.redis.keys()] else: data = self.redis.hgetall(key) return data def __del__(self): self.redis.connection_pool.disconnect()
以上代码就是实现了新增数据、获取数据、切换数据库的功能,在实例化的时候需要进行配置。虽然可以使用了,但是并不友好,因为我每次都得需要指定数据库,那有没有什么方法可以连数据库指定都不用呢?
当然有的啦!
那就是再写一个[狗头]哈哈!
class HandleQueue(RedisLib): def __init__(self): self.redis_db = { 'db0': None, 'db1': 'oss:aop:user', 'db9': 'oss:aop:role' } self.redis_url = 'redis://127.0.0.1:6379' # Flask app config from redis url # from app import app # self.redis_url = app.config['REDIS_URL'] self.blacklist = ['_sa_instance_state', 'version', 'status'] RedisLib.__init__(self, self.redis_db, self.redis_url, self.blacklist) def set_user_data(self, data): return self.push_redis('db1', data) def get_user_data(self, user_id=None): return self.pull_redis('db1', user_id) def set_role_data(self, data): return self.push_redis('db9', data) def get_role_data(self, dict_id=None): return self.pull_redis('db9', dict_id) if __name__ == '__main__': handle_queue = HandleQueue() return_value = handle_queue.get_user_data(1) print(return_value)
运行结果:
{'username': '极客点儿', 'role_id': '2', 'phone': '', 'account': 'GeekDot', 'mail': '', 'update_time': '2020-09-22 15:52:32', 'create_time': '2020-08-13 16:51:51', 'token': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MDA4NDc1NTIsImlhdCI6MTYwMDc2MTE1MiwiaXNzIjoiMjF2aWFuZXQiLCJkYXRhIjp7ImlkIjoxfX0.fEAfuRC3tvA-qv5j6Xjbqu9W8ksY4PZwYwFoI3ST3xU', 'number': '', 'id': '1'}
通过类继承把切换数据库等基本功都保留,然后通过构造函数配置数据库,最后用到哪些数据和数据库就写一个很简单的方法即可。
因为我们是 Flask 项目,所以要使用 Flask 的配置文件,方法如下,也很简单。
# Flask app config from redis url
# from app import app
# self.redis_url = app.config['REDIS_URL']
因为项目组中用的是 ORM 所以 push_redis 现在只能接受 ORM 对象。如果想支持其他对象,可以自己改写,也简单。
最后别忘了使用析构函数将数据库关闭!以上为部分代码,需要源码的可以来扣扣裙,群里有免费的源码和python学习资料。