zoukankan      html  css  js  c++  java
  • 搭建kafka

    Kafka 被称为下一代分布式消息系统,由 scala 和 Java 编写,是非营利性组织
    ASF(Apache Software Foundation,简称为 ASF)基金会中的一个开源项目,比如
    HTTP Server、Hadoop、ActiveMQ、Tomcat 等开源软件都属于 Apache 基金会的开
    源软件,类似的消息系统还有 RbbitMQ、ActiveMQ、ZeroMQ。

    Kafka®用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,
    快速快速性,可在数千家公司中投入生产。
    

    常用消息队列对比:

    kafka 最主要的优势是其具备分布式功能、并可以结合 zookeeper 可以实现动态扩
    容,Kafka 是一种高吞吐量的分布式发布订阅消息系统。
    

    kafka 优势:

    kafka 通过 O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以 TB 的
    消息存储也能够保持长时间的稳定性能。
    高吞吐量:即使是非常普通的硬件 Kafka 也可以支持每秒数百万的消息。
    支持通过 Kafka 服务器分区消息。
    支持 Hadoop 并行数据加载。
    
    O(1)就是最低的时空复杂度了,也就是耗时/耗空间与输入数据大小无关,无论
    输入数据增大多少倍,耗时/耗空间都不变,哈希算法就是典型的 O(1)时间复杂
    度,无论数据规模多大,都可以在一次计算后找到目标
    

    kafka 角色:

    Broker:

    Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。
    

    Topic :

    每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 topic,(物
    理上不同 topic 的消息分开存储在不同的文件夹,逻辑上一个 topic 的消息虽然保
    存于一个或多个 broker 上但用户只需指定消息的 topic 即可生产或消费数据而不
    必关心数据存于何处),topic 在逻辑上对 record(记录、日志)进行分组保存,消
    费者需要订阅相应的 topic 才能消费 topic 中的消息。
    

    Partition :

    是物理上的概念,每个 topic 包含一个或多个 partition,创建 topic 时
    可指定 parition 数量,每个 partition 对应于一个文件夹,该文件夹下存储该
    partition 的数据和索引文件,为了实现实现数据的高可用,比如将分区 0 的数据
    分散到不同的 kafka 节点,每一个分区都有一个 broker 作为 leader 和一个 broker
    作为 Follower。
    

    分区的优势(分区因子为 3):

    一:实现存储空间的横向扩容,即将多个 kafka 服务器的空间结合利用
    二:提升性能,多服务器读写
    三:实现高可用,分区 leader 分布在不同的 kafka 服务器,比如分区 0 的 leader
    为服务器 A,则服务器 B 和服务器 C 为 A 的 follower,而分区 1 的 leader 为服务
    器 B,则服务器 A 和 C 为服务器 B 的 follower,而分区 2 的 leader 为 C,则服务
    器 A 和 B 为 C 的 follower。
    

    Producer:负责发布消息到 Kafka broker。

    Consumer:消费消息,每个 consumer 属于一个特定的 consuer group(可为每个
    consumer 指定 group name,若不指定 group name 则属于默认的 group),使用
    consumer high level API 时,同一 topic 的一条消息只能被同一个 consumer group
    内的一个 consumer 消费,但多个 consumer group 可同时消费这一消息。
    

    kafka 部署:(基于zookeeper)

    ## 官方教程
    https://kafka.apache.org/quickstart
    
    # 下载
    https://kafka.apache.org/downloads
    

    部署三台服务器的高可用 kafka 环境。
    部署环境:

    Server1:172.31.2.41
    Server2:172.31.2.42
    Server3:172.31.2.43
    

    三台都是一样的操作

    下载

    [root@mq1 src]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
    

    创建目录

    [root@mq1 src]# mkdir -p /apps
    [root@mq1 src]# mkdir -p /data/kafka
    [root@mq2 src]# mkdir -p /apps
    [root@mq2 src]# mkdir -p /data/kafka
    [root@mq3 src]# mkdir -p /apps
    [root@mq3 src]# mkdir -p /data/kafka
    

    解压

    [root@mq1 src]# tar xf kafka_2.12-2.8.0.tgz -C /apps/
    [root@mq2 src]# tar xf kafka_2.12-2.8.0.tgz -C /apps/
    [root@mq3 src]# tar xf kafka_2.12-2.8.0.tgz -C /apps/
    

    做成软链接

    [root@mq1 src]# ln -s /apps/kafka_2.12-2.8.0 /apps/kafka
    [root@mq2 src]# ln -s /apps/kafka_2.12-2.8.0 /apps/kafka
    [root@mq3 src]# ln -s /apps/kafka_2.12-2.8.0 /apps/kafka
    

    修改配置

    [root@mq1 ~]# vim /apps/kafka/config/server.properties
    broker.id=41  # 唯一的
    listeners=PLAINTEXT://172.31.2.41:9092  # 本机的
    # 调优
    num.network.threads=8
    num.io.threads=12
    # 数据目录
    log.dirs=/data/kafka
    # zookeeper 集群
    zookeeper.connect=172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181
    

    以守护进程启动

    [root@mq1 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties
    
    [root@mq2 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties
    
    [root@mq3 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties
    

    查看端口9092

    [root@mq1 ~]# ss -tanl
    LISTEN              0                    50                                  [::ffff:172.31.2.41]:9092                                         *:*
    

    验证 zookeeper 中 kafka 元数据:

    1、Broker 依赖于 Zookeeper,每个 Broker 的 id 和 Topic、Partition 这些元数据信
    息都会写入 Zookeeper 的 ZNode 节点中;
    2、Consumer 依赖于 Zookeeper,Consumer 在消费消息时,每消费完一条消息,
    会将产生的 offset 保存到 Zookeeper 中,下次消费在当前 offset 往后继续消费;
    ps:kafka0.9 之前 Consumer 的 offset 存储在 Zookeeper 中,kafka0,9 以后 offset
    存储在本地。
    3、Partition 依赖于 Zookeeper,Partition 完成 Replication 备份后,选举出一个 Leader,
    这个是依托于 Zookeeper 的选举机制实现的;
    

    测试 kafka 读写数据:

    创建 topic:
    创建名为 logstashtest,partitions(分区)为 3,replication(每个分区的副本数/每个
    分区的分区因子)为 3 的 topic(主题):
    在任意 kafaka 服务器操作:

    [root@mq1 kafka]# bin/kafka-topics.sh --create --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 --partitions 3  --replication-factor 3 --topic m66
    
    Created topic m66.
    

    验证 topic:
    状态说明:logstashtest 有三个分区分别为 0、1、2,分区 0 的 leader 是 3(broker.id),
    分区 0 有三个副本,并且状态都为 lsr(ln-sync,表示可以参加选举成为 leader)。

    [root@mq1 kafka]# bin/kafka-topics.sh --describe --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 --topic m66
    
    Topic: m66      TopicId: NIgq-gm3SFaxsTpvwCoshA PartitionCount: 3       ReplicationFactor: 3    Configs:
            Topic: m66      Partition: 0    Leader: 43      Replicas: 43,41,42      Isr: 43,41,42
            Topic: m66      Partition: 1    Leader: 41      Replicas: 41,42,43      Isr: 41,42,43
            Topic: m66      Partition: 2    Leader: 42      Replicas: 42,43,41      Isr: 42,43,41
    

    获取所有 topic:

    [root@mq1 kafka]# bin/kafka-topics.sh --list --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 m66
    
    m66
    

    测试发送消息:

    [root@mq1 kafka]# bin/kafka-console-producer.sh --broker-list 172.31.2.41:9092,172.31.2.42:9092,172.31.2.43:9092 --topic m66
    
    >msg1
    >msg2
    >mgs3
    
    

    测试获取消息:

    可以到任意一台 kafka 服务器测试消息获取,只要有相应的消息获取客户端即可。

    [root@mq2 ~]# /apps/kafka/bin/kafka-console-consumer.sh --topic m66 --bootstrap-server 172.31.2.42:9092 --from-beginning msg1
    
    msg1
    msg2
    mgs3
    
    

    删除 topic:(必须有的才能删除,没有就会报错)

    [root@mq3 src]# /apps/kafka/bin/kafka-topics.sh --delete --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 --topic m44
    
    Topic m44 is marked for deletion.
    

    范例:

    [root@mq3 src]# /apps/kafka/bin/kafka-topics.sh --delete --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 --topic m10
    
    Error while executing topic command : Topic 'm10' does not exist as expected
    [2021-08-17 06:30:06,677] ERROR java.lang.IllegalArgumentException: Topic 'm10' does not exist as expected
            at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:542)
            at kafka.admin.TopicCommand$ZookeeperTopicService.deleteTopic(TopicCommand.scala:500)
            at kafka.admin.TopicCommand$.main(TopicCommand.scala:71)
            at kafka.admin.TopicCommand.main(TopicCommand.scala)
     (kafka.admin.TopicCommand$)
    
  • 相关阅读:
    爬虫-requests-html
    pillow
    bs4-mysql-豌豆荚
    代理池-豆瓣电影
    Codeforces 1373D
    Codeforces 1365D
    AtCoder "NOMURA Programming Competition 2020" C
    Codeforces 1359D
    Codeforces 1359C
    Codeforces 1358D
  • 原文地址:https://www.cnblogs.com/xuanlv-0413/p/15168207.html
Copyright © 2011-2022 走看看