zoukankan      html  css  js  c++  java
  • Kafka 核心概念/安装/命令行实战

    一、简介

    Kafka 是最初由 Linkedin 公司开发,Linkedin 于2010年贡献给了 Apache 基金会并成为顶级开源项目,也是一个开源【分布式流处理平台】,由 Scala 和 Java 编写,(也当做 MQ 系统,但不是纯粹的消息系统)。一句话概括:Kafka 是一种高吞吐量的分布式流处理平台,它可以处理消费者在网站中的所有动作流数据。比如网页浏览,搜索和其他用户的行为等,应用于大数据实时处理领域。

    二、认识概念

    Broker

    • Kafka 的服务端程序,可以认为一个 mq 节点就是一个Broker
    • Broker 存储 Topic 的数据

    Producer 生产者

    • 创建消息 Message,然后发布到 MQ 中
    • 该角色将消息发布到 Kafka 的 Topic 中

    Consumer 消费者

    • 消费队列里面的消息

    1、核心概念

    ConsumerGroup 消费者组

    • 同个Topic,广播发送给不同的Group,一个 Group 中只有一个 Consumer 可以消费此消息

    Topic

    • 每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为Topic,主题的意思

    Partition 分区

    • Kafka 数据存储的基本单元,Topic 中的数据分割为一个或多个Partition,每个 Topic 至少有一个Partition,是有序的
    • 一个 Topic 的多个Partitions,被分布在 Kafka 集群中的多个 Server 上
    • 消费者数量 <=小于或者等于 Partition 数量

    Replication 副本(备胎)

    • 同个 Partition 会有多个副本Replication ,多个副本的数据是一样的,当其他 Broker 挂掉后,系统可以主动用副本提供服务
    • 默认每个 Topic 的副本都是1(默认是没有副本,节省资源),也可以在创建 Topic 的时候指定
    • 如果当前 Kafka 集群只有3个 Broker 节点,则 Replication-Factor 最大就是3了,如果创建副本为4,则会报错

    ReplicationLeader、ReplicationFollower

    • Partition 有多个副本,但只有一个 ReplicationLeader 负责该 Partition 和生产者消费者交互
    • ReplicationFollower 只是做一个备份,从 ReplicationLeader 进行同步

    ReplicationManager

    • 负责 Broker 所有分区副本信息,Replication 副本状态切换

    Offset

    • 每个 Consumer 实例需要为他消费的 Partition 维护一个记录自己消费到哪里的偏移Offset
    • Kafka 把 Offset 保存在消费端的消费者组里

     

    2、特点总结

    • 多订阅者
      • 一个 Topic 可以有一个或者多个订阅者
      • 每个订阅者都要有一个Partition,所以订阅者数量要少于等于 Partition 数量
    • 高吞吐量、低延迟: 每秒可以处理几十万条消息
    • 高并发:几千个客户端同时读写
    • 容错性:多副本、多分区,允许集群中节点失败,如果副本数据量为n,则可以n-1个节点失败
    • 扩展性强:支持热扩展

    3、基于消费者组可以实现

    • 基于队列的模型:所有消费者都在同一消费者组里,每条消息只会被一个消费者处理
    • 基于发布订阅模型:消费者属于不同的消费者组,假如每个消费者都有自己的消费者组,这样 Kafka 消息就能广播到所有消费者实例上

    三、安装 Zookeeper

    步骤一:下载安装包并解压

    步骤二:复制配置文件

    将 zoo_sample.cfg 复制一份并命名为 zoo.cfg

    步骤三:启动

    进入 bin 目录,使用命令启动

    ./zkServer.sh start
    
    默认2181端口
    默认配置文件 zoo.cfg

    四、安装Kafka 

    步骤一:下载安装包并解压

    步骤二:修改server.properties

     

    #标识broker编号,集群中有多个broker,则每个broker的编号需要设置不同
    broker.id=0#修改下面两个配置(listeners 配置的ip和advertised.listeners相同时启动kafka会报错)
    listeners(内网Ip)
    advertised.listeners(公网ip)
    ​
    #修改zk地址,默认地址,因为zookeeper和kafka部署在一起,可以这样,否则需要具体的IP
    zookeeper.connection=localhost:2181

    默认端口是 9092 端口

    步骤三:启动

    进入 bin 目录

    #启动
    ./kafka-server-start.sh  ../config/server.properties &
    ​
    #启动:守护进程方式
    ./kafka-server-start.sh -daemon ../config/server.properties &
    
    #停止
    kafka-server-stop.sh

    五、命令行生产者发送消息和消费者消费消息实战

    1、创建topic

    ./kafka-topics.sh --create --zookeeper 112.74.55.160:2181 --replication-factor 1 --partitions 2 --topic xdclass-topic

    2、查看topic

    ./kafka-topics.sh --list --zookeeper 112.74.55.160:2181

    3、生产者发送消息

    ./kafka-console-producer.sh --broker-list 112.74.55.160:9092 --topic version1-topic

    4、消费者消费消息

    --from-beginning:会把主题中以往所有的数据都读取出来, 重启后会有这个重复消费,实际上不加
    ./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1

    5、删除topic

    ./kafka-topics.sh --zookeeper 112.74.55.160:2181 --delete --topic t1

    6、查看broker节点topic状态信息

    ./kafka-topics.sh --describe --zookeeper 112.74.55.160:2181 --topic xdclass-topic

    六、实现点对点消费

    步骤一:编辑消费者配置

    确保同个名称 group.id 一样,这里我们使用默认的就可以了,当然也可以改成自己喜欢的名称,编辑config/consumer.properties,

    步骤二:启动两个消费者试验

    #创建topic, 1个分区
    ./kafka-topics.sh --create --zookeeper 112.74.55.160:2181 --replication-factor 1 --partitions 2 --topic t1
    
    #下面的命令用两个窗口运行,指定配置文件启动两个消费者
    ./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer.properties

    只有一个消费者可以消费到数据,一个分区只能被同个消费者组下的某个消费者进行消费

    七、实现发布订阅消费

    步骤一:编辑消费者配置

    确保group.id 不一样

    • 编辑 config/consumer-1.properties
    • 编辑 config/consumer-2.properties

    步骤二:启动两个消费者试验

    #创建topic, 2个分区
    ./kafka-topics.sh --create --zookeeper 112.74.55.160:2181 --replication-factor 1 --partitions 2 --topic t2
    
    #指定配置文件启动两个消费者
    ./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer-1.properties
    ​
    ./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer-2.properties

    两个不同消费者组的节点,都可以消费到消息,实现发布订阅模型

    八、Kafka 数据存储流程、原理、LEO+HW讲解

    Partition

    • Topic 物理上的分组,一个 Topic 可以分为多个Partition,每个 Partition 是一个有序的队列
    • 是以文件夹的形式存储在具体 Broker 本机上

    LEO(LogEndOffset)

    • 表示每个 Partition 的 log 最后一条 Message 的位置。

    HW(HighWatermark)

    • 表示 Partition 各个 Replicas 数据间同步且一致的 offset 位置,即表示 allreplicas 已经 commit 的位置
    • HW 之前的数据才是 Commit 后的,对消费者才可见
    • ISR 集合里面最小leo

    offset

    • 每个 Partition 都由一系列有序的、不可变的消息组成,这些消息被连续的追加到 Partition 中
    • Partition 中的每个消息都有一个连续的序列号叫做offset,用于 Partition 唯一标识一条消息
    • 可以认为 offset 是 Partition 中 Message 的 id

    Segment:每个 Partition 又由多个 segment file 组成;

    • segment file 由2部分组成,分别为 index file 和 data file(log file),
    • 两个文件是一一对应的,后缀“.index”和“.log”分别表示索引文件和数据文件
    • 命名规则:Partition 的第一个 segment 从0开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的offset+1

    Kafka 高效文件存储设计特点:

    • Kafka 把 Topic 中一个 Parition 大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
    • 通过索引信息可以快速定位message
    • Producer 生产数据,要写入到 log 文件中,写的过程中一直追加到文件末尾,为顺序写,官网数据表明。同样的磁盘,顺序写能到600M/S,而随机写只有100K/S
  • 相关阅读:
    HDU 5912 Fraction (模拟)
    CodeForces 722C Destroying Array (并查集)
    CodeForces 722B Verse Pattern (水题)
    CodeForces 722A Broken Clock (水题)
    CodeForces 723D Lakes in Berland (dfs搜索)
    CodeForces 723C Polycarp at the Radio (题意题+暴力)
    CodeForces 723B Text Document Analysis (水题模拟)
    CodeForces 723A The New Year: Meeting Friends (水题)
    hdu 1258
    hdu 2266 dfs+1258
  • 原文地址:https://www.cnblogs.com/jwen1994/p/14793051.html
Copyright © 2011-2022 走看看