zoukankan      html  css  js  c++  java
  • kafka集群搭建

    一.初始化环境

    1.先安装好jdk1.8和zookeeper3.4.9, 安装zookeeper3.4.9  http://archive.apache.org/dist/zookeeper

    2.并启动zookeeper集群

    3.准备三台服务器搭建kafka集群环境

    二.下载kafka安装包

    http://archive.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz

    三.搭建环境

    1.把压缩包上传到三台服务器同一路径下

    2.修改配置文件 kafka/config/server.properties

     三台服务器创建logs目录

      第一台服务器 ip01:

    broker.id=1                             #节点编号,每个服务器不一样
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/usr/kafka_2.11-0.10.0.0/logs  #日志目录要创建出来
    num.partitions=2                        #分区数
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.flush.interval.messages=10000
    log.flush.interval.ms=1000
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=ip01:2181,ip02:2181,ip03:2181  #zookeeper集群地址
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    delete.topic.enable=true                    #可以删除topic
    host.name=ip01                              #ip地址

    第二台服务器 ip02:

    broker.id=2
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/usr/kafka_2.11-0.10.0.0/logs
    num.partitions=2
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.flush.interval.messages=10000
    log.flush.interval.ms=1000
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=ip01:2181,ip02:2181,ip03:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    delete.topic.enable=true
    host.name=ip02

    第三台服务器 ip03:

    broker.id=3
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/usr/kafka_2.11-0.10.0.0/logs
    num.partitions=2
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.flush.interval.messages=10000
    log.flush.interval.ms=1000
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=ip01:2181,ip02:2181,ip03:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    delete.topic.enable=true
    host.name=ip03

    四.启动集群

    三台服务后台启动命令:

    nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

    查看是否启动成功:jps

    五.测试

    创建一个Topic:

    bin/kafka-topics.sh --create --zookeeper ip01:2181 --replication-factor 2 -- partitions 3 --topic test

    生产者生产数据:

    bin/kafka-console-producer.sh --broker-list ip01:9092,ip02:9092,ip03:9092 --topic test

    消费者消费数据:

    bin/kafka-console-consumer.sh --from-beginning --topic test  --zookeeper ip01:2181,ip02:2181,ip03:2181
  • 相关阅读:
    linux配置显示git分支名
    tensorrt int8量化原理几点问题记录
    cuda Global Memory Access
    cuda shared memory bank conflict
    一种简单的死锁检测算法
    n = 5x+2y+z,程序优化:unroll loop
    c++ detect && solve integer overflow
    Tensorpack.MultiProcessPrefetchData改进,实现高效的数据流水线
    tensorflow layout optimizer && conv autotune
    python 产生token及token验证
  • 原文地址:https://www.cnblogs.com/chong-zuo3322/p/12045172.html
Copyright © 2011-2022 走看看