zoukankan      html  css  js  c++  java
  • Flink kuduSink开发

    1、继承RichSinkFunction

    (1)首先在构造方式传入kudu的masterAddress地址、默认表名、TableSerializationSchema、KuduTableRowConverter、Properties配置对象

    (2)重写open方法

    初始化KuduClient对象操作kudu,KuduSession对象并传入一堆配置

    (3)重写invoke方法

    核心是如果已传入TableSerializationSchema对象,则通过其serializeTable方法从输入的json数据里提取表名,如果未定义则直接取默认表名。拿到表名后就能使用KuduClient对象对其操作了

    if (schema != null) {
    String serializeTableName = schema.serializeTable(row);
    if (serializeTableName == null) return;
    table = client.openTable(serializeTableName);
    }
    else
    table = client.openTable(tableName);
    insert = table.newInsert();

    2、定义KuduTableRowConverter接口,将每一条输入数据转换成TableRow对象

    public interface KuduTableRowConverter<IN> extends Serializable {
    TableRow convert(IN value);
    }

    定义TableRow类,代表一行数据,key是字串型的键名,value是Object型的键值

    public class TableRow implements Serializable {
    private static final long serialVersionUID = 1L;
    private Map<String, Object> pairs = new HashMap<>();
    public int size() {return pairs.size();}
    public Map<String, Object> getPairs() {return pairs;}
    public Object getElement(String key) {return pairs.get(key);}
    public void putElement(String key, Object value) {pairs.put(key, value);}
    }

    定义JsonKuduTableRowConverter实现KuduTableRowConverter接口,对于输入的json数据,通过一系列转换逻辑转换成TableRow对象

    3、定义TableSerializationSchema接口,从每一条输入数据里提取表名

    public interface TableSerializationSchema<IN> extends Serializable {
    String serializeTable(IN value);
    }

    定义JsonLogidKeyTableSerializationSchema实现TableSerializationSchema接口,对于输入的json数据,使用指定key值提取value值,然后再从一个预先获取的map里找到这个value对应的表名,然后加上必要的前缀与后缀组成impala的表名

  • 相关阅读:
    二十七、正则表达式补充
    二十六、python中json学习
    二十五、python中pickle序列学习(仅python语言中有)
    MongoDB系列
    产品经理思考
    摩拜数据产品
    龙珠直播之swot
    ahp层次分析法软件
    用户画像之门店用户类型的体系
    汽车后市场SWOT分析
  • 原文地址:https://www.cnblogs.com/codetouse/p/12968238.html
Copyright © 2011-2022 走看看