zoukankan      html  css  js  c++  java
  • centos7安装kafka

    1.官网或 wget 下载 kafka_2.12-2.2.0.tgz 二进制代码包

      cd /home/tar

      wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz

    2.解压安装

      tar -zxvf kafka_2.12-2.2.0.tgz

      mv kafka_2.12-2.2.0 /usr/local/

    3.编辑 server.propertie 配置文件(配置修改待完善)

      备份文件

        cd /usr/local/kafka_2.12-2.2.0/config

        cp server.propertie server.propertie.bak

      编辑文件

        vi server.propertie

      主要修改一下几个地方:

    broker.id=123  #每台服务器的broker.id都不能相同
    host.name=xxx-xxx-xxx-xxx #主机名
    listeners=PLAINTEXT://ip:9092 #监听地址
    log.dirs=/usr/local/kafka_2.12-2.2.0/logs #在log.retention.hours
    =168 下追加 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #设置zookeeper的连接端口 zookeeper.connect=ip:12181

    4.启动服务

      启动命令

        /usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh -daemon ../config/server.propertie

      在终端输入“jps”,若启动成功会得到如下结果:

    7253 Jps
    5850 ZooKeeperMain
    6076 QuorumPeerMain
    6093 Kafka

       其中QuorumPeerMain是 zookeeper 的守护进程,kafka 是 kafka 的守护进程

    5.常用命令

      停止服务

        /usr/local/kafka_2.12-2.2.0/bin/kafka-server-stop.sh

      创建主题

        /usr/local/kafka_2.12-2.2.0/bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test-topic

      列出主题

        /usr/local/kafka_2.12-2.2.0/bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181

      运行producer并发送消息

        /usr/local/kafka_2.12-2.2.0/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test-topic

      运行consumer并接收消息

        /usr/local/kafka_2.12-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic --from-beginning

    6.集群配置

      待续

    附:

    1. server.propertie配置释义

    broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样,但是不管你怎么配,别配0就是,不然创建Topic的时候回报错。
    port=19092 #当前kafka对外提供服务的端口默认是9092
    host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
    num.network.threads=3 #这个是borker进行网络处理的线程数
    num.io.threads=8 #这个是borker进行I/O处理的线程数
    log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
    socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
    socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
    socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
    num.partitions=1 #默认的分区数,一个topic默认1个分区数
    log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
    message.max.byte=5242880  #消息保存的最大值5M
    default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
    replica.fetch.max.bytes=5242880  #取消息的最大直接数
    log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
    log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
    log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
    zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口

    2.启动时内存不足异常解决

    ## There is insufficient memory for the Java Runtime Environment to continue.
    
    # Native memory allocation (malloc) failed to allocate 986513408 bytes for committing reserved memory.
    
    # An error report file with more information is saved as:
    
    # //hs_err_pid6500.logOpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000bad30000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12)

      原因:kafka 启动脚本 kafka-server-start.sh 中指定了启动时需要的最小内存,默认为1G

      解决方法:

        修改启动脚本

          cd /usr/local/kafka_2.12-2.2.0/bin/

          cp kafka-server-start.sh kafka-server-start.sh.bak

          vi kafka-server-start.sh

        修改

          export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

        为

          export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

    3.启动时出现oom异常解决

    [ FATAL ] Fatal error during KafkaServerStable startup. Prepare to shutdown
     java.lang.OutOfMemoryError: Java heap space
            at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
            at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
            at kafka.log.SkimpyOffsetMap.(OffsetMap.scala:42)

      原因:kafka启动时分配的内存国小导致

      解决方法:同上

    参考文章:

      https://www.w3cschool.cn/apache_kafka/apache_kafka_installation_steps.html w3cschool

      https://www.cnblogs.com/krockey/p/9068129.html kafka安装

      https://blog.csdn.net/qq982782662/article/details/82810415 常用命令

      https://blog.csdn.net/yundanfengqingfeng/article/details/84781852 常用命令

      https://blog.csdn.net/r02221/article/details/55225036 启动异常解决

      https://blog.csdn.net/wqh8522/article/details/79163467 集群配置

      https://blog.csdn.net/zzq900503/article/details/83348419 集群配置

  • 相关阅读:
    混合装置实现了24/7的能量收集和储存
    2020年人工智能汽车将出台多项标准
    自动驾驶汽车事故的责任追究
    多核处理器集成了神经处理单元
    广泛的信号处理链如何让语音助理“正常工作”
    先进机器人系统中的关键技术
    模拟内存计算如何解决边缘人工智能推理的功耗挑战
    TinyML设备设计的Arm内核
    获取url指定参数值(js/vue)
    js 实时监听textarea输入
  • 原文地址:https://www.cnblogs.com/tarencez/p/10887931.html
Copyright © 2011-2022 走看看