zoukankan      html  css  js  c++  java
  • linux 安装 kafka

    @[TOC]


     

    一、安装 JDK

    rpm -qa | grep java
    rpm -qa | grep jdk
    rpm -qa | grep gcj
    rpm -qa | grep java | xargs rpm -e --nodeps #卸载老版本
    yum list java-1.8*
    yum install java-1.8.0-openjdk* -y
    java -version

     

    二、安装 zookeeper

    官方地址:https://archive.apache.org/dist/zookeeper/zookeeper-3.7.0/

    mkdir /data
    #wget http://101.34.22.188/zookeeper/apache-zookeeper-3.7.0-bin.tar.gz -P /data
    wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz -P /data --no-check-certificate
    tar -zxvf /data/apache-zookeeper-3.7.0-bin.tar.gz

    #修改配置文件
    cd /data/apache-zookeeper-3.7.0-bin/conf
    cp /data/apache-zookeeper-3.7.0-bin/conf/zoo_sample.cfg /data/apache-zookeeper-3.7.0-bin/conf/zoo.cfg

    vim /data/apache-zookeeper-3.7.0-bin/conf/zoo.cfg
    dataDir=/data/apache-zookeeper-3.7.0-bin/data #修改数据存放目录
    pidfile /var/run/zookeeper.pid
    ......

    mkdir -p /data/apache-zookeeper-3.7.0-bin/data #创建 data 文件夹

    #启动 zookeeper
    cd /data/apache-zookeeper-3.7.0-bin/bin
    ./zkServer.sh start
    ----------------------------------------------
    nohup /data/apache-zookeeper-3.7.0-bin/bin/zkServer.sh start &
    tail -f /data/apache-zookeeper-3.7.0-bin/bin/nohup.log
    ./zkServer.sh start-foreground #查看启动失败错误信息,大概率是版本问题,要下带 bin 的版本

     

    设置 zookeeper 开机自动启动

    cat > /usr/lib/systemd/system/zookeeper.service <<EOF
    [Unit]
    Description=zookeeperservice
    After=network.target

    [Service]
    Type=forking
    PIDFile=/var/run/zookeeper.pid
    WorkingDirectory=/data/apache-zookeeper-3.7.0-bin
    ExecStart=/data/apache-zookeeper-3.7.0-bin/bin/zkServer.sh start
    User=root
    Group=root
    ExecReload=/bin/kill -s HUP $MAINPID
    ExecStop=/bin/kill -s QUIT $MAINPID
    PrivateTmp=true

    [Install]
    WantedBy=multi-user.target
    EOF
    systemctl daemon-reload
    systemctl enable zookeeper.service && systemctl restart zookeeper.service
    systemctl status zookeeper.service

     

    三、安装 kafka

     

    官网:http://kafka.apache.org/downloads

    #wget http://101.34.22.188/kafka/kafka_2.12-2.8.0.tgz -P /data
    wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz -P /data --no-check-certificate
    tar xf /data/kafka_2.12-2.8.0.tgz

    修改配置

    cd /data/kafka_2.12-2.8.0

    vim /data/kafka_2.12-2.8.0/config/server.properties
    # 31 行
    listeners=PLAINTEXT://192.168.10.20:9092
    # 123 行,修改 zookeeper.connect 为自己的 IP:PORT
    zookeeper.connect=192.168.10.20:2181

    server.properties 配置详解

    //当前机器在集群中的唯一标识,和zookeeper的myid性质一样(broker.id和host.name每个节点都不相同)
    broker.id=0
    //当前kafka对外提供服务的端口默认是9092listeners=PLAINTEXT://192.168.1.202:9092
    //这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
    host.name=hadoop1
    //这个是borker进行网络处理的线程数
    num.network.threads=3
    //这个是borker进行I/O处理的线程数
    num.io.threads=8
    //发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
    socket.send.buffer.bytes=102400
    //kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
    socket.receive.buffer.bytes=102400
    //这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
    socket.request.max.bytes=104857600
    //消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,
    //如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
    log.dirs=/home/hadoop/log/kafka-logs
    //默认的分区数,一个topic默认1个分区数
    num.partitions=1
    //每个数据目录用来日志恢复的线程数目
    num.recovery.threads.per.data.dir=1
    //默认消息的最大持久化时间,168小时,7天
    log.retention.hours=168
    //轮转时间,当需要删除指定小时之前的数据时,该设置项很重要
    log.roll.hours=12
    //这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
    log.segment.bytes=1073741824
    //每隔300000毫秒去检查上面配置的log失效时间
    log.retention.check.interval.ms=300000
    //是否启用log压缩,一般不用启用,启用的话可以提高性能
    log.cleaner.enable=false
    //设置zookeeper的连接端口
    zookeeper.connect=192.168.123.102:2181,192.168.123.103:2181,192.168.123.104:2181
    //设置zookeeper的连接超时时间
    zookeeper.connection.timeout.ms=6000

     

    启动 kafka

    nohup /data/kafka_2.12-2.8.0/bin/kafka-server-start.sh /data/kafka_2.12-2.8.0/config/server.properties & 

     

    设置开机自动启动

    cat > /usr/lib/systemd/system/kafka.service <<EOF
    [Unit]
    Description=kafkaservice
    After=network.target

    [Service]
    WorkingDirectory=/data/kafka_2.12-2.8.0
    ExecStart=/data/kafka_2.12-2.8.0/bin/kafka-server-start.sh /data/kafka_2.12-2.8.0/config/server.properties
    ExecStop=/data/kafka_2.12-2.8.0/bin/kafka-server-stop.sh
    User=root
    Group=root
    Restart=always
    RestartSec=10

    [Install]
    WantedBy=multi-user.target
    EOF
    systemctl daemon-reload
    systemctl enable kafka.service && systemctl restart kafka.service
    systemctl status kafka.service
    netstat -antp | grep 9092
    ps aux | grep kafka
    lsof -i:9092

     

    测试

    # topic 是发布消息的 category,以单节点的配置创建了一个叫 dblab01 的 topic.可以用 list 列出所有创建的 topics,来查看刚才创建的主题是否存在
    /data/kafka_2.12-2.8.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic dblab01

    /data/kafka_2.12-2.8.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic dblab01

    /data/kafka_2.12-2.8.0/bin/kafka-topics.sh --list --zookeeper localhost:2181

    /data/kafka_2.12-2.8.0/bin/kafka-console-producer.sh --broker-list PLAINTEXT://192.168.10.20:9092 --topic dblab01

    /data/kafka_2.12-2.8.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab01 --from-beginning #测试命令可能有些版本问题

     

     

  • 相关阅读:
    Python推导式(Comprehension)
    mysql中文乱码
    入门学习hibernate
    什么是ORM?
    Java网站中的权限管理
    Java的8中基本数据类型
    Python获取文件夹大小
    Python技巧
    Python中取整的方法floor,ceil,round
    Python线程join和setDaemon
  • 原文地址:https://www.cnblogs.com/shenyuanhaojie/p/15522471.html
Copyright © 2011-2022 走看看