zoukankan      html  css  js  c++  java
  • Kafka安装

    1. 主机规划

    序号 主机名 IP zookeeper端口 kafka端口      
    1 node01 10.0.0.21 2181 9022      
    2 node02 10.0.0.22 2181 9022      
    3 node03 10.0.0.23 2181 9022      

    2. 下载kafka安装包

    wget  https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz

    3. 解压

    tar -zxf kafka_2.12-2.1.0.tgz 
    mv kafka_2.12-2.1.0 /opt/myprogram/kafka
    cd /opt/myprogram/kafka/

    4. 修改/kafka/config/server.properties配置

    cd /opt/myprogram/kafka/config

    vi server.properties

    配置文件内容修改以下几项:

    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    
    # The address the socket server listens on. It will get the value returned from
    # java.net.InetAddress.getCanonicalHostName() if not configured.
    #   FORMAT:
    #     listeners = listener_name://host_name:port
    #   EXAMPLE:
    #     listeners = PLAINTEXT://your.host.name:9092
    listeners=PLAINTEXT://10.0.0.21:9092
    
    # A comma separated list of directories under which to store log files
    log.dirs=/var/applogs/kafka-logs
    
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=10.0.0.21:2181,10.0.0.22:2181,10.0.0.23:2181/kafka

    5. 修改/etc/profile,添加以下两行

    export KAFKA_HOME=/opt/myprogram/kafka
    export PATH=$PATH:$KAFKA_HOME/bin

    6.远程拷贝kafka至node02,node03

    scp -r ./kafka root@10.0.0.22:/opt/myprogram/kafka
    scp -r ./kafka root@10.0.0.23:/opt/myprogram/kafka

    7. 修改node02的/kafka/config/server.properties配置

    broker.id=1
    listeners=PLAINTEXT://10.0.0.22:9092

    8. 修改node03的/kafka/config/server.properties配置

    broker.id=2
    listeners=PLAINTEXT://10.0.0.23:9092

    9. 修改node02,node03的/etc/profile

    10. 重新生效/etc/profile

    source /etc/profile

    11.启动命令

    kafka-server-start.sh /opt/myprogram/kafka/config/server.properties

    12 创建topic

    [root@zk-node01 ~]# kafka-topics.sh --zookeeper 10.0.0.21:2181/kafka --create --topic topicmsg --partitions 2 --replication-factor 2
    Created topic "topicmsg".

    查看zookeeper

    [zk: localhost:2181(CONNECTED) 30] ls /kafka/brokers/topics
    [topicmsg]

    查看已创建的topic

    [root@zk-node01 ~]# kafka-topics.sh --zookeeper 10.0.0.21:2181/kafka --describe topicmsg
    Topic:topicmsg    PartitionCount:2    ReplicationFactor:2    Configs:
        Topic: topicmsg    Partition: 0    Leader: 0    Replicas: 0,1    Isr: 1,0
        Topic: topicmsg    Partition: 1    Leader: 1    Replicas: 1,2    Isr: 1,2

     13. 发送测试消息

    [root@zk-node01 ~]# kafka-console-producer.sh --broker-list 10.0.0.21:9092 --topic topicmsg
    >1
    >2
    >3
    >4
    >5
    >6

    14.消费测试消息

    [root@zk-node01 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.21:9092 --topic topicmsg --group testgroup
    1
    2
    3
    4
    5
    6
  • 相关阅读:
    基于Maven的MyBatis Generator逆向工程
    JQuery对象调用reset方法:Uncaught TypeError: $(...).reset is not a function
    死锁编码及定位分析
    线程池的简介及底层原理
    转载:Mysql8.0忘记 root 密码, 如何修改?
    synchronized 和 Lock 有什么区别?
    java 中的阻塞队列及生产者-消费者中的简单应用
    java 中 CountDownLatch、CyclicBarrier 和 Semaphore 的简单使用
    java中的公平锁、非公平锁、可重入锁、递归锁、自旋锁、独占锁和共享锁
    Java 集合类的线程安全问题及解决方法
  • 原文地址:https://www.cnblogs.com/datangguott/p/14159176.html
Copyright © 2011-2022 走看看