zoukankan      html  css  js  c++  java
  • Kafka安装步骤

    基本概念

    1.Producer:消息生产者,就是向 kafka broker 发消息的客户端
    2.Consumer:消息消费者,向 kafka broker 取消息的客户端
    3.Consumer Group(CG ):消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据, 一个分区只能由一个 组内 消费者消费; 消费者组之间互不影响。 所有的消费者都属于某个消费者组,即 消费者组是逻辑上的一个订阅者。
    4.Broker:一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
    5.Topic:可以理解为一个队列, 生产者和消费者面向的都是一个 topic
    6.Partition:为了实现扩展性, 一个非常大的 topic 可以分布到多个 broker (即服务器) 上,一个 topic  可以分为多个 partition,每个 partition 是一个有序的队列;
    7.Replica: 副本, 为保证集群中的某个节点发生故障时, 该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作, kafka 提供了副本机制, 一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
    8.leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
    9.follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。
    

    采用伪集群的方式部署Kafka集群

    # 下载软件
    cd /usr/local/src
    wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.3.0/kafka_2.12-2.3.0.tgz
    tar -xvf kafka_2.12-2.3.0.tgz
    cd kafka_2.12-2.3.0
    
    # 解压缩后进入目录中
    # 方法1:使用同一个程序,但是采用不同的配置文件
    # 方法2:复制两份目录出来,使用不同的目录区分
    # 此处采用的是方法1,在后续步骤采用的是方法2
    
    cp config/server.properties config/server-1.properties
    cp config/server.properties config/server-2.properties 
    
    # log.dir表示的是队列数据存储的路径,根据实际情况修改
    
    config/server.properties: 
        broker.id=0 
        listeners=PLAINTEXT://:9092
        log.dir=/data/kafka-logs-0
        delete.topic.enable=true #删除 topic 功能
        
    config/server-1.properties: 
        broker.id=1 
        listeners=PLAINTEXT://:9093 
        log.dir=/data/kafka-logs-1
        delete.topic.enable=true #删除 topic 功能
    
    config/server-2.properties: 
        broker.id=2 
        listeners=PLAINTEXT://:9094 
        log.dir=/data/kafka-logs-2
        delete.topic.enable=true
    

    后台启动内置的zookeeper

    # 测试可以使用Kafka自带的zookeeper。生产上建议使用单独的zookeeper集群
    # 若是搭建集群的话,方法同Kafka
    # 此处先启动一个zookeeper,后续采用同Kafka的方法2搭建集群使用
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    

    关闭zookeeper

    bin/zookeeper-server-stop.sh stop
    

    开启kafk集群(伪集群),shell脚本

    bin/kafka-server-start.sh -daemon config/server.properties
    bin/kafka-server-start.sh -daemon config/server-1.properties
    bin/kafka-server-start.sh -daemon config/server-2.properties
    

    关闭kafk集群(伪集群),shell脚本

    bin/zookeeper-server-stop.sh stop
    

    查看当前服务器中的所有 topic

    bin/kafka-topics.sh --list --zookeeper localhost:2181 
    

    创建 topic

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic first
    

    删除 topic(需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除)

    bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic first
    

    发送消息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first
    

    消费消息(默认当前,不是从头开始)

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
    

    消费消息,从头开始

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic second --from-beginning
    

    查看某个 Topic 的详情

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic first
    

    修改分区数

    bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic first --partitions 6
    

    自带压测命令

    bin/kafka-producer-perf-test.sh --topic first --num-records 1000 --record-size 100 --throughput 1000  --producer-props bootstrap.servers=localhost:9092
    

    安装Kafka Eagle监控Kafka集群

    1.修改 kafka-server-start.sh 命令
    因为使用的是伪集群,所以,需要把目录复制三份,每个目录下该文件的JMX_PORT值都不一样才行

    正好利用这种情况,每个目录下都可以启动自带的zookeeper,修改端口号,从而构建一个zookeeper伪集群

    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
        export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
        export JMX_PORT="9999"
    #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    fi
    

    2.下载Kafka Eagle软件

    cd /usr/local/src/
    wget https://github.com/smartloli/kafka-eagle-bin/archive/v1.3.9.tar.gz
    tar -zxvf v1.3.9.tar.gz
    cd kafka-eagle-bin-1.3.9/
    cp kafka-eagle-web-1.3.9-bin.tar.gz ../
    tar -zxvf kafka-eagle-web-1.3.9-bin.tar.gz 
    cd kafka-eagle-web-1.3.9/conf
    vim system-config.properties
    

    3.设置
    若是使用zookeeper伪集群,做相应的增加

    vim system-config.properties
    
    kafka.eagle.zk.cluster.alias=cluster1
    cluster1.zk.list=localhost:2181
    
    cluster1.kafka.eagle.offset.storage=kafka
    
    kafka.eagle.metrics.charts=true
    kafka.eagle.sql.fix.error=false
    
    # 使用sqlite数据库,注意保存路径,没有的话需要新建,可以换成MySQL
    kafka.eagle.driver=org.sqlite.JDBC
    kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
    kafka.eagle.username=root
    kafka.eagle.password=www.kafka-eagle.org
    

    4.安装maven

    yum install maven
    

    5.配置环境变量

    vim /etc/profile.d/ke.sh
    export KE_HOME=/usr/local/src/kafka-eagle-web-1.3.9
    export PATH=$PATH:$KE_HOME/bin
    
    vim /etc/profile.d/java.sh
    export JAVA_HOME=//usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64
    export JRE_HOME=$JAVA_HOME/jre
    export CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
    export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
    
    source /etc/profile.d/ke.sh
    soucer /etc/profile.d/ke.sh
    

    6.启动

    cd bin
    bash ke.sh start.sh
    
    *******************************************************************
    * Kafka Eagle system monitor port successful... 
    *******************************************************************
    [2019-09-10 16:41:46] INFO: Status Code[0]
    [2019-09-10 16:41:46] INFO: [Job done!]
    Welcome to
        __ __    ___     ____    __ __    ___            ______    ___    ______    __     ______
       / //_/   /   |   / __/   / //_/   /   |          / ____/   /   |  / ____/   / /    / ____/
      / ,<     / /| |  / /_    / ,<     / /| |         / __/     / /| | / / __    / /    / __/   
     / /| |   / ___ | / __/   / /| |   / ___ |        / /___    / ___ |/ /_/ /   / /___ / /___   
    /_/ |_|  /_/  |_|/_/     /_/ |_|  /_/  |_|       /_____/   /_/  |_|\____/   /_____//_____/   
                                                                                                 
    
    Version 1.3.9
    *******************************************************************
    * Kafka Eagle Service has started success.
    * Welcome, Now you can visit 'http://127.0.0.1:8048/ke'
    * Account:admin ,Password:123456
    *******************************************************************
    * <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
    * <Usage> https://www.kafka-eagle.org/ </Usage>
    *******************************************************************
    

    提示的访问地址是http://127.0.0.1:8048/ke,但是可以用http://192.168.0.145/ke进行访问

  • 相关阅读:
    中国大学排名爬虫
    基于bs4库的HTML内容查找方法和HTML格式化和编码
    自动化提取51啦数据的信息
    简单目录扫描工具
    一个简单音乐播放器
    【原创】C++中对象的序列化
    [android]android开发中的运行错误之:adb.exe
    [转载]十大编程算法助程序员走上高手之路
    [原创]二叉树相关笔试题代码
    [原创]VS2010中创建动态链接库及其调用
  • 原文地址:https://www.cnblogs.com/sanduzxcvbnm/p/11640157.html
Copyright © 2011-2022 走看看