zoukankan      html  css  js  c++  java
  • doris stream load 导入

    官网地址:https://doris.apache.org/zh-CN/administrator-guide/load-data/stream-load-manual.html#基本原理

    doris 可以通过insert into 语句插入单条,或者批量插入,但是正式环境不推荐。

    通过http client 进行数据导入

    1.curl 格式 ,这个格式也是尝试了好多次 8030 是FE的http端口,8040是BE的http端口

    注意文件的内容最后一样不要有空行,光标不能在空行

    密码和用户名,都是doris的用户名和密码,不是服务器的。

    -T 后面是上传文件的路基,加上后缀名。我看官网上没有加,

    -H label 保证每次不重复 多个参数的时候 一个个的写

      //密码为fe的密码
      curl --location-trusted -u root -T /home/label.csv -H "label:label12"  http://192.168.0.18:8030/api/example_db/table4/_stream_load  
      curl --location-trusted -u root -T /home/label.csv -H "label:label14" -H "column_separator:,"  http://192.168.0.18:8030/api/example_db/table3/_stream_load

    2.http client  

       1.doc 文件

          FE的请求老是报 no valid Basic authorization  网上说的307 跳转导致的 我这边是直接调用的BE 的机器

     public static void StreamLoadBEDoc()
            {
                string result = "";
                string host = "192.168.0.74";//be
                int port = 8040;//be接口 fe8030
                string database = "example_db";
                string table = "table3";
                string user = "root";
                string passwd = "";
                string load_file_name = "E:/label.csv";
                string label = Guid.NewGuid().ToString();
                string loadUrl = string.Format("http://{0}:{1}/api/{2}/{3}/_stream_load", host, port, database, table);
                HttpWebRequest request = BuildRequestDoc(loadUrl, user, passwd, label);
                //文件数据
                FileStream rdr = new FileStream(load_file_name, FileMode.Open);
                request.ContentLength = rdr.Length;
                Stream reqStream = request.GetRequestStream();
                byte[] inData = new byte[rdr.Length];
                int bytesRead = rdr.Read(inData, 0, Convert.ToInt32(rdr.Length));
                reqStream.Write(inData, 0, Convert.ToInt32(rdr.Length));
                rdr.Close();
                WebResponse response = request.GetResponse();
                Stream stream = response.GetResponseStream();
                if (stream != null)
                {
    
                    var reader = new StreamReader(stream, Encoding.UTF8);
                    result = reader.ReadToEnd();
                    stream.Dispose();
                    stream = null;
                    reader.Dispose();
                    reader = null;
                }
                request.Abort();
                request = null;
                response.Dispose();
                response = null;
            }
     public static HttpWebRequest BuildRequestDoc(string url, string user, string passwd, string label)
            {
                HttpWebRequest client = (HttpWebRequest)WebRequest.Create(url);
                client.Headers.Set("Authorization", "Basic " + Convert.ToBase64String(Encoding.UTF8.GetBytes($"{user}:{passwd}")));
                client.Headers.Set("Expect", "100-continue");//固定
                client.Headers.Set("label", label);//labid 保持唯一
                client.Headers.Set("column_separator", ",");//列分隔符
                client.Headers.Set("Content-Type", "text/plain; charset=UTF-8");
                client.Method = HttpMethod.Put.Method;//put请求
    
                client.ServicePoint.ConnectionLimit = int.MaxValue; //加上请求设置,提高效率
                client.ServicePoint.Expect100Continue = false;
                client.ServicePoint.UseNagleAlgorithm = false;
                client.AllowWriteStreamBuffering = false;
                client.Proxy = null; //不使用代理
                client.KeepAlive = false; //不建立持久性连接
                return client;
            }

      2.json 传输

         json 和doc 差不多

         

     public static void StreamLoadBEJson(string jsonData)
            {
                string result = "";
                string host = "192.168.0.74";//be
                int port = 8040;//be接口
                string database = "example_db";
                string table = "table3";
                string user = "root";
                string passwd = "";
                string label = Guid.NewGuid().ToString();
                string loadUrl = string.Format("http://{0}:{1}/api/{2}/{3}/_stream_load", host, port, database, table);
                HttpWebRequest request = BuildRequestJson(loadUrl, user, passwd, label);
                //文件数据
                Stream outstream;
                byte[] _buffer = Encoding.GetEncoding("utf-8").GetBytes(jsonData);
                outstream = request.GetRequestStream();
                outstream.Write(_buffer, 0, _buffer.Length);
                outstream.Close();
    
                WebResponse response = request.GetResponse();
                Stream stream = response.GetResponseStream();
                if (stream != null)
                {
                    var reader = new StreamReader(stream, Encoding.UTF8);
                    result = reader.ReadToEnd();
                    stream.Dispose();
                    stream = null;
                    reader.Dispose();
                    reader = null;
                }
                request.Abort();
                request = null;
                response.Dispose();
                response = null;
            }
     public static HttpWebRequest BuildRequestJson(string url, string user, string passwd, string label)
            {
                HttpWebRequest client = (HttpWebRequest)WebRequest.Create(url);
                client.Headers.Set("Authorization", "Basic " + Convert.ToBase64String(Encoding.UTF8.GetBytes($"{user}:{passwd}")));
                client.Headers.Set("Expect", "100-continue");//固定
                client.Headers.Set("label", label);//labid 保持唯一
                client.Headers.Set("column_separator", ",");//列分隔符
                client.Headers.Set("Content-Type", "text/plain; charset=UTF-8");
                client.Method = HttpMethod.Put.Method;//put请求
                client.Headers.Set("format", "json"); //json数据
                client.Headers.Set("strip_outer_array", "true");//序列化是数组的json格式
                client.ServicePoint.ConnectionLimit = int.MaxValue; //加上请求设置,提高效率
                client.ServicePoint.Expect100Continue = false;
                client.ServicePoint.UseNagleAlgorithm = false;
                client.AllowWriteStreamBuffering = false;
                client.Proxy = null; //不使用代理
                client.KeepAlive = false; //不建立持久性连接
                return client;
            }

    返回格式

    {
        "TxnId": 1003,
        "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
        "Status": "Success",
        "ExistingJobStatus": "FINISHED", // optional
        "Message": "OK",
        "NumberTotalRows": 1000000,
        "NumberLoadedRows": 1000000,
        "NumberFilteredRows": 1,
        "NumberUnselectedRows": 0,
        "LoadBytes": 40888898,
        "LoadTimeMs": 2144,
        "BeginTxnTimeMs": 1,
        "StreamLoadPutTimeMs": 2,
        "ReadDataTimeMs": 325,
        "WriteDataTimeMs": 1933,
        "CommitAndPublishTimeMs": 106,
        "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
    }

    status :success 就成功了

    唯一遗憾的是FE的链接没有测试通过。不知道有没有老铁解决了。

  • 相关阅读:
    课堂Scrum站立会议演示
    每周工作进度及工作量统计
    连连看的设计与实现——四人小组项目(GUI)
    连连看的设计与实现——四人小组项目(NABCD)
    用户模板和用户场景
    对MySQL 存储过程中乱码的破解
    MySQL数据库同步的实现
    解决MySQL无法远程访问的3方案
    使用SQL Server 2014内存数据库时需要注意的地方
    navicat for sql server中文版|SQL Server管理及开发工具(Navicat for SQL Server)下载 v11.2.13
  • 原文地址:https://www.cnblogs.com/elsons/p/15801156.html
Copyright © 2011-2022 走看看