zoukankan      html  css  js  c++  java
  • linux运维、架构之路-Kafka集群部署

    一、Kafka介绍

          Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(Elasticsearch、Hadoop等)。Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。另外,Kafka也是目前性能最好的消息中间件。

    二、Kafka架构图

           Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。

    名称

    说明

    Broker

    Kafka节点

    Topic

    主题,用来承载消息

    Partition

    分区,用于主题分片存储

    Producer

    生产者,向主题发布消息的应用

    Consumer

    消费者,从主题订阅消息的应用

    Consumer Group

    消费者组,由多个消费者组成

    三、Kafka集群部署

    Kafka 官网文档:http://kafka.apache.org/
    Kafka 下载地址:http://kafka.apache.org/downloads

    1、服务器规划

    系统

    IP

    软件

    JAVA

    主机名

    CentOS7.5

    192.168.56.11

    kafka_2.11-2.0.0.tgz

    jdk1.8.0_181

    kafka1

    CentOS7.5

    192.168.56.12

    kafka_2.11-2.0.0.tgz

    jdk1.8.0_181

    kafka2

    CentOS7.5

    192.168.56.13

    kafka_2.11-2.0.0.tgz

    jdk1.8.0_181

    kafka3

    2、系统基础环境准备

    #参照: https://www.cnblogs.com/yanxinjiang/p/12752871.html
    #安装好JDK和Zookeeper集群 

      Kakfa集群需要依赖ZooKeeper存储Broker、Topic等信息,这里我们部署三台ZK。

    3、创建软件目录并解压

    mkdir /app
    tar xf kafka_2.11-2.0.0.tgz -C /app/
    ln -s /app/kafka_2.11-2.0.0/ /app/kafka

    4、创建日志路径

    mkdir /app/kafka/logs -p

    5、修改配置文件

    #[root@moban config]# egrep -v "#|$^" server.properties 
    broker.id=0   #节点配置,此处需要修改
    listeners=PLAINTEXT://192.168.56.11:9092    #节点IP配置
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/app/kafka/logs                   #配置日志路径
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=2
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181   #指定Zookeeper集群地址
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0

    6、如果需要修改jdk路径,可在启动脚本中进行配置

    vim /app/kafka/bin/kafka-server-start.sh
    export JAVA_HOME=/app/jdk1.8.0_181

    7、分发Kafka文件

    scp -rp /app/kafka_2.11-2.0.0/ 192.168.56.12:/app/
    scp -rp /app/kafka_2.11-2.0.0/ 192.168.56.13:/app/
    
    #创建Kafka软链接
    ln -s /app/kafka_2.11-2.0.0/ /app/kafka

    8、修改Kafka节点配置文件

    ①节点kafka1配置

    broker.id=0
    
    listeners=PLAINTEXT://192.168.56.11:9092

    ②节点kafka1配置

    broker.id=1
    
    listeners=PLAINTEXT://192.168.56.12:9092

    ③节点kafka1配置

    broker.id=2
    
    listeners=PLAINTEXT://192.168.56.13:9092

    9、启动Kafka服务

    nohup /app/kafka/bin/kafka-server-start.sh  /app/kafka/config/server.properties &   #启动
    /app/kafka/bin/zookeeper-server-stop.sh    #停止

    查看进程

    [root@moban ~]# jps
    41425 Jps
    41080 Kafka
    40938 QuorumPeerMain

    10、Kafka测试

    ①创建Topic

         在kafka1(Broker)上创建测试Tpoic:test-ken-io,这里我们指定了3个副本、1个分区

    /app/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.56.11:9092 --replication-factor 3 --partitions 1 --topic test-ken-io

    报错2.2以上版本无报错

    Exception in thread "main" joptsimple.UnrecognizedOptionException: bootstrap-server is not a recognized option
        at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
        at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
        at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
        at joptsimple.OptionParser.parse(OptionParser.java:396)
        at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:358)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:44)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

    注:如果使用的Kafka版本低于2.2,则应使用--zookeeperoption并将连接字符串传递给zookeeper

    https://stackoverflow.com/questions/55494988/why-is-kafka-not-creating-a-topic-bootstrap-server-is-not-a-recognized-option
    /app/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.56.11:2181 --replication-factor 3 --partitions 1 --topic test-ken-io
    Created topic "test-ken-io".

    ②其他节点查看Topic

     Topic在kafka01上创建后也会同步到集群中另外两个Broker:kafka2、kafka3

    [root@kafka2 ~]# /app/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.56.12:2181
    test-ken-io
    
    [root@kafka3 ~]# /app/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.56.13:2181
    test-ken-io

    ③发送消息

    这里我们向Broker(id=0)的Topic=test-ken-io发送消息

    [root@kafka1 ~]# /app/kafka/bin/kafka-console-producer.sh --broker-list  192.168.56.11:9092  --topic test-ken-io
    >test by ken.io

    ④消费消息

    在Kafka2上消费Broker03的消息

    [root@moban ~]# /app/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.13:9092 --topic test-ken-io --from-beginning
    test by ken.io

    在Kafka3上消费Broker02的消息

    [root@kafka3 ~]# /app/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.12:9092 --topic test-ken-io --from-beginning
    test by ken.io

    注:以上测试都可以收到消息

    这是因为这两个消费消息的命令是建立了两个不同的Consumer
    如果我们启动Consumer指定Consumer Group Id就可以作为一个消费组协同工,1个消息同时只会被一个Consumer消费到

  • 相关阅读:
    通讯录封装实现
    简单通讯录的实现 main..h .m文件全部
    iOS 开发 OC编程 字典和集合 排序方法
    iOS 开发 OC编程 数组冒泡排序.图书管理
    iOS 开发 OC编程 属性和字符串练习
    iOS 开发 OC编程 属性和字符串
    iOS 开发 OC编程 便利构造器以及初始化方法
    iOS 开发 OC编程 方法的书写
    IOS 开发 OC编程 类和对象
    iOS 开发 c语言阶段考试题
  • 原文地址:https://www.cnblogs.com/yanxinjiang/p/12759547.html
Copyright © 2011-2022 走看看