zoukankan      html  css  js  c++  java
  • Kafka集群部署指南

    一、前言

    1、Kafka简介

    Kafka是一个开源等分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kafka支持以发布/订阅等方式在应用间传递消息,同时基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其它系统的数据(Elasticsearch、Hadoop等)

    Kafka最核心最成熟的它的消息引擎,所以Kafka大部分应用场景是用来作为消息队列削峰平谷,另外Kafka也是目前性能最好的消息中间件。

    2、Kafka架构

    在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。

    概念/对象简单说明
    Broker Kafka节点
    Topic 主题,用来承载消息
    Partition 分区,用于主题分片存储
    Producer 生产者,向主题发布消息的应用
    Consumer 消费者,从主题订阅消息的应用
    Consumer Group 消费者组,由多个消费者组成

    3、准备工作

    软件版本说明

    说明
    Linux Server CentOS 7.6
    Kafka 2.4.1

    服务器采用之前搭建ZooKeeper集群的3台机器,Kafka集群需要依赖ZooKeeper存储Broker、Topic等信息。

    ZooKeeper集群部署步骤参考:https://www.cnblogs.com/winnerREN/p/13406248.html

    二、部署Kafka

    1、创建Kafka相关目录

    cd /opt
    sudo mkdir kafka
    cd /opt/kafka
    sudo mkdir kafkalogs
    

    2、下载&解压

    Kafka官方下载地址:https://downloads.apache.org/kafka/2.4.1/kafka_2.11-2.4.1.tgz

    cd /opt/kafka
    sudo wget https://downloads.apache.org/kafka/2.4.1/kafka_2.11-2.4.1.tgz
    sudo gunzip kafka_2.11-2.4.1.tgz
    sudo tar -xvf kafka_2.11-2.4.1.tgz
    

    3、Kafka节点配置

    cd /opt/kafka/kafka_2.11-2.4.1/
    sudo vim config/server.properties
    broker.id=0
    listeners=PLAINTEXT://172.31.9.177:9092
    port = 9092
    host.name = 172.31.9.177
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/kafka/kafkalogs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=172.31.9.177:2181,172.31.8.176:2181,172.31.0.70:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0

    broker.id分别在3个节点配置0,1,2

    listeners 配置各节点IP

    4、启动Kafka

    sudo bin/kafka-server-start.sh -daemon config/server.properties

     三、Kafka测试

    1、创建Topic

    在kafka1(Broker.id=0)上创建测试Topic:test-winner,这里我们指定了2个副本、1个分区

    sudo bin/kafka-topics.sh --create --zookeeper 172.31.9.177:2181 --replication-factor 2 --partitions 1 --topic test-winner
    

    Topic在kafka1上创建后也会同步到集群中另外两个Broker:kafka2、kafka3

    2、查看刚创建的Topic

    [centos@cluster-kafka1 kafka_2.11-2.4.1]$ sudo bin/kafka-topics.sh --describe --zookeeper 172.31.9.177:2181 --topic test-winner
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    Topic: test-winner	PartitionCount: 1	ReplicationFactor: 2	Configs:
    	Topic: test-winner	Partition: 0	Leader: 2	Replicas: 2,0	Isr: 2,0
    

    3、查看Topic列表

    [centos@cluster-kafka1 kafka_2.11-2.4.1]$ sudo bin/kafka-topics.sh --list --zookeeper 172.31.9.177:2181
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    test-winner
    

    4、查看集群情况

    [centos@cluster-kafka2 kafka_2.11-2.4.1]$ sudo bin/kafka-topics.sh --describe --zookeeper 172.31.8.176:2181 --topic test-winner
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    Topic: test-winner	PartitionCount: 1	ReplicationFactor: 2	Configs:
    	Topic: test-winner	Partition: 0	Leader: 2	Replicas: 2,0	Isr: 2,0
    [centos@cluster-kafka3 kafka_2.11-2.4.1]$ sudo bin/kafka-topics.sh --describe --zookeeper 172.31.0.70:2181 --topic test-winner
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    Topic: test-winner	PartitionCount: 1	ReplicationFactor: 2	Configs:
    	Topic: test-winner	Partition: 0	Leader: 2	Replicas: 2,0	Isr: 2,0
    

    5、生产消息

    我们向Broker(id=0)的Topic=test-winner 发送消息

     > bin/kafka-console-producer.sh --broker-list 172.31.9.177:9092 -topic test-winner
    #消息内容 
    test by winner
    

    4、消费消息

    在Kafka2和Kafka3上消费Broker(id=0)的消息

    > bin/kafka-console-consumer.sh --zookeeper 172.31.8.176:2181 --from-beginning --topic test-winner
    > bin/kafka-console-consumer.sh --zookeeper 172.31.0.70:2181 --from-beginning --topic test-winner
    
    消费端的内容:test by winner
    

    PS:每个节点既可以作为生产者也可以作为消费者,一个节点要么为生产者要么为消费者

    四、Kafka常用配置项说明:

     

    配置项默认值/示例值说明
    broker.id 0 Broker唯一标识
    listeners PLAINTEXT://172.31.9.177:9092 监听信息,PLAINTEXT表示明文传输
    log.dirs kafka/logs kafka数据存放地址,可以填写多个。用”,”间隔
    message.max.bytes message.max.bytes 单个消息长度限制,单位是字节
    num.partitions 1 默认分区数
    log.flush.interval.messages Long.MaxValue 在数据被写入到硬盘和消费者可用前最大累积的消息的数量
    log.flush.interval.ms Long.MaxValue 在数据被写入到硬盘前的最大时间
    log.flush.scheduler.interval.ms Long.MaxValue 检查数据是否要写入到硬盘的时间间隔。
    log.retention.hours 24 控制一个log保留时间,单位:小时
    zookeeper.connect 172.31.9.177:2181 ZooKeeper服务器地址,多台用”,”间隔

     

      

  • 相关阅读:
    宏队列与微队列
    async 与 await
    promise关键点
    promiseAPI
    promise基本使用
    JS中的错误(Error)即错误处理
    两种类型的回调函数(同步回调与异步回调)
    区别实例对象与函数对象
    在二叉树中查找指定值结点的所有祖先
    关于js点击事件出现 xx is not defined at HTMLAnchorElement.onclick 的问题
  • 原文地址:https://www.cnblogs.com/winnerREN/p/13407396.html
Copyright © 2011-2022 走看看