zoukankan      html  css  js  c++  java
  • presto自定义函数开发

    1 Presto介绍

    Presto 是 Facebook 开源的分布式查询引擎,在交互式查询任务中担当着重要的职责。随着越来越多的人开始使用 SQL 在 Presto 上分析数据,我们发现需要将一些业务逻辑开发成类似 Hive 中的 UDF,提高 SQL 使用人员的效率,同时也保证 Hive 和 Presto 环境中的 UDF 统一。

    1.1 Presto函数介绍

    在 Presto 中,函数大体分为三种:scalar,aggregation 和 window 类型。分别如下:

    1)scalar标量函数,简单来说就是 Java 中的一个静态方法,本身没有任何状态。

    2)aggregation累积状态的函数,或聚集函数,如count,avg。如果只是单节点,单机状态可以直接用一个变量存储即可,但是presto是分布式计算引擎,状态数据会在多个节点之间传输,因此状态数据需要被序列化成 Presto 的内部格式才可以被传输。

    3)window 窗口函数,如同sparkSQL中的窗口函数类似

    2 自定义函数实现

    官网地址:https://prestodb.github.io/docs/current/develop/functions.html

    2.1自定义Scalar函数的实现

    2.1.1定义一个java类

    1)用 @ScalarFunction 的 Annotation 标记实现业务逻辑的静态方法。

    2)用 @Description 描述函数的作用,这里的内容会在 SHOW FUNCTIONS 中显示。

    3)用@SqlType 标记函数的返回值类型,如返回字符串,因此是 StandardTypes.VARCHAR。

    4)Java 方法的返回值必须使用 Presto 内部的序列化方式,因此字符串类型必须返回 Slice, 使用 Slices.utf8Slice 方法可以方便的将 String 类型转换成 Slice 类型

    示例代码:

     1 public class LiulishuoFunctions {
     2 
     3   public static final String DATE_FORMAT = "yyyy-MM-dd";
     4 
     5   @ScalarFunction
     6   @Description("hive to_date function")
     7   @SqlType(StandardTypes.VARCHAR)
     8   public static Slice to_date(@SqlType(StandardTypes.TIMESTAMP) long input) {
     9       final DateFormat format = new SimpleDateFormat(DATE_FORMAT);
    10       return Slices.utf8Slice(format.format(new Date(input)));
    11   }
    12  }

    2.1.2 Presto插件机制

    presto不能像hive那样配置自定义的udf,要采用这种插件机制实现。Presto 的插件(Plugin)机制,是 Presto 能够整合多种数据源的核心。通过实现不同的 Plugin,Presto 允许用户在不同类型的数据源之间进行 JOIN 等计算。Presto 内部的所有数据源都是通过插件机制实现, 例如 MySQL、Hive、HBase等。Presto 插件机制不仅通过加载 Connector 来实现不同数据源的访问,还通过加载 FunctionFactory 来实现 UDF 的加载。 Presto 的 Plugin 遵循 Java 中的 ServiceLoader 规范, 实现非常简单。

    实现一个plugin接口如:

     1 import com.facebook.presto.spi.Plugin;
     2 
     3 import com.google.common.collect.ImmutableSet;
     4  6 
     7 import java.util.Set;
     8 
     9 public class PrestoFunctionsPlugin implements Plugin {
    10      @Override
    11      public Set<Class<?>> getFunctions() {
    12         return ImmutableSet.<Class<?>>builder()
    13                  .add(PvFlowStatsAggregation.class)
    14                  .add(AvgAggregationDemo.class)
    15                   .build();
    16     }
    17 
    18 }

    最后打包上传到指定的presto的plugin目录下,需要重启presto才能将jar中的自定义函数加载进去

    2.2 自定义Aggregation函数

    2.2.1实现原理步骤

    Presto 把 Aggregation 函数分解成三个步骤执行:

    1、input(state, data): 针对每条数据,执行 input 函数。这个过程是并行执行的,因此在每个有数据的节点都会执行,最终得到多个累积的状态数据。

    2、combine(state1, state2):将所有节点的状态数据聚合起来,多次执行,直至所有状态数据被聚合成一个最终状态,也就是 Aggregation 函数的输出结果。

    3、output(final_state, out):最终输出结果到一个 BlockBuilder。

    2.2.2 具体代码实现过程

    1、定义一个 Java 类,使用 @AggregationFunction 标记为 Aggregation 函数

    2、使用 @InputFunction、 @CombineFunction、@OutputFunction 分别标记计算函数、合并结果函数和最终输出函数在 Plugin 处注册 Aggregation 函数

    3、一个继承AccumulatorState的State接口,get和set方法

    4、并使用 @AccumulatorStateMetadata 提供序列化(stateSerializerClass指定)和 Factory 类信息(stateFactoryClass指定)。自己写一个序列化类和一个工厂类。

    核心代码示例:

     1 @AggregationFunction("pv_stats")
     2 public class PvFlowStatsAggregation {
     3     private PvFlowStatsAggregation() {}
     4     @AccumulatorStateMetadata(stateSerializerClass = PvFlowStatsStateSerializer.class,         stateFactoryClass = PvFlowStatsFactory.class)
     5 
     6     public interface State extends AccumulatorState {
     7         PvFlowStats get();
     8         void set(PvFlowStats value);
     9     }
    10 
    11 @InputFunction
    12 public static void input(@AggregationState State state, @SqlType(StandardTypes.BIGINT) long id,
    13 
    14                              @SqlType(StandardTypes.VARBINARY) Slice serialisedTree, @SqlType(StandardTypes.VARCHAR) Slice pvOrder) {
    15         handleDataInput(state, id, serialisedTree, pvOrder, null);
    16     }
    17 
    18 @InputFunction
    19 
    20 public static void input(@AggregationState State state, @SqlType(StandardTypes.BIGINT) long id,
    21 
    22                              @SqlType(StandardTypes.VARBINARY) Slice serialisedTree, @SqlType(StandardTypes.VARCHAR) Slice pvOrder,
    23                              @SqlType(StandardTypes.VARCHAR) Slice endUrl) {
    24         handleDataInput(state, id, serialisedTree, pvOrder, endUrl);
    25     }
    26 
    27     private static void handleDataInput(State state, long id, Slice serialisedTree, Slice pvOrder, Slice endUrl) {
    28 
    29         PvFlowStats stats = state.get();
    30         if (stats == null) {
    31             stats = new PvFlowStats();
    32             state.set(stats);
    33         }
    34 ......
    35 }
    36     @CombineFunction
    37    public static void combine(@AggregationState State state, State other) {
    38 
    39         PvFlowStats input = other.get();
    40         PvFlowStats previous = state.get();
    41         if (previous == null) {
    42             state.set(input);
    43         } else {
    44             previous.mergeWith(input);
    45         }
    46     }
    47     @OutputFunction(StandardTypes.VARCHAR)
    48     public static void output(@AggregationState State state, BlockBuilder out) {
    49         PvFlowStats stats = state.get();
    50         if (stats == null) {
    51             out.appendNull();
    52             return;
    53         }
    54         // 统计
    55         Slice result = stats.statisticNextPage();
    56         if (result == null) {
    57             out.appendNull();
    58         } else {
    59             VarcharType.VARCHAR.writeSlice(out, result);
    60         }
    61     }
    62 }

    2.2.3 复杂数据类型(list,map或自定义的类)

    对于复杂的类型,需要自定义序列化类和工厂类,需要自己实现类的序列化和反序列化。

    下面是示例:

    主类:

     1 /*** **   id | value* ** ----+-------* **   2 | ddd* **   2 | ddd* **   1 | bbb* **   1 | bbb* **   1 | ccc* **   1 | aaa* **   1 | bbb* **   2 | aaa* **   2 | ccc* **   1 | ccc* ***   *返回* **  [{id:1,{aaa:1,ccc:2,bbb:3},{id:2,{aaa:1,ccc:1,ddd:2}]* **/*@AggregationFunction("presto_collect")
     2 public class CollectListAggregation {
     3 
     4     @AccumulatorStateMetadata(stateSerializerClass = CollectListStatsSerializer.class, stateFactoryClass = CollectListStatsFactory.class)
     5     public interface CollectState extends AccumulatorState {
     6         CollectListStats get();
     7         void set(CollectListStats value);
     8     }
     9    @InputFunction
    10     public static void input(@AggregationState CollectState state, @SqlType(StandardTypes.*VARCHAR*) Slice id,@SqlType(StandardTypes.*VARCHAR*) Slice key) {
    11         try {
    12             CollectListStats stats = state.get();
    13             if (stats == null) {
    14                 stats = new CollectListStats();
    15                 state.set(stats);
    16             }
    17             int inputId = Integer.*parseInt*(id.toStringUtf8());
    18             String inputKey = key.toStringUtf8();
    19             stats.addCollectList(inputId,inputKey, 1);
    20        } catch (Exception e) {
    21             throw new RuntimeException(e+" ---------  input err");
    22         }
    23     }
    24 
    25     @CombineFunction
    26     public static void combine(@AggregationState CollectState state, CollectState otherState) {
    27         try {
    28             CollectListStats collectListStats = state.get();
    29             CollectListStats oCollectListStats = otherState.get();
    30             if(collectListStats == null) {
    31                 state.set(oCollectListStats);
    32             } else {
    33                 collectListStats.mergeWith(oCollectListStats);
    34             }
    35         }catch (Exception e) {
    36             throw new RuntimeException(e+" --------- combine err");
    37         }
    38     }
    39 
    40     @OutputFunction(StandardTypes.*VARCHAR*)
    41     public static void output(@AggregationState CollectState state, BlockBuilder out) {
    42         try {
    43             CollectListStats stats = state.get();
    44             if (stats == null) {
    45                 out.appendNull();
    46                 return;
    47             }
    48             // 统计
    49             Slice result = stats.getCollectResult();
    50             if (result == null) {
    51                 out.appendNull();
    52             } else {
    53                 VarcharType.*VARCHAR*.writeSlice(out, result);
    54             }
    55         } catch (Exception e) {
    56             throw new RuntimeException(e+" -------- output err");
    57         }
    58     }
    59 }

    主类实现的比较简单,input,combine,output即可

    存放数据的类:此类需要实现数据的序列化和反序列化,这是最关键和比较麻烦的地方,贴一个例子,关键在于需要自己控制存储空间以及数据的顺序,和读取的时候按照一定顺序读取。对于字符要先存储长度,然后是字节,读取则先读取字符长度,然后读取这么长的数据,最后转化为字符

      1 public class CollectListStats {
      2     private static final int *INSTANCE_SIZE* = ClassLayout.*parseClass*(CollectListStats.class).instanceSize();
      3     //<id,<key,value>>
      4     private Map<Integer,Map<String,Integer>> collectContainer = new HashMap<>();
      5     private long contentEstimatedSize = 0;
      6     private int keyByteLen = 0;
      7     private int keyListLen = 0;
      8     CollectListStats() {
      9     }
     10     CollectListStats(Slice serialized) {
     11         deserialize(serialized);
     12     }
     13    void addCollectList(Integer id, String key, int value) {
     14         if (collectContainer.containsKey(id)) {
     15             Map<String, Integer> tmpMap = collectContainer.get(id);
     16             if (tmpMap.containsKey(key)) {
     17                 tmpMap.put(key, tmpMap.get(key)+value);
     18             }else{
     19                 tmpMap.put(key,value);
     20                 contentEstimatedSize += ( key.getBytes().length + SizeOf.*SIZE_OF_INT*);
     21                 keyByteLen += key.getBytes().length;
     22                 keyListLen++;
     23             }
     24         } else {
     25             Map<String,Integer> tmpMap = new HashMap<String,Integer>();
     26             tmpMap.put(key, value);
     27             keyByteLen += key.getBytes().length;
     28             keyListLen++;
     29             collectContainer.put(id, tmpMap);
     30             contentEstimatedSize += SizeOf.*SIZE_OF_INT*;
     31         }
     32     }
     33     //[{id:1,{"aaa":3,"fadf":6},{}]
     34     Slice getCollectResult() {
     35         Slice jsonSlice = null;
     36         try {
     37             StringBuilder jsonStr = new StringBuilder();
     38             jsonStr.append("[");
     39             int collectLength = collectContainer.entrySet().size();
     40             for (Map.Entry<Integer, Map<String, Integer>> mapEntry : collectContainer.entrySet()) {
     41                 Integer id = mapEntry.getKey();
     42                 Map<String, Integer> vMap = mapEntry.getValue();
     43                 jsonStr.append("{id:").append(id).append(",{");
     44                 int vLength = vMap.entrySet().size();
     45                 for (Map.Entry<String, Integer> vEntry : vMap.entrySet()) {
     46                     String key = vEntry.getKey();
     47                     Integer value = vEntry.getValue();
     48                     jsonStr.append(key).append(":").append(value);
     49                     vLength--;
     50                     if (vLength != 0) {
     51                         jsonStr.append(",");
     52                     }
     53                 }
     54                 jsonStr.append("}");
     55                 collectLength--;
     56                 if (collectLength != 0) {
     57                     jsonStr.append(",");
     58                 }
     59             }
     60             jsonStr.append("]");
     61             jsonSlice = Slices.*utf8Slice*(jsonStr.toString());
     62         } catch (Exception e) {
     63             throw new RuntimeException(e+" ---------- get CollectResult err");
     64         }
     65         return jsonSlice;
     66     }
     67     public void deserialize(Slice serialized) {
     68         try {
     69             SliceInput input = serialized.getInput();
     70             //外层map的长度
     71             int collectStatsEntrySize = input.readInt();
     72             for (int collectCnt = 0; collectCnt < collectStatsEntrySize; collectCnt++) {
     73 
     74                 int id = input.readInt();
     75                 int keyEntrySize = input.readInt();
     76                 for (int idCnt = 0; idCnt < keyEntrySize; idCnt++) {
     77                     int keyBytesLen = input.readInt();
     78                     byte[] keyBytes = new byte[keyBytesLen];
     79                     for (int byteIdx = 0; byteIdx < keyBytesLen; byteIdx++) {
     80                         keyBytes[byteIdx] = input.readByte();
     81                     }
     82                     String key = new String(keyBytes);
     83                     int value = input.readInt();
     84                     addCollectList(id, key, value);
     85                 }
     86             }
     87         } catch (Exception e) {
     88             throw new RuntimeException(e+" ----- deserialize err");
     89         }
     90     }
     91 
     92     public Slice serialize() {
     93         SliceOutput builder = null;
     94         int requiredBytes =                                                 //对应 SliceOutput builder append的内容所占用的空间
     95                 SizeOf.*SIZE_OF_INT* * 3                                      //id entry数目,id数值,key Entry数目
     96                 + keyListLen * SizeOf.*SIZE_OF_INT*                           //key bytes长度
     97                 + keyByteLen                                                //key byte总长度
     98                 + keyListLen * SizeOf.*SIZE_OF_INT*;                          //value
     99         try {
    100             // 序列化
    101             builder = Slices.*allocate*(requiredBytes).getOutput();
    102             for (Map.Entry<Integer,Map<String, Integer>> entry : collectContainer.entrySet()) {
    103                 //id个数
    104                 builder.appendInt(collectContainer.entrySet().size());
    105                 //id 数值
    106                 builder.appendInt(entry.getKey());
    107                 Map<String, Integer> kMap = entry.getValue();
    108                 builder.appendInt(kMap.entrySet().size());
    109                 for (Map.Entry<String, Integer> vEntry : kMap.entrySet()) {
    110                     byte[] keyBytes = vEntry.getKey().getBytes();
    111                     builder.appendInt(keyBytes.length);
    112                     builder.appendBytes(keyBytes);
    113                     builder.appendInt(vEntry.getValue());
    114                 }
    115             }
    116             return builder.getUnderlyingSlice();
    117         } catch (Exception e) {
    118             throw new RuntimeException(e+" ---- serialize err  requiredBytes = " + requiredBytes + " keyByteLen= " + keyByteLen + " keyListLen = " + keyListLen);
    119        }
    120     }
    121     long estimatedInMemorySize() {
    122         return *INSTANCE_SIZE* + contentEstimatedSize;
    123     }
    124     void mergeWith(CollectListStats other) {
    125         if (other == null) {
    126             return;
    127         }
    128        for (Map.Entry<Integer,Map<String, Integer>> cEntry : other.collectContainer.entrySet()) {
    129             Integer id = cEntry.getKey();
    130             Map<String, Integer> kMap = cEntry.getValue();
    131             for (Map.Entry<String, Integer> kEntry : kMap.entrySet()) {
    132                 addCollectList(id, kEntry.getKey(), kEntry.getValue());
    133             }
    134         }
    135     }
    136 }

    序列化类:

     1 public class CollectListStatsSerializer implements AccumulatorStateSerializer<CollectListAggregation.CollectState> {
     2     @Override
     3     public Type getSerializedType() {
     4         return *VARBINARY*;
     5     }
     6     @Override
     7     public void serialize(CollectListAggregation.CollectState state, BlockBuilder out) {
     8         if (state.get() == null) {
     9             out.appendNull();
    10         } else {
    11             *VARBINARY*.writeSlice(out, state.get().serialize());
    12         }
    13     }
    14     @Override
    15     public void deserialize(Block block, int index, CollectListAggregation.CollectState state) {
    16         state.set(new CollectListStats(*VARBINARY*.getSlice(block, index)));
    17     }
    18 }

    工厂类:

     1 /*** **/*public class CollectListStatsFactory implements AccumulatorStateFactory<CollectListAggregation.CollectState> {
     2     @Override
     3     public CollectListAggregation.CollectState createSingleState() {
     4         return new SingleState();
     5     }
     6     @Override
     7     public Class<? extends CollectListAggregation.CollectState> getSingleStateClass() {
     8         return SingleState.class;
     9     }
    10     @Override
    11     public CollectListAggregation.CollectState createGroupedState() {
    12         return new GroupState();
    13     }
    14     @Override
    15     public Class<? extends CollectListAggregation.CollectState> getGroupedStateClass() {
    16         return GroupState.class;
    17     }
    18     public static class GroupState implements GroupedAccumulatorState, CollectListAggregation.CollectState {
    19         private final ObjectBigArray<CollectListStats> collectStatsList = new ObjectBigArray<>();
    20         private long size;
    21         private long groupId;
    22         @Override
    23         public void setGroupId(long groupId) {
    24             this.groupId = groupId;
    25         }
    26         @Override
    27         public void ensureCapacity(long size) {
    28             collectStatsList.ensureCapacity(size);
    29         }
    30         @Override
    31         public CollectListStats get() {
    32             return collectStatsList.get(groupId);
    33         }
    34         @Override
    35         public void set(CollectListStats value) {
    36             CollectListStats previous = get();
    37             if (previous != null) {
    38                 size -= previous.estimatedInMemorySize();
    39             }
    40             collectStatsList.set(groupId, value);
    41             size += value.estimatedInMemorySize();
    42         }
    43         @Override
    44         public long getEstimatedSize() {
    45             return size + collectStatsList.sizeOf();
    46         }
    47     }
    48     public static class SingleState implements CollectListAggregation.CollectState{
    49         private CollectListStats stats;
    50         @Override
    51         public CollectListStats get() {
    52             return stats;
    53         }
    54         @Override
    55         public void set(CollectListStats value) {
    56             stats = value;
    57         }
    58         @Override
    59         public long getEstimatedSize() {
    60             if (stats == null) {
    61                 return 0;
    62             }
    63             return stats.estimatedInMemorySize();
    64         }
    65     }
    66 }

    2.2.4 采用Slice可以有效提高性能

    使用Slice进行内存操作,Slice使用Unsafe#copyMemory实现了高效的内存拷贝

    不过使用Slice就需要手动控制存储的数据,记录数据的容量,长度,扩容等等。

      1 public class RouteUserAggregationBase {
      2 
      3     //...... 其他定义的静态变量
      4     /**
      5      * Slice State
      6      * 中间数据 Buffer
      7      */
      8     public interface SliceState extends AccumulatorState {
      9         Slice getSlice();
     10     
     11         void setSlice(Slice slice);
     12     }
     13 }
     14 
     15 @AggregationFunction("函数名")
     16 public class RouteUserGroupAggregation extends RouteUserAggregationBase {
     17 
     18     /** 缓存 Buffer Body 的初始字节容量 **/
     19     private static final int STORED_DATA_BODY_INIT_BYTE_SIZE = 64;
     20 
     21     /** 缓存 Buffer 头部元信息定义 **/
     22     private static int VALUES_OFFSET_HEADER_BYTE_LEN = 0;
     23     private static int VALUES_OFFSET_BODY_BYTE_SIZE = 4;
     24     private static int VALUES_OFFSET_BODY_BYTE_USED = 8;
     25 
     26     private static int VALUES_OFFSET_CONTAIN_TARGET_EVENT = 12;
     27 
     28     private static int VALUES_OFFSET_TARGET_EVENT_TYPE = 13;
     29     private static int VALUES_OFFSET_ROUTE_INTERVAL = 17;
     30     private static int VALUES_OFFSET_TARGET_EVENT_LEN = 21;
     31     private static int VALUES_OFFSET_TARGET_EVENT_BYTES = 25;
     32 
     33     @InputFunction
     34     public static void input(SliceState state,
     35                              //目标事件
     36                              @SqlType(StandardTypes.VARCHAR) Slice targetEvent,
     37                              //目标事件类型
     38                              @SqlType(StandardTypes.BIGINT) long targetType,
     39                              //事件间隔
     40                              @SqlType(StandardTypes.BIGINT) long eventInterval,
     41                              //当前事件名
     42                              @SqlType(StandardTypes.VARCHAR) Slice currEvent,
     43                              //当前事件时间
     44                              @SqlType(StandardTypes.BIGINT) long eventTime) {
     45 
     46         handleInput(state, targetEvent, (int) targetType, (int) eventInterval, currEvent, (int) eventTime, null, null);
     47     }
     48 
     49     private static void handleInput(SliceState state, Slice targetEvent, int targetType, int eventInterval, Slice currEvent, int eventTime, Slice groupByEvent, Slice groupByProp) {
     50         // 获取缓存的数据
     51         Slice storedData = state.getSlice();
     52 
     53         // 初始化缓存的元信息 不会变化的值,如:目标事件,目标类型,时间间隔
     54         if (storedData == null) {
     55             /*
     56                     Header byte大小
     57                     Body 总字节大小
     58                     Body 已使用字节大小
     59                     是否包含目标事件
     60                     目标事件类型
     61                     事件时间间隔
     62              */
     63             int headerByteLen = SizeOf.SIZE_OF_INT
     64                     + SizeOf.SIZE_OF_INT
     65                     + SizeOf.SIZE_OF_INT
     66                     + SizeOf.SIZE_OF_BYTE
     67                     + SizeOf.SIZE_OF_INT
     68                     + SizeOf.SIZE_OF_INT
     69                     ;
     70             int targetLength = SizeOf.SIZE_OF_INT + targetEvent.length();
     71             headerByteLen += targetLength;
     72 
     73             storedData = Slices.allocate(headerByteLen + STORED_DATA_BODY_INIT_BYTE_SIZE);
     74             storedData.setInt(VALUES_OFFSET_HEADER_BYTE_LEN, headerByteLen);
     75             storedData.setInt(VALUES_OFFSET_BODY_BYTE_SIZE, STORED_DATA_BODY_INIT_BYTE_SIZE);
     76             storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, 0);
     77             //是否包含目标事件
     78             storedData.setByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT, 0);
     79             //缓存 不变的参数
     80             storedData.setInt(VALUES_OFFSET_TARGET_EVENT_TYPE, targetType);
     81             storedData.setInt(VALUES_OFFSET_ROUTE_INTERVAL, eventInterval);
     82 
     83             storedData.setInt(VALUES_OFFSET_TARGET_EVENT_LEN, targetEvent.length());
     84             storedData.setBytes(VALUES_OFFSET_TARGET_EVENT_BYTES, targetEvent);
     85         }
     86 
     87         int headerByteLen = storedData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN);
     88         int bodyByteSize = storedData.getInt(VALUES_OFFSET_BODY_BYTE_SIZE);
     89         int bodyByteUsed = storedData.getInt(VALUES_OFFSET_BODY_BYTE_USED);
     90 
     91         // 标记包含目标事件
     92         if (currEvent.toStringUtf8().equals(targetEvent.toStringUtf8())) {
     93             storedData.setByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT, 1);
     94         }
     95         //直接判断,如果存在分组,判断当前事件就是分组事件,那么直接将分组值和事件拼接在一起
     96         if (groupByEvent != null && groupByEvent.toStringUtf8().equals(currEvent.toStringUtf8())) {
     97             String newEventKey = currEvent.toStringUtf8() + EVENT_CONCAT_GROUP_VALUE + groupByProp.toStringUtf8();
     98             currEvent = Slices.utf8Slice(newEventKey);
     99         }
    100 
    101         //扩展的长度,eventTime int , current length的int bytes内容
    102         int entryByteLen = SizeOf.SIZE_OF_INT * 2 + currEvent.length();
    103         if (bodyByteUsed + entryByteLen > bodyByteSize) {
    104             // 扩容 byteSize * 2
    105             int newBodyByteSize = bodyByteSize * 2;
    106             Slice newStoredData = Slices.allocate(headerByteLen + newBodyByteSize);
    107             //将storeData的数据copy到new的Slice中,然后重新设置容量
    108             newStoredData.setBytes(0, storedData.getBytes());
    109             newStoredData.setInt(VALUES_OFFSET_BODY_BYTE_SIZE, newBodyByteSize);
    110             storedData = newStoredData;
    111         }
    112         //写入位置的定位
    113         int writePos = headerByteLen + bodyByteUsed;
    114         storedData.setInt(writePos, entryByteLen);
    115         writePos += SizeOf.SIZE_OF_INT;
    116         storedData.setInt(writePos, eventTime);
    117         writePos += SizeOf.SIZE_OF_INT;
    118         storedData.setBytes(writePos, currEvent);
    119         storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, bodyByteUsed + entryByteLen);
    120         // 更新缓存的数据
    121         state.setSlice(storedData);
    122     }
    123 
    124 
    125     @CombineFunction
    126     public static void combine(SliceState state, SliceState other) {
    127         // 获取缓存的数据
    128         Slice storedData = state.getSlice();
    129         Slice otherStoredData = other.getSlice();
    130 
    131         // 合并缓存
    132         if (storedData == null) {
    133             state.setSlice(otherStoredData);
    134         } else {
    135             int headerByteLen = storedData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN);
    136             int bodyByteSize = storedData.getInt(VALUES_OFFSET_BODY_BYTE_SIZE);
    137             int bodyByteUsed = storedData.getInt(VALUES_OFFSET_BODY_BYTE_USED);
    138             int otherHeaderByteLen = otherStoredData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN);
    139             int otherBodyByteSize = otherStoredData.getInt(VALUES_OFFSET_BODY_BYTE_SIZE);
    140             int otherBodyByteUsed = otherStoredData.getInt(VALUES_OFFSET_BODY_BYTE_USED);
    141             byte containTargetEvent = 0;
    142             if (storedData.getByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT) == 1 || otherStoredData.getByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT) == 1) {
    143                 containTargetEvent = 1;
    144             }
    145             Slice finalStoredData;
    146             int finalBodyByteUsed = bodyByteUsed + otherBodyByteUsed;
    147             if (bodyByteSize >= finalBodyByteUsed) {
    148                 // 左容量足够  这里只copy header之外的数据,就是当前事件和time
    149                 storedData.setBytes(headerByteLen + bodyByteUsed, otherStoredData, otherHeaderByteLen, otherBodyByteUsed);
    150                 storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, finalBodyByteUsed);
    151                 finalStoredData = storedData;
    152             } else if (otherBodyByteSize >= finalBodyByteUsed) {
    153                 // 右容量足够
    154                 otherStoredData.setBytes(otherHeaderByteLen + otherBodyByteUsed, storedData, headerByteLen, bodyByteUsed);
    155                 otherStoredData.setInt(VALUES_OFFSET_BODY_BYTE_USED, finalBodyByteUsed);
    156                 finalStoredData = otherStoredData;
    157             } else {
    158                 // 扩容
    159                 int newBodyByteSize = bodyByteSize;
    160                 while (newBodyByteSize < finalBodyByteUsed) {
    161                     newBodyByteSize *= 2;
    162                 }
    163                 Slice newStoredData = Slices.allocate(headerByteLen + newBodyByteSize);
    164                 newStoredData.setBytes(VALUES_OFFSET_HEADER_BYTE_LEN, storedData.getBytes());
    165                 newStoredData.setInt(VALUES_OFFSET_BODY_BYTE_SIZE, newBodyByteSize);
    166                 storedData = newStoredData;
    167 
    168                 storedData.setBytes(headerByteLen + bodyByteUsed, otherStoredData, otherHeaderByteLen, otherBodyByteUsed);
    169                 storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, finalBodyByteUsed);
    170                 finalStoredData = storedData;
    171             }
    172             // 是否包含目标事件
    173             finalStoredData.setByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT, containTargetEvent);
    174             state.setSlice(finalStoredData);
    175         }
    176     }
    177 
    178     @OutputFunction(StandardTypes.VARCHAR)
    179     public static void output(@AggregationState SliceState state, BlockBuilder out) {
    180         // 获取缓存数据
    181         Slice storedData = state.getSlice();
    182 
    183         // 数据为空,或者没有起始事件
    184         if ((storedData == null) || (storedData.getByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT) == 0)) {
    185             out.appendNull();
    186             return;
    187         }
    188         //匹配
    189         Slice makeRoute = makeRoute(storedData);
    190         if (makeRoute == null) {
    191             out.appendNull();
    192         } else {
    193             VarcharType.VARCHAR.writeSlice(out, makeRoute);
    194         }
    195     }
    196 
    197     private static Slice makeRoute(Slice storedData) {
    198         // 获取 Header 信息
    199         int interval = storedData.getInt(VALUES_OFFSET_ROUTE_INTERVAL);
    200         int targetType = storedData.getInt(VALUES_OFFSET_TARGET_EVENT_TYPE);
    201         int targetLength = storedData.getInt(VALUES_OFFSET_TARGET_EVENT_LEN);
    202         String targetEvent = new String(storedData.getBytes(VALUES_OFFSET_TARGET_EVENT_BYTES, targetLength));
    203         List<Slice> timeEventSeries = new ArrayList<>();
    204         int headerByteLen = storedData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN);
    205         int bodyByteUsed = storedData.getInt(VALUES_OFFSET_BODY_BYTE_USED);
    206         int bound = headerByteLen + bodyByteUsed;
    207         int idx = headerByteLen;
    208         while (idx < bound) {
    209             //获取每个事件数据 time,事件名
    210             int entryByteLen = storedData.getInt(idx);
    211             Slice entry = storedData.slice(idx + SizeOf.SIZE_OF_INT, entryByteLen - SizeOf.SIZE_OF_INT);
    212             idx += entryByteLen;
    213             timeEventSeries.add(entry);
    214         }
    215         //处理逻辑
    216         ......
    217         // 构造返回结果
    218         Slice result = null;
    219         if (routes.size() > 0) {
    220             for (String route : routes) {
    221                 Slice routeSlice = Slices.utf8Slice(route);
    222                 Slice routeInfo = Slices.allocate(SizeOf.SIZE_OF_INT + routeSlice.length());
    223                 routeInfo.setInt(0, routeSlice.length());
    224                 routeInfo.setBytes(4, routeSlice);
    225                 if (result == null) {
    226                     result = routeInfo;
    227                 } else {
    228                     Slice newSlice = Slices.allocate(result.length() + routeInfo.length());
    229                     newSlice.setBytes(0, result);
    230                     newSlice.setBytes(result.length(), routeInfo, 0, routeInfo.length());
    231                     result = newSlice;
    232                 }
    233             }
    234         }
    235         return result;
    236     }
    237 }
  • 相关阅读:
    Hbase集群部署及shell操作
    sqoop部署与使用
    azkaban部署
    Hive入门操作
    Hive部署
    MapReduce过程详解(基于hadoop2.x架构)
    MapReduce编程中常用的字符操作
    【图文详解】深入HDFS原理
    vue项目跨域问题的解决:vue-cli 开启代理
    beego框架入门(2)
  • 原文地址:https://www.cnblogs.com/lrxvx/p/12558902.html
Copyright © 2011-2022 走看看