1 Presto介绍
Presto 是 Facebook 开源的分布式查询引擎,在交互式查询任务中担当着重要的职责。随着越来越多的人开始使用 SQL 在 Presto 上分析数据,我们发现需要将一些业务逻辑开发成类似 Hive 中的 UDF,提高 SQL 使用人员的效率,同时也保证 Hive 和 Presto 环境中的 UDF 统一。
1.1 Presto函数介绍
在 Presto 中,函数大体分为三种:scalar,aggregation 和 window 类型。分别如下:
1)scalar标量函数,简单来说就是 Java 中的一个静态方法,本身没有任何状态。
2)aggregation累积状态的函数,或聚集函数,如count,avg。如果只是单节点,单机状态可以直接用一个变量存储即可,但是presto是分布式计算引擎,状态数据会在多个节点之间传输,因此状态数据需要被序列化成 Presto 的内部格式才可以被传输。
3)window 窗口函数,如同sparkSQL中的窗口函数类似
2 自定义函数实现
官网地址:https://prestodb.github.io/docs/current/develop/functions.html
2.1自定义Scalar函数的实现
2.1.1定义一个java类
1)用 @ScalarFunction 的 Annotation 标记实现业务逻辑的静态方法。
2)用 @Description 描述函数的作用,这里的内容会在 SHOW FUNCTIONS 中显示。
3)用@SqlType 标记函数的返回值类型,如返回字符串,因此是 StandardTypes.VARCHAR。
4)Java 方法的返回值必须使用 Presto 内部的序列化方式,因此字符串类型必须返回 Slice, 使用 Slices.utf8Slice 方法可以方便的将 String 类型转换成 Slice 类型
示例代码:
1 public class LiulishuoFunctions { 2 3 public static final String DATE_FORMAT = "yyyy-MM-dd"; 4 5 @ScalarFunction 6 @Description("hive to_date function") 7 @SqlType(StandardTypes.VARCHAR) 8 public static Slice to_date(@SqlType(StandardTypes.TIMESTAMP) long input) { 9 final DateFormat format = new SimpleDateFormat(DATE_FORMAT); 10 return Slices.utf8Slice(format.format(new Date(input))); 11 } 12 }
2.1.2 Presto插件机制
presto不能像hive那样配置自定义的udf,要采用这种插件机制实现。Presto 的插件(Plugin)机制,是 Presto 能够整合多种数据源的核心。通过实现不同的 Plugin,Presto 允许用户在不同类型的数据源之间进行 JOIN 等计算。Presto 内部的所有数据源都是通过插件机制实现, 例如 MySQL、Hive、HBase等。Presto 插件机制不仅通过加载 Connector 来实现不同数据源的访问,还通过加载 FunctionFactory 来实现 UDF 的加载。 Presto 的 Plugin 遵循 Java 中的 ServiceLoader 规范, 实现非常简单。
实现一个plugin接口如:
1 import com.facebook.presto.spi.Plugin; 2 3 import com.google.common.collect.ImmutableSet; 4 6 7 import java.util.Set; 8 9 public class PrestoFunctionsPlugin implements Plugin { 10 @Override 11 public Set<Class<?>> getFunctions() { 12 return ImmutableSet.<Class<?>>builder() 13 .add(PvFlowStatsAggregation.class) 14 .add(AvgAggregationDemo.class) 15 .build(); 16 } 17 18 }
最后打包上传到指定的presto的plugin目录下,需要重启presto才能将jar中的自定义函数加载进去
2.2 自定义Aggregation函数
2.2.1实现原理步骤
Presto 把 Aggregation 函数分解成三个步骤执行:
1、input(state, data): 针对每条数据,执行 input 函数。这个过程是并行执行的,因此在每个有数据的节点都会执行,最终得到多个累积的状态数据。
2、combine(state1, state2):将所有节点的状态数据聚合起来,多次执行,直至所有状态数据被聚合成一个最终状态,也就是 Aggregation 函数的输出结果。
3、output(final_state, out):最终输出结果到一个 BlockBuilder。
2.2.2 具体代码实现过程
1、定义一个 Java 类,使用 @AggregationFunction 标记为 Aggregation 函数
2、使用 @InputFunction、 @CombineFunction、@OutputFunction 分别标记计算函数、合并结果函数和最终输出函数在 Plugin 处注册 Aggregation 函数
3、一个继承AccumulatorState的State接口,get和set方法
4、并使用 @AccumulatorStateMetadata 提供序列化(stateSerializerClass指定)和 Factory 类信息(stateFactoryClass指定)。自己写一个序列化类和一个工厂类。
核心代码示例:
1 @AggregationFunction("pv_stats") 2 public class PvFlowStatsAggregation { 3 private PvFlowStatsAggregation() {} 4 @AccumulatorStateMetadata(stateSerializerClass = PvFlowStatsStateSerializer.class, stateFactoryClass = PvFlowStatsFactory.class) 5 6 public interface State extends AccumulatorState { 7 PvFlowStats get(); 8 void set(PvFlowStats value); 9 } 10 11 @InputFunction 12 public static void input(@AggregationState State state, @SqlType(StandardTypes.BIGINT) long id, 13 14 @SqlType(StandardTypes.VARBINARY) Slice serialisedTree, @SqlType(StandardTypes.VARCHAR) Slice pvOrder) { 15 handleDataInput(state, id, serialisedTree, pvOrder, null); 16 } 17 18 @InputFunction 19 20 public static void input(@AggregationState State state, @SqlType(StandardTypes.BIGINT) long id, 21 22 @SqlType(StandardTypes.VARBINARY) Slice serialisedTree, @SqlType(StandardTypes.VARCHAR) Slice pvOrder, 23 @SqlType(StandardTypes.VARCHAR) Slice endUrl) { 24 handleDataInput(state, id, serialisedTree, pvOrder, endUrl); 25 } 26 27 private static void handleDataInput(State state, long id, Slice serialisedTree, Slice pvOrder, Slice endUrl) { 28 29 PvFlowStats stats = state.get(); 30 if (stats == null) { 31 stats = new PvFlowStats(); 32 state.set(stats); 33 } 34 ...... 35 } 36 @CombineFunction 37 public static void combine(@AggregationState State state, State other) { 38 39 PvFlowStats input = other.get(); 40 PvFlowStats previous = state.get(); 41 if (previous == null) { 42 state.set(input); 43 } else { 44 previous.mergeWith(input); 45 } 46 } 47 @OutputFunction(StandardTypes.VARCHAR) 48 public static void output(@AggregationState State state, BlockBuilder out) { 49 PvFlowStats stats = state.get(); 50 if (stats == null) { 51 out.appendNull(); 52 return; 53 } 54 // 统计 55 Slice result = stats.statisticNextPage(); 56 if (result == null) { 57 out.appendNull(); 58 } else { 59 VarcharType.VARCHAR.writeSlice(out, result); 60 } 61 } 62 }
2.2.3 复杂数据类型(list,map或自定义的类)
对于复杂的类型,需要自定义序列化类和工厂类,需要自己实现类的序列化和反序列化。
下面是示例:
主类:
1 /*** ** id | value* ** ----+-------* ** 2 | ddd* ** 2 | ddd* ** 1 | bbb* ** 1 | bbb* ** 1 | ccc* ** 1 | aaa* ** 1 | bbb* ** 2 | aaa* ** 2 | ccc* ** 1 | ccc* *** *返回* ** [{id:1,{aaa:1,ccc:2,bbb:3},{id:2,{aaa:1,ccc:1,ddd:2}]* **/*@AggregationFunction("presto_collect") 2 public class CollectListAggregation { 3 4 @AccumulatorStateMetadata(stateSerializerClass = CollectListStatsSerializer.class, stateFactoryClass = CollectListStatsFactory.class) 5 public interface CollectState extends AccumulatorState { 6 CollectListStats get(); 7 void set(CollectListStats value); 8 } 9 @InputFunction 10 public static void input(@AggregationState CollectState state, @SqlType(StandardTypes.*VARCHAR*) Slice id,@SqlType(StandardTypes.*VARCHAR*) Slice key) { 11 try { 12 CollectListStats stats = state.get(); 13 if (stats == null) { 14 stats = new CollectListStats(); 15 state.set(stats); 16 } 17 int inputId = Integer.*parseInt*(id.toStringUtf8()); 18 String inputKey = key.toStringUtf8(); 19 stats.addCollectList(inputId,inputKey, 1); 20 } catch (Exception e) { 21 throw new RuntimeException(e+" --------- input err"); 22 } 23 } 24 25 @CombineFunction 26 public static void combine(@AggregationState CollectState state, CollectState otherState) { 27 try { 28 CollectListStats collectListStats = state.get(); 29 CollectListStats oCollectListStats = otherState.get(); 30 if(collectListStats == null) { 31 state.set(oCollectListStats); 32 } else { 33 collectListStats.mergeWith(oCollectListStats); 34 } 35 }catch (Exception e) { 36 throw new RuntimeException(e+" --------- combine err"); 37 } 38 } 39 40 @OutputFunction(StandardTypes.*VARCHAR*) 41 public static void output(@AggregationState CollectState state, BlockBuilder out) { 42 try { 43 CollectListStats stats = state.get(); 44 if (stats == null) { 45 out.appendNull(); 46 return; 47 } 48 // 统计 49 Slice result = stats.getCollectResult(); 50 if (result == null) { 51 out.appendNull(); 52 } else { 53 VarcharType.*VARCHAR*.writeSlice(out, result); 54 } 55 } catch (Exception e) { 56 throw new RuntimeException(e+" -------- output err"); 57 } 58 } 59 }
主类实现的比较简单,input,combine,output即可
存放数据的类:此类需要实现数据的序列化和反序列化,这是最关键和比较麻烦的地方,贴一个例子,关键在于需要自己控制存储空间以及数据的顺序,和读取的时候按照一定顺序读取。对于字符要先存储长度,然后是字节,读取则先读取字符长度,然后读取这么长的数据,最后转化为字符
1 public class CollectListStats { 2 private static final int *INSTANCE_SIZE* = ClassLayout.*parseClass*(CollectListStats.class).instanceSize(); 3 //<id,<key,value>> 4 private Map<Integer,Map<String,Integer>> collectContainer = new HashMap<>(); 5 private long contentEstimatedSize = 0; 6 private int keyByteLen = 0; 7 private int keyListLen = 0; 8 CollectListStats() { 9 } 10 CollectListStats(Slice serialized) { 11 deserialize(serialized); 12 } 13 void addCollectList(Integer id, String key, int value) { 14 if (collectContainer.containsKey(id)) { 15 Map<String, Integer> tmpMap = collectContainer.get(id); 16 if (tmpMap.containsKey(key)) { 17 tmpMap.put(key, tmpMap.get(key)+value); 18 }else{ 19 tmpMap.put(key,value); 20 contentEstimatedSize += ( key.getBytes().length + SizeOf.*SIZE_OF_INT*); 21 keyByteLen += key.getBytes().length; 22 keyListLen++; 23 } 24 } else { 25 Map<String,Integer> tmpMap = new HashMap<String,Integer>(); 26 tmpMap.put(key, value); 27 keyByteLen += key.getBytes().length; 28 keyListLen++; 29 collectContainer.put(id, tmpMap); 30 contentEstimatedSize += SizeOf.*SIZE_OF_INT*; 31 } 32 } 33 //[{id:1,{"aaa":3,"fadf":6},{}] 34 Slice getCollectResult() { 35 Slice jsonSlice = null; 36 try { 37 StringBuilder jsonStr = new StringBuilder(); 38 jsonStr.append("["); 39 int collectLength = collectContainer.entrySet().size(); 40 for (Map.Entry<Integer, Map<String, Integer>> mapEntry : collectContainer.entrySet()) { 41 Integer id = mapEntry.getKey(); 42 Map<String, Integer> vMap = mapEntry.getValue(); 43 jsonStr.append("{id:").append(id).append(",{"); 44 int vLength = vMap.entrySet().size(); 45 for (Map.Entry<String, Integer> vEntry : vMap.entrySet()) { 46 String key = vEntry.getKey(); 47 Integer value = vEntry.getValue(); 48 jsonStr.append(key).append(":").append(value); 49 vLength--; 50 if (vLength != 0) { 51 jsonStr.append(","); 52 } 53 } 54 jsonStr.append("}"); 55 collectLength--; 56 if (collectLength != 0) { 57 jsonStr.append(","); 58 } 59 } 60 jsonStr.append("]"); 61 jsonSlice = Slices.*utf8Slice*(jsonStr.toString()); 62 } catch (Exception e) { 63 throw new RuntimeException(e+" ---------- get CollectResult err"); 64 } 65 return jsonSlice; 66 } 67 public void deserialize(Slice serialized) { 68 try { 69 SliceInput input = serialized.getInput(); 70 //外层map的长度 71 int collectStatsEntrySize = input.readInt(); 72 for (int collectCnt = 0; collectCnt < collectStatsEntrySize; collectCnt++) { 73 74 int id = input.readInt(); 75 int keyEntrySize = input.readInt(); 76 for (int idCnt = 0; idCnt < keyEntrySize; idCnt++) { 77 int keyBytesLen = input.readInt(); 78 byte[] keyBytes = new byte[keyBytesLen]; 79 for (int byteIdx = 0; byteIdx < keyBytesLen; byteIdx++) { 80 keyBytes[byteIdx] = input.readByte(); 81 } 82 String key = new String(keyBytes); 83 int value = input.readInt(); 84 addCollectList(id, key, value); 85 } 86 } 87 } catch (Exception e) { 88 throw new RuntimeException(e+" ----- deserialize err"); 89 } 90 } 91 92 public Slice serialize() { 93 SliceOutput builder = null; 94 int requiredBytes = //对应 SliceOutput builder append的内容所占用的空间 95 SizeOf.*SIZE_OF_INT* * 3 //id entry数目,id数值,key Entry数目 96 + keyListLen * SizeOf.*SIZE_OF_INT* //key bytes长度 97 + keyByteLen //key byte总长度 98 + keyListLen * SizeOf.*SIZE_OF_INT*; //value 99 try { 100 // 序列化 101 builder = Slices.*allocate*(requiredBytes).getOutput(); 102 for (Map.Entry<Integer,Map<String, Integer>> entry : collectContainer.entrySet()) { 103 //id个数 104 builder.appendInt(collectContainer.entrySet().size()); 105 //id 数值 106 builder.appendInt(entry.getKey()); 107 Map<String, Integer> kMap = entry.getValue(); 108 builder.appendInt(kMap.entrySet().size()); 109 for (Map.Entry<String, Integer> vEntry : kMap.entrySet()) { 110 byte[] keyBytes = vEntry.getKey().getBytes(); 111 builder.appendInt(keyBytes.length); 112 builder.appendBytes(keyBytes); 113 builder.appendInt(vEntry.getValue()); 114 } 115 } 116 return builder.getUnderlyingSlice(); 117 } catch (Exception e) { 118 throw new RuntimeException(e+" ---- serialize err requiredBytes = " + requiredBytes + " keyByteLen= " + keyByteLen + " keyListLen = " + keyListLen); 119 } 120 } 121 long estimatedInMemorySize() { 122 return *INSTANCE_SIZE* + contentEstimatedSize; 123 } 124 void mergeWith(CollectListStats other) { 125 if (other == null) { 126 return; 127 } 128 for (Map.Entry<Integer,Map<String, Integer>> cEntry : other.collectContainer.entrySet()) { 129 Integer id = cEntry.getKey(); 130 Map<String, Integer> kMap = cEntry.getValue(); 131 for (Map.Entry<String, Integer> kEntry : kMap.entrySet()) { 132 addCollectList(id, kEntry.getKey(), kEntry.getValue()); 133 } 134 } 135 } 136 }
序列化类:
1 public class CollectListStatsSerializer implements AccumulatorStateSerializer<CollectListAggregation.CollectState> { 2 @Override 3 public Type getSerializedType() { 4 return *VARBINARY*; 5 } 6 @Override 7 public void serialize(CollectListAggregation.CollectState state, BlockBuilder out) { 8 if (state.get() == null) { 9 out.appendNull(); 10 } else { 11 *VARBINARY*.writeSlice(out, state.get().serialize()); 12 } 13 } 14 @Override 15 public void deserialize(Block block, int index, CollectListAggregation.CollectState state) { 16 state.set(new CollectListStats(*VARBINARY*.getSlice(block, index))); 17 } 18 }
工厂类:
1 /*** **/*public class CollectListStatsFactory implements AccumulatorStateFactory<CollectListAggregation.CollectState> { 2 @Override 3 public CollectListAggregation.CollectState createSingleState() { 4 return new SingleState(); 5 } 6 @Override 7 public Class<? extends CollectListAggregation.CollectState> getSingleStateClass() { 8 return SingleState.class; 9 } 10 @Override 11 public CollectListAggregation.CollectState createGroupedState() { 12 return new GroupState(); 13 } 14 @Override 15 public Class<? extends CollectListAggregation.CollectState> getGroupedStateClass() { 16 return GroupState.class; 17 } 18 public static class GroupState implements GroupedAccumulatorState, CollectListAggregation.CollectState { 19 private final ObjectBigArray<CollectListStats> collectStatsList = new ObjectBigArray<>(); 20 private long size; 21 private long groupId; 22 @Override 23 public void setGroupId(long groupId) { 24 this.groupId = groupId; 25 } 26 @Override 27 public void ensureCapacity(long size) { 28 collectStatsList.ensureCapacity(size); 29 } 30 @Override 31 public CollectListStats get() { 32 return collectStatsList.get(groupId); 33 } 34 @Override 35 public void set(CollectListStats value) { 36 CollectListStats previous = get(); 37 if (previous != null) { 38 size -= previous.estimatedInMemorySize(); 39 } 40 collectStatsList.set(groupId, value); 41 size += value.estimatedInMemorySize(); 42 } 43 @Override 44 public long getEstimatedSize() { 45 return size + collectStatsList.sizeOf(); 46 } 47 } 48 public static class SingleState implements CollectListAggregation.CollectState{ 49 private CollectListStats stats; 50 @Override 51 public CollectListStats get() { 52 return stats; 53 } 54 @Override 55 public void set(CollectListStats value) { 56 stats = value; 57 } 58 @Override 59 public long getEstimatedSize() { 60 if (stats == null) { 61 return 0; 62 } 63 return stats.estimatedInMemorySize(); 64 } 65 } 66 }
2.2.4 采用Slice可以有效提高性能
使用Slice进行内存操作,Slice使用Unsafe#copyMemory实现了高效的内存拷贝
不过使用Slice就需要手动控制存储的数据,记录数据的容量,长度,扩容等等。
1 public class RouteUserAggregationBase { 2 3 //...... 其他定义的静态变量 4 /** 5 * Slice State 6 * 中间数据 Buffer 7 */ 8 public interface SliceState extends AccumulatorState { 9 Slice getSlice(); 10 11 void setSlice(Slice slice); 12 } 13 } 14 15 @AggregationFunction("函数名") 16 public class RouteUserGroupAggregation extends RouteUserAggregationBase { 17 18 /** 缓存 Buffer Body 的初始字节容量 **/ 19 private static final int STORED_DATA_BODY_INIT_BYTE_SIZE = 64; 20 21 /** 缓存 Buffer 头部元信息定义 **/ 22 private static int VALUES_OFFSET_HEADER_BYTE_LEN = 0; 23 private static int VALUES_OFFSET_BODY_BYTE_SIZE = 4; 24 private static int VALUES_OFFSET_BODY_BYTE_USED = 8; 25 26 private static int VALUES_OFFSET_CONTAIN_TARGET_EVENT = 12; 27 28 private static int VALUES_OFFSET_TARGET_EVENT_TYPE = 13; 29 private static int VALUES_OFFSET_ROUTE_INTERVAL = 17; 30 private static int VALUES_OFFSET_TARGET_EVENT_LEN = 21; 31 private static int VALUES_OFFSET_TARGET_EVENT_BYTES = 25; 32 33 @InputFunction 34 public static void input(SliceState state, 35 //目标事件 36 @SqlType(StandardTypes.VARCHAR) Slice targetEvent, 37 //目标事件类型 38 @SqlType(StandardTypes.BIGINT) long targetType, 39 //事件间隔 40 @SqlType(StandardTypes.BIGINT) long eventInterval, 41 //当前事件名 42 @SqlType(StandardTypes.VARCHAR) Slice currEvent, 43 //当前事件时间 44 @SqlType(StandardTypes.BIGINT) long eventTime) { 45 46 handleInput(state, targetEvent, (int) targetType, (int) eventInterval, currEvent, (int) eventTime, null, null); 47 } 48 49 private static void handleInput(SliceState state, Slice targetEvent, int targetType, int eventInterval, Slice currEvent, int eventTime, Slice groupByEvent, Slice groupByProp) { 50 // 获取缓存的数据 51 Slice storedData = state.getSlice(); 52 53 // 初始化缓存的元信息 不会变化的值,如:目标事件,目标类型,时间间隔 54 if (storedData == null) { 55 /* 56 Header byte大小 57 Body 总字节大小 58 Body 已使用字节大小 59 是否包含目标事件 60 目标事件类型 61 事件时间间隔 62 */ 63 int headerByteLen = SizeOf.SIZE_OF_INT 64 + SizeOf.SIZE_OF_INT 65 + SizeOf.SIZE_OF_INT 66 + SizeOf.SIZE_OF_BYTE 67 + SizeOf.SIZE_OF_INT 68 + SizeOf.SIZE_OF_INT 69 ; 70 int targetLength = SizeOf.SIZE_OF_INT + targetEvent.length(); 71 headerByteLen += targetLength; 72 73 storedData = Slices.allocate(headerByteLen + STORED_DATA_BODY_INIT_BYTE_SIZE); 74 storedData.setInt(VALUES_OFFSET_HEADER_BYTE_LEN, headerByteLen); 75 storedData.setInt(VALUES_OFFSET_BODY_BYTE_SIZE, STORED_DATA_BODY_INIT_BYTE_SIZE); 76 storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, 0); 77 //是否包含目标事件 78 storedData.setByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT, 0); 79 //缓存 不变的参数 80 storedData.setInt(VALUES_OFFSET_TARGET_EVENT_TYPE, targetType); 81 storedData.setInt(VALUES_OFFSET_ROUTE_INTERVAL, eventInterval); 82 83 storedData.setInt(VALUES_OFFSET_TARGET_EVENT_LEN, targetEvent.length()); 84 storedData.setBytes(VALUES_OFFSET_TARGET_EVENT_BYTES, targetEvent); 85 } 86 87 int headerByteLen = storedData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN); 88 int bodyByteSize = storedData.getInt(VALUES_OFFSET_BODY_BYTE_SIZE); 89 int bodyByteUsed = storedData.getInt(VALUES_OFFSET_BODY_BYTE_USED); 90 91 // 标记包含目标事件 92 if (currEvent.toStringUtf8().equals(targetEvent.toStringUtf8())) { 93 storedData.setByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT, 1); 94 } 95 //直接判断,如果存在分组,判断当前事件就是分组事件,那么直接将分组值和事件拼接在一起 96 if (groupByEvent != null && groupByEvent.toStringUtf8().equals(currEvent.toStringUtf8())) { 97 String newEventKey = currEvent.toStringUtf8() + EVENT_CONCAT_GROUP_VALUE + groupByProp.toStringUtf8(); 98 currEvent = Slices.utf8Slice(newEventKey); 99 } 100 101 //扩展的长度,eventTime int , current length的int bytes内容 102 int entryByteLen = SizeOf.SIZE_OF_INT * 2 + currEvent.length(); 103 if (bodyByteUsed + entryByteLen > bodyByteSize) { 104 // 扩容 byteSize * 2 105 int newBodyByteSize = bodyByteSize * 2; 106 Slice newStoredData = Slices.allocate(headerByteLen + newBodyByteSize); 107 //将storeData的数据copy到new的Slice中,然后重新设置容量 108 newStoredData.setBytes(0, storedData.getBytes()); 109 newStoredData.setInt(VALUES_OFFSET_BODY_BYTE_SIZE, newBodyByteSize); 110 storedData = newStoredData; 111 } 112 //写入位置的定位 113 int writePos = headerByteLen + bodyByteUsed; 114 storedData.setInt(writePos, entryByteLen); 115 writePos += SizeOf.SIZE_OF_INT; 116 storedData.setInt(writePos, eventTime); 117 writePos += SizeOf.SIZE_OF_INT; 118 storedData.setBytes(writePos, currEvent); 119 storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, bodyByteUsed + entryByteLen); 120 // 更新缓存的数据 121 state.setSlice(storedData); 122 } 123 124 125 @CombineFunction 126 public static void combine(SliceState state, SliceState other) { 127 // 获取缓存的数据 128 Slice storedData = state.getSlice(); 129 Slice otherStoredData = other.getSlice(); 130 131 // 合并缓存 132 if (storedData == null) { 133 state.setSlice(otherStoredData); 134 } else { 135 int headerByteLen = storedData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN); 136 int bodyByteSize = storedData.getInt(VALUES_OFFSET_BODY_BYTE_SIZE); 137 int bodyByteUsed = storedData.getInt(VALUES_OFFSET_BODY_BYTE_USED); 138 int otherHeaderByteLen = otherStoredData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN); 139 int otherBodyByteSize = otherStoredData.getInt(VALUES_OFFSET_BODY_BYTE_SIZE); 140 int otherBodyByteUsed = otherStoredData.getInt(VALUES_OFFSET_BODY_BYTE_USED); 141 byte containTargetEvent = 0; 142 if (storedData.getByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT) == 1 || otherStoredData.getByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT) == 1) { 143 containTargetEvent = 1; 144 } 145 Slice finalStoredData; 146 int finalBodyByteUsed = bodyByteUsed + otherBodyByteUsed; 147 if (bodyByteSize >= finalBodyByteUsed) { 148 // 左容量足够 这里只copy header之外的数据,就是当前事件和time 149 storedData.setBytes(headerByteLen + bodyByteUsed, otherStoredData, otherHeaderByteLen, otherBodyByteUsed); 150 storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, finalBodyByteUsed); 151 finalStoredData = storedData; 152 } else if (otherBodyByteSize >= finalBodyByteUsed) { 153 // 右容量足够 154 otherStoredData.setBytes(otherHeaderByteLen + otherBodyByteUsed, storedData, headerByteLen, bodyByteUsed); 155 otherStoredData.setInt(VALUES_OFFSET_BODY_BYTE_USED, finalBodyByteUsed); 156 finalStoredData = otherStoredData; 157 } else { 158 // 扩容 159 int newBodyByteSize = bodyByteSize; 160 while (newBodyByteSize < finalBodyByteUsed) { 161 newBodyByteSize *= 2; 162 } 163 Slice newStoredData = Slices.allocate(headerByteLen + newBodyByteSize); 164 newStoredData.setBytes(VALUES_OFFSET_HEADER_BYTE_LEN, storedData.getBytes()); 165 newStoredData.setInt(VALUES_OFFSET_BODY_BYTE_SIZE, newBodyByteSize); 166 storedData = newStoredData; 167 168 storedData.setBytes(headerByteLen + bodyByteUsed, otherStoredData, otherHeaderByteLen, otherBodyByteUsed); 169 storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, finalBodyByteUsed); 170 finalStoredData = storedData; 171 } 172 // 是否包含目标事件 173 finalStoredData.setByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT, containTargetEvent); 174 state.setSlice(finalStoredData); 175 } 176 } 177 178 @OutputFunction(StandardTypes.VARCHAR) 179 public static void output(@AggregationState SliceState state, BlockBuilder out) { 180 // 获取缓存数据 181 Slice storedData = state.getSlice(); 182 183 // 数据为空,或者没有起始事件 184 if ((storedData == null) || (storedData.getByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT) == 0)) { 185 out.appendNull(); 186 return; 187 } 188 //匹配 189 Slice makeRoute = makeRoute(storedData); 190 if (makeRoute == null) { 191 out.appendNull(); 192 } else { 193 VarcharType.VARCHAR.writeSlice(out, makeRoute); 194 } 195 } 196 197 private static Slice makeRoute(Slice storedData) { 198 // 获取 Header 信息 199 int interval = storedData.getInt(VALUES_OFFSET_ROUTE_INTERVAL); 200 int targetType = storedData.getInt(VALUES_OFFSET_TARGET_EVENT_TYPE); 201 int targetLength = storedData.getInt(VALUES_OFFSET_TARGET_EVENT_LEN); 202 String targetEvent = new String(storedData.getBytes(VALUES_OFFSET_TARGET_EVENT_BYTES, targetLength)); 203 List<Slice> timeEventSeries = new ArrayList<>(); 204 int headerByteLen = storedData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN); 205 int bodyByteUsed = storedData.getInt(VALUES_OFFSET_BODY_BYTE_USED); 206 int bound = headerByteLen + bodyByteUsed; 207 int idx = headerByteLen; 208 while (idx < bound) { 209 //获取每个事件数据 time,事件名 210 int entryByteLen = storedData.getInt(idx); 211 Slice entry = storedData.slice(idx + SizeOf.SIZE_OF_INT, entryByteLen - SizeOf.SIZE_OF_INT); 212 idx += entryByteLen; 213 timeEventSeries.add(entry); 214 } 215 //处理逻辑 216 ...... 217 // 构造返回结果 218 Slice result = null; 219 if (routes.size() > 0) { 220 for (String route : routes) { 221 Slice routeSlice = Slices.utf8Slice(route); 222 Slice routeInfo = Slices.allocate(SizeOf.SIZE_OF_INT + routeSlice.length()); 223 routeInfo.setInt(0, routeSlice.length()); 224 routeInfo.setBytes(4, routeSlice); 225 if (result == null) { 226 result = routeInfo; 227 } else { 228 Slice newSlice = Slices.allocate(result.length() + routeInfo.length()); 229 newSlice.setBytes(0, result); 230 newSlice.setBytes(result.length(), routeInfo, 0, routeInfo.length()); 231 result = newSlice; 232 } 233 } 234 } 235 return result; 236 } 237 }