UDAF函数用户自定义函数:
用java代码实现UDAF函数
SparkConf conf = new SparkConf(); conf.setAppName("UDAF").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext hc = new SQLContext(sc); JavaRDD<String> rdd = sc.parallelize(Arrays.asList("zhangsan", "lisi", "wangwu", "zhsangsan", "lisi", "zhsnagsan", "wangwu", "zhangsan", "lisi", "wangwu", "zhsangsan", "lisi", "zhsnagsan", "wangwu")); //将RDD转换成rddrow JavaRDD<Row> rddrow = rdd.map(new Function<String, Row>() { @Override public Row call(String str) throws Exception { return RowFactory.create(str); } }); StructField name = DataTypes.createStructField("name", DataTypes.StringType, true); List<StructField> fields = Arrays.asList(name); StructType schema = DataTypes.createStructType(fields); //创建DataType Dataset<Row> df = hc.createDataFrame(rddrow, schema); df.show(); df.registerTempTable("user"); /** * 注册UDAF函数 * */ hc.udf().register("strCount", new UserDefinedAggregateFunction() { @Override //定义输入数据的类型 public StructType inputSchema() { return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name",DataTypes.StringType,true))); } @Override //输出结果的类型 public DataType dataType() { return DataTypes.IntegerType; } @Override //判断当相同的值输入的时候是否有相同的输出 public boolean deterministic() { return true; } @Override public void update(MutableAggregationBuffer buffer, Row row) { buffer.update(0,buffer.getInt(0)+1); } @Override //缓存区定义的数据类型 public StructType bufferSchema() { return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("buffer",DataTypes.IntegerType,true))); } @Override public void merge(MutableAggregationBuffer buffer, Row row) { buffer.update(0,buffer.getInt(0)+row.getInt(0)); } @Override //对数据进行初始化 public void initialize(MutableAggregationBuffer buffer) { buffer.update(0,0); } @Override /** * 定义返回的结果 * */ public Object evaluate(Row row) { return row.getInt(0); } }); hc.sql("select name,strCount(name) from user group by name").show();
UDAF函数实现时,方法UserDefinedAggregateFunction的八个需要实现方法的作用如上代码中注释,其中最重要的三个方法是:
update: 对相同元素进行合并时,row个数的更新
merge: shuffer归并的时候,将相同元素拉取的时候,对拉取的元素的总数进行累加
init:初始化函数。初始化函数在整个过程中总共调用了两次,作用是将数组中的元素进行初始化,第一次被调用初始化的时候,是在同一个分区相同数据进行统计的时候,第二次进行初始化的时候,是在讲相同元素从不同分区拉倒一起统计的数据,会生成数组,然后首先会对这个数据进行初始化。
本地执行Hive的条件:
1、拷贝当前配置文件到src目录:hive-site.xml,core-site.xml,hdfs-site.xml
2、添加jar,以data开头的三个jar 文件
3、window环境必须是以root用户名命名的
4、执行的时候可能内存不够,添加VM参数配置 -server -Xmas512M -Xmx1024M -XX:PermSize = 256M -XX:MaxNewSize = 512M -XX:MaxPermSize = 512M