zoukankan      html  css  js  c++  java
  • nonebot 源码阅读笔记

    前言

    nonebot 是一个 QQ 消息机器人框架,它的一些实现机制,值得参考。

    NoneBot

    初始化(配置加载)

    阅读 nonebot 文档,第一个示例如下:

    import nonebot
    
    if __name__ == '__main__':
        nonebot.init()
        nonebot.load_builtin_plugins()
        nonebot.run(host='127.0.0.1', port=8080)
    

    首先思考一下,要运行几个 QQ 机器人,肯定是要保存一些动态的数据的。但是从上面的示例看,我们并没有创建什么对象来保存动态数据,很简单的就直接调用 nontbot.run() 了。这说明动态的数据被隐藏在了 nonebot 内部。

    接下来详细分析这几行代码:

    第一步是 nonebot.init(),该方法源码如下:

    #  这个全局变量用于保存 NoneBot 对象
    _bot: Optional[NoneBot] = None
    
    def init(config_object: Optional[Any] = None) -> None:
        global _bot
        _bot = NoneBot(config_object)  # 通过传入的配置对象,构造 NoneBot 实例。
    
        if _bot.config.DEBUG:   # 根据是否 debug 模式,来配置日志级别
            logger.setLevel(logging.DEBUG)
        else:
            logger.setLevel(logging.INFO)
    
        # 在 websocket 启动前,先启动 scheduler(通过调用 quart 的 before_serving 装饰器)
        # 这实际上是将 _start_scheduler 包装成一个 coroutine,然后丢到 quart 的 before_serving_funcs 队列中去。
        _bot.server_app.before_serving(_start_scheduler)
    
    def _start_scheduler():
        if scheduler and not scheduler.running:  # 这个 scheduler 是使用的 apscheduler.schedulers.asyncio.AsyncIOScheduler
            scheduler.configure(_bot.config.APSCHEDULER_CONFIG)  # 配置 scheduler 参数,该参数可通过 `nonebot.init()` 配置
            scheduler.start()  # 启动 scheduler,用于定时任务(如定时发送消息、每隔一定时间执行某任务)
            logger.info('Scheduler started')
    

    可以看到,nonebot.init() 做了三件事:

    1. 通过传入的配置对象,构造 NoneBot 实例。该实例对用户不可见
    2. 配置日志级别
    3. 让 quart 在服务启动前,先启动 AsyncIOScheduler
      • AsyncIOScheduler 是一个异步 scheduler,这意味着它本身也会由 asyncio 的 eventloop 调度。它和 quart 是并发执行的。

    1. plugins 加载机制

    第二步是 nonebot.load_builtin_plugins(),直接加载了 nonebot 内置的插件。该函数来自 plugin.py

    class Plugin:
        __slots__ = ('module', 'name', 'usage')
    
        def __init__(self, module: Any,
                     name: Optional[str] = None,
                     usage: Optional[Any] = None):
            self.module = module  # 插件对象本身
            self.name = name   # 插件名称
            self.usage = usage  # 插件的 help 字符串
    
    # 和 `_bot` 类似的设计,用全局变量保存状态
    _plugins: Set[Plugin] = set()
    
    def load_plugin(module_name: str) -> bool:
        try:
            module = importlib.import_module(module_name)   #  通过模块名,动态 import 该模块
            name = getattr(module, '__plugin_name__', None)
            usage = getattr(module, '__plugin_usage__', None)  # 模块的全局变量
            _plugins.add(Plugin(module, name, usage))              # 将加载好的模块放入 _plugins
            logger.info(f'Succeeded to import "{module_name}"')
            return True
        except Exception as e:
            logger.error(f'Failed to import "{module_name}", error: {e}')
            logger.exception(e)
            return False
    
    
    def load_plugins(plugin_dir: str, module_prefix: str) -> int:
        count = 0
        for name in os.listdir(plugin_dir):  # 遍历指定的文件夹
            path = os.path.join(plugin_dir, name)
            if os.path.isfile(path) and 
                    (name.startswith('_') or not name.endswith('.py')):
                continue
            if os.path.isdir(path) and 
                    (name.startswith('_') or not os.path.exists(
                        os.path.join(path, '__init__.py'))):
                continue
    
            m = re.match(r'([_A-Z0-9a-z]+)(.py)?', name)
            if not m:
                continue
    
            if load_plugin(f'{module_prefix}.{m.group(1)}'):  # 尝试加载该模块
                count += 1
        return count
    
    def load_builtin_plugins() -> int:
        plugin_dir = os.path.join(os.path.dirname(__file__), 'plugins')  # 得到内部 plugins 目录的路径
        return load_plugins(plugin_dir, 'nonebot.plugins')  # 直接加载该目录下的所有插件
    
    def get_loaded_plugins() -> Set[Plugin]:
        """
        获取所有加载好的插件,一般用于提供命令帮助。
        比如在收到 "帮助 拆字" 时,就从这里查询到 “拆字” 插件的 usage,返回给用户。
    
        :return: a set of Plugin objects
        """
        return _plugins
    

    这就是插件的动态加载机制,可以看到获取已加载插件的唯一方法,就是 get_loaded_plugins(),而且 plugins 是用集合来保存的。

    1. 优化:仔细想想,我觉得用字典(Dict)来代替 Set 会更好一些,用“插件名”索引,这样可以防止出现同名的插件,而且查询插件时也不需要遍历整个 Set。

    2. 思考:插件是 python 模块,但是这里加载好了,却没有手动将它注册到别的地方,那加载它还有什么用?

      • 插件中的“命令解析器”、“消息处理器”等,都是使用的是 nonebot 的装饰器装饰了的。
      • 该装饰器会直接将命令处理函数,连同命令解析参数等直接注册到 nonebot 的命令集合中。(这个后面会看到。)因此不需要在 load_plugin() 中手动注册。

    这两行之后,就直接 nonebot.run() 启动 quart 服务器了。

    QQ消息的处理

    从第一个例子中,只能看到上面这些。接下来考虑写一个自定义插件,看看 nonebot 的消息处理机制。项目结构如下:

    awesome-bot
    ├── awesome
    │   └── plugins
    │       └── usage.py
    ├── bot.py
    └── config.py  # 配置文件,写法参考 nonebot.default_config,建议使用类方式保存配置
    

    bot.py:

    from os import path
    
    import nonebot
    import config
    
    if __name__ == '__main__':
        nonebot.init(config)  # 使用自定义配置
        nonebot.load_plugins(  # 加载 awesome/plugins 下的自定义插件
            path.join(path.dirname(__file__), 'awesome', 'plugins'),
            'awesome.plugins'
        )
        nonebot.run()
    

    usage.py:

    import nonebot
    from nonebot import on_command, CommandSession
    
    
    @on_command('usage', aliases=['使用帮助', '帮助', '使用方法'])
    async def _(session: CommandSession):
        """之前说过的“帮助”命令"""
        plugins = list(filter(lambda p: p.name, nonebot.get_loaded_plugins()))
        arg = session.current_arg_text.strip().lower()
        if not arg:
            session.finish(
                '我现在支持的功能有:
    
    ' + '
    '.join(p.name for p in plugins))
        for p in plugins:  # 如果 plugins 换成 dict 类型,就不需要循环遍历了
            if p.name.lower() == arg:
                await session.send(p.usage)
    

    查看装饰器 on_command 的内容,有 command/__init__.py

    
    # key: one segment of command name
    # value: subtree or a leaf Command object
    _registry = {}  # type: Dict[str, Union[Dict, Command]]  # 保存命令与命令处理器
    
    # key: alias
    # value: real command name
    _aliases = {}  # type: Dict[str, CommandName_T]  # 保存命令的别名(利用别名,从这里查找真正的命令名称,再用该名称查找命令处理器)
    
    # key: context id
    # value: CommandSession object
    _sessions = {}  # type: Dict[str, CommandSession]  # 保存与用户的会话,这样就能支持一些需要关联上下文的命令。比如赛文续传,或者需要花一定时间执行的命令,Session 有个 is_running。
    
    
    def on_command(name: Union[str, CommandName_T], *,
                   aliases: Iterable[str] = (),
                   permission: int = perm.EVERYBODY,
                   only_to_me: bool = True,
                   privileged: bool = False,
                   shell_like: bool = False) -> Callable:
        """
        用于注册命令处理器
    
        :param name: 命令名称 (e.g. 'echo' or ('random', 'number'))
        :param aliases: 命令别名,建议用元组
        :param permission: 该命令的默认权限
        :param only_to_me: 是否只处理发送给“我”的消息
        :param privileged: 已经存在此 session 时,是否仍然能被运行
        :param shell_like: 使用类似 shell 的语法传递参数
        """
    
        def deco(func: CommandHandler_T) -> CommandHandler_T:
            if not isinstance(name, (str, tuple)):
                raise TypeError('the name of a command must be a str or tuple')
            if not name:
                raise ValueError('the name of a command must not be empty')
    
            cmd_name = (name,) if isinstance(name, str) else name
    
            cmd = Command(name=cmd_name, func=func, permission=permission,
                          only_to_me=only_to_me, privileged=privileged)  # 构造命令处理器
            if shell_like:
                async def shell_like_args_parser(session):
                    session.args['argv'] = shlex.split(session.current_arg)
    
                cmd.args_parser_func = shell_like_args_parser
    
            current_parent = _registry
            for parent_key in cmd_name[:-1]:  # 循环将命令树添加到 _registry
                current_parent[parent_key] = current_parent.get(parent_key) or {}
                current_parent = current_parent[parent_key]
            current_parent[cmd_name[-1]] = cmd
    
            for alias in aliases:  # 保存命令别名
                _aliases[alias] = cmd_name
    
            return CommandFunc(cmd, func)
    
        return deco
    

    该装饰器将命令处理器注册到模块的全局变量中,然后 quart 在收到消息时,会调用该模块的如下方法,查找对应的命令处理器,并使用它处理该命令:

    async def handle_command(bot: NoneBot, ctx: Context_T) -> bool:
        """
        尝试将消息解析为命令,如果解析成功,而且用户拥有权限,就执行该命令。否则忽略。
    
        此函数会被 "handle_message" 调用
        """
        cmd, current_arg = parse_command(bot, str(ctx['message']).lstrip())  # 尝试解析该命令
        is_privileged_cmd = cmd and cmd.privileged
        if is_privileged_cmd and cmd.only_to_me and not ctx['to_me']:
            is_privileged_cmd = False
        disable_interaction = is_privileged_cmd
    
        if is_privileged_cmd:
            logger.debug(f'Command {cmd.name} is a privileged command')
    
        ctx_id = context_id(ctx)
    
        if not is_privileged_cmd:
            # wait for 1.5 seconds (at most) if the current session is running
            retry = 5
            while retry > 0 and 
                    _sessions.get(ctx_id) and _sessions[ctx_id].running:
                retry -= 1
                await asyncio.sleep(0.3)
    
        check_perm = True
        session = _sessions.get(ctx_id) if not is_privileged_cmd else None
        if session:
            if session.running:
                logger.warning(f'There is a session of command '
                               f'{session.cmd.name} running, notify the user')
                asyncio.ensure_future(send(
                    bot, ctx,
                    render_expression(bot.config.SESSION_RUNNING_EXPRESSION)
                ))
                # pretend we are successful, so that NLP won't handle it
                return True
    
            if session.is_valid:
                logger.debug(f'Session of command {session.cmd.name} exists')
                # since it's in a session, the user must be talking to me
                ctx['to_me'] = True
                session.refresh(ctx, current_arg=str(ctx['message']))
                # there is no need to check permission for existing session
                check_perm = False
            else:
                # the session is expired, remove it
                logger.debug(f'Session of command {session.cmd.name} is expired')
                if ctx_id in _sessions:
                    del _sessions[ctx_id]
                session = None
    
        if not session:
            if not cmd:
                logger.debug('Not a known command, ignored')
                return False
            if cmd.only_to_me and not ctx['to_me']:
                logger.debug('Not to me, ignored')
                return False
            session = CommandSession(bot, ctx, cmd, current_arg=current_arg)  # 构造命令 Session,某些上下文相关的命令需要用到。
            logger.debug(f'New session of command {session.cmd.name} created')
    
        return await _real_run_command(session, ctx_id, check_perm=check_perm,  # 这个函数将命令处理函数包装成 task,然后等待该 task 完成,再返回结果。
                                       disable_interaction=disable_interaction)
    

    Web 中的 Session 一般是用于保存登录状态,而聊天程序的 session,则主要是保存上下文。

    如果要做赛文续传与成绩统计,Session 和 Command 肯定是需要的,但是不能像 nonebot 这样做。 NoneBot 的命令格式限制得比较严,没法用来解析跟打器自动发送的成绩消息。也许命令应该更宽松:

    1. 命令前缀仍然通过全局配置来做,但是用 dict 来存,给每个前缀一个名字,默认使用 default。
      • @command 应该给一个参数用于指定前缀:None 为不需要前缀,默认为 config.Prefix.DEFAULT.
    2. 添加一个正则消息匹配的命令注册器,要匹配多个正则,则多次使用该装饰器。正则匹配到的 groupdict 会被传到命令处理器中。

    其他

    还有就是 NoneBot 作者提到的一些问题:

    1. 基于 python-aiocqhttp(跟 酷Q 强耦合),无法支持其它机器人平台:我写 xhup-bot 时,也需要把这一点考虑进去。机器人核心不应该依赖任何平台相关的东西。
    2. 过于以命令为核心:这也是我体会到的。这导致很多功能无法使用 nonebot 实现。只能借助底层的 on_message。
    3. 没有全局黑名单机制,无法简单地屏蔽其它 bot 的消息。全局黑名单感觉还算比较容易做。
    4. 权限控制功能不够强大,无法进行单用户和群组粒度的控制:我这边也有考虑这个。
      • 细粒度权限控制的话,可以将 on_command 的 permission 当成该命令的默认权限。然后可以在 config 里针对不同的群/用户,添加不同的权限。
      • 但是这可能会导致配置变复杂。最好还是通过后端提供的 Web 网页来配置。每个群管理都可以自己配置自己群的一些权限。然后 bot 在启动时通过 http 从后端获取配置信息。
      • 会话只针对单用户,无法简单地实现多用户游戏功能:这个我暂时不需要。。而且我的 xhup-bot 是有后端的,我觉得这个可以放到后端做。

    本文为个人杂谈,不保证正确。如有错误,还请指正。

  • 相关阅读:
    sublime text3在指定浏览器上本地服务器(localhost)运行文件(php)
    关于Content-Type的问题
    为什么开发要用一个大的背景图
    2017-04-17
    我的第一篇博客
    b站计算机网络谢希仁6网络层续3
    b站计算机网络谢希仁6网络层续2
    b站计算机网络谢希仁6网络层续1
    b站操作系统2.10互斥
    b站操作系统2.9并发
  • 原文地址:https://www.cnblogs.com/kirito-c/p/10458068.html
Copyright © 2011-2022 走看看