zoukankan      html  css  js  c++  java
  • 在Centos 7上安装配置 Apche Kafka 分布式消息系统集群

    Apache Kafka是一种颇受欢迎的分布式消息代理系统,旨在有效地处理大量的实时数据。Kafka集群不仅具有高度可扩展性和容错性,而且与其他消息代理(如ActiveMQ和RabbitMQ)相比,还具有更高的吞吐量。虽然它通常用作pub/sub消息传递系统,但许多组织也将其用于日志聚合,因为它为发布的消息提供持久存储。

    您可以在一台服务器上部署Kafka,也可以构建一个分布式的Kafka集群来提高性能。本文介绍如何在多节点CentOS 7服务器实例上安装Apache Kafka。

    先决条件:

    欲安装kafka集群服务器,首先要安装以下组件:

    Linux JAVA JDK JRE 环境变量安装与配置
    在 Linux 多节点安装配置 Apache Zookeeper 分布式集群

    服务器列表:

    10.10.204.63
    10.10.204.64
    10.10.204.65

    1.安装

    创建用户和组:

     # groupadd kafka
     # useradd -g kafka -s /sbin/nologin kafka

    下载Kafka包:

     # cd /usr/local
     # wget http://apache.fayea.com/kafka/0.10.2.1/kafka_2.10-0.10.2.1.tgz

    解压创建软连接:

     # tar zxvf kafka_2.10-0.10.2.1.tgz
     # ln -s kafka_2.10-0.10.2.1 kafka

    设置权限及创建Kafka日志存放目录:

     # chown -R kafka:kafka kafka_2.10-0.10.2.1 kafka
     # mkdir -p /usr/local/kafka/logs

    添加系统变量:

    编辑:/etc/profile 文件,在最下面添加以下内容:

     export KAFKA_HOME=/usr/local/kafka_2.10-0.10.2.1
     export PATH=$KAFKA_HOME/bin:$PATH

    使变量生效:

     # source /etc/profile

    2.配置

    修改添加Kafka服务器的配置文件:

     # cd /usr/local/kafka/config
     # vim server.properties

    #唯一值,每个server填写不一样。
    broker.id=63
    #允许删除主题。
    delete.topic.enable=true
    #修改;协议、当前broker机器ip、端口,此值可以配置多个,跟SSL等有关系。
    listeners=PLAINTEXT://10.10.204.63:9092
    #修改;kafka数据的存放地址,多个地址的话用逗号分割,例如 /data/kafka-logs-1,/data/kafka-logs-2。
    log.dirs=/usr/local/kafka/logs/kafka-logs
    #每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖。
    num.partitions=3
    #新增;表示消息体的最大大小,单位是字节。
    message.max.bytes=5242880
    #新增;是否允许自动创建topic,若是false,就需要通过命令创建topic。
    default.replication.factor=2
    #新增;replicas每次获取数据的最大大小。
    replica.fetch.max.bytes=5242880
    #新增;配置文件中必须使用以下配置,否则只会标记为删除,而不是真正删除。
    delete.topic.enable=true
    #新增;是否允许 leader 进行自动平衡,boolean 值,默认为 true。
    auto.leader.rebalance.enable=true
    #kafka连接的zk地址,各个broker配置一致。
    zookeeper.connect=10.10.204.63:2181,10.10.204.64:2181,10.10.204.65:2181

    #可选配置
    #是否允许自动创建 topic,boolean 值,默认为 true。
    auto.create.topics.enable=true
    #指定 topic 的压缩方式,string 值,可选有。
    #compression.type=high
    #会把所有的日志同步到磁盘上,避免重启之后的日志恢复,减少重启时间。
    controlled.shutdown.enable=true

    注:broker的配置文件中有zookeeper的地址,也有自己的broker ID, 当broker启动后,会在zookeeper中新建一个znode。

    修改其他配置文件:

     # vim zookeeper.properties

    修改为:

     dataDir=/usr/local/zookeeper/data
    新增:
    
     server.1=10.10.204.63:2888:3888
     server.2=10.10.204.64:2888:3888
     server.3=10.10.204.65:2888:3888

    修改以下配置文件:

    # vim producer.properties
    
    bootstrap.servers=10.10.204.63:9092,10.10.204.64:9092,10.10.204.65:9092
    
    # vim consumer.properties
    
    zookeeper.connect=10.10.204.63:2181,10.10.204.64:2181,10.10.204.65:2181

    3.启动

    启动所有节点kafka服务(可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功):

    # /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties

    执行上述命令后,会出来滚动的启动信息,直至窗口静止,此时需重开终端检查是否启动成功;

    # jps
     9939 Jps
     2201 QuorumPeerMain
     2303 Kafka

    4.使用测试

    下面操作可以在任意节点,重新打开一个终端操作:

    执行以下命令,建立一个名为 renwole 的topic。

     # cd /usr/local/kafka/bin
     # ./kafka-topics.sh --create --zookeeper 10.10.204.63:2181,10.10.204.64:2181,10.10.204.65:2181 --replication-factor 1 --partitions 1 --topic renwole
     Created topic "renwole".

    解释:

     --replication-factor 1 复制1份
     --partitions 1 创建1个分区
     --topic 主题为renwole

    查看已创建的topic:

    # ./kafka-topics.sh --list --zookeeper 10.10.204.63:2181
     _consumer_offsets
     renwole

    注:可以配置 broker 自动创建 topic。

    发送消息(Kafka 使用一个简单的命令行producer(然后可以随意输入内容,回车可以发送,ctrl+c 退出)默认的每条命令将发送一条消息。):

    # ./kafka-console-producer.sh --broker-list 10.10.204.64:9092 --topic renwole

    在消息接收端,执行以下命令查看收到的消息:

    # ./kafka-console-consumer.sh --bootstrap-server 10.10.204.63:9092 --topic renwole --from-beginning

    执行以下命令删除topic:

    # ./kafka-topics.sh --delete --zookeeper 10.10.204.63:2181,10.10.204.64:2181,10.10.204.65:2181 --topic renwole

    5.查看集群状态

    kafka已经成功完成安装,查看kafka集群节点ID状态:

    注:可在任意节点连接zookeeper客户端。

     # cd /usr/local/zookeeper/bin
     # ./zkCli.sh
     Connecting to localhost:2181
     [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
     [myid:] - INFO [main:Environment@100] - Client environment:host.name=10-10-204-63.10.10.204.63
     [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_144
     [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
     [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/java/jdk1.8.0_144/jre
     [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/zookeeper/bin/../lib/netty-3.10.5.Final.jar:/usr/local/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/local/zookeeper/bin/../lib/jline-0.9.94.jar:/usr/local/zookeeper/bin/../zookeeper-3.4.10.jar:/usr/local/zookeeper/bin/../src/java/lib/*.jar:/usr/local/zookeeper/bin/../conf:.:/usr/java/jdk1.8.0_144/jre/lib/rt.jar:/usr/java/jdk1.8.0_144/lib/dt.jar:/usr/java/jdk1.8.0_144/lib/tools.jar:/usr/java/jdk1.8.0_144/jre/lib
     [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
     [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
     [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=
     [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux
     [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64
     [myid:] - INFO [main:Environment@100] - Client environment:os.version=3.10.0-514.21.2.el7.x86_64
     [myid:] - INFO [main:Environment@100] - Client environment:user.name=root
     [myid:] - INFO [main:Environment@100] - Client environment:user.home=/root
     [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/usr/local/zookeeper-3.4.10/bin
     [myid:] - INFO [main:ZooKeeper@438] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@69d0a921
     Welcome to ZooKeeper!
     [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1032] - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
     JLine support is enabled
     [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@876] - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session
     [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1299] - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x35ddf80430b0008, negotiated timeout = 30000
     WATCHER::
     WatchedEvent state:SyncConnected type:None path:null
     [zk: localhost:2181(CONNECTED) 0] ls /brokers/ids #查看ID
     [63, 64, 65]

    可以看到三台kafka实例ID都在线,如果模拟任意一台节点掉线,查看结果就会不同(这个ID号是前面设置的 broker.id )。

    开放端口加入防火墙:

    # firewall-cmd --permanent --add-port=9092/tcp
    # firewall-cmd --reload

    6.开机启动

    创建system单元文件:

    /usr/lib/systemd/system 目录下创建 kafka.service 填写以下内容:

    [Unit]
     Description=Apache Kafka server (broker)
     Documentation=http://kafka.apache.org/documentation/
     Requires=network.target remote-fs.target
     After=network.target remote-fs.target
    
    [Service]
     Type=simple
     Environment="LOG_DIR=/usr/local/kafka/logs"
     User=kafka
     Group=kafka
     #Environment=JAVA_HOME=/usr/java/jdk1.8.0_144
     ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
     ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
     Restart=on-failure
     SyslogIdentifier=kafka
    
    [Install]
     WantedBy=multi-user.target

    启动kafka实例服务器:

    # systemctl daemon-reload
    # systemctl enable kafka.service
    # systemctl start kafka.service
    # systemctl status kafka.service
    

    7.此外Kafka也可以通过 Web UI 界面进行集群的管理,你可以参阅:

    安装配置 Kafka Manager 分布式管理工具

    滴滴,好了,到目前为止,kafka分布式消息队列已经部署完成,其中包括必要的优化信息。目前,您可以用于大多数编程语言的Kafka客户端去创建Kafka生产者和消费者,轻松地将其用于您的项目中。

    参考资料:
    http://kafka.apache.org/documentation.html#quickstart
    https://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/
    https://tech.meituan.com/kafka-fs-design-theory.html
    http://blog.jobbole.com/99195/

    版权声明:本站原创文章,欢迎任何形式的转载。 
    转载请注明:在Centos 7上安装配置 Apche Kafka 分布式消息系统集群 | 任我乐

  • 相关阅读:
    函数配接器
    函数对象和函数指针
    unary_function 和 binary_function
    堆排序
    Shell排序
    volatile理解
    死锁
    进程间通信
    优化循环的方法-循环展开
    如何学习编译原理
  • 原文地址:https://www.cnblogs.com/weifeng1463/p/8875441.html
Copyright © 2011-2022 走看看