zoukankan      html  css  js  c++  java
  • Kafka入门实战(1)-概念、安装及简单使用

    Kafka是由Linkedin公司开发的,它是一个分布式的,支持多分区、多副本,基于Zookeeper的分布式消息流平台,它同时也是一款基于发布订阅模式的消息引擎系统;Linkedin于2010年将Kafka贡献给了Apache基金会并成为Apache顶级项目。本文主要包括Zookeeper简介、安装、简单操作等;文中所使用到的软件版本:Java 1.8.0_191、Zookeeper 3.6.0、kafka 2.13-2.4.1、Centos 7.6。

    1、简介

    1.1、Broker

    Kafka集群包含一个或多个服务器,服务器节点称为broker。
    broker存储topic的数据;如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
    如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
    如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。

    1.2、Topic(主题)

    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处。类似于数据库的表名。

    对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略删除旧数据:一是基于时间,二是基于Partition文件大小。

    1.3、Partition(分区)

    topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。

    1.4、Producer(生产者)

    生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。

    1.5、Consumer(消费者)

    消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。

    1.6、Consumer Group(消费者群组)

    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group,若不指定group则属于默认的group)。

    1.7、Leader

    每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。

    1.8、Follower

    Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower挂掉、卡住或与Leader同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

    2、安装

    2.1、单机版安装

    2.1.1、下载并解压Kafka

    http://kafka.apache.org/downloads
    tar zxvf kafka_2.13-2.4.1.tgz -C ../app

    2.1.2、Zookeeper配置及启停

    Kafka依赖Zookeeper需先启动Zookeeper;这里使用Kafka自带的Zookeeper,也可以使用自己单独安装的Zookeeper。

    修改config/zookeeper.properties:

    dataDir=/home/app/kafka_2.13-2.4.1/data #根据实际情况修改

    启停Zookeeper:

    ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties #启动zookeeper
    ./zookeeper-server-stop.sh #停止zookeeper

    2.1.3、Kafka配置及启停

    对应的配置文件为config/server.properties,这里使用默认配置;实际使用过程中可以根据情况修改里面的参数。

    启停Kafka:

    ./kafka-server-start.sh -daemon ../config/server.properties #启动kafka
    ./kafka-server-stop.sh #停止kafka

    2.2、集群安装

     这里使用自己单独安装的zookeeper,版本为3.6.0,假设已经安装好了,地址为:10.49.196.20:2181,10.49.196.21:2181,10.49.196.22:2181。安装方法可以参见:Zookeeper入门实战(1)-概念、安装及命令行

    2.2.1、集群规划

    主机:10.49.196.20,10.49.196.21,10.49.196.22

    安装目录都为:/home/hadoop/app

    2.2.2、下载并解压Kafka

    http://kafka.apache.org/downloads
    tar zxvf kafka_2.13-2.4.1.tgz -C /home/hadoop/app

    在其中一台机器上解压,然后把解压的包拷贝到其他的机器。

    2.2.3、修改配置文件

    10.49.196.20:/home/hadoop/app/kafka_2.13-2.4.1/server.properties:

    broker.id=1
    log.dirs=/home/hadoop/app/kafka_2.13-2.4.1/kafka-logs #该目录需新建
    zookeeper.connect=10.49.196.20:2181,10.49.196.21:2181,10.49.196.22:2181/kafka

    10.49.196.21:/home/hadoop/app/kafka_2.13-2.4.1/server.properties:

    broker.id=2
    log.dirs=/home/hadoop/app/kafka_2.13-2.4.1/kafka-logs #该目录需新建
    zookeeper.connect=10.49.196.20:2181,10.49.196.21:2181,10.49.196.22:2181/kafka

    10.49.196.22:/home/hadoop/app/kafka_2.13-2.4.1/server.properties:

    broker.id=3
    log.dirs=/home/hadoop/app/kafka_2.13-2.4.1/kafka-logs #该目录需新建
    zookeeper.connect=10.49.196.20:2181,10.49.196.21:2181,10.49.196.22:2181/kafka

    2.2.4、Kafka启停

    启停kafka,依次在各机器上执行:

    ./kafka-server-start.sh -daemon ../config/server.properties #启动kafka
    ./kafka-server-stop.sh #停止kafka

    3、简单使用

    3.1、创建主题

    创建一个单分区单副本的主题

    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

    3.2、查看主题列表

    bin/kafka-topics.sh --list --bootstrap-server localhost:9092

    3.3、查看主题信息

    bin/kafka-topics.sh --bootstrap-server --describe --topic test

    3.4、使用生产者发送消息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    This is a message      #发送的消息
    This is another message#发送的消息

    3.6、查看消费组信息

    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group A

    3.5、使用消费者接受消息

    4、分区原理

            一个topic可以配置几个partition,produce发送的消息分发到不同的partition中,consumer接受数据的时候是按照group来接受,kafka确保每个partition只能被同一个group中的同一个consumer消费,如果想要重复消费,那么需要其他的组来消费。当consumer的数量大于分区的数量的时候,有的consumer会读取不到数据。

           假设一个topic test被groupA消费了,现在启动另外一个新的groupB来消费test,默认test-groupB的offset不是0,而是没有新建立,除非当test有新数据的时候,groupB会收到该数据,该条数据也是第一条数据,groupB的offset也是刚初始化的ofsert, 可以显式的用--from-beginnging 来获取从0开始数据。查看消费组对应topic的offset:

     ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group A

           Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。

  • 相关阅读:
    《Machine Learning in Action》—— 白话贝叶斯,“恰瓜群众”应该恰好瓜还是恰坏瓜
    《Machine Learning in Action》—— 女同学问Taoye,KNN应该怎么玩才能通关
    《Machine Learning in Action》—— Taoye给你讲讲决策树到底是支什么“鬼”
    深度学习炼丹术 —— Taoye不讲码德,又水文了,居然写感知器这么简单的内容
    《Machine Learning in Action》—— 浅谈线性回归的那些事
    《Machine Learning in Action》—— 懂的都懂,不懂的也能懂。非线性支持向量机
    《Machine Learning in Action》—— hao朋友,快来玩啊,决策树呦
    《Machine Learning in Action》—— 剖析支持向量机,优化SMO
    《Machine Learning in Action》—— 剖析支持向量机,单手狂撕线性SVM
    JVM 字节码指令
  • 原文地址:https://www.cnblogs.com/wuyongyin/p/12658863.html
Copyright © 2011-2022 走看看