zoukankan      html  css  js  c++  java
  • 分布式超大规模数据的实时快速排序算法

    作者:xiaofei,UC技术专家
    来源:云栖社区

    引言

    对数据进行处理的同学,经常会遇到排序需求,无论是内存数据还是磁盘数据。

    对于单点的数据,我们的处理比较简单,比如:

    select field_a from table_b order by field_a limit 100, 10;
    db.collection_b.find().sort({"field_a":1}).skip(100).limit(10);

    存储服务的处理流程一般可抽象如下:
    640?wx_fmt=png

    信息爆炸的时代,数据早已不是单点所能承载的了,数据一般分布在大量节点上,假设某库中的数据均匀地分布在以下的所有节点上。

    640?wx_fmt=jpeg

    这时sort, limit的一般方法是选择一个中间节点或者中间件来做合并处理:
    640?wx_fmt=jpeg

    一般处理流程的动态表示如下:

    640?wx_fmt=gif

    我们将过程抽象,流程简化如下:
    640?wx_fmt=jpeg

    注意第三步在数据节点中的查询结果范围为[0,skip+limit]。当我们想查询[skip=1000000, limit=200]的数据,意味着需要在各节点

    上先查询[skip=0, limit=1000000+200]的数据,再由归并服务对结果进行[skip=1000000, limit=200]的排序, 对存储IO与网络IO的处理量级与skip成正比,

    对于T级以上规模所数据处理,无法做到实时处理。

    下面来讨论另一种方式

    理论基础

    在一般对数据的处理方法中,我们基于一个共同的假设:各数据存储节点只具备简单的对外查询功能,相互之间的连接功能是很弱的,主要有主从,选举,更一步的功能就少了。

    现在我们要改变这一假设。

    理论描述

    假设各存储节点具备相互对话的能力。比如,"hey,你那里skip为100的数据是哪个", “好的,我这里skip为100的数为m。”

    对话分成几种,第一种是扩散请求,当其中一节点收到一次请求后, 此节点会将请求迅速扩散到所有其他相关的节点。

    第二种对话是应答式,简单的你问我答型。

    假设有一个排序全网排序请求,在某一节点获得请求后,扩散给网络需要对此请求处理的请求,各节点在进经n次对话后,产生最终的结果。

    概念定义

    在一堆数据中,数据m前面有n-1个数,则m的排序索引为n。

    640?wx_fmt=png

    通过问答式查询,我们可以轻而易举地获得某个数在全网中的排序索引,只需要将各节点上排在此节点前的个数相加即可。

    推导

    简单点,如果我们要在一批数据中查询skip=100, limit=20的数据有哪些,我们的目标是在全网数据中获取b,e.

    b的索引为第100,

    e的索引为120。

    则所有在[b,e]之前的数都是我们的目标数据。

    实际上还要考虑数据重复,即在b的索引为98个, b个数为4,e的索引118, e个数为5,则目标数据以[b, b]开头, 以[e,e,e]结尾。

    技术使用

    某节点想知道数据m前面有多少个数, 则直接向其他数据节点发送对话,所有节点(包含自身)只需要返回本节点中在m前面的数据个数, 假设各节点上的查询结果个数为n1,n2,n3,...n10,

    则全量数据中数据m前面的数据个数n=(n1+n2+n3+.....+n10)。

    以数据m做为递度对象, n做为结果向skip, skip+limit逼近,在全量数据中获取最终的b,e。

    架构设计

    请求处理流程

    640?wx_fmt=jpeg

    如图所示,对于一次请求我们会分成三个部分

    640?wx_fmt=png

    节点确认阶段

    640?wx_fmt=png

    此阶段确认哪些节点参与发现。

    结果同步阶段

    640?wx_fmt=jpeg

    同步的过程是相互的,相互猜测,查询对应数据的索引。

    这一步是处理的核心步骤,通过相互确认,最终逼近索引在[100,120]之间的数是哪些。

    结果合并

    640?wx_fmt=png

    各数据节点将数据结果同步出去,如果skip=100万, limit=20,最多也就同步20条数据,
    不再于skip正成比。

    模型假设

    假设存在m个节点,各节点上的数据都是各自排好序的,各节点间平均来回时间为t1,
    单次查询确认程序执行时间为t2,

    每次确认的数据个数为p,假设结果确认阶段平均某节点的对外请求次数不在于s。

    节点确认时间为 t1

    结果确认阶段时间<=s*(t1+t2)

    结果合并阶段时间为t1

    则总共所需时间为 (2 + s)t1 + st2

    从上面结果得出,请求所需时间与节点个数不成正比,与节点间的平均网络时间及算法次数相关。

    假设各节点在同一个局域网中,相互间的来回网络时间t1<1ms, 程序执行时间t2 < 1ms,单节点对外请求次数不超过100,

    则总共所需求时间不超过 (2+100)+100=202ms

    理论要求

    如果希望在200ms内完成一次查询,则平均某个节点对外请求次数不超过100,对应的查询数据总次数则不超过100*p,假设p为100,则总次确认的总次数可以达到10000次。

    下面我们来模拟一次真实的操作吧。

    模拟操作

    数据准备

    假设对应的数据为正整型,在10个节点中查询skip=100, limit=20的所有数据。

    则我们要通过对话确认索引分别为100,120的数为哪个。

    考虑到数据重复,我们为各个数建立向量(数据,索引,个数),假设索引为100的数为b,个数为c(b), 索引为120的数为e, 个数为c(e),则我们所要获得的向量为( b,100,c(b)), (e, 120, c(e))。

    首轮

    由于所有数据都是正整型,则我们知道最小的数为0, 最大的数为2^31,

    因此第一次待确认队列里可以包含[0, max=2*31]在所有节点上的情况,得到(0, i(0), c(0)), (max, i(max), c(max))。

    同时为了更好地得到逼近效果,先做一次全范围猜测,比如max/100做猜测,

    以得到(max/100, i(max/100), c(max/100), (max2/100, i(max2/100), c(max*2/100)), .......(max^99/100,i(max^99/100),c(max^99/100))。

    其实0可以认为是max0, 则第一次做逼近的数据可以是(nmax/100), n~[0,100]。

    目标逼近

    经过第一轮猜测后,全网络都知道了(n*max/100), n~[0,100]对应的向量。

    存在2个数n1, n2满足 i(n1max/100=s1) + c(s1)<=100, i(n2max/100=s2) + c(s2)>=100.(如果不存在n2, 则表明不存在这个数,其全局索引>=100, 因此结果为空,直接跳到数据合并阶段)

    存在2个数n3, n4满足 i(n3max/100=s3) + c(s3)<=120, i(n4max/100=s4) + c(s4)>=120.(如果不存在n4, 则表明不存在这个数,其全局索引>=120, 假设存在n2, 则数据大于等于n2*max/100的数都是目标数据。)

    则有 s1<= b <=s2, s3<=e<=s4,

    我们对再[s1, s2], [s3, s4]做相应的逼近,直至获取到最终b,e, 满足 c(b) + i(b) <= 100, c(e) + i(e) <= 120。

    提升规模

    从上面的结论来看,数据规模对时间的影响不大,假设数据模块为T级或者P级, 直接影响的是查询某个数在此数据节点上前面有多少个数。

    为了降低响应时间,我们只需要设计好数据结构,以支持快速的向量查询。

    假设单个节点的数据是G级,假设我们用红黑树存储,是T/P级,我们用B+数,假设每颗节点存储着其子节点的个数。

    我们以红黑树为例:
    640?wx_fmt=jpeg

    如果需要获取小于数150前面的个数,则只需要找到其所在左支遍历的个数加上根节点的左侧子节点个数。

    转化为代码即为:

    function getCount(node, child){    if (node.right == child) {        return { node:node.parent, count:node.leftCount + 1 };
        }    else {        return { node:node.parent, count:0};
        }
    }
    node=node_150;var count = node.leftCount;var nc = {node:node, count:0};while(node != null){
        nc = getCount(node.parent, node);
        node = nc.node;
        count += nc.count;
    }console.log("node 150's left count:"+count);

    结论

    我们的理论目标环境:

    1.数据分布在大量的数据节点上,并且在节点上是有序的。

    2.各节点间的网络延时不超过1mm。

    在此分布式环境下可以实现对T/P级数据进行最多200ms延时的实时快速排序。

    往期推荐

     


    【技术篇】 


    【技术篇】 


    【技术篇】 


    【生活篇】 

    640?wx_fmt=jpeg

  • 相关阅读:
    LA 2038 Strategic game(最小点覆盖,树形dp,二分匹配)
    UVA 10564 Paths through the Hourglass(背包)
    Codeforces Round #323 (Div. 2) D 582B Once Again...(快速幂)
    UVALive 3530 Martian Mining(贪心,dp)
    UVALive 4727 Jump(约瑟夫环,递推)
    UVALive 4731 Cellular Network(贪心,dp)
    UVA Mega Man's Mission(状压dp)
    Aizu 2456 Usoperanto (贪心)
    UVA 11404 Plalidromic Subsquence (回文子序列,LCS)
    Aizu 2304 Reverse Roads(无向流)
  • 原文地址:https://www.cnblogs.com/Java-Road/p/11824693.html
Copyright © 2011-2022 走看看