zoukankan      html  css  js  c++  java
  • kafka 生产者基本操作

    kafka自带了一个在终端演示生产者发布消息的脚本--kafka-console-producer.sh

    运行该脚本会启动一个进程,在运行该脚本时可以传递相应配置以覆盖默认配置。

    参数--

      -- producer.config,用于加载一个生产者级别相关的配置文件

      -- producer-property 直接在启动命令中设置参数,覆盖默认配置的参数

      -- property 通过该命令可以设置消费者相关的配置

    1. 启动生产者

    必传参数 broker-list(指定kafka的代理地址列表)  和  topic(消息被发送的目标主题)

    执行下列命令,启动一个向主题 kafka-action 发送消息的生产者,同时指定每条消息包含有key

    ./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-action --property parse.key=true
    

     由于以上命令没有指定消息key与消息净荷(payload)之间的分隔符,默认为制表符,如果希望修改分隔符

    通过key.separator指定。

    ./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-action --property parse.key=true --property key.separator=' '
    

     然后在控制台输入一批消息,输入一下命令验证是否发送成功

    ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka-action --time -1
    

    输出如下

    kafka-action:2:4
    kafka-action:1:4
    kafka-action:0:0
    

     分别表示:主题名,分区,消息偏移量,共8条消息

    开启一个消费者:

    [root@haha ~]# kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic
    kafka-action
    Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. aaaabbb bbbbb 
    98230841023
    123123
    3333

     没法送一条会依次存储在 三个不同的分区  

    例如:分别发送了 1111 2222  33333  444 555 6666 六条消息

    那么

    partition-1 log日志文件存储的是  1111  444 

    partition-2 log日志文件存储的是 2222 555

    partition-3 log日志文件存储的是 33333 6666

    2.创建主题,如果开启了 auto.create.topics.enable=true,当生产者向一个不存在的主题发送消息时,kafka会自动创建主题,

    ./kafka-console-producer.sh --broker-list localhost:9092 --topic producer-create-topic
    Producer sends message to a topic that doesn't exist yet
    [2018-10-25 09:56:16,888] WARN Error while fetching metadata with correlation id 1 : {producer-create-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
    

     producer-create-topic 并不存在,控制台抛出警告

    3. 查看消息内容,使用kafka.tools.DumpLogSegments工具

    [root@haha ~]# kafka-run-class.sh kafka.tools.DumpLogSegments --files /data/appData/kafka/test3-0/00000000000000000000.log
    Dumping /data/appData/kafka/test3-0/00000000000000000000.log
    Starting offset: 0
    offset: 0 position: 0 CreateTime: 1541061414817 isvalid: true payloadsize: 13 magic: 1 compresscodec: NONE crc: 686614980
    offset: 1 position: 47 CreateTime: 1541061439270 isvalid: true payloadsize: 0 magic: 1 compresscodec: NONE crc: 988414825
    offset: 2 position: 81 CreateTime: 1541061441260 isvalid: true payloadsize: 11 magic: 1 compresscodec: NONE crc: 2421154020
    offset: 3 position: 126 CreateTime: 1541061575221 isvalid: true payloadsize: 6 magic: 1 compresscodec: NONE crc: 905626548
    offset: 4 position: 166 CreateTime: 1541061703621 isvalid: true payloadsize: 4 magic: 1 compresscodec: NONE crc: 2986172870
    offset: 5 position: 204 CreateTime: 1541061741306 isvalid: true payloadsize: 3 magic: 1 compresscodec: NONE crc: 2132028322
    offset: 6 position: 241 CreateTime: 1541061762979 isvalid: true payloadsize: 16 magic: 1 compresscodec: NONE crc: 1956799962
    

    4.生产者测试工具

  • 相关阅读:
    LeetCode 40. 组合总和 II(Combination Sum II)
    LeetCode 129. 求根到叶子节点数字之和(Sum Root to Leaf Numbers)
    LeetCode 60. 第k个排列(Permutation Sequence)
    LeetCode 47. 全排列 II(Permutations II)
    LeetCode 46. 全排列(Permutations)
    LeetCode 93. 复原IP地址(Restore IP Addresses)
    LeetCode 98. 验证二叉搜索树(Validate Binary Search Tree)
    LeetCode 59. 螺旋矩阵 II(Spiral Matrix II)
    一重指针和二重指针
    指针的意义
  • 原文地址:https://www.cnblogs.com/sunshine-long/p/9847948.html
Copyright © 2011-2022 走看看