zoukankan      html  css  js  c++  java
  • Kafka入门实战(1)-概念、安装及简单使用

    Kafka是由Linkedin公司开发的,它是一个分布式的,支持多分区、多副本,基于Zookeeper的分布式消息流平台,它同时也是一款基于发布订阅模式的消息引擎系统;Linkedin于2010年将Kafka贡献给了Apache基金会并成为Apache顶级项目。本文主要包括Zookeeper简介、安装、简单操作等;文中所使用到的软件版本:Java 1.8.0_191、Zookeeper 3.6.0、kafka 2.13-2.4.1、Centos 7.6。

    1、简介

    1.1、Broker

    Kafka集群包含一个或多个服务器,服务器节点称为broker。
    broker存储topic的数据;如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
    如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
    如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。

    1.2、Topic(主题)

    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处。类似于数据库的表名。

    对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略删除旧数据:一是基于时间,二是基于Partition文件大小。

    1.3、Partition(分区)

    topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。

    1.4、Producer(生产者)

    生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。

    1.5、Consumer(消费者)

    消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。

    1.6、Consumer Group(消费者群组)

    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group,若不指定group则属于默认的group)。

    1.7、Leader

    每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。

    1.8、Follower

    Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower挂掉、卡住或与Leader同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

    2、安装

    2.1、单机版安装

    2.1.1、下载并解压Kafka

    http://kafka.apache.org/downloads
    tar zxvf kafka_2.13-2.4.1.tgz -C ../app

    2.1.2、Zookeeper配置及启停

    Kafka依赖Zookeeper需先启动Zookeeper;这里使用Kafka自带的Zookeeper,也可以使用自己单独安装的Zookeeper。

    修改config/zookeeper.properties:

    dataDir=/home/app/kafka_2.13-2.4.1/data #根据实际情况修改

    启停Zookeeper:

    ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties #启动zookeeper
    ./zookeeper-server-stop.sh #停止zookeeper

    2.1.3、Kafka配置及启停

    对应的配置文件为config/server.properties,这里使用默认配置;实际使用过程中可以根据情况修改里面的参数。

    启停Kafka:

    ./kafka-server-start.sh -daemon ../config/server.properties #启动kafka
    ./kafka-server-stop.sh #停止kafka

    2.2、集群安装

     这里使用自己单独安装的zookeeper,版本为3.6.0,假设已经安装好了,地址为:10.49.196.20:2181,10.49.196.21:2181,10.49.196.22:2181。安装方法可以参见:Zookeeper入门实战(1)-概念、安装及命令行

    2.2.1、集群规划

    主机:10.49.196.20,10.49.196.21,10.49.196.22

    安装目录都为:/home/hadoop/app

    2.2.2、下载并解压Kafka

    http://kafka.apache.org/downloads
    tar zxvf kafka_2.13-2.4.1.tgz -C /home/hadoop/app

    在其中一台机器上解压,然后把解压的包拷贝到其他的机器。

    2.2.3、修改配置文件

    10.49.196.20:/home/hadoop/app/kafka_2.13-2.4.1/server.properties:

    broker.id=1
    log.dirs=/home/hadoop/app/kafka_2.13-2.4.1/kafka-logs #该目录需新建
    zookeeper.connect=10.49.196.20:2181,10.49.196.21:2181,10.49.196.22:2181/kafka

    10.49.196.21:/home/hadoop/app/kafka_2.13-2.4.1/server.properties:

    broker.id=2
    log.dirs=/home/hadoop/app/kafka_2.13-2.4.1/kafka-logs #该目录需新建
    zookeeper.connect=10.49.196.20:2181,10.49.196.21:2181,10.49.196.22:2181/kafka

    10.49.196.22:/home/hadoop/app/kafka_2.13-2.4.1/server.properties:

    broker.id=3
    log.dirs=/home/hadoop/app/kafka_2.13-2.4.1/kafka-logs #该目录需新建
    zookeeper.connect=10.49.196.20:2181,10.49.196.21:2181,10.49.196.22:2181/kafka

    2.2.4、Kafka启停

    启停kafka,依次在各机器上执行:

    ./kafka-server-start.sh -daemon ../config/server.properties #启动kafka
    ./kafka-server-stop.sh #停止kafka

    3、简单使用

    3.1、创建主题

    创建一个单分区单副本的主题

    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

    3.2、查看主题列表

    bin/kafka-topics.sh --list --bootstrap-server localhost:9092

    3.3、查看主题信息

    bin/kafka-topics.sh --bootstrap-server --describe --topic test

    3.4、使用生产者发送消息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    This is a message      #发送的消息
    This is another message#发送的消息

    3.6、查看消费组信息

    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group A

    3.5、使用消费者接受消息

    4、分区原理

            一个topic可以配置几个partition,produce发送的消息分发到不同的partition中,consumer接受数据的时候是按照group来接受,kafka确保每个partition只能被同一个group中的同一个consumer消费,如果想要重复消费,那么需要其他的组来消费。当consumer的数量大于分区的数量的时候,有的consumer会读取不到数据。

           假设一个topic test被groupA消费了,现在启动另外一个新的groupB来消费test,默认test-groupB的offset不是0,而是没有新建立,除非当test有新数据的时候,groupB会收到该数据,该条数据也是第一条数据,groupB的offset也是刚初始化的ofsert, 可以显式的用--from-beginnging 来获取从0开始数据。查看消费组对应topic的offset:

     ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group A

           Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。

  • 相关阅读:
    编写StyleCop自定义规则教程(二)校验规则增加配置参数
    XtraReport学习笔记之Bookmark (一)
    Web Application Firewall(WAF)
    CAN/CFN/CPN
    什么是 SASE?
    FEC 前向纠错
    VPP usage
    强化学习
    What Is a RAN Intelligent Controller (RIC)?
    Read Up: Connection Management xAPP for ORAN RIC: A Graph Neural Network and Reinforcement Learning Approach
  • 原文地址:https://www.cnblogs.com/wuyongyin/p/12658863.html
Copyright © 2011-2022 走看看