zoukankan      html  css  js  c++  java
  • EMQ X 与 HStreamDB 集成实践:通过规则引擎实现数据存储

    面对物联网时代海量设备连接及其实时产生的大规模数据流,EMQ 提供从边缘到云的现代数据基础设施,助力云边端物联网数据的统一「连接、移动、处理、分析」。

    如今,可「随处运行、无限连接、任意集成」的云原生分布式消息中间件 EMQ X 已解决了海量连接的挑战,流数据库 HStreamDB 则正试图解决海量物联网数据的存储、处理与实时分析。

    作为首个专为流数据设计的云原生流数据库,HStreamDB 致力于高效的大规模数据流存储和管理。EMQ X 与 HStreamDB 的组合,将使海量数据接入、存储、实时处理与分析的一站式管理变得不再困难。

    EMQ X 与 HStreamDB

    最近发布的 HStreamDB v0.6 新增了数据写入 Rest API,可以使用任何语言通过 Rest API 向 HStreamDB 写入数据,方便开源用户围绕 HStreamDB 进行二次开发。我们也通过这一功能与 EMQ X 开源版的 Webhook 功能结合,实现了 EMQ X 和 HStreamDB 的快速集成。

    本文就将详细介绍使用 HStreamDB 对 EMQ X 的接入数据进行持久化存储的具体操作。

    注:本文介绍基于 EMQ X 4.3 和 hstreamdb/hstream:v0.6.1 镜像。

    启动 EMQ X 和 HStreamDB

    首先我们需要一个运行中的 EMQ X,如何安装、部署并启动请参考:EMQ X 文档

    同时,我们需要一个运行中的 HStreamDB,更详细的如何安装、部署与启动教程请参考:HStreamDB Docs

    对于不熟悉 HStreamDB 的用户,可以先通过 docker-compose 快速启动一个单机的 HStreamDB 集群。

    启动 HStreamDB

    先直接通过链接下载 docker-compose.yaml 文件。

    创建一个用来存储数据库数据的文件:

    mkdir /data/store
    

    在后台启动 HStreamDB:

    docker-compose -f quick-start.yaml up -d
    

    通过:

    docker-compose -f quick-start.yaml logs hstream-http-server
    

    将会看到以下 log:

    Server is configured with:
         gRPCServerHost: hserver
         gRPCServerPort: 6570
         httpServerPort: 6580
     Setting gRPC connection
     Setting HTTP server
     Server started on port 6580 
    

    通过 HStreamDB CLI 创建所需要的 Stream

    Stream 是 HStreamDB 中用来存储流式数据的对象,可以看作是一些数据的集合。

    启动 HStreamDB CLI

    用 docker 启动一个 HStreamDB 的命令行界面:

    docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream hstream-client --port 6570 --client-id 1
    

    你将会进入到以下界面:

          __  _________________  _________    __  ___
         / / / / ___/_  __/ __ \/ ____/   |  /  |/  /
        / /_/ /\__ \ / / / /_/ / __/ / /| | / /|_/ /
       / __  /___/ // / / _  _/ /___/ ___ |/ /  / /
      /_/ /_//____//_/ /_/ |_/_____/_/  |_/_/  /_/
    
    >
    

    创建 HStreamDB Stream,用来保存桥接过来的数据:

    > CREATE STREAM emqx_rule_engine_output ;
    emqx_rule_engine_output
    

    当然我们也可以通过 SHOW 得到已经创建好的 Stream:

    > SHOW STREAMS;
    emqx_rule_engine_output
    

    配置 EMQ X

    然后,我们打开 EMQ X 的 Dashboard,点击规则引擎(Rule Engine),进入资源(Resource)界面。

    EMQ X Dashboard 资源页面

    我们可以先创建一个 WebHook 资源,如下图:

    EMQ X Dashboard 创建 WebHook

    Request URL 一栏中填入 hstream-http-server 的监听地址,<host>:6580/streams/emqx_rule_engine_output:publish,然后点击 test connection 测试链接。

    EMQ X Dashboard test connection

    接着,我们来创建所需要的规则引擎规则:

    创建 EMQ X 规则引擎规则

    SELECT 
      payload,                 -- 在 HStreamDB 的 http 协议中,我们需要一个 payload 项
      str(payload) as payload, -- HStreamDB 要求 payload 是一个 JSON String
      0 as flag                -- HStreamDB 中 flag 为 0 表示 payload 是一个JSON String
    FROM 
      "#"                      -- 这个符号会匹配所有的 topic
    

    我们需要增加一个 Action Handler ,选择 ActionData to Web Server

    EMQ X 规则引擎 Action

    Method 设置为 POSTHeader 加入 content-type application/json

    这个时候,我们已经完成了最基本的桥接的设置,接下来让我们通过 websocket 和 hstreamdb-cli 来测试一下吧。

    通过 HStreamDB CLI 观察数据的持久化存储是否完成

    首先我们在刚刚启动的 HStreamDB CLI 中创建一个 Query:

    > SELECT * FROM emqx_rule_engine_output EMIT CHANGES;
    

    在 HStreamDB 中,每一个 Stream 都表示一串动态变化的数据流,因此一个 Query 并不是简单地读取数据,而是会持续读取并输出被写入 Stream 中的数据。在 CLI 中,读取和输出数据的起点是就是成功创建 Query 的这一刻。当前,我们可以观察到的是,CLI 中并没有任何输出。

    此时我们可以通过 EMQ X DashBoard 的 WebSocket 或者其他 MQTT 客户端(例如跨平台 MQTT 5.0 桌面客户端工具 - MQTT X)向 EMQ X 写入数据。

    以下用 WebSocket 举例,我们可以先连接上我们启动的 EMQ X 集群:

    EMQ X DashBoard 的 WebSocket 客户端

    再向指定的 topic 发送数据:

    EMQ X DashBoard 的 WebSocket 发送数据

    如果一切正常的话,我们就可以实时地在 HStreamDB CLI 看到我们发到 EMQ X 的数据。

    > SELECT * FROM emqx_rule_engine_output EMIT CHANGES;
    {"location":{"lng":116.296011,"lat":40.005091},"speed":32.12,"tachometer":9001.0,"ts":1563268202,"direction":198.33212,"id":"NXP-058659730253-963945118132721-22","dynamical":8.93}
    

    至此,我们完成了 EMQ X 接入的数据在 HStreamDB 的持久化存储。

    通过将 EMQ X 与 HStreamDB 集成,我们不仅可以实现对发送到 EMQ X 的数据的持久化存储,还能对这些数据进行实时处理分析,获得进一步的数据洞察。随着两个产品的不断完善,我们相信在未来,EMQ X + HStreamDB 的高效组合将在 IoT 领域实时流数据的分析和处理场景发挥重要作用,成为数据转化与变现过程中的重要一环,为企业数据资产的价值创造提供动力。

    版权声明: 本文为 EMQ 原创,转载请注明出处。

    原文链接:https://www.emqx.com/zh/blog/integration-practice-of-emqx-and-hstreamdb

    技术支持:如对本文或 EMQ 相关产品有疑问,可访问 EMQ 问答社区 https://askemq.com 提问,我们将会及时回复支持。

    更多技术干货,欢迎关注我们公众号【EMQ 中文社区】。

  • 相关阅读:
    LeetCode 109 Convert Sorted List to Binary Search Tree
    LeetCode 108 Convert Sorted Array to Binary Search Tree
    LeetCode 107. Binary Tree Level Order Traversal II
    LeetCode 106. Construct Binary Tree from Inorder and Postorder Traversal
    LeetCode 105. Construct Binary Tree from Preorder and Inorder Traversal
    LeetCode 103 Binary Tree Zigzag Level Order Traversal
    LeetCode 102. Binary Tree Level Order Traversal
    LeetCode 104. Maximum Depth of Binary Tree
    接口和多态性
    C# 编码规范
  • 原文地址:https://www.cnblogs.com/emqx/p/15716340.html
Copyright © 2011-2022 走看看