zoukankan      html  css  js  c++  java
  • 测试平台系列(79) 编写Redis配置功能(下)

    大家好~我是米洛

    我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的完整教程,希望大家多多支持。

    欢迎关注我的公众号米洛的测开日记,获取最新文章教程!

    回顾

    上一节我们提出了优化Dao逻辑的想法,那今天就试着来兑现之,并运用到Redis配置管理的开发中去。

    初步构思list方法

    我们在dao/init.py新建类: Mapper,以后所有的dao类都继承自它。

    想想list需要什么,一般需要,字段参数是like还是等于这3个重要的信息。

    明白这个以后,我们的伪代码就好编写了:

    # 1. 获取session
    async with async_session() as session:
      condition = [model.deleted_at == 0]
      # 2. 根据参数里的字段,字段值构造查询条件
      DatabaseHelper.where(字段,字段值,condition)
      # 3. 查询出结果,后续与原来的方式一致,就不写了
    

    可以看到,这边除了需要上面的3个信息以外,还需要try去写入日志,也就是说需要我们平时经常创建的log成员变量,还需要model这个类,否则你不知道改的是什么表。

    思考

    我们平时的参数,都是key-value的形式,一旦value不为空,那么说明我们要根据这个条件来查询。

    而model和log,我们可以通过类的装饰器,由子类传递给父类(这里会用到setattr和getattr)

    class Mapper(object):
        log = None
        model = None
    
        @classmethod
        async def list_data(cls, **kwargs):
            try:
                async with async_session() as session:
                    # 构造查询条件,默认是数据未被删除
                    condition = [getattr(cls.model, "deleted_at") == 0]
                    # 遍历参数,当参数不为None的时候传递
                    for k, v in kwargs.items():
                        # 判断是否是like的情况 TODO: 这里没支持in查询
                        like = v is not None and len(v) > 2 and v.startswith("%") and v.endswith("%")
                        # 如果是like模式,则使用Model.字段.like 否则用 Model.字段 等于
                        DatabaseHelper.where(v, getattr(cls.model, k).like(v) if like else getattr(cls.model, k) == v,
                                             condition)
                    result = await session.execute(select(cls.model).where(*condition))
                    return result.scalars().all()
            except Exception as e:
                # 这边调用cls本身的log参数,写入日志+抛出异常
                cls.log.error(f"获取{cls.model}列表失败, error: {e}")
                raise Exception(f"获取数据失败")
    

    注释写的比较详细,由于现在字段是个变量,所以我们不能用model.字段来取值,所以取而代之的是getattr(model, 字段)。不熟悉的朋友可以去搜索下getattr

    这样,一个粗略的list方法就写好了,但是这个是不带分页的,所以我们还需要补充一个分页的模式,其实也很简单。

        @classmethod
        async def list_data_with_pagination(cls, page, size, **kwargs):
            try:
                async with async_session() as session:
                    condition = [getattr(cls.model, "deleted_at") == 0]
                    for k, v in kwargs.items():
                        like = v is not None and len(v) > 2 and v.startswith("%") and v.endswith("%")
                        sql = DatabaseHelper.where(v, getattr(cls.model, k).like(v) if like else getattr(cls.model, k) == v,
                                                   condition)
                    return await DatabaseHelper.pagination(page, size, session, sql)
            except Exception as e:
                cls.log.error(f"获取{cls.model}列表失败, error: {e}")
                raise Exception(f"获取数据失败")
    

    基本上长的差不多,只是最后返回那里,调用了pagination相关方法。

    看看dao装饰器

    这也是为什么要用classmethod而不是staticmethod的原因

    我们很坏,把这个装饰器套到子类上,并把model和log传给父类。毕竟方法都是在调用父类的方法,父类无法直接拿到子类的数据

    用的话,就是把model和log传入dao参数

    这样就避免了在调用list方法的时候,还需要传入model和log的尴尬情况。

    完善其他方法

    list搞定以后,其他的还会远吗?但还真的好像还有点问题,因为我们一般会有一些重名判断,但没关系,我们可以编写一个query方法。

        @classmethod
        def query_wrapper(cls, **kwargs):
            condition = [getattr(cls.model, "deleted_at") == 0]
            # 遍历参数,当参数不为None的时候传递
            for k, v in kwargs.items():
                # 判断是否是like的情况 TODO: 这里没支持in查询
                like = v is not None and len(v) > 2 and v.startswith("%") and v.endswith("%")
                # 如果是like模式,则使用Model.字段.like 否则用 Model.字段 等于
                DatabaseHelper.where(v, getattr(cls.model, k).like(v) if like else getattr(cls.model, k) == v,
                                     condition)
            return select(cls.model).where(*condition)
    
        @classmethod
        async def query_record(cls, **kwargs):
            try:
                async with async_session() as session:
                    sql = cls.query_wrapper(**kwargs)
                    result = await session.execute(sql)
                    return result.scalars().first()
            except Exception as e:
                cls.log.error(f"查询{cls.model}失败, error: {e}")
                raise Exception(f"查询数据失败")
    

    由于查询方法太过于通用,所以抽成了query_wrapper方法。

    • 正常编写insert接口
        @classmethod
        async def insert_record(cls, model):
            try:
                async with async_session() as session:
                    async with session.begin():
                        session.add(model)
                        await session.flush()
                        session.expunge(model)
                        return model
            except Exception as e:
                cls.log.error(f"添加{cls.model}记录失败, error: {e}")
                raise Exception(f"添加记录失败")
    

    这边返回了model,如果不需要也可以不用,但咱还是给返回。

    • insert接口层

    和以前一样,先查,如果没有再关掉。

    但这样会产生2个session,开->关->开->关

    但也解耦了查询和插入2个操作。

    • 编写删除和修改方法
        @classmethod
        async def update_record_by_id(cls, user, model, not_null=False):
            try:
                async with async_session() as session:
                    async with session.begin():
                        query = cls.query_wrapper(id=model.id)
                        result = await session.execute(query)
                        original = result.scalars().first()
                        if original is None:
                            raise Exception("记录不存在")
                        DatabaseHelper.update_model(original, model, user, not_null)
                        await session.flush()
                        session.expunge(original)
                        return original
            except Exception as e:
                cls.log.error(f"更新{cls.model}记录失败, error: {e}")
                raise Exception(f"更新记录失败")
    
        @classmethod
        async def delete_record_by_id(cls, user, id):
            """
            逻辑删除
            :param user:
            :param id:
            :return:
            """
            try:
                async with async_session() as session:
                    async with session.begin():
                        query = cls.query_wrapper(id=id)
                        result = await session.execute(query)
                        original = result.scalars().first()
                        if original is None:
                            raise Exception("记录不存在")
                        DatabaseHelper.delete_model(original, user)
            except Exception as e:
                cls.log.error(f"删除{cls.model}记录失败, error: {e}")
                raise Exception(f"删除记录失败")
    
        @classmethod
        async def delete_by_id(cls, id):
            """
            物理删除
            :param id:
            :return:
            """
            try:
                async with async_session() as session:
                    async with session.begin():
                        query = cls.query_wrapper(id=id)
                        result = await session.execute(query)
                        original = result.scalars().first()
                        if original is None:
                            raise Exception("记录不存在")
                        session.delete(original)
            except Exception as e:
                cls.log.error(f"逻辑删除{cls.model}记录失败, error: {e}")
                raise Exception(f"删除记录失败")
    

    删除这边支持了物理删除和逻辑删除,当然我们一般是用软删除

    完善其他接口

    可以看到删除和修改和以前是差不多的(也是内部进行数据是否存在判断),这些步骤做完。redis的管理工作就可以顺利进行了,接着我们需要为之编写页面咯!

    由于是很基础的表格页面,所以我们不赘述。下一节我们会利用配置的redis连接编写redisManager,管理我们的连接数据,为之后在线执行redis以及将它作为前置条件打下基础。

  • 相关阅读:
    LR11
    安装Nginx+uWSGI+Django环境
    MYSQL 安装更新,使用,管理,备份和安全等
    oracle里要查看一条sql的执行情况,有没有走到索引,怎么看?索引不能提高效率?
    大神:python怎么爬取js的页面
    Sublime 编译出来的是 dos格式,不是unix格式
    前天搞了一天?昨天搞了一天?今天搞了半小时
    搞了一宿,弄完了一个POP3协议
    我竟然。。。傻了近一年
    跨进程信息交互真个费事。
  • 原文地址:https://www.cnblogs.com/we8fans/p/15588682.html
Copyright © 2011-2022 走看看