zoukankan      html  css  js  c++  java
  • BitMap算法知识笔记以及在大数据方向的使用

    概述

    所谓的BitMap算法就是位图算法,简单说就是用一个bit位来标记某个元素所对应的value,而key即是该元素,由于BitMap使用了bit位来存储数据,因此可以大大节省存储空间,这是很常用的数据结构,比如用于Bloom Filter中、用于无重复整数的排序等等。bitmap通常基于数组来实现,数组中每个元素可以看成是一系列二进制数,所有元素组成更大的二进制集合。

    基本思想

     我用一个简单的例子来详细介绍BitMap算法的原理。假设我们要对0-7内的5个元素(4,7,2,5,3)进行排序(这里假设元素没有重复)。我们可以使用BitMap算法达到排序目的。要表示8个数,我们需要8个byte,

      1.首先我们开辟一个字节(8byte)的空间,将这些空间的所有的byte位都设置为0

      2.然后便利这5个元素,第一个元素是4,因为下边从0开始,因此我们把第五个字节的值设置为1

      3.然后再处理剩下的四个元素,最终8个字节的状态如下图

           4.现在我们遍历一次bytes区域,把值为1的byte的位置输出(2,3,4,5,7),这样便达到了排序的目的

    从上面的例子我们可以看出,BitMap算法的思想还是比较简单的,关键的问题是如何确定10进制的数到2进制的映射图。

    MAP映射:

    假设需要排序或则查找的数的总数N=100000000,BitMap中1bit代表一个数字,1个int = 4Bytes = 4*8bit = 32 bit,那么N个数需要N/32 int空间。所以我们需要申请内存空间的大小为int a[1 + N/32],其中:a[0]在内存中占32为可以对应十进制数0-31,依次类推:

      a[0]-----------------------------> 0-31

      a[1]------------------------------> 32-63

      a[2]-------------------------------> 64-95

      a[3]--------------------------------> 96-127

    那么十进制数如何转换为对应的bit位,下面介绍用位移将十进制数转换为对应的bit位:

      1.求十进制数在对应数组a中的下标

      十进制数0-31,对应在数组a[0]中,32-63对应在数组a[1]中,64-95对应在数组a[2]中………,使用数学归纳分析得出结论:对于一个十进制数n,其在数组a中的下标为:a[n/32]

      2.求出十进制数在对应数a[i]中的下标

      例如十进制数1在a[0]的下标为1,十进制数31在a[0]中下标为31,十进制数32在a[1]中下标为0。 在十进制0-31就对应0-31,而32-63则对应也是0-31,即给定一个数n可以通过模32求得在对应数组a[i]中的下标。

      3.位移

      对于一个十进制数n,对应在数组a[n/32][n%32]中,但数组a毕竟不是一个二维数组,我们通过移位操作实现置1

      a[n/32] |= 1 << n % 32
      移位操作:
      a[n>>5] |= 1 << (n & 0x1F)

      n & 0x1F 保留n的后五位 相当于 n % 32 求十进制数在数组a[i]中的下标。

    BitMap简单使用示例

    用户标签使用BitMap的数据结构来存储,比如表示用户对应的标签表如下所示:

     如果使用标签,也就是一个标签对应多个用户,如下所示,比较简单一看就会:

     让每一个标签存储包含此标签的所有用户 ID,每一个标签都是一个独立的 Bitmap。这样,实现用户的去重和查询统计,就变得一目了然:

     对上面例子的使用:

    在用户群做交集和并集运算的时候,例如:

    1,如何查找使用苹果手机的程序员用户?

     2.如何查找所有男性或者00后的用户?

     3,同样是刚才的例子,我们给定 90 后用户的 Bitmap,再给定一个全量用户的 Bitmap。最终要求出的是存在于全量用户,但又不存在于 90 后用户的部分。

     如何求出呢?我们可以使用异或操作,即相同位为 0,不同位为 1。 (1^1)

    BitMap的代码实现

    java实现: 

     1 /**
     2  * ClassName BitMap4.java
     3  * author Rhett.wang
     4  * version 1.0.0
     5  * Description TODO
     6  * createTime 2020年01月24日 07:53:00
     7  */
     8 public class BitMap4 {
     9     //保存数据的
    10     private byte[] bits;
    11 
    12     //能够存储多少数据
    13     private int capacity;
    14 
    15 
    16     public BitMap4(int capacity){
    17         this.capacity = capacity;
    18 
    19         //1bit能存储8个数据,那么capacity数据需要多少个bit呢,capacity/8+1,右移3位相当于除以8
    20         bits = new byte[(capacity >>3 )+1];
    21     }
    22 
    23     public void add(int num){
    24         // num/8得到byte[]的index
    25         int arrayIndex = num >> 3;
    26 
    27         // num%8得到在byte[index]的位置
    28         int position = num & 0x07;
    29 
    30         //将1左移position后,那个位置自然就是1,然后和以前的数据做|,这样,那个位置就替换成1了。
    31         bits[arrayIndex] |= 1 << position;
    32     }
    33 
    34     public boolean contain(int num){
    35         // num/8得到byte[]的index
    36         int arrayIndex = num >> 3;
    37 
    38         // num%8得到在byte[index]的位置
    39         int position = num & 0x07;
    40 
    41         //将1左移position后,那个位置自然就是1,然后和以前的数据做&,判断是否为0即可
    42         return (bits[arrayIndex] & (1 << position)) !=0;
    43     }
    44 
    45     public void clear(int num){
    46         // num/8得到byte[]的index
    47         int arrayIndex = num >> 3;
    48 
    49         // num%8得到在byte[index]的位置
    50         int position = num & 0x07;
    51 
    52         //将1左移position后,那个位置自然就是1,然后对取反,再与当前值做&,即可清除当前的位置了.
    53         bits[arrayIndex] &= ~(1 << position);
    54 
    55     }
    56 
    57     public static void main(String[] args) {
    58         BitMap4 bitmap = new BitMap4(100);
    59         bitmap.add(7);
    60         System.out.println("插入7成功");
    61 
    62         boolean isexsit = bitmap.contain(7);
    63         System.out.println("7是否存在:"+isexsit);
    64 
    65         bitmap.clear(7);
    66         isexsit = bitmap.contain(7);
    67         System.out.println("7是否存在:"+isexsit);
    68     }
    69 }

    PYTHON代码实现:

     1 class BitMap():
     2     def __init__(self,max):
     3         self.size=int((max +31 -1)/31)
     4         self.array=[0 for i in range(self.size)]
     5 
     6     def bitindex(self,num):
     7         return num%31
     8 
     9     def set_1(self,num):
    10         elemindex=(num//31)
    11         byteindex=self.bitindex(num)
    12         ele=self.array[elemindex]
    13         self.array[elemindex] = ele| (1<< byteindex)
    14 
    15 
    16     def test_1(self,i):
    17         elemindex=(i//31)
    18         bytearray= self.bitindex(i)
    19         if self.array[elemindex] & (1 << bytearray):
    20             return True
    21         return False
    22 
    23 if __name__ =="__main__":
    24     Max = ord('z')
    25     shuffle_array=[x for x in 'qwelajkda']
    26     ret =[]
    27     bitmap =BitMap(Max)
    28     for c in shuffle_array:
    29         bitmap.set_1(ord(c))
    30 
    31     for i in range(Max+1):
    32         if bitmap.test_1(i):
    33             ret.append(chr(i))
    34 
    35     print(u'原始数组是:%s' % shuffle_array)
    36     print(u'排序以后的数组是:%s' % ret)

    scala代码实现

     1 /**
     2   * ClassName BitMap.java
     3   * author Rhett.wang
     4   * version 1.0.0
     5   * Description TODO
     6   * createTime 2020年01月24日 10:30:00
     7   */
     8 class BitMap(bitmap:Array[Byte], length:Int) {
     9 
    10 }
    11 object BitMap {
    12   var bitmap:Array[Int]=Array()
    13   def main(args: Array[String]) {
    14 
    15     var bitmaps=TestBit(100)
    16     setBit(32)
    17     println(getBit(32))
    18     println(getBit(11))
    19   }
    20   def TestBit(length:Int):Unit={
    21     bitmap= new Array[Int]((length >> 5).toInt + (if ((length & 31) > 0) 1
    22     else 0))
    23   }
    24 
    25   def getBit(index: Long): Int = {
    26     var intData: Int = bitmap(((index - 1) >> 5).toInt)
    27     var offset: Int = ((index - 1) & 31).toInt
    28     return intData >> offset & 0x01
    29   }
    30 
    31   def setBit(index: Long) {
    32     var belowIndex: Int = ((index - 1) >> 5).toInt
    33     var offset: Int = ((index - 1) & 31).toInt
    34     var inData: Int = bitmap(belowIndex)
    35     bitmap(belowIndex) = inData | (0x01 << offset)
    36   }
    37 
    38    /* def clear(num:Int):Unit={
    39       var arrayIndex: Int = num >> 5
    40       var position: Int = num & 0x1F
    41       bitmap(arrayIndex) =(bitmap(arrayIndex) & ~(1 << position)).toByte
    42     }*/
    43 
    44 
    45 }

    Roaring BitMap的原理和使用

    位图索引被广泛用于数据库和搜索引擎中,通过利用位级并行,它们可以显著加快查询速度。但是,位图索引会占用大量的内存,因此我们会更喜欢压缩位图索引。 Roaring Bitmaps 就是一种十分优秀的压缩位图索引,后文统称 RBM。压缩位图索引有很多种,比如基于 RLE(Run-Length Encoding运行长度编码)的WAH (Word Aligned Hybrid Compression Scheme) 和 Concise (Compressed ‘n’ Composable Integer Set)。相比较前者, RBM 能提供更优秀的压缩性能和更快的查询效率。

    RBM 的用途和 Bitmap 很差不多(比如说索引),只是说从性能、空间利用率各方面更优秀了。目前 RBM 已经在很多成熟的开源大数据平台中使用,简单列几个作为参考。

    1 Apache Lucene and derivative systems such as Solr and Elasticsearch,
    2 Metamarkets’ Druid,
    3 Apache Spark,
    4 Apache Hive,
    5 eBay’s Apache Kylin,
    6 ……

    RBM 的主要思想并不复杂,简单来讲,有如下三条:

    1,我们将 32-bit 的范围 ([0, n)) 划分为 2^16 个桶,每一个桶有一个 Container 来存放一个数值的低16位;

    2,在存储和查询数值的时候,我们将一个数值 k 划分为高 16 位(k % 2^16)和低 16 位(k mod 2^16),取高 16 位找到对应的桶,然后在低 16 位存放在相应的 Container 中;

    3,容器的话, RBM 使用两种容器结构: Array Container 和 Bitmap Container。Array Container 存放稀疏的数据,Bitmap Container 存放稠密的数据。即,若一个 Container 里面的 Integer 数量小于 4096,就用 Short 类型的有序数组来存储值。若大于 4096,就用 Bitmap 来存储值。

    如下图,就是官网给出的一个例子,三个容器分别代表了三个数据集:

    the list of the first 1000 multiples of 62

    all integers [216, 216 + 100)

    all even numbers in [2216, 3216)

    举例说明:

    看完前面的还不知道在说什么?没关系,举个栗子说明就好了。现在我们要将 821697800 这个 32 bit 的整数插入 RBM 中,整个算法流程是这样的:

    821697800 对应的 16 进制数为 30FA1D08, 其中高 16 位为 30FA, 低16位为 1D08。

    我们先用二分查找从一级索引(即 Container Array)中找到数值为 30FA 的容器(如果该容器不存在,则新建一个),从图中我们可以看到,该容器是一个 Bitmap 容器。

    找到了相应的容器后,看一下低 16 位的数值 1D08,它相当于是 7432,因此在 Bitmap 中找到相应的位置,将其置为 1 即可。

    下面介绍到的是RoaringBitmap的核心,三种Container。

    通过上面的介绍我们知道,每个32位整形的高16位已经作为key存储在RoaringArray中了,那么Container只需要处理低16位的数据。

    ArrayContainer

    结构很简单,只有一个short[] content,将16位value直接存储。

    short[] content始终保持有序,方便使用二分查找,且不会存储重复数值。

    因为这种Container存储数据没有任何压缩,因此只适合存储少量数据。

    ArrayContainer占用的空间大小与存储的数据量为线性关系,每个short为2字节,因此存储了N个数据的ArrayContainer占用空间大致为2N字节。存储一个数据占用2字节,存储4096个数据占用8kb。

    根据源码可以看出,常量DEFAULT_MAX_SIZE值为4096,当容量超过这个值的时候会将当前Container替换为BitmapContainer。

    BitmapContainer

    这种Container使用long[]存储位图数据。我们知道,每个Container处理16位整形的数据,也就是0~65535,因此根据位 图的原理,需要65536个比特来存储数据,每个比特位用1来表示有,0来表示无。每个long有64位,因此需要1024个long来提供65536个 比特。

    因此,每个BitmapContainer在构建时就会初始化长度为1024的long[]。这就意味着,不管一个BitmapContainer中只存储了1个数据还是存储了65536个数据,占用的空间都是同样的8kb。

    解释一下为什么这里用的 4096 这个阈值?因为一个 Integer 的低 16 位是 2Byte,因此对应到 Arrary Container 中的话就是 2Byte * 4096 = 8KB;同样,对于 Bitmap Container 来讲,2^16 个 bit 也相当于是 8KB。

    RunContainer

    RunContainer中的Run指的是行程长度压缩算法(Run Length Encoding),对连续数据有比较好的压缩效果。

    它的原理是,对于连续出现的数字,只记录初始数字和后续数量。即:

    对于数列11,它会压缩为11,0;
    对于数列11,12,13,14,15,它会压缩为11,4;
    对于数列11,12,13,14,15,21,22,它会压缩为11,4,21,1;
    源码中的short[] valueslength中存储的就是压缩后的数据。

    这种压缩算法的性能和数据的连续性(紧凑性)关系极为密切,对于连续的100个short,它能从200字节压缩为4字节,但对于完全不连续的100个short,编码完之后反而会从200字节变为400字节。

    如果要分析RunContainer的容量,我们可以做下面两种极端的假设:

    最好情况,即只存在一个数据或只存在一串连续数字,那么只会存储2个short,占用4字节

    最坏情况,0~65535的范围内填充所有的奇数位(或所有偶数位),需要存储65536个short,128kb

    代码测试示例:

     1 import org.roaringbitmap.RoaringBitmap;
     2 
     3 import java.util.function.Consumer;
     4 
     5 /**
     6  * ClassName RBitMap.java
     7  * author Rhett.wang
     8  * version 1.0.0
     9  * Description TODO
    10  * createTime 2020年01月25日 21:09:00
    11  */
    12 public class RBitMap {
    13     public static void main(String[] args) {
    14         test1();
    15     }
    16     private static void test1(){
    17         //向rr中添加1、2、3、1000四个数字
    18         RoaringBitmap rr = RoaringBitmap.bitmapOf(1,2,3,1000);
    19         //创建RoaringBitmap rr2
    20         RoaringBitmap rr2 = new RoaringBitmap();
    21         //向rr2中添加10000-12000共2000个数字
    22         rr2.add(10000L,12000L);
    23         //返回第3个数字是1000,第0个数字是1,第1个数字是2,则第3个数字是1000
    24         rr.select(3);
    25         //返回value = 2 时的索引为 1。value = 1 时,索引是 0 ,value=3的索引为2
    26         rr.rank(2);
    27         //判断是否包含1000
    28         rr.contains(1000); // will return true
    29         //判断是否包含7
    30         rr.contains(7); // will return false
    31 
    32         //两个RoaringBitmap进行or操作,数值进行合并,合并后产生新的RoaringBitmap叫rror
    33         RoaringBitmap rror = RoaringBitmap.or(rr, rr2);
    34         //rr与rr2进行位运算,并将值赋值给rr
    35         rr.or(rr2);
    36         //判断rror与rr是否相等,显然是相等的
    37         boolean equals = rror.equals(rr);
    38         if(!equals) throw new RuntimeException("bug");
    39         // 查看rr中存储了多少个值,1,2,3,1000和10000-12000,共2004个数字
    40         long cardinality = rr.getLongCardinality();
    41         System.out.println(cardinality);
    42         //遍历rr中的value
    43         for(int i : rr) {
    44             System.out.println(i);
    45         }
    46         //这种方式的遍历比上面的方式更快
    47         rr.forEach((Consumer<? super Integer>) i -> {
    48             System.out.println(i.intValue());
    49         });
    50     }
    51 }

    在RoaringBitmap中,32位整数被分成了2^16个块。任何一个32位整数的前16位决定放在哪个块里。后16位就是放在这个块里的内容。比如0xFFFF0000和0xFFFF0001,前16位都是FFFF,表明这两个数应该放在一个块里。后16位分别是0和1。在这个块中指保存0和1就可以了,不需要保存完整的整数。

    在最开始的时候,一个块中包含一个长度为4的short数组,后16位所对应的值就存在这个short数组里。注意在插入的时候要保持顺序性,这里就需要用到二分查找来加快速度了。如果当块中的元素大于short数组的长度时,就需要重新分配更大的数组,把当前数组copy过去,并把新值插入对应的位置。扩展数组大小和STL中vector的方式类似,不过并不是完全的加倍,而且上限是4096,也就是说最多只保存4096个元素。那么问题来了,超过了4096怎么办呢?

    一个块里最多可能需要存放2^16个元素,那么如果是用short来存放,最多需要65536个short,那么就是131072个byte。如果换一种方式,用位来存储元素,那么就需要65536个bit,相当于1024个long型数组,即2048个int,也就是4096个short。

    所以,当一个块中元素数量小于等于4096的时候,用有序short数组来保存元素,而当元素数量大于4096的时候,用长度为1024的long数组来按位表示元素是否存在。

    当bitmap中有多个块的时候,块的信息是用数组来保存的。这个数组同样需要保持顺序性,也是用二分查找找到一个块的位置。所以,当一个整数过来之后,首先根据前16位计算出块的key,然后在块的数组中二分查找。找到的话,就把后16位保存在这个块中。找不到,就创建一个新块,把后16位保存在块中,再把块插入对应的位置。

     大数据分析常用去重算法分析-Kylin

    首先,请大家思考一个问题:在大数据处理领域中,什么环节是你最不希望见到的?以我的观点来看,shuffle 是我最不愿意见到的环节,因为一旦出现了非常多的 shuffle,就会占用大量的磁盘和网络 IO,从而导致任务进行得非常缓慢。而今天我们所讨论的去重分析,就是一个会产生非常多 shuffle 的场景,先大概介绍一下shuffle原理:

    Shuffle描述着数据从map task输出到reduce task输入的这段过程。在分布式情况下,reduce task需要跨节点去拉取其它节点上的map task结果。这一过程将会产生网络资源消耗和内存,磁盘IO的消耗。

    先来看以下场景:

    我们有一张商品访问表,表上有 item 和 user_id 两个列,我们希望求商品的 UV,这是去重非常典型的一个场景。我们的数据是存储在分布式平台上的,分别在数据节点 1 和 2 上。

    我们从物理执行层面上想一下这句 SQL 背后会发生什么故事:首先分布式计算框架启动任务, 从两个节点上去拿数据, 因为 SQL group by 了 item 列, 所以需要以 item 为 key 对两个表中的原始数据进行一次 shuffle。我们来看看需要 shuffle 哪些数据:因为 select/group by了 item,所以 item 需要 shuffle 。但是,user_id  我们只需要它的一个统计值,能不能不 shuffle 整个 user_id 的原始值呢?

    如果只是简单的求 count 的话, 每个数据节点分别求出对应 item 的 user_id 的 count, 然后只要 shuffle 这个 count 就行了,因为count 只是一个数字, 所以 shuffle 的量非常小。但是由于分析的指标是 count distinct,我们不能简单相加两个节点user_id 的 count distinct 值,我们只有得到一个 key 对应的所有 user_id 才能统计出正确的 count distinct值,而这些值原先可能分布在不同的节点上,所以我们只能通过 shuffle 把这些值收集到同一个节点上再做去重。而当 user_id 这一列的数据量非常大的时候,需要 shuffle 的数据量也会非常大。我们其实最后只需要一个 count 值,那么有办法可以不 shuffle 整个列的原始值吗?我下面要介绍的两种算法就提供了这样的一种思路,使用更少的信息位,同样能够求出该列不重复元素的个数(基数)

    第一种要介绍的算法是一种精确的去重算法,主要利用了 Bitmap 的原理。Bitmap 也称之为 Bitset,它本质上是定义了一个很大的 bit 数组,每个元素对应到 bit 数组的其中一位。例如有一个集合[2,3,5,8]对应的 Bitmap 数组是[001101001],集合中的 2 对应到数组 index 为 2 的位置,3 对应到 index 为 3 的位置,下同,得到的这样一个数组,我们就称之为 Bitmap。很直观的,数组中 1 的数量就是集合的基数。追本溯源,我们的目的是用更小的存储去表示更多的信息,而在计算机最小的信息单位是 bit,如果能够用一个 bit 来表示集合中的一个元素,比起原始元素,可以节省非常多的存储。

    这就是最基础的 Bitmap,我们可以把 Bitmap 想象成一个容器,我们知道一个 Integer 是32位的,如果一个 Bitmap 可以存放最多 Integer.MAX_VALUE 个值,那么这个 Bitmap 最少需要 32 的长度。一个 32 位长度的 Bitmap 占用的空间是512 M (2^32/8/1024/1024),这种 Bitmap 存在着非常明显的问题:这种 Bitmap 中不论只有 1 个元素或者有 40 亿个元素,它都需要占据 512 M 的空间。回到刚才求 UV 的场景,不是每一个商品都会有那么多的访问,一些爆款可能会有上亿的访问,但是一些比较冷门的商品可能只有几个用户浏览,如果都用这种 Bitmap,它们占用的空间都是一样大的,这显然是不可接受的。

     对于上节说的问题,有一种设计的非常的精巧 Bitmap,叫做 Roaring Bitmap,能够很好地解决上面说的这个问题。我们还是以存放 Integer 值的 Bitmap 来举例,Roaring Bitmap 把一个 32 位的 Integer 划分为高 16 位和低 16 位,取高 16 位找到该条数据所对应的 key,每个 key 都有自己的一个 Container。我们把剩余的低 16 位放入该 Container 中。依据不同的场景,有 3 种不同的 Container,分别是 Array Container、Bitmap Container 和 Run Container,下文将一一介绍。

     首先第一种,是 Roaring Bitmap 初始化时默认的 Container,叫做 Array Container。Array Container 适合存放稀疏的数据,Array Container 内部的数据结构是一个 short array,这个 array 是有序的,方便查找。数组初始容量为 4,数组最大容量为 4096。超过最大容量 4096 时,会转换为 Bitmap Container。这边举例来说明数据放入一个 Array Container 的过程:有 0xFFFF0000 和 0xFFFF0001 两个数需要放到 Bitmap 中, 它们的前 16 位都是 FFFF,所以他们是同一个 key,它们的后 16 位存放在同一个 Container 中; 它们的后 16 位分别是 0 和 1, 在 Array Container 的数组中分别保存 0 和 1 就可以了,相较于原始的 Bitmap 需要占用 512M 内存来存储这两个数,这种存放实际只占用了 2+4=6 个字节(key 占 2 Bytes,两个 value 占 4 Bytes,不考虑数组的初始容量)。

    第二种 Container 是 Bitmap Container,其原理就是上文说的 Bitmap。它的数据结构是一个 long 的数组,数组容量固定为 1024,和上文的 Array Container 不同,Array Container 是一个动态扩容的数组。这边推导下 1024 这个值:由于每个 Container 还需处理剩余的后 16 位数据,使用 Bitmap 来存储需要 8192 Bytes(2^16/8), 而一个 long 值占 8 个 Bytes,所以一共需要 1024(8192/8)个 long 值。所以一个 Bitmap container 固定占用内存 8 KB(1024 * 8 Byte)。当 Array Container 中元素到 4096 个时,也恰好占用 8 k(4096*2Bytes)的空间,正好等于 Bitmap 所占用的 8 KB。而当你存放的元素个数超过 4096 的时候,Array Container 的大小占用还是会线性的增长,但是 Bitmap Container 的内存空间并不会增长,始终还是占用 8 K,所以当 Array Container 超过最大容量(DEFAULT_MAX_SIZE)会转换为 Bitmap Container。

    我们自己在 Kylin 中实践使用 Roaring Bitmap 时,我们发现 Array Container 随着数据量的增加会不停地 resize 自己的数组,而 Java 数组的 resize 其实非常消耗性能,因为它会不停地申请新的内存,同时老的内存在复制完成前也不会释放,导致内存占用变高,所以我们建议把 DEFAULT_MAX_SIZE 调得低一点,调成 1024 或者 2048,减少 Array Container 后期 reszie 数组的次数和开销。

     最后一种 Container 叫做Run Container,这种 Container 适用于存放连续的数据。比如说 1 到 100,一共 100 个数,这种类型的数据称为连续的数据。这边的Run指的是Run Length Encoding(RLE),它对连续数据有比较好的压缩效果。原理是对于连续出现的数字, 只记录初始数字和后续数量。例如: 对于 [11, 12, 13, 14, 15, 21, 22],会被记录为 11, 4, 21, 1。很显然,该 Container 的存储占用与数据的分布紧密相关。最好情况是如果数据是连续分布的,就算是存放 65536 个元素,也只会占用 2 个 short。而最坏的情况就是当数据全部不连续的时候,会占用 128 KB 内存。

     总结:用一张图来总结3种 Container 所占的存储空间,可以看到元素个数达到 4096 之前,选用 Array Container 的收益是最好的,当元素个数超过了 4096 时,Array Container 所占用的空间还是线性的增长,而 Bitmap Container 的存储占用则与数据量无关,这个时候 Bitmap Container 的收益就会更好。而 Run Container 占用的存储大小完全看数据的连续性, 因此只能画出一个上下限范围 [4 Bytes, 128 KB]。

     我们再来看一下Bitmap 在 Kylin 中的应用,Kylin 中编辑 measure 的时候,可以选择 Count Distinct,且Return Type 选为 Precisely,点保存就可以了。但是事情没有那么简单,刚才上文在讲 Bitmap 时,一直都有一个前提,放入的值都是数值类型,但是如果不是数值类型的值,它们不能够直接放入 Bitmap,这时需要构建一个全区字典,做一个值到数值的映射,然后再放入 Bitmap 中。

    在 Kylin 中构建全局字典,当列的基数非常高的时候,全局字典会成为一个性能的瓶颈。针对这种情况,社区也一直在努力做优化,这边简单介绍几种优化的策略,更详细的优化策略可以见文末的参考链接。

     1)当一个列的值完全被另外一个列包含,而另一个列有全局字典,可以复用另一个列的全局字典

     2)当精确去重指标不需要跨 Segment 聚合的时候,可以使用这个列的 Segment 字典代替(这个列需要字典编码)。在 Kylin 中,Segment 就相当于时间分片的概念。当不会发生跨 Segments 的分析时,这个列的 Segment 字典就可以代替这个全局字典。

     3)如果你的 cube 包含很多的精确去重指标,可以考虑将这些指标放到不同的列族上。不止是精确去重,像一些复杂 measure,我们都建议使用多个列族去存储,可以提升查询的性能。

     快手BitBase数据库

     如上图所示,首先将原始数据的一列的某个值抽象成 bitmap(比特数组),举例:city=bj,city 是维度,bj (北京) 是维度值,抽象成 bitmap 值就是10100,表示第0个用户在 bj,第1个用户不在北京,依次类推。然后将多维度之间的组合转换为 bitmap 计算:bitmap 之间做与、或、非、异或,举例:比如在北京的用户,且兴趣是篮球,这样的用户有多少个,就转换为图中所示的两个 bitmap 做与运算,得到橙色的 bitmap,最后,再对 bitmap 做 count 运算。count 表示统计“1”的个数,list 是列举“1”所在的数组 index,业务上表示 userId。

    SparkSQL自定义聚合函数(UDAF)实现bitmap函数

    创建表:使用phoenix在HBase中创建测试表,字段使用VARBINARY类型

    1 CREATE TABLE IF NOT EXISTS test_binary (
    2 date VARCHAR NOT NULL,
    3 dist_mem VARBINARY
    4  CONSTRAINT test_binary_pk PRIMARY KEY (date)
    5  ) SALT_BUCKETS=6;

    创建完成后使用RoaringBitmap序列化数据存入数据库:

     自定义代码:

      1 import org.apache.spark.sql.Row;
      2 import org.apache.spark.sql.expressions.MutableAggregationBuffer;
      3 import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
      4 import org.apache.spark.sql.types.DataType;
      5 import org.apache.spark.sql.types.DataTypes;
      6 import org.apache.spark.sql.types.StructField;
      7 import org.apache.spark.sql.types.StructType;
      8 import org.roaringbitmap.RoaringBitmap;
      9  
     10 import java.io.*;
     11 import java.util.ArrayList;
     12 import java.util.List;
     13  
     14 /**
     15  * 实现自定义聚合函数Bitmap
     16  */
     17 public class UdafBitMap extends UserDefinedAggregateFunction {
     18     @Override
     19     public StructType inputSchema() {
     20         List<StructField> structFields = new ArrayList<>();
     21         structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
     22         return DataTypes.createStructType(structFields);
     23     }
     24  
     25     @Override
     26     public StructType bufferSchema() {
     27         List<StructField> structFields = new ArrayList<>();
     28         structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
     29         return DataTypes.createStructType(structFields);
     30     }
     31  
     32     @Override
     33     public DataType dataType() {
     34         return DataTypes.LongType;
     35     }
     36  
     37     @Override
     38     public boolean deterministic() {
     39         //是否强制每次执行的结果相同
     40         return false;
     41     }
     42  
     43     @Override
     44     public void initialize(MutableAggregationBuffer buffer) {
     45         //初始化
     46         buffer.update(0, null);
     47     }
     48  
     49     @Override
     50     public void update(MutableAggregationBuffer buffer, Row input) {
     51         // 相同的executor间的数据合并
     52         // 1. 输入为空直接返回不更新
     53         Object in = input.get(0);
     54         if(in == null){
     55             return ;
     56         }
     57         // 2. 源为空则直接更新值为输入
     58         byte[] inBytes = (byte[]) in;
     59         Object out = buffer.get(0);
     60         if(out == null){
     61             buffer.update(0, inBytes);
     62             return ;
     63         }
     64         // 3. 源和输入都不为空使用bitmap去重合并
     65         byte[] outBytes = (byte[]) out;
     66         byte[] result = outBytes;
     67         RoaringBitmap outRR = new RoaringBitmap();
     68         RoaringBitmap inRR = new RoaringBitmap();
     69         try {
     70             outRR.deserialize(new DataInputStream(new ByteArrayInputStream(outBytes)));
     71             inRR.deserialize(new DataInputStream(new ByteArrayInputStream(inBytes)));
     72             outRR.or(inRR);
     73             ByteArrayOutputStream bos = new ByteArrayOutputStream();
     74             outRR.serialize(new DataOutputStream(bos));
     75             result = bos.toByteArray();
     76         } catch (IOException e) {
     77             e.printStackTrace();
     78         }
     79         buffer.update(0, result);
     80     }
     81  
     82     @Override
     83     public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
     84         //不同excutor间的数据合并
     85         update(buffer1, buffer2);
     86     }
     87  
     88     @Override
     89     public Object evaluate(Row buffer) {
     90         //根据Buffer计算结果
     91         long r = 0l;
     92         Object val = buffer.get(0);
     93         if (val != null) {
     94             RoaringBitmap rr = new RoaringBitmap();
     95             try {
     96                 rr.deserialize(new DataInputStream(new ByteArrayInputStream((byte[]) val)));
     97                 r = rr.getLongCardinality();
     98             } catch (IOException e) {
     99                 e.printStackTrace();
    100             }
    101         }
    102         return r;
    103     }
    104 }

    调用例子:

     1  /**
     2      * 使用自定义函数解析bitmap
     3      *
     4      * @param sparkSession
     5      * @return
     6      */
     7     private static void udafBitmap(SparkSession sparkSession) {
     8         try {
     9             Properties prop = PropUtil.loadProp(DB_PHOENIX_CONF_FILE);
    10             // JDBC连接属性
    11             Properties connProp = new Properties();
    12             connProp.put("driver", prop.getProperty(DB_PHOENIX_DRIVER));
    13             connProp.put("user", prop.getProperty(DB_PHOENIX_USER));
    14             connProp.put("password", prop.getProperty(DB_PHOENIX_PASS));
    15             connProp.put("fetchsize", prop.getProperty(DB_PHOENIX_FETCHSIZE));
    16             // 注册自定义聚合函数
    17             sparkSession.udf().register("bitmap",new UdafBitMap());
    18             sparkSession
    19                     .read()
    20                     .jdbc(prop.getProperty(DB_PHOENIX_URL), "test_binary", connProp)
    21                     // sql中必须使用global_temp.表名,否则找不到
    22                     .createOrReplaceGlobalTempView("test_binary");
    23             //sparkSession.sql("select YEAR(TO_DATE(date)) year,bitmap(dist_mem) memNum from global_temp.test_binary group by YEAR(TO_DATE(date))").show();
    24             sparkSession.sql("select date,bitmap(dist_mem) memNum from global_temp.test_binary group by date").show();
    25         } catch (Exception e) {
    26             e.printStackTrace();
    27         }
    28     }  

    总结

    感谢网络大神的分享:技术共勉

    https://kyligence.io/zh/blog/count-distinct-bitmap/

    https://www.jianshu.com/p/ded6e8ecd0d1

    https://blog.csdn.net/xiongbingcool/article/details/81282118

    http://www.sohu.com/a/327627247_315839

    https://blog.csdn.net/xywtalk/article/details/52590275

    https://blog.csdn.net/qq_22520215/article/details/77893326

     

     

  • 相关阅读:
    HDOJ/HDU 1015 Safecracker(枚举、暴力)
    nodejs之入门
    git错误收集总结
    git基本操作
    git使用前配置
    花开花落花非花、缘起缘灭缘随缘
    js之定时器
    js之Date(日期对象)
    es5严格模式简谈
    try...catch
  • 原文地址:https://www.cnblogs.com/boanxin/p/12232604.html
Copyright © 2011-2022 走看看