zoukankan      html  css  js  c++  java
  • Hive 简单udf入门--自然周差异计算

      Hive sql与我们普通使用的sql基本差异不大,但在大数据领域往往存在很多未知的需求,所以往往都有一个支持自定义功能函数编写的口子,让用户实现其特定的需求。(这往往并非hive独有,几乎都是标配)

      而要写udf往往也是比较简单,看几个例子,依葫芦画瓢总能搞几个。

      今天我们就来简单写一个“自然周差异计算”week_diff函数吧。

    1. pom依赖

      依赖是环境必备。实际上,hive udf 分为几种类型,我们本文就来看看最简单的一种实现, 继承 UDF 类。

      pom.xml 必备依赖:

    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.7.3</version>
    </dependency>

      以上依赖,也就是一些接口定义,以及必备环境的类库引入,然后你就可以进行编写自己的UDF了。

    2. 编写UDF实现

      这是用户要做的一件事也是唯一件可做的事,本篇是实现 UDF 功能。 UDF 是hive中一对一关系的函数调用,即给一个输入,给出一个输出。样例如下:

    import com.y.udf.exception.UdfDataException;
    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.UDF;
    
    import java.time.DayOfWeek;
    import java.time.LocalDate;
    import java.time.ZoneOffset;
    import java.time.format.DateTimeFormatter;
    import java.util.Date;
    
    /**
     * 功能描述: 自然周偏移计算函数
     *          <p>周偏移计算</p>
     *
     */
    @Description(name = "week_diff",
            value = "_FUNC_(week_diff(date dayForJudge [, date dateReferer]) - Returns day1 与 day2 的自然周差异数, 如 -3, -1, 0, n... 
    "
                    + "_FUNC_(week_diff('2020-07-30')) - Returns 0 
    "
                    + "_FUNC_(week_diff('2020-01-01', '2020-01-08 10:00:01')) - Returns -1 
    " +
                    "_FUNC_(week_diff(to_date(from_unixtime(UNIX_TIMESTAMP('2020-01-01','yyyy-MM-dd'))), current_date))")
    public class WeekDiffUdf extends UDF {
    
        /**
         * 一天的毫秒数常量
         */
        private static final long ONE_DAY_MILLIS = 3600_000 * 24;
    
        /**
         * 日期格式定义
         */
        private final DateTimeFormatter dayFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    
        /**
         * 与当前日期为依据,计算日期偏移 (结果一般都为 -xx)
         *
         * @param weekDayForCompare 要比较的日期格式
         * @return 周差异(-1, 0, n...)
         */
        public int evaluate(String weekDayForCompare) {
            if(weekDayForCompare.length() < 10) {
                throw new UdfDataException("要比较的日期 day1 数据格式错误, 请确认是否为 yyyy-MM-dd 格式");
            }
            weekDayForCompare = weekDayForCompare.substring(0, 10);
            LocalDate day1 = LocalDate.parse(weekDayForCompare, dayFormatter);
            return evaluate(day1, LocalDate.now());
        }
    
        /**
         * 日期格式入参调用计算周偏移
         */
        public int evaluate(Date weekDayForCompare) {
            LocalDate day1 = weekDayForCompare.toInstant()
                                .atZone(ZoneOffset.ofHours(8)).toLocalDate();
            return evaluate(day1, LocalDate.now());
        }
    
        /**
         * 两个日期比较周差异 (string -> string)
         *
         * @param weekDayForCompare 被比较的日期
         * @param weekDayRef 参照日期
         * @return day1与day2 的周差异
         * @throws UdfDataException 格式错误时抛出
         */
        public int evaluate(String weekDayForCompare, String weekDayRef) throws Exception {
            if(weekDayForCompare.length() < 10) {
                throw new UdfDataException("要比较的日期 day1 数据格式错误, 请确认是否为 yyyy-MM-dd 格式");
            }
            if(weekDayRef.length() < 10) {
                throw new UdfDataException("参考日期 day2 数据格式错误, 请确认是否为 yyyy-MM-dd 格式");
            }
            weekDayForCompare = weekDayForCompare.substring(0, 10);
            weekDayRef = weekDayRef.substring(0, 10);
            LocalDate day1 = LocalDate.parse(weekDayForCompare, dayFormatter);
            LocalDate day2 = LocalDate.parse(weekDayRef);
            return evaluate(day1, day2);
        }
    
        /**
         * 两个日期比较周差异 (date -> date)
         */
        public int evaluate(Date weekDayForCompare, Date weekDayRef) {
            LocalDate day1 = weekDayForCompare.toInstant()
                                .atZone(ZoneOffset.ofHours(8)).toLocalDate();
            LocalDate day2 = weekDayRef.toInstant()
                                .atZone(ZoneOffset.ofHours(8)).toLocalDate();
            long day1WeekFirstTimestamp = getDayOfWeekFirstTimestamp(day1);
            long day2WeekFirstTimestamp = getDayOfWeekFirstTimestamp(day2);
            // 计算周差异算法很简单,就是获取日期所在周的第一天的时间戳相减,然后除以周单位即可得到周差异
            long diffWeeks = (day1WeekFirstTimestamp - day2WeekFirstTimestamp)
                    / (ONE_DAY_MILLIS * 7);
            return (int) diffWeeks;
        }
    
        public int evaluate(LocalDate day1, LocalDate day2) {
            long day1WeekFirstTimestamp = getDayOfWeekFirstTimestamp(day1);
            long day2WeekFirstTimestamp = getDayOfWeekFirstTimestamp(day2);
            long diffWeeks = (day1WeekFirstTimestamp - day2WeekFirstTimestamp)
                    / (ONE_DAY_MILLIS * 7);
            return (int) diffWeeks;
        }
    
        /**
         * 获取指定日期所在自然周的 第一天的时间戳 (周一为第1天)
         * localDate 的周起始时间计算
         *
         * @param day 指定日期
         * @return 1434543543 时间戳
         * @see #getDayOfWeekFirstTimestamp(LocalDate)
         */
        private long getDayOfWeekFirstTimestamp(LocalDate day) {
            DayOfWeek dayOfWeek = day.getDayOfWeek();
            // 以周一为起始点 日_周 偏移, 周一: 2, 周三: 4, SUNDAY=7,MONDAY=1
            int realOffsetFromMonday = dayOfWeek.getValue() - 1;
            return day.atStartOfDay(ZoneOffset.ofHours(8)).toInstant().toEpochMilli()
                        - realOffsetFromMonday * ONE_DAY_MILLIS;
        }
    
    }

      从上面可以看出,我们写了n个 evaluate() 方法,而这些方法都是可能被hive作为函数入口调用的,我们可以简单认为就是evaluate的重载函数。所以,不需要向外暴露的方法,就不要命名为 evaluate了。上面实现了这么多,主要就是考虑外部可能传入的不同数据类型,做的适配工作。可以适当推测,hive是通过硬反射调用 udf 的。

      可以看到,具体的函数实现比较简单,因为我们的需求就在那儿。倒也不是想炫技什么的,主要是hive也不会支持你这种需求,所以简单也还得自己来。

    3. 编写udf单元测试

      这准确的说,是java基础知识,但这里的单元测试远比我们在hive进行函数测试来得容易,所以是有必要的。

    import org.junit.Assert;
    import org.junit.Test;
    
    import java.text.SimpleDateFormat;
    
    /**
     * 功能描述: 周函数单元测试
     *
     */
    public class WeekDiffUdfTest {
    
        @Test
        public void testEvaluate() throws Exception {
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
            WeekDiffUdf udf = new WeekDiffUdf();
            int weekOffset;
    
            // 单入参函数测试
            String day1 = "2020-07-18";
            weekOffset = udf.evaluate(format.parse(day1));
            Assert.assertTrue("__FUNC__(string)周偏移计算错误",
                    weekOffset <= -2);
            int weekOffset2 = udf.evaluate(day1);
            Assert.assertEquals("__FUNC__(string) != __FUNC__(date)",
                            weekOffset, weekOffset2);
    
            day1 = "2020-08-02";
            weekOffset = udf.evaluate(format.parse(day1));
            Assert.assertTrue("__FUNC__(string)周末边界偏移计算错误",
                    weekOffset <= 0);
    
            day1 = "2020-07-27";
            weekOffset = udf.evaluate(format.parse(day1));
            Assert.assertTrue("__FUNC__(string)周一边界偏移计算错误",
                    weekOffset <= 0);
    
    
            // 两个函数参数入参测试
            day1 = "2020-08-02";
            String day2 = "2020-07-25 10:09:01";
            weekOffset = udf.evaluate(day1, day2);
            Assert.assertEquals("__FUNC__(string, string)周偏移计算错误",
                                1, weekOffset);
    
            day1 = "2020-07-27";
            day2 = "2020-07-30 10:00:01";
            weekOffset = udf.evaluate(day1, day2);
            Assert.assertEquals("__FUNC__(string, string)周偏移计算错误",
                            0, weekOffset);
    
            day1 = "2020-07-27";
            day2 = "2020-08-02";
            weekOffset = udf.evaluate(format.parse(day1), format.parse(day2));
            Assert.assertEquals("__FUNC__(date, date)周一周末偏移计算错误",
                            0, weekOffset);
    
            day1 = "2019-12-30";
            day2 = "2020-01-02";
            weekOffset = udf.evaluate(day1, day2);
            Assert.assertEquals("__FUNC__(string, string)跨年周偏移计算错误",
                                0, weekOffset);
    
            day1 = "2019-12-20";
            day2 = "2020-01-01";
            weekOffset = udf.evaluate(day1, day2);
            Assert.assertEquals("__FUNC__(string, string)跨年周偏移计算错误",
                                -2, weekOffset);
            System.out.println("ok。offset:" + weekOffset);
        }
    }

      测试通过,核心功能无误,可以准备打包发布hive环境了。当然是打jar包了。

    4. 注册udf并测试

      将前面打好的包放到hive环境可触达的地方,运行加载命令!

    add jar /home/hadoop/WeekDiffUdf.jar

      运行hive测试用命:(即相当于将前面的单元测试,翻译成sql在hive中进行测试)

    # 创建临时函数,以便进行测试
    create temporary function week_diff as "com.y.udf.WeekDiffUdf";    
    select week_diff('2020-07-29') = 0 from default.dual;
    select week_diff('2020-07-20') = -1 from default.dual;
    select week_diff('2020-01-01', '2020-01-08 10:00:01') = -1 from default.dual;
    select week_diff('2020-01-01', '2019-12-30 10:00:01') = 1 from default.dual;
    select week_diff(to_date(from_unixtime(UNIX_TIMESTAMP('2020-07-28',"yyyy-MM-dd")))) = 0  from default.dual;
    select week_diff(to_date(from_unixtime(UNIX_TIMESTAMP('2020-07-28',"yyyy-MM-dd"))), current_date) = 0 from default.dual;
    # hive 外部会解析好字段值,再代入计算的
    select my_date,week_diff(my_date) from default.account_for_test;

      如上结果,你应该会得到n个true返回值,否则单测不通过。最后一个sql只是为了验证实际运行时被代入变量的情况,意义不大。

      运行单测完成后,功能就算完成了。我们可以正式发布了,进行永久注册!如下:

    CREATE FUNCTION week_diff AS 'com.y.udf.WeekDiffUdf' USING JAR 'hdfs://hadoop001:9000/lib/hadoop/WeekDiffUdf.jar';

      如此,一个自然周偏移函数udf 就完成了,你就可以像使用hive通用sql也一样去写业务了。

      可以通过 show functions; 查看已经注册了的函数列表。

      要删除已经注册的函数:

    drop temporary function week_diff;
    drop function week_diff;
  • 相关阅读:
    反序列化
    反序列化使用
    Serializer序列器
    DRF工程搭建
    JDK目录介绍
    Java环境变量配置
    Java语言的特性
    Java语言概述
    计算机编程语言介绍
    软件开发介绍
  • 原文地址:https://www.cnblogs.com/yougewe/p/13409127.html
Copyright © 2011-2022 走看看