zoukankan      html  css  js  c++  java
  • 使用 Python 开发 EMQ X MQTT 服务器插件

    从 v4.1 版本开始,EMQ X MQTT 服务器 提供了专门的多语言支持插件 emqx_extension_hook ,现已支持使用其他编程语言来处理 EMQ X 中的钩子事件,开发者可以使用 Python 或者 Java 快速开发自己的插件,在官方功能的基础上进行扩展,满足自己的业务场景。例如:

    • 验证某客户端的登录权限:客户端连接时触发对应函数,通过参数获取客户端信息后通过读取数据库、比对等操作判定是否有登录权限
    • 记录客户端在线状态与上下线历史:客户端状态变动时触发对应函数,通过参数获取客户端信息,改写数据库中客户端在线状态
    • 校验某客户端的 PUB/SUB 的操作权限:发布/订阅时触发对应函数,通过参数获取客户端信息与当前主题,判定客户端是否有对应的操作权限
    • 处理会话 (Sessions) 和 消息 (Message) 事件,实现订阅关系与消息处理/存储:消息发布、状态变动时触发对应函数,获取当前客户端信息、消息状态与消息内容,转发到 Kafka 或数据库进行存储。

    注:消息(Message) 类钩子,仅在企业版中支持。

    Python 和 Java 驱动基于 Erlang/OTP-Port 进程间通信实现,本身具有非常高的吞吐性能,本文以 Python 拓展为例介绍 EMQ X 跨语言拓展使用方式。

    img

    Python 拓展使用示例

    要求

    • EMQ X 所在服务器需安装 Python 3.6 以上版本

    使用步骤

    1. 通过 pip 安装 Python SDK
    2. 调整 EMQ X 配置,确保相关配置项正确指向 Python 项目
    3. 引入 SDK 编写代码

    Python 插件安装

    通过 pip 命令在本地安装 SDK,确保使用 pip3 进行安装

    pip3 install emqx-extension-sdk
    

    修改配置

    修改 emqx-extension-hook 插件配置,正确使用拓展:

    ## Setup the supported drivers
    ##
    ## Value: python2 | python3 | java
    exhook.drivers = python3
    
    ## Search path for scripts/library
    exhook.drivers.python3.path = data/extension/hooks.py
    
    ## Call timeout
    ##
    ## Value: Duration
    ##exhook.drivers.python3.call_timeout = 5s
    
    ## Initial module name
    ## Your filename or module name
    exhook.drivers.python3.init_module = hooks
    

    编写代码

    emqx/data/extension 目录下新建 hooks.py 文件,引入 SDK 编写业务逻辑,示例程序如下:

    ## data/extension/hooks.py
    
    from emqx_extension.hooks import EmqxHookSdk, hooks_handler
    from emqx_extension.types import EMQX_CLIENTINFO_PARSE_T, EMQX_MESSAGE_PARSE_T
    
    
    # 继承 SDK HookSdk 类
    class CustomHook(EmqxHookSdk):
    
      	# 使用装饰器注册 hooks
        @hooks_handler()
        def on_client_connect(self,
                              conninfo: EMQX_CLIENTINFO_PARSE_T = None,
                              props: dict = None,
                              state: list = None):
            print(f'[Python SDK] [on_client_connect] {conninfo.clientid} connecte')
    
        @hooks_handler()
        def on_client_connected(self,
                                clientinfo: EMQX_CLIENTINFO_PARSE_T,
                                state: list = None):
            print(
                f'[Python SDK] [on_client_connected] {clientinfo.clientid} connected')
    
        @hooks_handler()
        def on_client_check_acl(self, clientinfo: EMQX_CLIENTINFO_PARSE_T,
                                pubsub: str,
                                topic: str,
                                result: bool,
                                state: tuple) -> bool:
            print(
                f'[Python SDK] [on_client_check_acl] {clientinfo.username} check ACL: {pubsub} {topic}')
            # 用户名为空时,ACL 验证不通过
            if clientinfo.username == '':
                return False
            return True
    
        @hooks_handler()
        def on_client_authenticate(self, clientinfo: EMQX_CLIENTINFO_PARSE_T, authresult,
                                   state) -> bool:
            print(
                f'[Python SDK] [on_client_authenticate] {clientinfo.clientid} authenticate')
            # clientid 不为空时,验证通过
            if clientinfo.clientid != '':
                return True
            return False
    
        # on_message_* 仅支持企业版
        @hooks_handler()
        def on_message_publish(self, message: EMQX_MESSAGE_PARSE_T, state):
            print(
                f'[Python SDK] [on_message_publish] {message.topic} {message.payload}')
    
    
    emqx_hook = CustomHook(hook_module=f'{__name__}.emqx_hook')
    
    
    def init():
        return emqx_hook.start()
    
    
    def deinit():
        return
    

    启动

    启动 emqx_extension_hook 插件,如果配置错误或代码编写错误将无法正常启动。启动后尝试建立 MQTT 连接并观察业务运行情况。

    ./bin/emqx_ctl plugins load emqx_extension_hook
    

    进阶开发

    目前 EMQ X Python 拓展 SDK 是开源的,如果对可控性、性能要求更高,或需要使用 Python 2.7 版本的运行环境,欢迎贡献代码或基于原始示例进行开发:

    • 代码仓库:emqx-extension-python-sdk
    • Python 原始示例,可使用该示例自行封装:[emqx-extension-hook main.py

    版权声明: 本文为 EMQ 原创,转载请注明出处。

    原文链接:https://www.emqx.io/cn/blog/develop-emqx-plugin-using-python

  • 相关阅读:
    [C#]获取指定文件文件名、后缀、所在目录等
    Mysql 存储引擎中InnoDB与Myisam的主要区别
    MySQL的btree索引和hash索引的区别
    Mysql事务的隔离级别
    AE序列号
    mysql索引类型说明
    去除url中自带的jsessionid
    redirect传值非url(springmvc3)
    ueditor的使用
    mysql用户管理(开户、权限)
  • 原文地址:https://www.cnblogs.com/emqx/p/13280208.html
Copyright © 2011-2022 走看看