zoukankan      html  css  js  c++  java
  • Ubuntu下安装和使用zookeeper和kafka

    1.在清华镜像站下载kafka_2.10-0.10.0.0.tgz 和 zookeeper-3.4.10.tar.gz

    分别解压到/usr/local目录下

    2.进入zookeeper目录,在conf目录下将zoo_sample.cfg文件拷贝,并更名为zoo.cfg

    参考 https://my.oschina.net/phoebus789/blog/730787

    zoo.cfg文件的内容

    # The number of ticks that the initial 
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just 
    # example sakes.
    dataDir=/home/common/zookeeper/zookeeperdir/zookeeper-data
    dataLogDir=/home/common/zookeeper/zookeeperdir/logs
    # the port at which the clients will connect
    clientPort=2181
    server.1=10.10.100.10:2888:3888
    # the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    #
    # Be sure to read the maintenance section of the 
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1
    

    新建下面这两个目录

    /home/common/zookeeper/zookeeperdir/zookeeper-data
    /home/common/zookeeper/zookeeperdir/logs
    

    在zookeeper-data目录下新建一个myid文件,内容为1,代表这个服务器的编号是1,具体参考上面网址中的内容

    最后在/etc/profile中添加环境变量,并source

    export ZOOKEEPER_HOME=/usr/local/zookeeper
    export PATH=${ZOOKEEPER_HOME}/bin:$PATH
    

    现在zookeeper就安装好了,现在启动zookeeper

    bin/zkServer.sh start
    

    查看状态

    bin/zkServer.sh status
    

    启动客户端脚本

    bin/zkCli.sh -server localhost:2181
    

    停止zookeeper

    bin/zkServer.sh stop
    

    1.现在安装kafka,同样是解压之后就安装好了

    参考 http://www.jianshu.com/p/efc8b9dbd3bd

    2.进入kafka目录下

    kafka需要使用Zookeeper,首先需要启动Zookeeper服务,上面的操作就已经启动了Zookeeper服务

    如果没有的话,可以使用kafka自带的脚本启动一个简单的单一节点Zookeeper实例

    bin/zookeeper-server-start.sh config/zookeeper.properties
    

    启动 Kafka服务

    bin/kafka-server-start.sh config/server.properties
    

    停止 Kafka服务

    bin/kafka-server-stop.sh config/server.properties
    

    3.创建一个主题

    首先创建一个名为test的topic,只使用单个分区和一个复本

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    

     现在可以运行list topic命令看到我们的主题

    bin/kafka-topics.sh --list --zookeeper localhost:2181
    

    4.发送消息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    This is a message
    This is another message
    

    如果要批量导入文件数据到kafka,参考:2.1 本地环境下kafka批量导入数据

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic < file_pat
    

    如果要模拟实时数据到打入kafka的情况,可以写一个shell脚本

    #!/usr/bin/env bash
    
    cat XXXX.log | while read line
    do
        sleep 0.1
        echo "${line}"
        echo "${line}" | /home/lintong/software/apache/kafka_2.11-0.10.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicA
    done
    

    5.启动一个消费者,消费者会接收到消息

    旧版消费者

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 2>/dev/null
    

    新版消费者

    bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic input --from-beginning 2>/dev/null
    

    消费带权限kafka topic

    /opt/cloudera/parcels/KAFKA/bin/kafka-console-consumer --new-consumer --bootstrap-server xxxx:9092 --topic my_topic --consumer.config ./client.jaas > ./test.log
    

    6.查看指定的topic的offset信息

    对于结尾是ZK的消费者,其消费者的信息是存储在Zookeeper中的

    对于结尾是KF的消费者,其消费者的信息是存在在Kafka的broker中的

    都可以使用下面的命令进行查看

    bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group xxx --topic xxx
    

    结果

    bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group test-consumer-group --topic xxx
    [2018-09-03 20:34:57,595] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
    Group           Topic                          Pid Offset          logSize         Lag             Owner
    test-consumer-group xxx              0   509             0               -509            none
    

    或者

    ./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group xxxx --topic xxxx
    

    结果

    bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test-consumer-group
    [2018-09-03 20:45:02,967] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
    Group           Topic                          Pid Offset          logSize         Lag             Owner
    test-consumer-group xxx              0   509             509             0               none
    

    lag是负数的原因是 topic中的消息数量过期(超过kafka默认的7天后被删除了),变成了0,所以Lag=logSize减去Offset,所以就变成了负数

    7.删除一个topic

    需要在 conf/server.properties 文件中设置

    # For delete topic
    delete.topic.enable=true
    

    否则在执行了以下删除命令后,再 list 查看所有的topic,还是会看到该topic

    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topicB
    

    再到 配置文件 中的kafka数据存储地址去删除物理数据了,我的地址为

    /tmp/kafka-logs
    

    最后需要到zk里删除kafka的元数据

    ./bin/zkCli.sh #进入zk shell
    ls /brokers/topics
    rmr /brokers/topics/topicA
    

     参考:kafka 手动删除topic的数据

    8.查看某个group的信息

    新版

    bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group xxx
    

    结果

    bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group group_id
    GROUP          TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET   LAG                 OWNER
    group_id      xxx              0          509             509             0               consumer-1_/127.0.0.1
    

    如果这时候消费者进程关闭了之后,使用上面的命令和下面的-list命令将不会查出这个group_id,但是当消费者进程重新开启后,这个group_id又能重新查到,且消费的offset不会丢失

    旧版

    bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group xxx --describe
    

    9.查看consumer group的列表

    ZK的消费者可以使用下面命令查看,比如上面的例子中的 test-consumer-group

    bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list
    

    KF的消费者可以使用下面命令查看,比如上面的例子中的 console-consumer-xxx ,但是只会查看到类似于 KMOffsetCache-lintong-B250M-DS3H 的结果,这是由于这种消费者的信息是存放在 __consumer_offsets 中

    对于如何查看存储于 __consumer_offsets 中的新版消费者的信息,可以参考huxihx的博文: Kafka 如何读取offset topic内容 (__consumer_offsets)

    bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
    

    10.在zk中删除一个consumer group

    rmr /consumers/test-consumer-group
    

    11.查看topic的offset的最小值

    参考:重置kafka的offset

    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --time -2
    xxxx:0:0
    

    12.查看topic的offset的最大值

    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --time -1
    

    13.重置topic的某个消费者的offset为0,需要高版本的kafka才有该命令,在高版本的kafka client对低版本的kafka集群执行该命令是会生效的

    而且需要该group是inactive的,,即该消费组没有消费者,不然会报 Error: Assignments can only be reset if the group 'xxxxxx' is inactive, but the current state is Stable.

    kafka-consumer-groups --bootstrap-server localhost:9092 --group xxx --topic xxx --reset-offsets --to-earliest --execute
    

    14.指定offset和partition进行消费,指定offset的时候必须指定partition

    /opt/cloudera/parcels/KAFKA/bin/kafka-console-consumer --topic xxxx --partition 2 --offset 820000  --bootstrap-server xxx:9092 > ./test2.log
    

    15.查看kafka topic的consumer的某个时间的offset,注意这个--to-datetime是utc时间,需要减去8个小时

    /opt/cloudera/parcels/KAFKA/bin/kafka-consumer-groups --bootstrap-server xxxx:9092 --group xxxx --topic xxxx --command-config ./client.jaas  --reset-offsets --to-datetime 2020-01-01T00:00:00.000
    

    client.jaas

    properties {
    	security.protocol=SASL_PLAINTEXT
    	sasl.mechanism=PLAIN
    	sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" serviceName="kafka" password="xxxx";
    }
    
  • 相关阅读:
    sql 导出大数据量 到excel
    完美输出textarea样式(换行,空格)
    Caliburn.Micro tips
    客户端向服务器提交数据,表单形式
    sevlet生命周期
    Intent(简单介绍)
    return常用用法
    Activity的生命周期
    android.util.AndroidRuntimeException: requestFeature() must be called before adding content
    ListView点击事件不响应。
  • 原文地址:https://www.cnblogs.com/tonglin0325/p/7039747.html
Copyright © 2011-2022 走看看