zoukankan      html  css  js  c++  java
  • kafka集群搭建

     

    环境准备

    1.服务器概览

    ip操作系统说明安装内容
    192.168.0.113 centos 7 master节点 jdk1.8, kafka_2.11-0.10.1.1, zookeeper-3.4.8.tar
    192.168.0.114 centos 7 master节点 jdk1.8, kafka_2.11-0.10.1.1, zookeeper-3.4.8.tar
    192.168.0.115 centos 7 master节点 jdk1.8, kafka_2.11-0.10.1.1, zookeeper-3.4.8.tar

    2.服务器环境安装

    jdk1.8 安装 

    参考:centos7 安装jdk 1.8

    注意:三台服务均执行

    #添加host
    
    192.168.0.112 master.kafka
    192.168.0.114 worker1.kafka
    192.168.0.115 worker2.kafka
    
    
    #执行以下命令关闭防火墙
    [root@node1 ~]systemctl stop firewalld && systemctl disable firewalld
    [root@node1 ~]setenforce 0
    
    #将SELINUX的值改成disabled
    [root@node1 ~]vim /etc/selinux/config
    
    SELINUX=disabled
    
    #修改系统参数
    sudo vim /etc/security/limits.conf
    *               soft    nproc           65536
    *               hard    nproc           65536
    *               soft    nofile          65536
    *               hard    nofile          65536
    sudo vim /etc/sysctl.conf
    vm.max_map_count= 262144
    sudo sysctl -p
    
    
    #重启服务器
    [root@node1 ~]reboot

    3.Zookeeper集群安装

    Zookeeper安装和运行

    4.Kafka 安装

    下载Kafka安装包:http://kafka.apache.org/downloads 

     下载之后解压即可。(由于Kafka是用Scala语言开发,运行在JVM上,所以要先安装JDK)

    创建Kafka目录并安装:

    // 解压下载好的kafka压缩包并重命名
    cd /usr/local
    tar -zxvf /usr/local/kafka_2.11-0.10.1.1.tgz
    mv kafka_2.11-0.10.1.1 kafka
    // 修改配置文件
    vi ./kafka/config/server.properties

    修改配置文件: 进入到config目录

    cd /opt/kafka/kafka/config/

    主要关注:server.properties 这个文件即可,我们可以发现在目录下:

    有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群

     修改配置文件:

    broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
    port=19092 #当前kafka对外提供服务的端口默认是9092
    host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
    num.network.threads=3 #这个是borker进行网络处理的线程数
    num.io.threads=8 #这个是borker进行I/O处理的线程数
    log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
    socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
    socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
    socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
    num.partitions=1 #默认的分区数,一个topic默认1个分区数
    log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
    message.max.byte=5242880  #消息保存的最大值5M
    default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
    replica.fetch.max.bytes=5242880  #取消息的最大直接数
    log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
    log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
    log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
    zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口
    #Server Basics
    broker.id=1  每台服务器的broker.id都不能相同
    
    #Socket Server Settings
    listeners=PLAINTEXT://192.168.0.113:9092
    advertised.listeners=PLAINTEXT://192.168.0.113:9092
    
    #hostname
    host.name=192.168.0.113
    
    #Log Basics
    log.dirs=/usr/local/kafka/logs

    num.partitions
    =3 #在log.retention.hours=168 下面新增下面三项 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #Zookeeper zookeeper.connect=192.168.0.113:2181,192.168.0.114:2181,192.168.0.115:2181

    启动Kafka,进入kafka目录运行下面的命令,有两种方式

    # 第一种方式(推荐)
    bin/kafka-server-start.sh -daemon config/server.properties
    # 第二种方式 
    bin/kafka-server-start.sh -daemon config/server.properties
    # 停止 
    bin/kafka-server-stop.sh

    常用命令

    #启动
    bin/kafka-server-start.sh config/server.properties &
    
    #创建 topic 
    bin/kafka-topics.sh --create --zookeeper 192.168.0.113:2181 --replication-factor 2 --partitions 3 --topic jeremytopic
    
    #显示 topic 信息
    bin/kafka-topics.sh --describe --zookeeper 192.168.0.113:2181 --topic test
    
    #列出topic
    bin/kafka-topics.sh --list --zookeeper 192.168.0.113:2181
    
    #删除 topic
    bin/kafka-topics.sh --delete --zookeeper 192.168.0.113:2181 --topic test
    
    #关闭服务
    bin/kafka-server-stop.sh

    查看kafka的group.id

    kafka/config目录下的consumer.properties中可以看到

    注意事项:
    1、确保/etc/profile中环境配置是正确的,并在更新后启用(source /etc/profile)
    2、开启kafka前必须先开启所有zookeeper
    3、server.properties中broker.id不能重复
    4、server.properties中listeners参数后面的ip地址最好写上
    5、若server.properties中listeners,zookeeper.connect参数后填写的是ip地址(192.168.131.130等),则在运行命令时也应该使用ip地址代替localhost(如/usr/local/kafka/bin/kafka-console-producer.sh –broker-list 192.168.131.130:9092 –topic my-replicated-topic避免写为/usr/local/kafka/bin/kafka-console-producer.sh –broker-list localhost:9092 –topic my-replicated-topic)
    6、出现内存不足的错误时,尝试关闭多余的程序,还是不行就重启了。

    参考:

    https://my.oschina.net/orrin/blog/1834218

    https://blog.51cto.com/littledevil/2134694?source=dra

    https://blog.csdn.net/a807557328/article/details/78272989

    https://www.cnblogs.com/freeweb/p/7380492.html

    https://www.cnblogs.com/luotianshuai/p/5206662.html

  • 相关阅读:
    13.numpy线性代数和绘图
    12-numpy矩阵
    11-numpy视图与副本
    10-numpy排序搜索
    day12 异常 模块 单例
    day11面向对象 多态 静态方法 (三)
    day 10 面向对象(=)
    day9 面向对象
    day8 文件
    day7 地址 名片管理系统
  • 原文地址:https://www.cnblogs.com/Jeremy2001/p/10707532.html
Copyright © 2011-2022 走看看