zoukankan      html  css  js  c++  java
  • Spark开发-Spark UDAF(一)

    示例

     使用org.roaringbitmap.Roaringbitmap ,通过序列化和反序列化的方式来进行计算
      这里有两个例子,主要是最终的返回结果类型不同
     使用org.roaringbitmap.buffer.*的方式,使用内存映射文件来进行计算,主要是 ImmutableRoaringBitmap 和 MutableRoaringBitmap
    

    适用场景

    多维度条件查询
    多组标签的人群圈选
      01.预计算 - 预聚合和重聚合
      02.BitMap Index
      03.Inverted index
        正排索引(forward index)与倒排索引(inverted index)
    	倒排索引是一种以关键字和文档编号结合,并以关键字作为主键的索引结构
    
    预计算是数据仓库领域常见的一种提升查询效率的方式,通过将全部或部分计算结果提前计算好并存储下来,
    对于后续的相关的查询可以直接重用之前的预计算结果,从而加速查询速度。
    在多维分析或报表等查询模式相对比较固定的场景中,我们可以通过预聚合,将需要处理的数据量下降成百上千倍。
    此外对于预计算来说,由于用户的查询维度,过滤条件,统计方式非常多,考虑到预计算的计算和存储代价,
    不太可能把每种可能的查询条件都进行预计算,
    通常的方式是按照较细粒度进行分组聚合,然后对于后续更粗粒度的分组聚合查询,可以使用预计算的结果进行重聚合,
    

    Roaringbitmap for Spark 聚合代码

    01.数据映射
      不能在 bitmap 中直接使用,需要用 INT 类型的用户 ID 来标识所有的用户。
      同时原 hive 表中也是不包含 INT 类型的用户 ID 这个字段的,
      所以需要提前准备好 bitmap 分群方案所需的 bitmap_hive 表
         一个映射函数,能够将统计字段的取值范围映射成自然数
    	      1)row_number() over() 函数,但是在操作亿级别行的数据时,会造成数据倾斜
    		  2)一种针对亿级行大数据量的全局唯一连续数字 ID 生成方法
    02.bit_mapping: 接受Integer类型字段作为参数,内部维护Bitmap数据结构,将输入数据插入Bitmap中,
       并把Bitmap序列化二进制数据作为输出结果。
      如何将 Hive 表中的关系型数据以 bitmap 的形式保存 ? 字节数组  .在程序中序列化roaringbitmap,将二进制数据写入数据库
        a binaryFile data sourceon Spark, Array[Byte] is represented as a BinaryType
        The data type representing Array[Byte] values. Please use the singleton DataTypes.BinaryType
    03.数据类型
    hive类型        说明         java类型      实例
       1).tinyint     1byte有符号的整数   byte        30
       2).smalint      2byte有符号的整数    short        30
       3).int        4byte有符号的整数   int        30
       4).bigint      8byte有符号的整数   long        30
       5).boolean      布尔类型true或false    boolean     true
       6).float       单精度         float             3.33
       7).double      双精度         double            3.22
       8).string      字符序列,单双即可   string          'ggj';"tyhjk"
       9).timestamp     时间戳,精确的纳秒   timestamp        '169030219333'
      10).binary     字节数组        byte[] 
    04.re_count_distinct: 接受二进制数据作为参数,反序列化位Bitmap,merge同一分区的多个Bitmap,把Bitmap的cardinality作为结果输出。
    

    RoaringBitMap

      * In-place bitwise OR (union) operation. The current bitmap is modified.
         public void or(final RoaringBitmap x2) {}
      *  serialized Roaring objects with an incorrect byte order  
    	    public void serialize(DataOutput out) throws IOException {}
    		public int serializedSizeInBytes()
       * Deserialize (retrieve) this bitmap.
    	        public void deserialize(DataInput in) throws IOException {}
    

    相关系统案例

    01.Bitmap索引是应该在数据写入的同时实时构建呢,还是应该在数据从内存persist到硬盘的时候批量构建
    02.如何分别为每个维度列构建Bitmap索引
    03.Bitmap索引如何进行压缩处理?
    标签属性
        标签划分成枚举类型(enum)、连续值类型(continuous)、日期类型(date)
    	  nominal  标称- 枚举: 标签取值从维表中选择,标签和取值之间的逻辑关系只有等于、不等于,共 2 种
    标签构成 : 标签划分成单一标签和复合标签
       每种标签的 bitmap 构建和运算转换规则
       对部分标签的边界值情况进行处理
    

    代码示例 读取IntegerType数据

     读取Inter数据,利用Roaringbitmap,返回不重复的个数
    `
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.expressions.MutableAggregationBuffer;
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
    import org.apache.spark.sql.types.DataType;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import org.roaringbitmap.RoaringBitmap;
    
    import java.io.*;
    import java.util.ArrayList;
    import java.util.List;
    
    public class RoaringBitMapByteUDAF extends UserDefinedAggregateFunction {
    
    /**
     * // 聚合函数的输入数据结构
     */
    @Override
    public StructType inputSchema() {
        List<StructField> structFields = new ArrayList<>();
        structFields.add(DataTypes.createStructField("field", DataTypes.IntegerType, true));
        return DataTypes.createStructType(structFields);
    }
    
    /**
     * 聚缓存区数据结构   //聚合的中间过程中产生的数据的数据类型定义
     */
    @Override
    public StructType bufferSchema() {
        List<StructField> structFields = new ArrayList<>();
        structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
        return DataTypes.createStructType(structFields);
    }
    
    /**
     * 聚合函数返回值数据结构
     */
    @Override
    public DataType dataType() {
        return DataTypes.LongType;
    }
    
    /**
     * 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出
     */
    @Override
    public boolean deterministic() {
        //是否强制每次执行的结果相同
        return true;
    }
    
    /**
     * 初始化缓冲区
     */
    @Override
    public void initialize(MutableAggregationBuffer buffer) {
        //初始化
        buffer.update(0, null);
    }
    
    /**
     *  给聚合函数传入一条新数据进行处理
     *  buffer.getInt(0)获取的是上一次聚合后的值
     *   //用输入数据input更新buffer值,类似于combineByKey
     */
    
    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {
        // 相同的executor间的数据合并
        Object in = input.get(0);
        Object out = buffer.get(0);
        RoaringBitmap outRR = new RoaringBitmap();
        // 1. 输入为空直接返回不更新
        if(in == null){
            return ;
        }
    
        // 2. 源为空则直接更新值为输入
        int inInt = Integer.valueOf(in.toString());
        byte[] inBytes = null ;
        if(out == null){
            System.out.println(inInt);
            outRR.add(inInt);
            try{
                // 将RoaringBitmap的数据转成字节数组
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream ndos = new DataOutputStream(bos);
                outRR.serialize(ndos);
                inBytes = bos.toByteArray();
                ndos.close();
            }   catch (IOException e) {
                e.printStackTrace();
            }
            buffer.update(0, inBytes);
            return ;
        }
        // 3. 源和输入都不为空使用 bitmap去重合并
        byte[] outBytes = (byte[]) buffer.get(0);
        System.out.println(outBytes.length);
        byte[] result = outBytes;
        try {
            outRR.deserialize(new DataInputStream(new ByteArrayInputStream(outBytes)));
            outRR.add(inInt);
            System.out.println("去重后的" + String.valueOf(outRR.getCardinality()));
            ByteArrayOutputStream boss = new ByteArrayOutputStream();
            DataOutputStream ndosn = new DataOutputStream(boss);
            outRR.serialize(ndosn);
            result = boss.toByteArray();
            ndosn.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        buffer.update(0, result);
    }
    
    
    /**
     *  合并聚合函数缓冲区
     *      //合并两个buffer,将buffer2合并到buffer1.在合并两个分区聚合结果的时候会被用到,类似于reduceByKey
     *    //这里要注意该方法没有返回值,
     *    在实现的时候是把buffer2合并到buffer1中去,你需要实现这个合并细节。
     */
    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
        //不同excutor间的数据合并
        // 合并两个聚合buffer,该函数在聚合并两个部分聚合数据集的时候调用
      //update(buffer1, buffer2);
        RoaringBitmap inRBM = new RoaringBitmap();
        RoaringBitmap outRBM = new RoaringBitmap();
        Object out = buffer1.get(0);
        byte[] inBytes = (byte[]) buffer2.get(0);
        if(out == null){
            buffer1.update(0, inBytes);
            return ;
        }
        byte[] outBitBytes = (byte[]) out;
        byte[] resultBit = outBitBytes;
        if (out != null) {
            try {
                outRBM.deserialize(new DataInputStream(new ByteArrayInputStream(outBitBytes)));
                System.out.println("去重后的 outRBM " + String.valueOf(outRBM.getCardinality()));
                inRBM.deserialize(new DataInputStream(new ByteArrayInputStream(inBytes)));
                System.out.println("去重后的 inRBM " + String.valueOf(inRBM.getCardinality()));
                RoaringBitmap rror = RoaringBitmap.or(outRBM, inRBM) ;
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream ndosn = new DataOutputStream(bos);
                rror.serialize(ndosn);
                resultBit = bos.toByteArray();
                ndosn.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            buffer1.update(0, resultBit);
        }
    }
    
    /**
     * 计算最终结果
     */
    
    @Override
    public Object evaluate(Row buffer) {
        //根据Buffer计算结果
        long r = 2L;
        Object val = buffer.get(0);
        if (val != null) {
            RoaringBitmap rr = new RoaringBitmap();
            try {
                rr.deserialize(new DataInputStream(new ByteArrayInputStream((byte[]) val)));
                r = rr.getCardinality();
                // getLongCardinality()
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return r;
    }
    }
    `
    

    读取BinaryType 返回数据不重复的计数

     读取 BinaryType 数据,即读取RoaringBitmap序列化数据  利用Roaringbitmap,返回不重复的个数 -- 
     参考学习于 sparkSQL自定义聚合函数(UDAF)实现bitmap函数  https://blog.csdn.net/xiongbingcool/article/details/81282118
    `    
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.expressions.MutableAggregationBuffer;
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
    import org.apache.spark.sql.types.DataType;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import org.roaringbitmap.RoaringBitmap;
    
    import java.io.*;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * 实现自定义聚合函数Bitmap
     */
    public class RoaringBitMapByteDistinctUDAF extends UserDefinedAggregateFunction {
        /**
         * // 聚合函数的输入数据结构
         */
        @Override
        public StructType inputSchema() {
            List<StructField> structFields = new ArrayList<>();
            structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
            return DataTypes.createStructType(structFields);
        }
    
        /**
         * 聚缓存区数据结构
         */
        @Override
        public StructType bufferSchema() {
            List<StructField> structFields = new ArrayList<>();
            structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
            return DataTypes.createStructType(structFields);
        }
    
        /**
         * 聚合函数返回值数据结构
         */
        @Override
        public DataType dataType() {
            return DataTypes.LongType;
        }
    
        /**
         * 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出
         */
        @Override
        public boolean deterministic() {
            //是否强制每次执行的结果相同
            return false;
        }
    
        /**
         * 初始化缓冲区
         */
        @Override
        public void initialize(MutableAggregationBuffer buffer) {
            //初始化
            buffer.update(0, null);
        }
    
        /**
         *  给聚合函数传入一条新数据进行处理
         */
        @Override
        public void update(MutableAggregationBuffer buffer, Row input) {
            // 相同的executor间的数据合并
            // 1. 输入为空直接返回不更新
            Object in = input.get(0);
            if(in == null){
                return ;
            }
            // 2. 源为空则直接更新值为输入
            byte[] inBytes = (byte[]) in;
            Object out = buffer.get(0);
            if(out == null){
                buffer.update(0, inBytes);
                return ;
            }
            // 3. 源和输入都不为空使用bitmap去重合并
            byte[] outBytes = (byte[]) out;
            byte[] result = outBytes;
            RoaringBitmap outRR = new RoaringBitmap();
            RoaringBitmap inRR = new RoaringBitmap();
            try {
                outRR.deserialize(new DataInputStream(new ByteArrayInputStream(outBytes)));
                inRR.deserialize(new DataInputStream(new ByteArrayInputStream(inBytes)));
                outRR.or(inRR);
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                outRR.serialize(new DataOutputStream(bos));
                result = bos.toByteArray();
            } catch (IOException e) {
                e.printStackTrace();
            }
            buffer.update(0, result);
        }
    
        /**
         *  合并聚合函数缓冲区
         */
        @Override
        public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
            //不同excutor间的数据合并
            update(buffer1, buffer2);
        }
    
        /**
         * 计算最终结果
         */
        @Override
        public Object evaluate(Row buffer) {
            //根据Buffer计算结果
            long r = 0L;
            Object val = buffer.get(0);
            if (val != null) {
                RoaringBitmap rr = new RoaringBitmap();
                try {
                    rr.deserialize(new DataInputStream(new ByteArrayInputStream((byte[]) val)));
                    r = rr.getCardinality();
                    // getLongCardinality()
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            return r;
        }
    }
    

    `

    使用Buffer方式

    `
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.expressions.MutableAggregationBuffer;
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
    import org.apache.spark.sql.types.DataType;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
    import org.roaringbitmap.buffer.MutableRoaringBitmap;
    import java.io.*;
    import java.nio.ByteBuffer;
    import java.util.ArrayList;
    import java.util.List;
    
    public class MutableRoaringBitmapUDAF extends UserDefinedAggregateFunction {
        /**
         * // 聚合函数的输入数据结构
         */
        @Override
        public StructType inputSchema() {
            List<StructField> structFields = new ArrayList<>();
            structFields.add(DataTypes.createStructField("field", DataTypes.IntegerType, true));
            return DataTypes.createStructType(structFields);
        }
    
        /**
         * 聚缓存区数据结构   //聚合的中间过程中产生的数据的数据类型定义
         */
        @Override
        public StructType bufferSchema() {
            List<StructField> structFields = new ArrayList<>();
            structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
            return DataTypes.createStructType(structFields);
        }
    
        /**
         * 聚合函数返回值数据结构
         */
        @Override
        public DataType dataType() {
            return DataTypes.LongType;
        }
    
        /**
         * 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出
         */
        @Override
        public boolean deterministic() {
            //是否强制每次执行的结果相同
            return true;
        }
    
        /**
         * 初始化缓冲区
         */
        @Override
        public void initialize(MutableAggregationBuffer buffer) {
            //初始化
            buffer.update(0, null);
        }
    
        /**
         *  给聚合函数传入一条新数据进行处理
         *  buffer.getInt(0)获取的是上一次聚合后的值
         *   //用输入数据input更新buffer值,类似于combineByKey
         */
    
        @Override
        public void update(MutableAggregationBuffer buffer, Row input) {
            // 相同的executor间的数据合并
            Object in = input.get(0);
            Object out = buffer.get(0);
            MutableRoaringBitmap outRR =  new MutableRoaringBitmap();
            // 1. 输入为空直接返回不更新
            if(in == null){
                return ;
            }
            // 2. 源为空则直接更新值为输入
            int inInt = Integer.valueOf(in.toString());
            byte[] inBytes = null ;
            if(out == null){
                outRR.add(inInt);
                try{
                    // 将RoaringBitmap的数据转成字节数组
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    DataOutputStream ndos = new DataOutputStream(bos);
                    outRR.serialize(ndos);
                    inBytes = bos.toByteArray();
                    ndos.close();
                }   catch (IOException e) {
                    e.printStackTrace();
                }
                buffer.update(0, inBytes);
                return ;
            }
            // 3. 源和输入都不为空使用 bitmap去重合并
            byte[] outBytes = (byte[]) buffer.get(0);
            byte[] result = outBytes;
            try {
                outRR.deserialize(new DataInputStream(new ByteArrayInputStream(outBytes)));
                outRR.add(inInt);
                ByteArrayOutputStream boss = new ByteArrayOutputStream();
                DataOutputStream ndosn = new DataOutputStream(boss);
                outRR.serialize(ndosn);
                result = boss.toByteArray();
                ndosn.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            buffer.update(0, result);
        }
     
        /**
         *  合并聚合函数缓冲区
         *      //合并两个buffer,将buffer2合并到buffer1.在合并两个分区聚合结果的时候会被用到,类似于reduceByKey
         *    //这里要注意该方法没有返回值,
         *    在实现的时候是把buffer2合并到buffer1中去,你需要实现这个合并细节。
         */
        @Override
        public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
            //不同excutor间的数据合并
            // 合并两个聚合buffer,该函数在聚合并两个部分聚合数据集的时候调用
            //update(buffer1, buffer2);
    
            Object out = buffer1.get(0);
            byte[] outBitBytes = (byte[]) out;
            byte[] resultBit = outBitBytes;
            byte[] inBytes = (byte[]) buffer2.get(0);
    
            ImmutableRoaringBitmap inRBM =  new ImmutableRoaringBitmap(ByteBuffer.wrap(inBytes));
            if(out == null){
                buffer1.update(0, inBytes);
                return ;
            }
            if (out != null) {
                try {
                    ImmutableRoaringBitmap outRBM =  new ImmutableRoaringBitmap(ByteBuffer.wrap(outBitBytes));
                    outRBM.toMutableRoaringBitmap().or(inRBM);
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    DataOutputStream ndosn = new DataOutputStream(bos);
                    outRBM.serialize(ndosn);
                    resultBit = bos.toByteArray();
                    ndosn.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                buffer1.update(0, resultBit);
            }
        }
    
        /**
         * 计算最终结果
         */
    
        @Override
        public Object evaluate(Row buffer) {
            //根据Buffer计算结果
            long r = 0L;
            Object val = buffer.get(0);
            if (val != null) {
                ImmutableRoaringBitmap rr = new ImmutableRoaringBitmap(ByteBuffer.wrap((byte[]) val));
                r = rr.getCardinality();
            }
            return r;
        }
    }
    `
    

    附录

    倒排索引
     文档检索系统中最常用的数据结构。通过倒排索引,可以根据单词快速获取包含这个单词的文档列表。
     倒排索引主要由两个部分组成:“单词词典”和“倒排文件”。
     ①单词词典包含了所有粒度的拆分词;
     ②倒排文件则保存了该词对应的所有相关信息。
    

    参考:

       hive udf 读写存储到hbase的roaringbitmap https://blog.csdn.net/qq_34748569/article/details/105252559
    如何在Spark中实现Count Distinct重聚合 https://developer.aliyun.com/article/723521
    SparkSQL用UDAF实现Bitmap函数 https://my.oschina.net/wangzhiwubigdata/blog/4392249
    Spark笔记之使用UDAF(User Defined Aggregate Function) https://www.cnblogs.com/cc11001100/p/9471859.html
    Inverted index 倒排索引  https://www.cnblogs.com/ycx95/p/9177274.html
    时序数据库技术体系 – Druid 多维查询之Bitmap索引 https://blog.csdn.net/matrix_google/article/details/82878214
    java 读取文件流 https://www.cnblogs.com/zhzhlong/p/11420084.html
    bitmap用户分群方法在贝壳DMP的实践和应用 https://cloud.tencent.com/developer/news/683175
    JDBC 将RoaringBitmap写入greenplum https://www.jianshu.com/p/af6a7ef67518
     https://stackoverflow.com/questions/53075020/why-does-spark-infer-a-binary-instead-of-an-arraybyte-when-creating-a-datafram
       User Defined Aggregate Functions (UDAFs) http://spark.apache.org/docs/latest/sql-ref-functions-udf-aggregate.html
       User-Defined Aggregate Functions(UDAF) Using Apache Spark https://www.nitendratech.com/spark/udaf-apache-spark/
       基于bitmap实现用户画像的标签圈人功能 https://blog.51cto.com/sbp810050504/2420208
       https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/
       spark 编写udaf函数求中位数  https://cloud.tencent.com/developer/article/1507271
      RoaringBitmap的使用 https://www.liangzl.com/get-article-detail-148556.html
  • 相关阅读:
    HDU 6103 Kirinriki【尺取法】【思维题】【好题】
    HDU 6103 Kirinriki【尺取法】【思维题】【好题】
    HDU 6095 Rikka with Competition【阅读题】【水题】
    HDU 6095 Rikka with Competition【阅读题】【水题】
    HDU 2844 Coins[【经典题】【模板题】
    HDU 2844 Coins[【经典题】【模板题】
    HDU 6090 Rikka with Graph【思维题】
    HDU 6090 Rikka with Graph【思维题】
    Codeforces Round #318(Div. 1) 573 D. Bear and Cavalry【dp+矩阵+线段树优化】
    Codeforces Round #318(Div. 1) 573 D. Bear and Cavalry【dp+矩阵+线段树优化】
  • 原文地址:https://www.cnblogs.com/ytwang/p/13986000.html
Copyright © 2011-2022 走看看