zoukankan      html  css  js  c++  java
  • Kafka 核心概念/安装/命令行实战

    一、简介

    Kafka 是最初由 Linkedin 公司开发,Linkedin 于2010年贡献给了 Apache 基金会并成为顶级开源项目,也是一个开源【分布式流处理平台】,由 Scala 和 Java 编写,(也当做 MQ 系统,但不是纯粹的消息系统)。一句话概括:Kafka 是一种高吞吐量的分布式流处理平台,它可以处理消费者在网站中的所有动作流数据。比如网页浏览,搜索和其他用户的行为等,应用于大数据实时处理领域。

    二、认识概念

    Broker

    • Kafka 的服务端程序,可以认为一个 mq 节点就是一个Broker
    • Broker 存储 Topic 的数据

    Producer 生产者

    • 创建消息 Message,然后发布到 MQ 中
    • 该角色将消息发布到 Kafka 的 Topic 中

    Consumer 消费者

    • 消费队列里面的消息

    1、核心概念

    ConsumerGroup 消费者组

    • 同个Topic,广播发送给不同的Group,一个 Group 中只有一个 Consumer 可以消费此消息

    Topic

    • 每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为Topic,主题的意思

    Partition 分区

    • Kafka 数据存储的基本单元,Topic 中的数据分割为一个或多个Partition,每个 Topic 至少有一个Partition,是有序的
    • 一个 Topic 的多个Partitions,被分布在 Kafka 集群中的多个 Server 上
    • 消费者数量 <=小于或者等于 Partition 数量

    Replication 副本(备胎)

    • 同个 Partition 会有多个副本Replication ,多个副本的数据是一样的,当其他 Broker 挂掉后,系统可以主动用副本提供服务
    • 默认每个 Topic 的副本都是1(默认是没有副本,节省资源),也可以在创建 Topic 的时候指定
    • 如果当前 Kafka 集群只有3个 Broker 节点,则 Replication-Factor 最大就是3了,如果创建副本为4,则会报错

    ReplicationLeader、ReplicationFollower

    • Partition 有多个副本,但只有一个 ReplicationLeader 负责该 Partition 和生产者消费者交互
    • ReplicationFollower 只是做一个备份,从 ReplicationLeader 进行同步

    ReplicationManager

    • 负责 Broker 所有分区副本信息,Replication 副本状态切换

    Offset

    • 每个 Consumer 实例需要为他消费的 Partition 维护一个记录自己消费到哪里的偏移Offset
    • Kafka 把 Offset 保存在消费端的消费者组里

     

    2、特点总结

    • 多订阅者
      • 一个 Topic 可以有一个或者多个订阅者
      • 每个订阅者都要有一个Partition,所以订阅者数量要少于等于 Partition 数量
    • 高吞吐量、低延迟: 每秒可以处理几十万条消息
    • 高并发:几千个客户端同时读写
    • 容错性:多副本、多分区,允许集群中节点失败,如果副本数据量为n,则可以n-1个节点失败
    • 扩展性强:支持热扩展

    3、基于消费者组可以实现

    • 基于队列的模型:所有消费者都在同一消费者组里,每条消息只会被一个消费者处理
    • 基于发布订阅模型:消费者属于不同的消费者组,假如每个消费者都有自己的消费者组,这样 Kafka 消息就能广播到所有消费者实例上

    三、安装 Zookeeper

    步骤一:下载安装包并解压

    步骤二:复制配置文件

    将 zoo_sample.cfg 复制一份并命名为 zoo.cfg

    步骤三:启动

    进入 bin 目录,使用命令启动

    ./zkServer.sh start
    
    默认2181端口
    默认配置文件 zoo.cfg

    四、安装Kafka 

    步骤一:下载安装包并解压

    步骤二:修改server.properties

     

    #标识broker编号,集群中有多个broker,则每个broker的编号需要设置不同
    broker.id=0#修改下面两个配置(listeners 配置的ip和advertised.listeners相同时启动kafka会报错)
    listeners(内网Ip)
    advertised.listeners(公网ip)
    ​
    #修改zk地址,默认地址,因为zookeeper和kafka部署在一起,可以这样,否则需要具体的IP
    zookeeper.connection=localhost:2181

    默认端口是 9092 端口

    步骤三:启动

    进入 bin 目录

    #启动
    ./kafka-server-start.sh  ../config/server.properties &
    ​
    #启动:守护进程方式
    ./kafka-server-start.sh -daemon ../config/server.properties &
    
    #停止
    kafka-server-stop.sh

    五、命令行生产者发送消息和消费者消费消息实战

    1、创建topic

    ./kafka-topics.sh --create --zookeeper 112.74.55.160:2181 --replication-factor 1 --partitions 2 --topic xdclass-topic

    2、查看topic

    ./kafka-topics.sh --list --zookeeper 112.74.55.160:2181

    3、生产者发送消息

    ./kafka-console-producer.sh --broker-list 112.74.55.160:9092 --topic version1-topic

    4、消费者消费消息

    --from-beginning:会把主题中以往所有的数据都读取出来, 重启后会有这个重复消费,实际上不加
    ./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1

    5、删除topic

    ./kafka-topics.sh --zookeeper 112.74.55.160:2181 --delete --topic t1

    6、查看broker节点topic状态信息

    ./kafka-topics.sh --describe --zookeeper 112.74.55.160:2181 --topic xdclass-topic

    六、实现点对点消费

    步骤一:编辑消费者配置

    确保同个名称 group.id 一样,这里我们使用默认的就可以了,当然也可以改成自己喜欢的名称,编辑config/consumer.properties,

    步骤二:启动两个消费者试验

    #创建topic, 1个分区
    ./kafka-topics.sh --create --zookeeper 112.74.55.160:2181 --replication-factor 1 --partitions 2 --topic t1
    
    #下面的命令用两个窗口运行,指定配置文件启动两个消费者
    ./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer.properties

    只有一个消费者可以消费到数据,一个分区只能被同个消费者组下的某个消费者进行消费

    七、实现发布订阅消费

    步骤一:编辑消费者配置

    确保group.id 不一样

    • 编辑 config/consumer-1.properties
    • 编辑 config/consumer-2.properties

    步骤二:启动两个消费者试验

    #创建topic, 2个分区
    ./kafka-topics.sh --create --zookeeper 112.74.55.160:2181 --replication-factor 1 --partitions 2 --topic t2
    
    #指定配置文件启动两个消费者
    ./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer-1.properties
    ​
    ./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer-2.properties

    两个不同消费者组的节点,都可以消费到消息,实现发布订阅模型

    八、Kafka 数据存储流程、原理、LEO+HW讲解

    Partition

    • Topic 物理上的分组,一个 Topic 可以分为多个Partition,每个 Partition 是一个有序的队列
    • 是以文件夹的形式存储在具体 Broker 本机上

    LEO(LogEndOffset)

    • 表示每个 Partition 的 log 最后一条 Message 的位置。

    HW(HighWatermark)

    • 表示 Partition 各个 Replicas 数据间同步且一致的 offset 位置,即表示 allreplicas 已经 commit 的位置
    • HW 之前的数据才是 Commit 后的,对消费者才可见
    • ISR 集合里面最小leo

    offset

    • 每个 Partition 都由一系列有序的、不可变的消息组成,这些消息被连续的追加到 Partition 中
    • Partition 中的每个消息都有一个连续的序列号叫做offset,用于 Partition 唯一标识一条消息
    • 可以认为 offset 是 Partition 中 Message 的 id

    Segment:每个 Partition 又由多个 segment file 组成;

    • segment file 由2部分组成,分别为 index file 和 data file(log file),
    • 两个文件是一一对应的,后缀“.index”和“.log”分别表示索引文件和数据文件
    • 命名规则:Partition 的第一个 segment 从0开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的offset+1

    Kafka 高效文件存储设计特点:

    • Kafka 把 Topic 中一个 Parition 大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
    • 通过索引信息可以快速定位message
    • Producer 生产数据,要写入到 log 文件中,写的过程中一直追加到文件末尾,为顺序写,官网数据表明。同样的磁盘,顺序写能到600M/S,而随机写只有100K/S
  • 相关阅读:
    JAVA-throw new IOException报错unhandled exception:java.lang.Exception 2021年6月7日
    GIt保持远程 源仓库与Fork仓库同步--2017年6月13日
    Python的getattr()-2017年6月7日
    JavaScript学习-2017年5月18日
    Writing your first Django app--2017年5月9日
    M4-AC6 Oh,Trojan Again--2017年5月9日
    吴军硅谷来信
    【1】Prologue--A Game of Thrones--2017年4月8日
    M4-PC9 Read 10,000 Books,Travel 10,000 Miles--2017年5月8日
    资源分配图RAG的化简
  • 原文地址:https://www.cnblogs.com/jwen1994/p/14793051.html
Copyright © 2011-2022 走看看