zoukankan      html  css  js  c++  java
  • hive 汇率拉链表转日连续流水表

    1.什么是拉链表

    拉链表是针对数据仓库设计中表存储数据的方式而定义的,顾名思义,所谓拉链,就是记录历史。记录一个事物从开始,一直到当前状态的所有变化的信息。

    我们先看一个示例,这就是一张拉链表,存储的是汇率以及每条记录的生命周期。我们可以使用这张表拿到最新的当天的最新数据以及之前的历史数据。
    我们首先介绍一下我们公司用到的汇率分区拉链表

    每个公司的拉链表设计可能并不相同但是拉链表以记录生命周期的设计目的是不会改变的。

     2.汇率拉链表转日连续流水表

    进行对间断的时间序列补全,然后对null补全(这里的规则是取同类上一条数据的非空值)

     3.汇率拉链表转日连续流水表

    代码实现思路是

    step1.使用utf生成连续的时间序列 left join exchangeRate拉链表

    step2.使用开窗函数解决补空值问题

    为了简单我们用下面这个表代替

     

    1.udtf函数

    public class GenDay extends GenericUDTF {
        private PrimitiveObjectInspector poi1;
        private PrimitiveObjectInspector poi2;
        @Override
        public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
            if (argOIs.getAllStructFieldRefs().size() != 2) {
                throw new UDFArgumentException("参数个数只能为2");
            }
            //如果输入字段类型非String,则抛异常
            ObjectInspector oi1 = argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector();
            if (oi1.getCategory() != ObjectInspector.Category.PRIMITIVE) {
                throw new UDFArgumentException("参数非基本类型,需要基本类型");
            }
            //如果输入字段类型非String,则抛异常
            ObjectInspector oi2 = argOIs.getAllStructFieldRefs().get(1).getFieldObjectInspector();
            if (oi2.getCategory() != ObjectInspector.Category.PRIMITIVE) {
                throw new UDFArgumentException("参数非基本类型,需要基本类型");
            }
            //强转为基本类型对象检查器
            poi1 = (PrimitiveObjectInspector) oi1;
            if (poi1.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
                throw new UDFArgumentException("参数1非string,需要基本类型string");
            }
            poi2 = (PrimitiveObjectInspector) oi2;
            if (poi2.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
                throw new UDFArgumentException("参数1非string,需要基本类型string");
            }
    
            //构造字段名,word
            List<String> fieldNames = new ArrayList<String>();
            fieldNames.add("everyday");
    
    
            //构造字段类型,string
            List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
            //通过基本数据类型工厂获取java基本类型oi
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
    
            //构造对象检查器
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
                    fieldOIs);
    
        }
    
        @Override
        public void process(Object[] args) throws HiveException {
            Date dBegin=null;
            Date dEnd=null;
    
    
            //得到一行数据
            String start = (String) poi1.getPrimitiveJavaObject(args[0]);
            String end  = (String) poi2.getPrimitiveJavaObject(args[1]);
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            try {
                 dBegin = sdf.parse(start);
                 dEnd = sdf.parse(end);
            } catch (ParseException e) {
                e.printStackTrace();
            }
    
            assert dEnd != null;
            List<String> lDate=getDatesBetweenTwoDate(dBegin,dEnd);
            StringBuilder stringBuffer = new StringBuilder();
            for (int i=0;i<lDate.size(); i += 1) {
              if (i!=0){
                  stringBuffer.append(" ").append(lDate.get(i));
              }else {
                  stringBuffer.append(lDate.get(i));
    
              }
    
            }
            String s = stringBuffer.toString();
            Object[] objs = new Object[1];
            objs[0]= s;
            forward(objs);
            
        }
    
        @Override
        public void close() throws HiveException {
    
        }
    
        public  List<String> getDatesBetweenTwoDate(Date beginDate, Date endDate) {
            List<String> lDate = new ArrayList<String>();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
    
            lDate.add(sdf.format(beginDate));
            Calendar cal = Calendar.getInstance();
            // 使用给定的 Date 设置此 Calendar 的时间
            cal.setTime(beginDate);
            while (true) {
                // 根据日历的规则,为给定的日历字段添加或减去指定的时间量
                cal.add(Calendar.DAY_OF_MONTH, 1);
                // 测试此日期是否在指定日期之后
                if (endDate.after(cal.getTime())) {
                    lDate.add(sdf.format(cal.getTime()));
                } else {
                    break;
                }
            }
            lDate.add(sdf.format(endDate));// 把结束时间加入集合
            return lDate;
        }
    
    }

    2.先用笛卡尔积找到所有的uid和连续完全的时间序列的组合,然后left join得到 时间连续但有空值的 序列。

    select c.uid,c.everyday,d.event
    from
    (select a.uid,b.everyday from 
    (select uid from group by big12.test) a 
    join  (select expode(split(everyday,' ')) as everyday select everyday from GenDay('2018-01-01','2018-12-31'))b
    --笛卡尔积
    on 1=1) c
    left join test d
    on c.uid=d.uid and c.everyday=d.time;

    像是这样:

    3.1用上一条数据补充字段空值(我自己想的)

    不过必须单节点 对于汇率来说,一般我的口径里只用到3-5个汇率,这样最多1500条。数据量不大。有风险(自己玩吧别去生产)

    package udf;
    
    import org.apache.hadoop.hive.ql.exec.UDF;
    
    public class GetNotNull extends UDF {
    
        private static String lrkey = null;
        private static String lrvalue = null;
    
    
        public String evaluate(String key, String value) {
            if (key.equals(lrkey)) {
                if (value.isEmpty()) {
                    value = lrvalue;
                }else{
                    lrvalue=value;
                }
            } else {
                lrkey = key;
                lrvalue = value;
            }
            return value;
        }
    }

    使用静态类保存上一条非空值。

    3.2用上一条数据补充字段空值

    drop table if exists big12.test;
    create table big12.test( 
    uid int,
    time string,
    event string
    )comment ''
    row format delimited
    fields terminated by '31'
    stored as textfile
    ;
    
    insert into big12.test values(1,'2018-12-02 11:00:29','1');
    insert into big12.test values(1,'2018-12-02 11:00:30','');
    insert into big12.test values(1,'2018-12-02 11:00:31','2');
    insert into big12.test values(1,'2018-12-02 11:00:32','');
    insert into big12.test values(1,'2018-12-02 11:00:33','');
    insert into big12.test values(2,'2018-12-02 11:00:40','3');
    insert into big12.test values(2,'2018-12-02 11:00:41','');
    insert into big12.test values(2,'2018-12-02 11:00:42','4');
    insert into big12.test values(2,'2018-12-02 11:00:44','');
    
    
    use big12;
    select
          t1.uid,
          t1.time,
          t2.event
    from 
    (
        select
              uid,
              time,
              event,
              row,
              all_row
          from 
          (
          select
          uid,
          time,
          event,
          row_number()over(partition by case when event is not null and trim(event)<>'' then 1 else 0 end order by time asc) as row,
          row_number()over( order by time asc) as all_row
          from test
          )t 
          where event is  null or trim(event)=''
    )t1 
    left join 
    (
        select
              uid,
              time,
              event,
              row,
              all_row
        from 
         (
          select
          uid,
          time,
          event,
          row_number()over(partition by case when event is not null and trim(event)<>'' then 1 else 0 end order by time asc) as row,
          row_number()over( order by time asc) as all_row
          from test
          )t 
         where event is not  null and trim(event)<>''
    )t2 
    on  t1.all_row-t1.row=t2.row
    union all
    select
    uid,
    time,
    event
    from test
    where event is not  null and trim(event)<>'';

     

  • 相关阅读:
    [jQuery学习系列六]6-jQuery实际操作小案例
    [Java拾遗一] XML的书写规范与解析.
    [数据库操作]Java中的JDBC的使用方法.
    [Java拾遗二]Tomact及Http 部分总结.
    [Java拾遗三]JavaWeb基础之Servlet
    [Java拾遗四]JavaWeb基础之Servlet_Request&&Response
    [Java拾遗五]使用Session防止表单重复提交
    [数据库连接池] Java数据库连接池--DBCP浅析.
    [开发工具]Java开发常用的在线工具
    [数据库连接池二]Java数据库连接池--C3P0和JDNI.
  • 原文地址:https://www.cnblogs.com/wqbin/p/10549859.html
Copyright © 2011-2022 走看看