zoukankan      html  css  js  c++  java
  • DataStax Bulk Loader教程(二)

    DataStax Bulk Loader系列教程共分为六篇。

    通过作者Brian Hess对概念的清晰解释再辅以丰富的示例代码,用户可以较为全面细致地了解DataStax Bulk Loader这种高效易用的批量加载工具,从而实现轻松在数据模型、DSE系统或者其它数据系统之间的数据迁移。

    另外,我们也建议您在使用dsbulk时参考dsbulk 文档页面,从而了解所有的相关参数和选项。


    在之前的博客文章中,我们引入了dsbulk 命令行,一些基本的加载范例,并介绍了一些映射(Mapping)。在本博客文章中,我们将研究一些其它加载功能的元素。

    范例 5: 注释 (Comment)

    dsbulk允许在你的输入文件中加入注释。默认情况下是没有注释字符的。 iris_with_comment.csv 文件在开头有一个注释,我们可以这样使用dsbulk来忽略注释。

    $ dsbulk load -url /tmp/dsbulkblog/iris_with_comment.csv -k dsbulkblog -t iris_with_id -comment "#"

    范例 6: 空值(NULL)

    有时候我们的数据包含了空值。 默认情况下,空字符串会作为空值插入Cassandra。 iris_with_null.csv文件中species的列含有了空字符串。我们可以用默认设置方法加载它。首先,让我们清除表(TRUNCATE)。

    $ cqlsh -e "TRUNCATE dsbulkblog.iris_with_id;"

    现在,让我们加载数据:

    $ dsbulk load -url /tmp/dsbulkblog/iris_with_nulls.csv -k dsbulkblog -t iris_with_id

    我们可以快速对iris_with_id 表格使用SELECT来检查species列是空值。

    $ cqlsh -e "SELECT * FROM dsbulkblog.iris_with_id LIMIT 3;"
    
    id  | petal_length | petal_width | sepal_length | sepal_width | species
    
    -----+--------------+-------------+--------------+-------------+---------  
    
    23 |          1.7 | 0.5 |          5.1 | 3.3 | null
    
    114|          5.1 | 2.4 |          5.8 | 2.8 | null  
    
    53 |            4 | 1.3 |          5.5 | 2.3 | null
    
     
    
    (3 rows)

    我们也可以选择通过指明--connector.csv.nullValue 的参数将这些空字符串转化成非空值。例如, 如果我们有一个没有值的species,我们可以将这个值设置为IRIS:

    $ cqlsh -e "TRUNCATE dsbulkblog.iris_with_id;"
    
    $ dsbulk load -url /tmp/dsbulkblog/iris_with_nulls.csv -k dsbulk
    
    blog -t iris_with_id --connector.csv.nullValue "IRIS"
    
    Operation directory: /tmp/logs/LOAD_20190320-190450-807527
    
    total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches 
    
    150 |      0 | 346 | 0.01 |   0.04 | 11.49 | 39.58 |  41.42 | 1.00
    
    Operation LOAD_20190320-190450-807527 completed successfully in 0 seconds.
    
    Last processed positions can be found in positions.txt $
    
    cqlsh -e "SELECT * FROM dsbulkblog.iris_with_id LIMIT 3;"
    
    id  | petal_length | petal_width | sepal_length | sepal_width | species
    
    -----+--------------+-------------+--------------+-------------+---------  
    
    23 |          1.7 | 0.5 |          5.1 | 3.3 | IRIS
    
    114|          5.1 | 2.4 |          5.8 | 2.8 | IRIS 
    
    53 |            4 | 1.3 |          5.5 | 2.3 | IRIS
    
    (3 rows)

     

    范例 6.1:空字符串 (Null Strings)

    当然,有时候数据会附带一条”NULL“ 的字符串,如使用NULL字符串表示NULL值的iris_with_null_string.csv 文件。如果我们用默认值加载该文件,如下面案例:

    $ dsbulk load -url /tmp/dsbulkblog/iris_with_null_string.csv -k dsbulkblog -t iris_with_id

    我们不会得到预期的结果:

    $ cqlsh -e "SELECT * FROM dsbulkblog.iris_with_id LIMIT 3;"
    
    id  | petal_length | petal_width | sepal_length | sepal_width | species
    
    -----+--------------+-------------+--------------+-------------+---------  
    
    23 |          1.7 | 0.5 |          5.1 | 3.3 | NULL
    
    114|          5.1 | 2.4 |          5.8 | 2.8 | NULL 
    
    53 |            4 | 1.3 |          5.5 | 2.3 | NULL (3 rows)

    请注意到这是一条 ”NULL“ 的字符串,而不是一个空值。

     

    我们可以通过使用-nullStrings 的参数加载空值。首先,让我们清除表(TRUNCATE):

    $ cqlsh -e "TRUNCATE dsbulkblog.iris_with_id;"

    现在,让我们使用-nullStrings 的参数:

    $ dsbulk load -url /tmp/dsbulkblog/iris_with_null_string.csv -k dsbulkblog -t iris_with_id -nullStrings "NULL"

    这会给我们所期望的结果:

    $ cqlsh -e "SELECT * FROM dsbulkblog.iris_with_id LIMIT 3;"
    
    id  | petal_length | petal_width | sepal_length | sepal_width | species
    
    -----+--------------+-------------+--------------+-------------+---------  
    
    23 |          1.7 | 0.5 |          5.1 | 3.3 | null
    
    114|          5.1 | 2.4 |          5.8 | 2.8 | null 
    
    53 |            4 | 1.3 |          5.5 | 2.3 | null (3 rows)

    请注意到我们现在有了一个空值,而不是一个“NULL” 的字符串。

     

    范例6.2: 未设置值(Unset)VS 空值 (NULL)

    在DSE 5.0 (或者Cassandra 3.0),我们引入了将INSERT字段指定为“未设置字段”的功能。在此之前,我们必须将指明一个缺失字段为空,而且做以下两件事:

    1. 无论此INSERT语句是一个UPDATE还是覆盖,我们可以删除在这字段中任何的数据。

    2. 如果此INSERT语句是一个新记录,我们会插入一个空值。DSE会将此记录设为空,被当作墓碑处理,这可能会成为一个问题。

     

    dsbulk既可以将输入的空数据当作一个空值,也可以当成一个未设置值。默认情况下是当作未设置值,但我们通过加入--schema.nullToUnset 的参数将它们设为空:

    $ dsbulk load -url /tmp/dsbulkblog/iris_with_null_string.csv -k dsbulkblog -t iris_with_id
    
    -nullStrings "NULL" --schema.nullToUnset false

      

    请注意,尽管dsbulk仅支持DSE 5.0 或者更高版本,一些用户仍可以在DSE 4.8 进行load。因此,您必须特别设置-schema.nullToUnset 为false,因为DSE 5.0 之前不支持未设置的值。

     

    范例 7:  定界符 (Delimeters)

    不是所有数据的定界符都为逗号,但这是dsbulk 默认的定界符。例如,president_birthdates.psv 文件的定界符是管道符 “|”。我们可以这样通过以下方法加载数据:

    $ dsbulk load -url /tmp/dsbulkblog/president_birthdates.psv -k dsbulkblog -t president_birthdates -delim "|"

    -delim 参数是 --connector.csv.delimiter 的简写。

     

    范例 8:日期格式

    你可能也注意到其中有一条记录加载失败了。我们可以通过mapping-errors.log来查询错误。

    Resource: file:/tmp/dsbulkblog/president_birthdates.psv
    
    Position: 6
    
    Source: John Quincy Adams|July 11, 1767|1767.07.11u000a
    
    java.time.format.DateTimeParseException: Text '1767.07.11' could not be parsed at index 4       
    
    at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)        
    
          at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1819)        
    
          at com.datastax.dsbulk.engine.internal.codecs.util.SimpleTemporalFormat.parse(SimpleTemporalFormat.java:41)       
    
    at com.datastax.dsbulk.engine.internal.codecs.string.StringToTemporalCodec.parseTemporalAccessor(StringToTemporalCodec.java:39)       
    
    at com.datastax.dsbulk.engine.internal.codecs.string.StringToLocalDateCodec.externalToInternal(StringToLocalDateCodec.java:30)

    因为John Quincy Adams这条记录,我们使用了一个不同的日期格式。正如之前所述,加载失败的记录储存在mapping.bad 文件中。我们可以通过一种不同的加载方式加载此文件, 如:

    $ dsbulk load -url /tmp/logs/LOAD_20190314-164545-280529/mapping.bad -k dsbulkblog -t president_birthdates -delim "|" -header false -m "president,birthdate_string,birthdate" --codec.date "y.M.d"

    请注意这里有几件事。首先,mapping.bad 文件位置是针对这次的运行。它很有可能会有不同的路径,并且您也可以看到这个“失败的文件” 是在运行失败的过程中产生的。第二,我们需要指明正确的日期格式。为此,我们需要为每一个Java DateFormatter 创建一个字符串,并通过--codec.date parameter 参数将数据传递给dsbulk。第三,错误的文件没有标题行,所以我们需要提供它,为此需要提供两个东西:

    1. 我们需要告诉dsbulk没有标题行: -header false。

    2. 我们要提供映射,就需要用到: -m "president,birthdate_string,birthdate"。

     

    顺便说一句,我们也可以使用Linux 的映射功能,同时从president_birthdates.psv 原文件中获取标题行(虽然我们只需要将 “,” 替换为 “|”):

    $ dsbulk load -url /tmp/logs/LOAD_20190314-164545-280529/mapping.bad -k dsbulkblog -t president_birthdates -delim "|" -header false -m `head -1 /tmp/dsbulkblog/president_birthdates.psv | sed 's/|/,/g'` --codec.date "y.M.d"

    范例 9:CurrentDate(),Now(),等

    有时候我们希望能用当前日期或者时间戳填充数据。CQL允许你在做INSERT时调用  CurrentDate() 函数(还有 Now(), CurrentTimestamp() 等 )。如果我们在dsbulk中也想这样做,我们可以使用 -query 的指令。例如,如果我们想将下面所有的总统(president)的生日设置为今天(仅是将一个人作为例子),我们可以如以下方法进行操作:

    $ dsbulk load -url /tmp/dsbulkblog/president_birthdates.psv -delim "|" -query "INSERT INTO dsbulkblog.president_birthdates(president,birthdate,birthdate_string) VALUES (:president, CurrentDate(), :birthdate_string)"

    范例 9.1:映射 CurrentDate(),Now(),等

    我们也可以用 --schema.mapping 选项来完成:

    $ dsbulk load -url /tmp/dsbulkblog/president_birthdates.psv -k dsbulkblog -t president_birthdates -delim "|" -m "president=president, CurrentDate()=birthdate, birthdate_string=birthdate_string"

    范例 10: 集合 (Collections)

    Dsbulk可以将用户自定义类型数据(user-defined types UDTs)加载到集合和列中。要注意的是:dsbulk 替换了现有的集合,而不是追加到现有集合中。集合中数据的格式是 JSON。例如,这是 sportsteam.csv 中的第一行:

    "Baseball Teams","["Boston Red Sox", "Atlanta Braves", "San Diego Padres", "Washington Senators"]"

    我们不需要做任何特别的操作就可以将数据加载到 dsbulkblog.categories_list 中:

    $ dsbulk load -url /tmp/dsbulkblog/sportsteams.csv -k dsbulkblog -t categories_list

    我们也可以将相同的文件加载到 dsbulkblog.categories_set 里:

    $ dsbulk load -url /tmp/dsbulkblog/sportsteams.csv -k dsbulkblog -t categories_set

    还有一件值得注意的事情是, JSON格式的集合还有JSON数组中单独的元素必须要用双引号引起来。然而,字符串的类别不需要用引号括起来。。这里是 sportsteams.csv 完整的文件:

    category,examples
    
    "Baseball Teams","["Boston Red Sox","Atlanta Braves","San Diego Padres","Washington Senators"]"
    
    "Football Teams","["New England Patriots","Seattle Seahawks","Detroit Lions","Chicago Bears"]"
    
    "Soccer Teams","["New England Revolution","New York Red Bulls","D. C. United","Chicago Fire Soccer Club"]"
    
    "Basketball Teams","["Boston Celtics","Los Angeles Lakers","Chicago Bulls","Detroit Pistons"]"
    
    "Hockey Teams","["Boston Bruins","Philadelphia Flyers","Washington Capitals","Edmonton Oilers"]"

    关于集合还有一点,当 JSON 集合被引起来时,您必须确保在定界符和含有引号的JSON字符串之间没有任何空格,也不能利用 --connector.csv.ignoreLeadingWhitespaces 参数,这我们已经在范例 3.7中进行了讨论。

     

    范例 11: 存活时间TTLs 与 时间戳 (Timestamps)

    范例 11.1: TTLs

    我们可以通过  --schema.queryTtl 参数来指定加载数据的TTL。

    $ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id --schema.queryTtl 3600

    范例 11.2: 时间戳

    类似的,您可以用 --schema.queryTimestamp 来设置时间戳:

    $ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id --schema.queryTimestamp "2018-01-01T12:34:56+00:00"

    范例 11.3: 输入数据的TTL

    有时候我们希望根据输入数据本身设置TTL或者写入时间。dsbulk 允许通过两种方法。 第一种: 我们可以在--schema.mapping parameter 参数里定义特殊的  __ttl 列, 例如:

    $ cat /tmp/dsbulkblog/iris_no_header.csv | awk -F, '{printf("%s,%d
    ", $0, $6+10000);}' | dsbulk load -k dsbulkblog -t iris_with_id -m "sepal_length,sepal_width,petal_length,petal_width,species,id,__ttl" -header false

    这里我们通过awk添加列一个TTL的额外列(这里将TTL的值设为id列加上10000)。我们可以通过cqlsh检查TTL是否已经设置好。

    $ cqlsh -e "SELECT id, species, Ttl(species) FROM dsbulkblog.iris_with_id LIMIT 5"
    
    id | species         | ttl(species)
    
    -----+-----------------+--------------  
    
    23 |     Iris-setosa |         9986
    
    114|  Iris-virginica |        10078 
    
    53 | Iris-versicolor |        10016
    
    110|  Iris-virginica |        10074 
    
    91 | Iris-versicolor |        10054
    
    (5 rows)

    范例 11.4:自定义查询数据的TTL

    我们也可以用自定义查询来执行此操作:

    $ cat /tmp/dsbulkblog/iris_no_header.csv | awk -F, 'BEGIN{printf("sepal_length,sepal_width,petal_length,petal_width,species,id,ttl_to_use
    ");} {printf("%s,%d
    ", $0, $6+10000);}' | dsbulk load -query "INSERT INTO dsbulkblog.iris_with_id(sepal_length,sepal_width,petal_length,petal_width,species,id) VALUES (:sepal_length,:sepal_width,:petal_length,:petal_width,:species,:id) USING TTL :ttl_to_use"

      

    因为 USING TTL 子句需要绑定到命名变量,所以我们需要提供标题行。

     

    类似的操作可以通过设置写入数据的时间戳来完成。

     

    案例 12: 试运行 (Dry Run)

    有时候您想在加载之前验证您的数据和参数。dsbulk 有一个 -dryRun 的选项,也是 --engine.dryRun 的简写来试运行:

    $ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -dryRun true

    输出将如下所示:

    Operation directory: /tmp/logs/LOAD_20190314-165538-413122
    
    Dry-run mode enabled.
    
    total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches  
    
    150 |      0 | 0 | 0.00 |   0.00 | 0.00 | 0.00 |   0.00 | 1.00
    
    Operation LOAD_20190314-165538-413122 completed successfully in 0 seconds.
    
    Last processed positions can be found in positions.txt 

     

    我们接着可以检查日志目录(在这里是/tmp/logs/LOAD_20190314-165538-413122)并检查是否有任何映射错误或者验证错误。因为本运行的结果是成功的,所以我们只看到有 operation.log 和 positions.txt 文件产生。

     

    范例 13: 控制写入率 和 实时请求 (in-flight requests)

    我们可以在任何时候通过 Executor 选项控制每秒钟写入的数量和实时写入的数量。

     

    范例 13.1: 控制写入率

    我可以通过 --executor.maxPerSecond 的参数控制每秒钟写入的数量:

    $ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id --executor.maxPerSecond 10

    范例 13.2: 控制实时请求的数量

    同理,我们可以使用 --executor.maxInFlight 的参数限制任何实时的并发量。

    $ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id --executor.maxInFlight 3 

     


     

    点击这里下载 DataStax Bulk Loader。

     

    本系列所有文章:

  • 相关阅读:
    Qt5.3.0 for android windows平台下搭建及demo(虫子的博客)
    不分享,用户很少,什么都没有,没有秒传
    Entity Framework 5.0系列之约定配置
    学习SQL关联查询
    SQL语句调优
    数组总结篇(上)
    实现同一套代码针对不同平台工程的编辑和编译
    dispatch队列
    SOCKET网络编程快速上手(一)
    Javascript判断两个日期是否相等
  • 原文地址:https://www.cnblogs.com/datastax/p/13679783.html
Copyright © 2011-2022 走看看