zoukankan      html  css  js  c++  java
  • kafka 导入数据到 doris 

    具体见官网:http://doris.apache.org/master/zh-CN/sql-reference/sql-statements/Data%20Manipulation/ROUTINE%20LOAD.html#description

    下面是个人测试例子:

    1.创建DB

    CREATE TABLE IF NOT EXISTS user
    (
        siteid INT DEFAULT '10',
        citycode SMALLINT,
        username VARCHAR(32) DEFAULT '',
        pv BIGINT SUM DEFAULT '0'
    )
    AGGREGATE KEY(siteid, citycode, username)
    DISTRIBUTED BY HASH(siteid) BUCKETS 10
    PROPERTIES("replication_num" = "1");

    2.准备数据 (kafka  topic  --doris)

    pls input topic:doris
    pls input msg:6|12|pp|123
    send over !!!
    pls input msg:7|32|ww|231
    send over !!!
    pls input msg:8|12|ee|213
    send over !!!
    pls input msg:

    3.导入数据到doris         sea.user:任务标示(唯一)     columus:列名

     CREATE ROUTINE LOAD sea.user ON 
     user COLUMNS TERMINATED BY "|",
     COLUMNS(siteid,citycode,username,pv)
     PROPERTIES(
     "desired_concurrent_number"="1",
     "max_batch_interval"="20",
     "max_batch_rows"="300000",
     "max_batch_size"="209715200")
     FROM KAFKA(
     "kafka_broker_list"="192.168.18.129:9092",
     "kafka_topic"="doris",
     "property.group.id"="gid",
     "property.clinet.id"="cid",
     "property.kafka_default_offsets"="OFFSET_BEGINNING");
    1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
    2) OFFSET_END: 从末尾开始订阅

    3.1 查看routine load状态

    SHOW ALL ROUTINE LOAD FOR  sea.user;

    3.1.1 显示 example_db 下,所有的例行导入作业(包括已停止或取消的作业)。结果为一行或多行。

    use example_db;
    SHOW ALL ROUTINE LOAD;
    Ⅴ).查看routine load状态
    SHOW ALL ROUTINE LOAD FOR datasource_name.kafka_load;
    Ⅵ).常用routine load命令
    a).暂停routine load
    PAUSE ROUTINE LOAD FOR datasource_name.kafka_load;
    b).恢复routine load
    RESUME ROUTINE LOAD FOR datasource_name.kafka_load;
    c).停止routine load
    STOP ROUTINE LOAD FOR datasource_name.kafka_load;
    d).查看所有routine load
    SHOW [ALL] ROUTINE LOAD FOR datasource_name.kafka_load;
    e).查看routine load任务
    SHOW ROUTINE LOAD TASK datasource_name.kafka_load;
    Ⅶ).查看数据
    SELECT * FROM datasource_name.table_name LIMIT 10;

    4.查看数据

     

    官网:

    example:

    4. 简单模式导入json
        CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
        COLUMNS(category,price,author)
        PROPERTIES
        (
        "desired_concurrent_number"="3",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false",
        "format" = "json"
        )
        FROM KAFKA
        (
        "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic" = "my_topic",
        "kafka_partitions" = "0,1,2",
        "kafka_offsets" = "0,0,0"
        );
        支持两种json数据格式:
      1){"category":"a9jadhx","author":"test","price":895}
      2)[
                {"category":"a9jadhx","author":"test","price":895},
                {"category":"axdfa1","author":"EvelynWaugh","price":1299}
         ]

    说明: 1)如果json数据是以数组开始,并且数组中每个对象是一条记录,则需要将strip_outer_array设置成true,表示展平数组。

       2)如果json数据是以数组开始,并且数组中每个对象是一条记录,在设置jsonpath时,我们的ROOT节点实际上是数组中对象。

    6. 用户指定根节点json_root
        CREATE ROUTINE LOAD example_db.test1 ON example_tbl
        COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
        PROPERTIES
        (
            "desired_concurrent_number"="3",
            "max_batch_interval" = "20",
            "max_batch_rows" = "300000",
            "max_batch_size" = "209715200",
            "strict_mode" = "false",
            "format" = "json",
            "jsonpaths" = "["$.category","$.author","$.price","$.timestamp"]",
            "strip_outer_array" = "true",
            "json_root" = "$.RECORDS"
        )
        FROM KAFKA
        (
            "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
            "kafka_topic" = "my_topic",
            "kafka_partitions" = "0,1,2",
            "kafka_offsets" = "0,0,0"
        );

    json数据格式:

    {
      "RECORDS": [
        {
          "category": "11",
          "title": "SayingsoftheCentury",
          "price": 895,
          "timestamp": 1589191587
        },
        {
          "category": "22",
          "author": "2avc",
          "price": 895,
          "timestamp": 1589191487
        },
        {
          "category": "33",
          "author": "3avc",
          "title": "SayingsoftheCentury",
          "timestamp": 1589191387
        }
      ]
    }
  • 相关阅读:
    AspDotNetStorefront客户化开始
    "超时时间已到。在操作完成之前超时时间已过或服务器未响应。"另一个原因
    转:只打开一个窗口和关闭窗口而不出现提示
    .net 数据格式设置
    SQLServer导出数据表中数据的存储过程
    游标、临时表、嵌套游标使用一列
    转:将图片转换成16进制的代码写入文本
    根据文件后缀返回Http的ContentType类型的函数
    正确配置p6spy后没有日志输出的一个可能的原因
    C99 声明 + 表达式 + 词法 部分Grammar
  • 原文地址:https://www.cnblogs.com/lshan/p/14790587.html
Copyright © 2011-2022 走看看