zoukankan      html  css  js  c++  java
  • Kafka 教程(二)-安装与基础操作

    单机安装

    1. 安装 java

    2. 安装 zookeeper    【这一步可以没有,因为 kafka 自带了 zookeeper】

    3. 安装 kafka

    下载链接 kafka 

    kafka 是 scalar 开发的,解压后的后的文件名 包含两个版本号,第一个是 scalar 版本,第二个是 kafka 版本,注意一定要比较新的版本。

    解压

    启动 zookeeper 服务;如果使用 kafka 自带的 zookeeper,也需要先启动 ,自带 zk 启动命令为

    bin/zookeeper-server-start.sh config/zookeeper.properties

    修改 kafka 配置,server.properties,一般不用大改,需要注意的就是 设定 zookeeper.connect=localhost:port 的端口等于启动的 zookeeper 服务的端口

    启动 kafka 服务器,输出如下所示

    [root@localhost kafka_2.11-0.9.0.0]# bin/kafka-server-start.sh config/server.properties 
    [2019-09-04 14:31:23,069] INFO KafkaConfig values: 
        advertised.host.name = null
        metric.reporters = []
        quota.producer.default = 9223372036854775807
        offsets.topic.num.partitions = 50
        log.flush.interval.messages = 9223372036854775807
        auto.create.topics.enable = true

    启动服务器之后,可以 ps 查看到进程;可以在 logs/server.log 查看启动日志

    停止服务器

    bin/kafka-server-stop.sh config/server.properties

    后台启动服务器

    加参数  -daemon

    基本操作

    主题 topic 操作

    kafka-topics.sh:用于在服务器上各种操作 topic,该命令位于 kafka 安装路径的bin下

    创建 topic

    --create:创建 topic

    --zookeeper:指定 zookeeper

    --replication-factor:指定副本数,副本数不能大于 broker 数

    --partitions:指定分区

    --topic:指定 topic name

    [root@localhost kafka_2.11-0.9.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2191 --replication-factor 1 --partitions 1 --topic test
    Created topic "test".

    创建成功后,可以在 server.properties 配置文件 log.dirs 指定的路径下看到创建 topic 的 log

    查看 topic 名字

    [root@localhost kafka_2.11-0.9.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2191
    test

    查看某个 topic 具体信息

    [root@localhost kafka_2.11-0.9.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2191 --replication-factor 3 --partition 4 --topic sq
    Created topic "sq".
    
    [root@localhost kafka_2.11-0.9.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2191 --topic sq
    Topic:sq    PartitionCount:4    ReplicationFactor:3    Configs:
        Topic: sq    Partition: 0    Leader: 0    Replicas: 0,10,11    Isr: 0,10,11
        Topic: sq    Partition: 1    Leader: 10    Replicas: 10,11,12    Isr: 10,11,12
        Topic: sq    Partition: 2    Leader: 11    Replicas: 11,12,0    Isr: 11,12,0
        Topic: sq    Partition: 3    Leader: 12    Replicas: 12,0,10    Isr: 12,0,10

    Replicas: 代表 副本位于哪个服务器,排在最前面的起作用,就是每个分区的 leader,优先使用这个服务器上的 副本,如果挂了,再切到其他服务器的副本;

    Isr:kafka 集群中可用的 broker.id 列表,用来同步副本

    消息队列

    启动生产者创建消息

    producer.properties 指定了生产者的属性,如 metadata.broker.list=localhost:9092,...

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    --broker-list:代理列表,当前数据流向哪个 broker

    --topic:主题名

    启动消费者获取消息

    consumer.properties 指定了消费者的属性,如 zookeeper.connect=127.0.0.1:2181,...

    bin/kafka-console-consumer.sh --zookeeper localhost:2191 --topic test --from-beginning

    --zookeeper :kafka topic 的元数据存放在 zk 中,所以指定 zk

    --from-beginning:从头开始获取消息

      // 具体来说是这样的,比如生产者一连发了好多消息,消费者不管是发送的时候已经启动,还是发完再启动,它都能获取 1-end 全部消息,即使 生产者已经关闭了,也可以获取

      // 如果没有 这个参数,消费者只能获取 他被启动以后 发送的消息

    =========================== 2020/4/10 日更新 ===========================

    新版本更新 (kafka_2.11-2.2.0)

    1. 新版本的 生产者、消费者 都交给了 kafka 节点;

    消费者的操作更新为

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 9911

    zookeeper 换成了  bootstrap-server;后面的 ip-port 换成了 kafka 服务,而不是 zookeeper 服务

    2. 数据保存在 kafka 中,而 原来保存在 zk

    数据保存的配置在 server.properties/log.dirs=/tmp/kafka-logs;

    而之前版本在 zookeeper.properties/dataDir=/tmp/zookeeper

    这是 consumer 的 offset,保存在 kafka 的 log_dirs

    安装配置 与 操作的对应关系

    描述了 在 实际 操作中,哪些 操作 对应了 哪些 配置参数

    消费者--黑名单(blacklist)和白名单(whitelist)选项:

    --blacklist 后面跟需要过滤的topic的列表,使用","隔开,意思是除了列表中的topic之外,都能接收其它topic的数据
    --whitelist 后面跟需要过滤的topic的列表,使用","隔开,意思是除了列表中的topic之外,都不能接收其它topic的数据
    eg:
    kafka-console-consumer.sh --zookeeper uplooking01:2181 --from-beginning --blacklist hadoop,hive
    kafka-console-consumer.sh --zookeeper uplooking01:2181 --from-beginning --whitelist hadoop,flume

    集群安装

    由于只有一台服务器,本次执行伪分布式集群安装,与真正的集群安装过程雷同。

    kafka 没有主从关系;

    在安装分布式 kafka 之前,可以不安装分布式的 zookeeper,单机 zk 即可

    安装过程

    以3个节点为例

    第一步,新建3个配置文件,server1.properties,server2.properties,server3.properties    【或者建2个,原来的也算一个,总共3个】

    修改以下几个参数

    broker.id=10
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs1

    3个配置文件中这几个参数不能相同

    第二步,新建上面的3个路径,存储 topic 日志

    第三步,分别启动这3个 kafka

    kafka 启动成功后,会在 zookeeper 中创建节点,进入 zookeeper 客户端即可查看

    bin/zkCli.sh -server localhost:2191
    [zk: localhost:2191(CONNECTED) 1] ls /
    [admin, brokers, config, consumers, controller, controller_epoch, isr_change_notification, zookeeper]
    [zk: localhost:2191(CONNECTED) 2] get brokers/ids/10
    Path must start with / character
    [zk: localhost:2191(CONNECTED) 3] get /brokers/ids/10
    {"jmx_port":-1,"timestamp":"1567661983831","endpoints":["PLAINTEXT://localhost:9093"],"host":"localhost","version":2,"port":9093}

    我们看到了 kafka 服务的配置

    基本操作

    主题 topic 操作

    基本同单机

    创建 topic

    bin/kafka-topics.sh --create --zookeeper localhost:2191 --replication-factor 3 --partitions 1 --topic yanshw

    3 台服务器,所以 副本设置为3,当然可以是1,2,比3小就行了

    describe

    [root@localhost kafka_2.11-0.9.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2191 --topic yanshw
    Topic:yanshw    PartitionCount:1    ReplicationFactor:3    Configs:
        Topic: yanshw    Partition: 0    Leader: 12    Replicas: 12,0,10    Isr: 12,0,10

    输出第一行代表分区摘要,分别是 主题,分区,副本,

    输出第二行代表 领导者 所在分区,Leader:12 中的 12 是 server.properties 中的参数 broker.id,Replicas: 12,0,10 代表 副本位于哪个服务器,排在最前面的起作用;Isr:kafka 集群中可用的 broker.id 列表

    这里有个小发现:我实验单机时,开启一个kafka服务,id=0,后来我实验集群,复制了3份配置文件,分别设置 id 为 10,11,12,后来都启动成功了,但是之前的 单机服务没关闭,也就是 变成了 4个节点,而副本数是3,于是随机选了3台存放副本

    消息队列

    方式同单机

    生产者

    [root@localhost kafka_2.11-0.9.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic yanshw
    a
    b

    这个端口是 producer.properties 生产者配置中的 metadata.broker.list=localhost:9092

    消费者

    [root@localhost kafka_2.11-0.9.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2191 --topic yanshw --from-beginning
    a
    b

    其他操作

    修改主题

    只能修改分区个数,且只能增加,不能减少

    bin/kafka-topics.sh --zookeeper localhost:2191 --alter --topic yanshw --partitions 4

    修改为4个分区

    删除主题

    [root@localhost kafka_2.11-0.9.0.0]# bin/kafka-topics.sh --zookeeper localhost:2191 --delete --topic yanshw 
    Topic yanshw is marked for deletion.
    Note: This will have no impact if delete.topic.enable is not set to true.

    注意,这个操作之后, topic 只是标记为 待删除,并没有真正删除。

    如果 delete.topic.enable 没有设置为 true,这个操作不会有任何影响。

    查看消费者分组

    消费者分组无需事先指定,在读数据时随意指定名字,会自动创建分组

    [root@node0 kafka_2.11-2.2.0]# bin/kafka-consumer-groups.sh --bootstrap-server node0:9092 --list
    111
    123456
    console-consumer-17565

     查看某个分组的信息

    [root@node0 kafka_2.11-2.2.0]# bin/kafka-consumer-groups.sh --bootstrap-server node0:9092 --group 111 --describe
    
    TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
    91107           0          9               9               0               kafka-python-1.4.6-a6a9e3ff-e677-4ed9-80aa-ee1dd9ea35d8 /172.16.95.55   kafka-python-1.4.6
    91107           1          6               6               0               kafka-python-1.4.6-a6a9e3ff-e677-4ed9-80aa-ee1dd9ea35d8 /172.16.95.55   kafka-python-1.4.6
    91107           2          5               5               0               kafka-python-1.4.6-a6a9e3ff-e677-4ed9-80aa-ee1dd9ea35d8 /172.16.95.55   kafka-python-1.4.6
    91101           1          179             179             0               -                                                       -               -
    91101           2          206             222             16              -                                                       -               -

    参考资料:

    https://www.w3cschool.cn/apache_kafka/apache_kafka_installation_steps.html  Apache Kafka 安装步骤

    https://www.w3cschool.cn/apache_kafka/apache_kafka_basic_operations.html  Apache Kafka 基本操作

    https://blog.51cto.com/xpleaf/2090847    讲些比较深一些

    https://blog.csdn.net/hg_harvey/article/details/79174104  各种情况下的安装

  • 相关阅读:
    pgspider sqlite mysql docker 镜像
    pgspider docker 镜像
    pgspider基于pg 的高性能数据可视化sql 集群引擎
    diesel rust orm 框架试用
    golang 条件编译
    Performance Profiling Zeebe
    bazel 学习一 简单java 项目运行
    一个好用node http keeplive agnet
    gox 简单灵活的golang 跨平台编译工具
    mailhog 作为smtp server mock工具
  • 原文地址:https://www.cnblogs.com/yanshw/p/11460255.html
Copyright © 2011-2022 走看看