zoukankan      html  css  js  c++  java
  • Kafka分布式查询引擎

    1.概述

    Kafka是一个分布式消息中间件系统,里面存储着实际场景中的数据。Kafka原生是不支持点查询的,如果我们想对存储在Topic中的数据进行查询,可能需要对Topic中的数据进行消费落地,然后构建索引(或者数据落地到自带所以的存储系统中,例如HBase、Hive等)。今天,笔者就为大家来介绍如何实现Kafka分布式查询引擎。

    2.内容

    对于点查询,我们可以总结为两个要点。其一,有数据供我们查询;其二,对待查询的数据构建索引。在Kafka中,Topic存储数据,满足了第一点,虽然Kafka有索引的概念,但是它的索引是基于Offset的稀疏索引,并不是对每条Message都会构建一个索引。并且,这个Offset索引对于实际情况查询场景来说,也帮助不大。比如,你查询Topic01下的Partition_0,但是,也仅仅只是查下到某个Topic中分区下的Offset对应的一条记录,但是这条记录是啥,你并不知道。真实查询的情况,可能是你需要查询某个ID,或者模糊查询某个Name是否存在。

    2.1 索引

    其实有一种方式,是可行的。就是对Kafka源代码进行改造,在Broker落地每条数据的时候,构建一条索引(其实,这种方式与在原始的Kafka外面加一层Proxy类似,由Proxy充当与Client交互的角色,接收Client的数据存储并构建索引)。这样的实现方式如下图:

    如果对Kafka源代码熟悉,有能力改造其源代码,可以在Kafka中添加对每条数据构建索引的逻辑。如果,觉得怕对Kafka的性能有影响,或者改造有难度。上述流程图的方式,也可以实现这种点查询。

    改造Kafka源代码添加索引,或者是Proxy的方式存储数据并构建索引,这种两种方式来说,数据上都会要冗余一倍左右的的存储容量。

    2.2 单节点查询

    基于上述的问题,我们对这种方案进行升级改造一下。因为很多情况下,生产环境的数据已经是运行了很长时间了,加Proxy或者改造Kafka源代码的方式适合构建一个Kafka的新集群的时候使用。对于已有的Kafka集群,如果我们要查询Topic中的数据,如何实现呢。

    Kafka-Eagle中,我对Topic数据查询实现了基于SQL查询的实现方案。逻辑是这样的,编写SQL查询语句,对SQL进行解析,映射出一个Topic的Schema以及过滤条件,然后根据过滤条件消费Topic对应的数据,最后拿到数据集,通过SQL呈现出最后的结果。流程图如下:

    但是,这样是由局限性的。由于,单节点的计算能力有限,所以对每个Partition默认查询5000条数据,这个记录是可以增加或者减少的。如果在配置文件中对这个属性增大,比如设置为了50000条,那么对应的ke.sh脚本中的内存也需要增加,因为每次查询需要的内存增加了。不然,频繁若干用户同时查询,容易造成OOM的情况。

    但是,通常一个Topic中存储的数据一般达到上亿条数据以上,这种方式要从上亿条或者更多的数据中查询我们想要的数据,可能就满足不了了。

    2.3 分布式查询

    基于这种情况,我们可以对这中单节点查询的方式进行升级改造,将它变为分布式查询。其实,仔细来看,单节点查询的方式,就是一个分布式查询的缩版。那我们需要实现这样一个分布式查询的Kafka SQL引擎呢?

    首先,我们可以借助Hadoop的MapReduce思想,“化繁为简,分而治之”。我们将一个Topic看成一个比较大的数据集,每次我们需要对这个数据集进行查询,可以将待查询的数据进行拆分若干份Segment,然后,充分利用服务器的CPU,进行多线程消费(这样就可以打破Kafka中一个线程只能消费一个分区的局限性)。实现流程图如下:

    上图可知,由客户端发起请求,提交请求到Master节点,然后Master节点解析客户端的请求,并生成待执行策略。比如上述有三个工作节点,按照客户端的情况,Master会将生成的执行策略下发给三个工作节点,让其进行计算。

    这里以其中一个工作节点为例子,比如WorkNode1接收到了Master下发的计算任务,接收到执行指令后,结合工作节点自身的资源情况(比如CPU和内存,这里CPU较为重要),将任务进行拆解为若干个子任务(子任务的个数取决于每个批次的BatchSize,可以在属性中进行配置),然后让生成好的若干个子任务并行计算,得到若干个子结果,然后将若干个子结果汇总为一个最终结果作为当前工作节点的最终计算结果,最后将不同的工作节点的结果进行最后的Merge作为本次查询的结果返回给Master节点(这里需要注意的是,多个工作节点汇总在同一个JobID下)。然后,Master节点收到工作节点返回的结果后,返回给客户端。

    3.结果预览

    查询10条Topic中的数据,工作节点执行如下:

    select * from ke1115 where `partition` in (0) limit 10

     上图显示了,同一WorkNode节点下,同一JobID中,不同线程子任务的计算进度日志。

    KSqlStrategy显示了Master节点下发的待执行策略,msg表示各个工作节点返回的最终结果。

    4.待优化

    目前Kafka分布式查询引擎基础功能已实现可以用,任务托管、子任务查询内存优化等还有优化的空间,计划正在考虑集成到KafkaEagle系统中。 

    5.结束语

    这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

    另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。

  • 相关阅读:
    ovs tag
    从数据库分析OpenStack创建虚机流程
    Neutron中的二层网络服务架构
    Failed to bind port
    OpenStack网络参数segment
    OpenStack与SDN控制器的集成
    HDU 3709 Balanced Number
    HDU 5787 K-wolf Number
    HDU 5803 Zhu’s Math Problem
    CodeForces 258B Little Elephant and Elections
  • 原文地址:https://www.cnblogs.com/smartloli/p/14053206.html
Copyright © 2011-2022 走看看