zoukankan      html  css  js  c++  java
  • spark UDAF 函数解决

    UDAF函数用户自定义函数:

    用java代码实现UDAF函数

    SparkConf conf =  new SparkConf();
            conf.setAppName("UDAF").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext hc = new SQLContext(sc);
    
    
            JavaRDD<String> rdd = sc.parallelize(Arrays.asList("zhangsan", "lisi", "wangwu", "zhsangsan", "lisi", "zhsnagsan", "wangwu", "zhangsan", "lisi", "wangwu", "zhsangsan", "lisi", "zhsnagsan", "wangwu"));
    
            //将RDD转换成rddrow
            JavaRDD<Row> rddrow = rdd.map(new Function<String, Row>() {
                @Override
                public Row call(String str) throws Exception {
                    return RowFactory.create(str);
                }
            });
    
            StructField name = DataTypes.createStructField("name", DataTypes.StringType, true);
            List<StructField> fields = Arrays.asList(name);
            StructType schema = DataTypes.createStructType(fields);
    
            //创建DataType
            Dataset<Row> df = hc.createDataFrame(rddrow, schema);
            df.show();
    
            df.registerTempTable("user");
    
            /**
             * 注册UDAF函数
             *
             */
    
            hc.udf().register("strCount", new UserDefinedAggregateFunction() {
                @Override
                //定义输入数据的类型
                public StructType inputSchema() {
                    return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name",DataTypes.StringType,true)));
                }
    
                @Override
                //输出结果的类型
                public DataType dataType() {
                    return DataTypes.IntegerType;
                }
    
                @Override
                //判断当相同的值输入的时候是否有相同的输出
                public boolean deterministic() {
                    return true;
                }
    
                @Override
                public void update(MutableAggregationBuffer buffer, Row row) {
    
                    buffer.update(0,buffer.getInt(0)+1);
    
                }
    
                @Override
                //缓存区定义的数据类型
                public StructType bufferSchema() {
                    return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("buffer",DataTypes.IntegerType,true)));
                }
    
                @Override
                public void merge(MutableAggregationBuffer buffer, Row row) {
    
                    buffer.update(0,buffer.getInt(0)+row.getInt(0));
                }
    
                @Override
                //对数据进行初始化
                public void initialize(MutableAggregationBuffer buffer) {
    
                    buffer.update(0,0);
                }
    
                @Override
                /**
                 * 定义返回的结果
                 *
                 */
                public Object evaluate(Row row) {
                    return row.getInt(0);
                }
            });
    
    
            hc.sql("select name,strCount(name) from user group by name").show();

    UDAF函数实现时,方法UserDefinedAggregateFunction的八个需要实现方法的作用如上代码中注释,其中最重要的三个方法是:

    update: 对相同元素进行合并时,row个数的更新

    merge: shuffer归并的时候,将相同元素拉取的时候,对拉取的元素的总数进行累加

    init:初始化函数。初始化函数在整个过程中总共调用了两次,作用是将数组中的元素进行初始化,第一次被调用初始化的时候,是在同一个分区相同数据进行统计的时候,第二次进行初始化的时候,是在讲相同元素从不同分区拉倒一起统计的数据,会生成数组,然后首先会对这个数据进行初始化。

    本地执行Hive的条件:

    1、拷贝当前配置文件到src目录:hive-site.xml,core-site.xml,hdfs-site.xml

    2、添加jar,以data开头的三个jar 文件

    3、window环境必须是以root用户名命名的

    4、执行的时候可能内存不够,添加VM参数配置 -server -Xmas512M -Xmx1024M -XX:PermSize = 256M -XX:MaxNewSize = 512M -XX:MaxPermSize = 512M 

  • 相关阅读:
    ubuntu环境下编译linux内核问题解决备忘
    Ubuntu 16.04 安装 arm-linux-gcc 交叉编译工具
    opus代码解析
    google的android工具常用下载路径
    opus在arm的嵌入式平台上的移植和开发
    OGG的孩子-有损音频编码opus
    ogg的孩子-无损音频编解码flac
    音频科普---oggs
    当初我为什么要去创业公司呢?
    python实现桶排序算法
  • 原文地址:https://www.cnblogs.com/wcgstudy/p/11066602.html
Copyright © 2011-2022 走看看