zoukankan      html  css  js  c++  java
  • Kafka安装及部署

     安装及部署

    一、环境配置

    • 操作系统:Cent OS 7

    • Kafka版本:0.9.0.0

    • Kafka官网下载:请点击

    • JDK版本:1.7.0_51

    • SSH Secure Shell版本:XShell 5

    二、操作过程

    1、下载Kafka并解压

    • 下载:

    curl -L -O http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz 
    • 解压:

    tar zxvf kafka_2.10-0.9.0.0.tgz 

    2、Kafka目录介绍

    • /bin 操作kafka的可执行脚本,还包含windows下脚本

    • /config 配置文件所在目录

    • /libs 依赖库目录

    • /logs 日志数据目录,目录kafka把server端日志分为5种类型,分为:server,request,state,log-cleaner,controller

    3、配置

    • 配置zookeeper

    请参考zookeeper

    • 进入kafka安装工程根目录编辑config/server.properties

    kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect,kafka server端config/server.properties参数说明和解释如下:

    server.properties配置属性说明

    4、启动Kafka

    • 启动

    进入kafka目录,敲入命令 bin/kafka-server-start.sh config/server.properties &
    • 检测2181与9092端口

    netstat -tunlp|egrep "(2181|9092)"
    tcp        0      0 :::2181                     :::*                        LISTEN      19787/java          
    tcp        0      0 :::9092                     :::*                        LISTEN      28094/java 

    说明:

    Kafka的进程ID为28094,占用端口为9092

    QuorumPeerMain为对应的zookeeper实例,进程ID为19787,在2181端口监听

    5、单机连通性测试

    启动2个XSHELL客户端,一个用于生产者发送消息,一个用于消费者接受消息。

    • 运行producer,随机敲入几个字符,相当于把这个敲入的字符消息发送给队列。

    bin/kafka-console-producer.sh --broker-list 192.168.1.181:9092 --topic test

    说明:早版本的Kafka,–broker-list 192.168.1.181:9092需改为–zookeeper 192.168.1.181:2181

    • 运行consumer,可以看到刚才发送的消息列表。

    bin/kafka-console-consumer.sh --zookeeper 192.168.1.181:2181 --topic test --from-beginning  
    • 注意:

    producer,指定的Socket(192.168.1.181+9092),说明生产者的消息要发往kafka,也即是broker

    consumer, 指定的Socket(192.168.1.181+2181),说明消费者的消息来自zookeeper(协调转发)

    上面的只是一个单个的broker,下面我们来实验一个多broker的集群。

    6、搭建一个多个broker的伪集群

    刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点也都是在本机上。

    (1)为每一个broker提供配置文件

    我们先看看config/server0.properties配置信息:

    broker.id=0
    listeners=PLAINTEXT://:9092
    port=9092
    host.name=192.168.1.181
    num.network.threads=4
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs
    num.partitions=5
    num.recovery.threads.per.data.dir=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    log.cleaner.enable=false
    zookeeper.connect=192.168.1.181:2181
    zookeeper.connection.timeout.ms=6000
    queued.max.requests =500
    log.cleanup.policy = delete
    • 说明:

    broker.id为集群中唯一的标注一个节点,因为在同一个机器上,所以必须指定不同的端口和日志文件,避免数据被覆盖。

    在上面单个broker的实验中,为什么kafka的端口为9092,这里可以看得很清楚。

    kafka cluster怎么同zookeeper交互的,配置信息中也有体现。

    那么下面,我们仿照上面的配置文件,提供2个broker的配置文件:

    • server1.properties:

    broker.id=1
    listeners=PLAINTEXT://:9093
    port=9093
    host.name=192.168.1.181
    num.network.threads=4
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs1
    num.partitions=5
    num.recovery.threads.per.data.dir=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    log.cleaner.enable=false
    zookeeper.connect=192.168.1.181:2181
    zookeeper.connection.timeout.ms=6000
    queued.max.requests =500
    log.cleanup.policy = delete
    • server2.properties:

    broker.id=2
    listeners=PLAINTEXT://:9094
    port=9094
    host.name=192.168.1.181
    num.network.threads=4
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs2
    num.partitions=5
    num.recovery.threads.per.data.dir=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    log.cleaner.enable=false
    zookeeper.connect=192.168.1.181:2181
    zookeeper.connection.timeout.ms=6000
    queued.max.requests =500
    log.cleanup.policy = delete
    (2)启动所有的broker

    命令如下:

    bin/kafka-server-start.sh config/server0.properties &   #启动broker0
    bin/kafka-server-start.sh config/server1.properties & #启动broker1
    bin/kafka-server-start.sh config/server2.properties & #启动broker2

    查看2181、9092、9093、9094端口

    netstat -tunlp|egrep "(2181|9092|9093|9094)"
    tcp        0      0 :::9093                     :::*                        LISTEN      29725/java          
    tcp        0      0 :::2181                     :::*                        LISTEN      19787/java          
    tcp        0      0 :::9094                     :::*                        LISTEN      29800/java          
    tcp        0      0 :::9092                     :::*                        LISTEN      29572/java  

    一个zookeeper在2181端口上监听,3个kafka cluster(broker)分别在端口9092,9093,9094监听。

    (3)创建topic
    bin/kafka-topics.sh --create --topic topic_1 --partitions 1 --replication-factor 3  \--zookeeper localhost:2181
    bin/kafka-topics.sh --create --topic topic_2 --partitions 1 --replication-factor 3  \--zookeeper localhost:2181
    bin/kafka-topics.sh --create --topic topic_3 --partitions 1 --replication-factor 3  \--zookeeper localhost:2181

    查看topic创建情况:

    bin/kafka-topics.sh --list --zookeeper localhost:2181
    test
    topic_1
    topic_2
    topic_3
    [root@atman081 kafka_2.10-0.9.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181
    Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
    	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
    Topic:topic_1	PartitionCount:1	ReplicationFactor:3	Configs:
    	Topic: topic_1	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1,0
    Topic:topic_2	PartitionCount:1	ReplicationFactor:3	Configs:
    	Topic: topic_2	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
    Topic:topic_3	PartitionCount:1	ReplicationFactor:3	Configs:
    	Topic: topic_3	Partition: 0	Leader: 0	Replicas: 0,2,1	Isr: 0,2,1

    上面的有些东西,也许还不太清楚,暂放,继续试验。需要注意的是topic_1的Leader=2

    (4)模拟客户端发送,接受消息
    • 发送消息

    bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.1.181:9092,192.168.1.181:9093,192.168.1.181:9094
    • 接收消息

    bin/kafka-console-consumer.sh --topic topic_1 --zookeeper 192.168.1.181:2181 --from-beginning

    需要注意,此时producer将topic发布到了3个broker中,现在就有点分布式的概念了。

    (5) kill some broker

    kill broker(id=0)

    首先,我们根据前面的配置,得到broker(id=0)应该在9092监听,这样就能确定它的PID了。

    broker0没kill之前topic在kafka cluster中的情况

    bin/kafka-topics.sh --describe --zookeeper localhost:2181
    Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
    	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
    Topic:topic_1	PartitionCount:1	ReplicationFactor:3	Configs:
    	Topic: topic_1	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1,0
    Topic:topic_2	PartitionCount:1	ReplicationFactor:3	Configs:
    	Topic: topic_2	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
    Topic:topic_3	PartitionCount:1	ReplicationFactor:3	Configs:
    	Topic: topic_3	Partition: 0	Leader: 2	Replicas: 0,2,1	Isr: 2,1,0
    

    kill之后,再观察,做下对比。很明显,主要变化在于Isr,以后再分析

    bin/kafka-topics.sh --describe --zookeeper localhost:2181
    Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
    	Topic: test	Partition: 0	Leader: -1	Replicas: 0	Isr: 
    Topic:topic_1	PartitionCount:1	ReplicationFactor:3	Configs:
    
    	Topic: topic_1	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1
    Topic:topic_2	PartitionCount:1	ReplicationFactor:3	Configs:
    	Topic: topic_2	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2
    Topic:topic_3	PartitionCount:1	ReplicationFactor:3	Configs:
    	Topic: topic_3	Partition: 0	Leader: 2	Replicas: 0,2,1	Isr: 2,1
    

    测试下,发送消息,接受消息,是否收到影响。

    • 发送消息

    bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.1.181:9092,192.168.1.181:9093,192.168.1.181:9094
    • 接收消息

    bin/kafka-console-consumer.sh --topic topic_1 --zookeeper 192.168.1.181:2181 --from-beginning

    可见,kafka的分布式机制,容错能力还是挺好的~

    Kafka介绍

    1、kafka有什么?

    • producer 消息的生成者,即发布消息

    • consumer 消息的消费者,即订阅消息

    • broker Kafka以集群的方式运行,可以由一个或多个服务组成,服务即broker

    • zookeeper 协调转发

    2、kafka的工作图

    producers通过网络将消息发送到Kafka集群,集群向消费者提供消息

    kafka对消息进行归纳,即topic,也就是说producer发布topic,consumer订阅topic

    参考资料

    apache kafka技术分享系列(目录索引)

    Kafka深度解析,众人推荐,精彩好文!

  • 相关阅读:
    归并排序(Merge Sort)
    AtCoder AGC035D Add and Remove (状压DP)
    AtCoder AGC034D Manhattan Max Matching (费用流)
    AtCoder AGC033F Adding Edges (图论)
    AtCoder AGC031F Walk on Graph (图论、数论)
    AtCoder AGC031E Snuke the Phantom Thief (费用流)
    AtCoder AGC029F Construction of a Tree (二分图匹配)
    AtCoder AGC029E Wandering TKHS
    AtCoder AGC039F Min Product Sum (容斥原理、组合计数、DP)
    AtCoder AGC035E Develop (DP、图论、计数)
  • 原文地址:https://www.cnblogs.com/wangyangliuping/p/5546465.html
Copyright © 2011-2022 走看看