zoukankan      html  css  js  c++  java
  • 初识kafka

    kafka是天然分布式的消息发布订阅系统,其强大的吞吐量能实现海量数据的处理,目前在日志处理领域拥有比较广泛的应用。

    基本术语

    borker是卡夫卡集群的节点,

    topic是一个逻辑概念,一个topic在物理上表现为多个分区(partition)

    partition是一个物理概念,是每个节点上存储数据的分区。

    producer是向kafka节点通过push方式写数据的对象

    consumer是从kafka节点通过pull方式消费数据的对象

    consumer group消费者组,属于同一个group的consumer(group id一样)平均分配partition,每个partition只会被一个consumer消费

    leader是分区副本的领导,producer和consumer的读写只会从leader进行

    follower是分区副本的追随者,follower只会从leader复制消息,leader节点崩溃时会从follower节点中选举新的leader(leader节点和follower节点是相对而言的,对于不同的topic,leader不一定在同一个broker上),实现高可用和防止数据丢失

    消息送达保证机制

    at most once:最多一次,这个和JMS中”非持久化”消息类似,发送一次,无论成败,将不会重发。

    at least once:消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功。

    exactly once:消息只会发送一次,且不管成功与否。

    通常情况下”at-least-once”是我们首选。

    副本分布策略

    Kafka分配Replica的算法如下:

    • 将所有存活的N个Brokers和待分配的Partition排序
    • 将第i个Partition分配到第(i mod n)个Broker上,这个Partition的第一个Replica存在于这个分配的Broker上,并且会作为partition的优先副本
    • 将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上

    注意leader也算是其中的一个副本,如果副本的参数为1,那么分区将只有一个leader。

    假设集群一共有4个brokers,一个topic有4个partition,每个partition有3个副本。下图(懒得画图,盗用一张)是每个Broker上的副本分配情况

    副本同步机制

    poducer在发布消息到某个partition时,先通过ZooKeeper找到该partition的leader,然后无论该Topic的副本数量为多少,producer只将该消息发送到该partition的leader。

    leader会将该消息写入其本地log,每个follower都从leader pull数据,follower在收到该消息并写入其Log后,向Leader发送ACK。

    一旦leader收到了ISR中的所有replica的ACK,该消息就被认为已经commit了,leader将增加HW并且向producer发送ACK。

    consumer读消息也是从leader读取,只有被commit过的消息才会暴露给consumer。

    为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费

    leader选举

    kafka集群节点的维护和副本的leader选举都是通过zookeeper来实现的,由于本文只是对kafka做一个基本的介绍,这个内容就不做具体的介绍了。

    消息方式

    与其它消息系统不同,Kafka broker是无状态的。这意味着消费者必须维护已消费的状态信息。这些信息由消费者自己维护。

    同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。

    参考资料:http://geek.csdn.net/news/detail/229569、http://www.importnew.com/24677.html

    搭建kafka

    一、搭建zookeeper集群

    由于条件限制,就在一台服务器搭建,实现伪集群,供学习。

    首先安装zookeeper环境,下载地址 https://mirrors.cnnic.cn/apache/zookeeper/。解压后复制zoo_sample.cfg文件三份,分别为zoo1.cfg、zoo2.cfg、zoo2.cfg,如下

    修改==三个文件:

    zoo1.cfg

    clientPort=2181

    dataDir=/home/shared_disk/zookeeper-3.4.10/data-1(在此目录下创建myid文件,文件中写一个1,缺少此文件会kafka可能会连不上)
    dataLogDir=dataLogDir=/usr/myapp/zookeeper-3.4.5/logs-1

    server.1=192.168.16.84:2888:3888(其中前一个端口是主从交互数据的端口,后一个是leader选举的端口)
    server.2=192.168.16.84:4888:5888
    server.3=192.168.16.84:6888:7888

    zoo2.cfg

    clientPort=3181

    dataDir=/home/shared_disk/zookeeper-3.4.10/data-2(在此目录下创建myid文件,文件中写一个2)
    dataLogDir=dataLogDir=/usr/myapp/zookeeper-3.4.5/logs-2

    server.1=192.168.16.84:2888:3888
    server.2=192.168.16.84:4888:5888
    server.3=192.168.16.84:6888:7888

    zoo3.cfg

    clientPort=4181

    dataDir=/home/shared_disk/zookeeper-3.4.10/data-3(在此目录下创建myid文件,文件中写一个3)
    dataLogDir=dataLogDir=/usr/myapp/zookeeper-3.4.5/logs-3

    server.1=192.168.16.84:2888:3888
    server.2=192.168.16.84:4888:5888
    server.3=192.168.16.84:6888:7888

    切换到bin目录下

    分别启动三个节点

    nohup ./zkServer.sh start ../conf/zoo1.cfg &

    nohup ./zkServer.sh start ../conf/zoo2.cfg &

    nohup ./zkServer.sh start ../conf/zoo3.cfg &

    启动后查看状态

    ./zkServer.sh status ../conf/zoo1.cfg

    发现节点1为follower模式

    停止命令为./zkServer.sh stop ../conf/zoo1.cfg

    至此zookeeper搭建完成。

    二、搭建kafka集群

    下载kafka并解压 http://kafka.apache.org/downloads.html

    修改conf下的server.properties文件。同样是在本机搭建伪集群,复制三份server-1.properties、server-2.properties、server-3.properties

    修改server配置文件

    server-1.properties

    broker.id=1

    listeners=PLAINTEXT://192.168.16.84:9092

    advertised.listeners=PLAINTEXT://192.168.16.84:9092

    log.dirs=/home/shared_disk/kafka_2.12-1.0.0/logs-1(数据存储目录)

    delete.topic.enable=true(可以删除topic,kafka默认是不能删除的)

    zookeeper.connect=192.168.16.84:2181,192.168.16.84:3181,192.168.16.84:4181(zookeeper集群)

    server-2.properties

    broker.id=2

    listeners=PLAINTEXT://192.168.16.84:9093

    advertised.listeners=PLAINTEXT://192.168.16.84:9093

    log.dirs=/home/shared_disk/kafka_2.12-1.0.0/logs-2

    delete.topic.enable=true

    zookeeper.connect=192.168.16.84:2181,192.168.16.84:3181,192.168.16.84:4181

    server-3.properties

    broker.id=3

    listeners=PLAINTEXT://192.168.16.84:9094

    advertised.listeners=PLAINTEXT://192.168.16.84:9094

    log.dirs=/home/shared_disk/kafka_2.12-1.0.0/logs-3

    delete.topic.enable=true

    zookeeper.connect=192.168.16.84:2181,192.168.16.84:3181,192.168.16.84:4181

    切换到bin目录下,分别启动三个节点。

    nohup ./kafka-server-start.sh ../config/server-1.properties  &

    nohup ./kafka-server-start.sh ../config/server-2.properties  &

    nohup ./kafka-server-start.sh ../config/server-3.properties  &

    ps -ef | grep kafka下看下是否有三个kafka进程。

    至此,kafka的集群搭建已完成。

    可以用kafka-console-producer和kafka-console-consumer进行测试,下面是常用的命令,可以按照下面的命令来测试,首先创建,然后分别打开一个生产者和一个消费者窗口,从生产者输入消息,我们能看到消费者端有接收到(图中的例子是前面已经发送过其他消息了)

    创建topic
    ./kafka-topics.sh --create --zookeeper 192.168.16.84:2181 --replication-factor 3 --partitions 3 --topic Hello-Kafka-Topic


    查看topic状态
    ./kafka-topics.sh --describe --zookeeper 192.168.16.84:2181 --topic Hello-Kafka-Topic


    查看topic列表
    ./kafka-topics.sh --list --zookeeper 192.168.16.84:4181

    删除topic
    ./kafka-topics.sh --delete --zookeeper 192.168.16.84:2181 --topic Hello-Kafka-Topic

    模拟生产者发送消息
    ./kafka-console-producer.sh --broker-list 192.168.16.84:9092,192.168.16.84:9093,192.168.16.84:9094 --topic Hello-Kafka-Topic

    模拟消费者接收消息
    ./kafka-console-consumer.sh --bootstrap-server 192.168.16.84:9092,192.168.16.84:9093,192.168.16.84:9094 --topic Hello-Kafka-Topic --from-beginning

    查看指定topic的指定消费者组消费者情况

    ./kafka-consumer-groups.sh --bootstrap-server 192.168.16.84:9092,192.168.16.84:9093,192.168.16.84:9094 --describe --group car-location-consume-group

    最后附上本人自己写的demo,有原始客户端写的,也有与springboot集成的,连接  https://github.com/littlechaser/kafka-demo

  • 相关阅读:
    SQL Server 复制订阅
    杂谈经验与未来
    泛泰A820L (高通MSM8660 cpu) 3.4内核的CM10.1(Android 4.2.2) 測试版第二版
    hdu1280 前m大的数(数组下标排序)
    Design Pattern Adaptor 适配器设计模式
    ssh命令、ping命令、traceroute 命令所使用的协议
    Android禁止ViewPager的左右滑动
    推荐一款优雅的jquery手风琴特效
    vijos
    iOS 7 UI 过渡指南
  • 原文地址:https://www.cnblogs.com/xiao-tao/p/8831772.html
Copyright © 2011-2022 走看看