zoukankan      html  css  js  c++  java
  • linux运维、架构之路-Kafka集群部署

    一、Kafka介绍

          Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(Elasticsearch、Hadoop等)。Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。另外,Kafka也是目前性能最好的消息中间件。

    二、Kafka架构图

           Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。

    名称

    说明

    Broker

    Kafka节点

    Topic

    主题,用来承载消息

    Partition

    分区,用于主题分片存储

    Producer

    生产者,向主题发布消息的应用

    Consumer

    消费者,从主题订阅消息的应用

    Consumer Group

    消费者组,由多个消费者组成

    三、Kafka集群部署

    Kafka 官网文档:http://kafka.apache.org/
    Kafka 下载地址:http://kafka.apache.org/downloads

    1、服务器规划

    系统

    IP

    软件

    JAVA

    主机名

    CentOS7.5

    192.168.56.11

    kafka_2.11-2.0.0.tgz

    jdk1.8.0_181

    kafka1

    CentOS7.5

    192.168.56.12

    kafka_2.11-2.0.0.tgz

    jdk1.8.0_181

    kafka2

    CentOS7.5

    192.168.56.13

    kafka_2.11-2.0.0.tgz

    jdk1.8.0_181

    kafka3

    2、系统基础环境准备

    #参照: https://www.cnblogs.com/yanxinjiang/p/12752871.html
    #安装好JDK和Zookeeper集群 

      Kakfa集群需要依赖ZooKeeper存储Broker、Topic等信息,这里我们部署三台ZK。

    3、创建软件目录并解压

    mkdir /app
    tar xf kafka_2.11-2.0.0.tgz -C /app/
    ln -s /app/kafka_2.11-2.0.0/ /app/kafka

    4、创建日志路径

    mkdir /app/kafka/logs -p

    5、修改配置文件

    #[root@moban config]# egrep -v "#|$^" server.properties 
    broker.id=0   #节点配置,此处需要修改
    listeners=PLAINTEXT://192.168.56.11:9092    #节点IP配置
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/app/kafka/logs                   #配置日志路径
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=2
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181   #指定Zookeeper集群地址
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0

    6、如果需要修改jdk路径,可在启动脚本中进行配置

    vim /app/kafka/bin/kafka-server-start.sh
    export JAVA_HOME=/app/jdk1.8.0_181

    7、分发Kafka文件

    scp -rp /app/kafka_2.11-2.0.0/ 192.168.56.12:/app/
    scp -rp /app/kafka_2.11-2.0.0/ 192.168.56.13:/app/
    
    #创建Kafka软链接
    ln -s /app/kafka_2.11-2.0.0/ /app/kafka

    8、修改Kafka节点配置文件

    ①节点kafka1配置

    broker.id=0
    
    listeners=PLAINTEXT://192.168.56.11:9092

    ②节点kafka1配置

    broker.id=1
    
    listeners=PLAINTEXT://192.168.56.12:9092

    ③节点kafka1配置

    broker.id=2
    
    listeners=PLAINTEXT://192.168.56.13:9092

    9、启动Kafka服务

    nohup /app/kafka/bin/kafka-server-start.sh  /app/kafka/config/server.properties &   #启动
    /app/kafka/bin/zookeeper-server-stop.sh    #停止

    查看进程

    [root@moban ~]# jps
    41425 Jps
    41080 Kafka
    40938 QuorumPeerMain

    10、Kafka测试

    ①创建Topic

         在kafka1(Broker)上创建测试Tpoic:test-ken-io,这里我们指定了3个副本、1个分区

    /app/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.56.11:9092 --replication-factor 3 --partitions 1 --topic test-ken-io

    报错2.2以上版本无报错

    Exception in thread "main" joptsimple.UnrecognizedOptionException: bootstrap-server is not a recognized option
        at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
        at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
        at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
        at joptsimple.OptionParser.parse(OptionParser.java:396)
        at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:358)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:44)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

    注:如果使用的Kafka版本低于2.2,则应使用--zookeeperoption并将连接字符串传递给zookeeper

    https://stackoverflow.com/questions/55494988/why-is-kafka-not-creating-a-topic-bootstrap-server-is-not-a-recognized-option
    /app/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.56.11:2181 --replication-factor 3 --partitions 1 --topic test-ken-io
    Created topic "test-ken-io".

    ②其他节点查看Topic

     Topic在kafka01上创建后也会同步到集群中另外两个Broker:kafka2、kafka3

    [root@kafka2 ~]# /app/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.56.12:2181
    test-ken-io
    
    [root@kafka3 ~]# /app/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.56.13:2181
    test-ken-io

    ③发送消息

    这里我们向Broker(id=0)的Topic=test-ken-io发送消息

    [root@kafka1 ~]# /app/kafka/bin/kafka-console-producer.sh --broker-list  192.168.56.11:9092  --topic test-ken-io
    >test by ken.io

    ④消费消息

    在Kafka2上消费Broker03的消息

    [root@moban ~]# /app/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.13:9092 --topic test-ken-io --from-beginning
    test by ken.io

    在Kafka3上消费Broker02的消息

    [root@kafka3 ~]# /app/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.12:9092 --topic test-ken-io --from-beginning
    test by ken.io

    注:以上测试都可以收到消息

    这是因为这两个消费消息的命令是建立了两个不同的Consumer
    如果我们启动Consumer指定Consumer Group Id就可以作为一个消费组协同工,1个消息同时只会被一个Consumer消费到

  • 相关阅读:
    05 css继承性
    04 选择器权重
    03 css三种引入的方式
    02 css实现举例
    01 css介绍
    05 dl-添加定义列表
    04 ol-热门点击排行榜
    02 h1 p hr br 特殊符号
    01html简介
    函数内置方法
  • 原文地址:https://www.cnblogs.com/yanxinjiang/p/12759547.html
Copyright © 2011-2022 走看看