1.什么是拉链表
拉链表是针对数据仓库设计中表存储数据的方式而定义的,顾名思义,所谓拉链,就是记录历史。记录一个事物从开始,一直到当前状态的所有变化的信息。
我们先看一个示例,这就是一张拉链表,存储的是汇率以及每条记录的生命周期。我们可以使用这张表拿到最新的当天的最新数据以及之前的历史数据。
我们首先介绍一下我们公司用到的汇率分区拉链表
每个公司的拉链表设计可能并不相同但是拉链表以记录生命周期的设计目的是不会改变的。
2.汇率拉链表转日连续流水表
进行对间断的时间序列补全,然后对null补全(这里的规则是取同类上一条数据的非空值)
3.汇率拉链表转日连续流水表
代码实现思路是
step1.使用utf生成连续的时间序列 left join exchangeRate拉链表
step2.使用开窗函数解决补空值问题
为了简单我们用下面这个表代替
1.udtf函数
public class GenDay extends GenericUDTF { private PrimitiveObjectInspector poi1; private PrimitiveObjectInspector poi2; @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { if (argOIs.getAllStructFieldRefs().size() != 2) { throw new UDFArgumentException("参数个数只能为2"); } //如果输入字段类型非String,则抛异常 ObjectInspector oi1 = argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector(); if (oi1.getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("参数非基本类型,需要基本类型"); } //如果输入字段类型非String,则抛异常 ObjectInspector oi2 = argOIs.getAllStructFieldRefs().get(1).getFieldObjectInspector(); if (oi2.getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("参数非基本类型,需要基本类型"); } //强转为基本类型对象检查器 poi1 = (PrimitiveObjectInspector) oi1; if (poi1.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) { throw new UDFArgumentException("参数1非string,需要基本类型string"); } poi2 = (PrimitiveObjectInspector) oi2; if (poi2.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) { throw new UDFArgumentException("参数1非string,需要基本类型string"); } //构造字段名,word List<String> fieldNames = new ArrayList<String>(); fieldNames.add("everyday"); //构造字段类型,string List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); //通过基本数据类型工厂获取java基本类型oi fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); //构造对象检查器 return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } @Override public void process(Object[] args) throws HiveException { Date dBegin=null; Date dEnd=null; //得到一行数据 String start = (String) poi1.getPrimitiveJavaObject(args[0]); String end = (String) poi2.getPrimitiveJavaObject(args[1]); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); try { dBegin = sdf.parse(start); dEnd = sdf.parse(end); } catch (ParseException e) { e.printStackTrace(); } assert dEnd != null; List<String> lDate=getDatesBetweenTwoDate(dBegin,dEnd); StringBuilder stringBuffer = new StringBuilder(); for (int i=0;i<lDate.size(); i += 1) { if (i!=0){ stringBuffer.append(" ").append(lDate.get(i)); }else { stringBuffer.append(lDate.get(i)); } } String s = stringBuffer.toString(); Object[] objs = new Object[1]; objs[0]= s; forward(objs); } @Override public void close() throws HiveException { } public List<String> getDatesBetweenTwoDate(Date beginDate, Date endDate) { List<String> lDate = new ArrayList<String>(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); lDate.add(sdf.format(beginDate)); Calendar cal = Calendar.getInstance(); // 使用给定的 Date 设置此 Calendar 的时间 cal.setTime(beginDate); while (true) { // 根据日历的规则,为给定的日历字段添加或减去指定的时间量 cal.add(Calendar.DAY_OF_MONTH, 1); // 测试此日期是否在指定日期之后 if (endDate.after(cal.getTime())) { lDate.add(sdf.format(cal.getTime())); } else { break; } } lDate.add(sdf.format(endDate));// 把结束时间加入集合 return lDate; } }
2.先用笛卡尔积找到所有的uid和连续完全的时间序列的组合,然后left join得到 时间连续但有空值的 序列。
select c.uid,c.everyday,d.event from (select a.uid,b.everyday from (select uid from group by big12.test) a join (select expode(split(everyday,' ')) as everyday select everyday from GenDay('2018-01-01','2018-12-31'))b --笛卡尔积 on 1=1) c left join test d on c.uid=d.uid and c.everyday=d.time;
像是这样:
3.1用上一条数据补充字段空值(我自己想的)
不过必须单节点 对于汇率来说,一般我的口径里只用到3-5个汇率,这样最多1500条。数据量不大。有风险(自己玩吧别去生产)
package udf; import org.apache.hadoop.hive.ql.exec.UDF; public class GetNotNull extends UDF { private static String lrkey = null; private static String lrvalue = null; public String evaluate(String key, String value) { if (key.equals(lrkey)) { if (value.isEmpty()) { value = lrvalue; }else{ lrvalue=value; } } else { lrkey = key; lrvalue = value; } return value; } }
使用静态类保存上一条非空值。
3.2用上一条数据补充字段空值
drop table if exists big12.test; create table big12.test( uid int, time string, event string )comment '' row format delimited fields terminated by '