zoukankan      html  css  js  c++  java
  • kafka

     一、下载编译好的安装包

    配置好环境变量bin/windows放到path里

    先启动zookeeper,再启动server

    C: oolskafka_2.12-2.4.1>zookeeper-server-start.bat configzookeeper.properties

    C: oolskafka_2.12-2.4.1>kafka-server-start.bat configserver.properties

     二、创建topic

    C:Users86185>kafka-topics --zookeeper 127.0.0.1:2181 --create --topic first-kafka-topic --partitions 3 --replication-factor 1
    Created topic first-kafka-topic.

    C:Users86185>kafka-topics --zookeeper 127.0.0.1:2181 --list
    first-kafka-topic

    如果用--bootstrap-server参数值为kafka server的地址

    C:Users86185>kafka-topics --bootstrap-server 127.0.0.1:9092 --list
    __consumer_offsets
    first-kafka-topic

    三、producer发消息

    C:Users86185>kafka-console-producer --broker-list 127.0.0.1:9092 --topic first-kafka-topic
    >first kafka topic
    >second kafka topic
    >third kafka topic
    >

    四、consumer消费消息,可以加--from-beginning从头开始消费

    注意消费的时候参数是--bootstrap-server而不是--broker-list

    C:Users86185>kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first-kafka-topic --from-beginning
    first kafka topic
    third kafka topic
    second kafka topic

    五,用kafka-consumer-groups显示消费组信息

    C:Users86185>kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group my-first-application

    GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
    my-first-application first-kafka-topic 0 16 16 0 consumer-my-first-application-1-3aec1668-ce55-4c63-8961-e9811e325447 /192.168.1.8 consumer-my-first-application-1
    my-first-application first-kafka-topic 1 13 13 0 consumer-my-first-application-1-3aec1668-ce55-4c63-8961-e9811e325447 /192.168.1.8 consumer-my-first-application-1
    my-first-application first-kafka-topic 2 11 11 0 consumer-my-first-application-1-c80c459d-3e3f-43cd-850d-0cb2658272a2 /192.168.1.8 consumer-my-first-application-1

    C:Users86185>kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group my-first-application

    Consumer group 'my-first-application' has no active members.

    GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
    my-first-application first-kafka-topic 0 16 16 0 - - -
    my-first-application first-kafka-topic 2 11 11 0 - - -
    my-first-application first-kafka-topic 1 13 13 0 - - -

    六,执行 --reset-offsets --to-earliest --execute重头开始消费

    C:Users86185>kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --to-earliest --execute --group my-first-application --topic first-kafka-topic

    GROUP TOPIC PARTITION NEW-OFFSET
    my-first-application first-kafka-topic 0 0
    my-first-application first-kafka-topic 2 0
    my-first-application first-kafka-topic 1 0

    七,执行 --reset-offsets --shift-by -2 --execute

    --shift-by -2会让每个partition都回退2,所以我的topic有3个partition,设置完shift-by,consumer重新消费的时候会有6条数据。

    C:Users86185>kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --shift-by -2 --execute --group my-first-application --topic first-kafka-topic

    GROUP TOPIC PARTITION NEW-OFFSET
    my-first-application first-kafka-topic 0 20
    my-first-application first-kafka-topic 2 12
    my-first-application first-kafka-topic 1 16

    C:Users86185>kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first-kafka-topic --group my-first-application
    k
    n
    b
    i
    l
    m

    C:Users86185>kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group my-first-application

    GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
    my-first-application first-kafka-topic 0 22 22 0 consumer-my-first-application-1-3ea7c558-e31f-432e-a198-526ec421d94a /192.168.1.8 consumer-my-first-application-1
    my-first-application first-kafka-topic 1 18 18 0 consumer-my-first-application-1-3ea7c558-e31f-432e-a198-526ec421d94a /192.168.1.8 consumer-my-first-application-1
    my-first-application first-kafka-topic 2 14 14 0 consumer-my-first-application-1-3ea7c558-e31f-432e-a198-526ec421d94a /192.168.1.8 consumer-my-first-application-1

    八,遇到的问题:
    1,创建topic的时候一直timeout

    Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

    解决方法:把命令中--bootstrap-server改成--zookeeper

    2,创建topic时候传入--bootstrap-server地址填的是zookeeper的地址报错

    [2020-03-28 21:10:31,116] WARN Exception causing close of session 0x0: Unreasonable length = 308375649 (org.apache.zookeeper.server.NIOServerCnxn)

    --bootstrap-server地址应该填kafka server的地址

    2,windows下删除topic后 再启动kafka server一直报错

    解决:如果是在本地自己环境测试,可以删除kafka的log文件,会导致其他topic也被删除,需要慎重

    3,rename log文件失败

    kafka 生产消息Failed to rename [D:kafka_2.12-2.4.1/logs/controller.log] to [D:kafka_2.12-2.4.1/logs/

    解决:删除zookeeper的日志即可

  • 相关阅读:
    处理sevenzipsharp 检查密码函数的Bug
    C# 开源压缩组件比较
    css 一些技巧
    input 限制输入
    原生JS实现淡入淡出效果(fadeIn/fadeOut/fadeTo)
    js string.format 方法
    Atom插件及使用
    chrome浏览器的跨域设置-包括版本49前后两种设置 ,windows&mac
    原生js监听input值改变事件
    html5 tab横向滚动,无滚动条(transform:translate)
  • 原文地址:https://www.cnblogs.com/starof/p/12536328.html
Copyright © 2011-2022 走看看