zoukankan      html  css  js  c++  java
  • Kafka+Zookeeper集群搭建

    上次介绍了ES集群搭建的方法,希望能帮助大家,这儿我再接着介绍kafka集群,接着上次搭建的效果。

    首先我们来简单了解下什么是kafka和zookeeper?

    Apache kafka 是一个分布式的基于push-subscribe的消息系统,它具备快速、可扩展、可持久化的特点。它现在是Apache旗下的一个开源系统,作为hadoop生态系统的一部分,被各种商业公司广泛应用。它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/spark流式处理引擎。

    特点:

    • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
    • 可扩展性:kafka集群支持热扩展
    • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
    • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
    • 高并发:支持数千个客户端同时读写

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它包含一个简单的原语集,分布式应用程序可以基于它实现同步服务,配置维护和命名服务等。

    集群角色:

    • Leader服务器是整个zookeeper集群工作机制中的核心
    • Follower服务器是zookeeper集群状态的跟随者
    • Observer 服务器充当一个观察者的角色

    接下来就直接进去正题,如何正确的搭建kafka和zookeeper集群。

     一、zookeeper集群配置

    1、修改主机名

    kafka1.example.com --> 172.16.81.131
    kafka2.example.com --> 172.16.81.132
    

    2、修改hosts文件

    [root@kafka1 opt]# cat /etc/hosts
      127.0.0.1   kafka1.example.com localhost localhost.localdomain localhost4 localhost4.localdomain4
      ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
    [root@kafka2 opt]# cat /etc/hosts
      127.0.0.1   kafka2.example.com localhost localhost.localdomain localhost4 localhost4.localdomain4
      ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
    

    3、安装jdk

    cd /opt
    jdk-8u131-linux-x64.tar.gz
    tar -zxvf jdk-8u131-linux-x64.tar.gz
    mv jdk-1.8.0_131 /usr/local/
    

    4、配置jdk环境变量

    [root@kafka1 opt]# tail -10 /etc/profile
     #JAVA环境变量
     export JAVA_HOME=/usr/local/jdk1.8.0_131
     export JAVA_BIN=$JAVA_HOME/bin
     export JAVA_LIB=$JAVA_HOME/lib
     export CLASSPATH=.:$JAVA_LIB/tools.jar:$JAVA_LIB/dt.jar
     export PATH=$JAVA_BIN:$PATH
     export _JAVA_SR_SIGNUM=12
     #zookeeper环境变量
     export ZOOKEEPER_HOME=/opt/zookeeper/
     export PATH=$ZOOKEEPER_HOME/bin:$PATH
     export PATH
    [root@kafka2 opt]# tail -10 /etc/profile
     #JAVA环境变量
     export JAVA_HOME=/usr/local/jdk1.8.0_131
     export JAVA_BIN=$JAVA_HOME/bin
     export JAVA_LIB=$JAVA_HOME/lib
     export CLASSPATH=.:$JAVA_LIB/tools.jar:$JAVA_LIB/dt.jar
     export PATH=$JAVA_BIN:$PATH
     export _JAVA_SR_SIGNUM=12
     #zookeeper环境变量
     export ZOOKEEPER_HOME=/opt/zookeeper/
     export PATH=$ZOOKEEPER_HOME/bin:$PATH
     export PATH
     #应用环境变量
     source /etc/profile
    

    5、下载软件包   

    zookeeper-3.4.10.tar.gz

      #解压
      tar -zxvf zookeeper-3.4.10.tar.gz
      mv zookeeper-3.4.10 zookeeper
      cd /opt/zookeeper/config/
      cp zoo_sample.cfg zoo.cfg

    6、编辑zookeeper配置文件

    [root@kafka1 opt]# cat /opt/zookeeper/conf/zoo.cfg | grep -v '^#' | grep -v '^$'
     tickTime=2000
     initLimit=20
     syncLimit=10
     dataDir=/opt/data/zookeeper/data
     datalogDir=/opt/data/zookeeper/logs
     clientPort=2181
     server.1=172.16.81.131:2888:3888	
     server.2=172.16.81.132:2888:3888
    [root@kafka2 opt]# cat /opt/zookeeper/conf/zoo.cfg | grep -v '^#' | grep -v '^$'
     tickTime=2000
     initLimit=20
     syncLimit=10
     dataDir=/opt/data/zookeeper/data
     datalogDir=/opt/data/zookeeper/logs
     clientPort=2181
     server.1=172.16.81.131:2888:3888
     server.2=172.16.81.132:2888:3888

    #注意:在zookeeper配置文件中或者后面不能跟注释文字,不然会报错!
    #说明:
    tickTime: 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
    2888端口:表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
    3888端口:表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口

    7、分别在kafka1和kafka2服务器上创建datadir目录

    mkdir -p /opt/kafka/data
    mkdir -p /opt/kafka/data/zookeeper
    

    8、分别写入id

    [root@kafka1 opt]# echo "1" > /opt/kafka/data/zookeeper/myid 
    [root@kafka2 ~]# echo "2" > /opt/kafka/data/zookeeper/myid
    #注意ID不能一样
    

    9、启动zookeeper集群

    cd /opt/zookeeper/
    bin/zkServer.sh start
    

    10、启动效果

    [rootkafka1 ~]#   netstat -nlpt | grep -E "2181|2888|3888"
    	tcp        0      0 :::2181                     :::*                        LISTEN      33644/java
    	tcp        0      0 ::ffff:10.1.1.247:3888      :::*                        LISTEN      33644/java
    [root@kafka2 ~]#   netstat -nlpt | grep -E "2181|2888|3888"
    	tcp        0      0 :::2181                     :::*                        LISTEN      35016/java
    	tcp        0      0 ::ffff:10.1.1.248:2888      :::*                        LISTEN      35016/java #哪台是leader,那么他就拥有2888端口
    	tcp        0      0 ::ffff:10.1.1.248:3888      :::*                        LISTEN      35016/java	  

    二、kafka集群搭建

     1、配置文件

    [root@kafka1 opt]# cat /opt/kafka/config/server.properties | grep -v '^#'|grep -v '^$'
    	broker.id=1
    	listeners=PLAINTEXT://172.16.81.131:9092
    	num.network.threads=3
    	num.io.threads=8
    	socket.send.buffer.bytes=102400
    	socket.receive.buffer.bytes=102400
    	socket.request.max.bytes=104857600
    	log.dirs=/opt/kafka/data/kafka-logs
    	num.partitions=10
    	num.recovery.threads.per.data.dir=1
    	offsets.topic.replication.factor=1
    	transaction.state.log.replication.factor=1
    	transaction.state.log.min.isr=1
    	log.retention.hours=168
    	log.segment.bytes=1073741824
    	log.retention.check.interval.ms=300000
    	zookeeper.connect=172.16.81.131:2181,172.16.81.132:2181
    	zookeeper.connection.timeout.ms=6000
    	group.initial.rebalance.delay.ms=0
    [root@kafka2 ~]# cat /opt/kafka/config/server.properties | grep -v '^#'|grep -v '^$'
    	broker.id=2
    	listeners=PLAINTEXT://172.16.81.132:9092
    	num.network.threads=3
    	num.io.threads=8
    	socket.send.buffer.bytes=102400
    	socket.receive.buffer.bytes=102400
    	socket.request.max.bytes=104857600
    	log.dirs=/opt/kafka/data/kafka-logs
    	num.partitions=10
    	num.recovery.threads.per.data.dir=1
    	offsets.topic.replication.factor=1
    	transaction.state.log.replication.factor=1
    	transaction.state.log.min.isr=1
    	log.retention.hours=168
    	log.segment.bytes=1073741824
    	log.retention.check.interval.ms=300000
    	zookeeper.connect=172.16.81.131:2181,172.16.81.132:2181
    	zookeeper.connection.timeout.ms=6000
    	group.initial.rebalance.delay.ms=0
    	#注意:broker.id不能相同
    

    2、启动kafka集群

    /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties &
    

    3、启动效果

    [root@kafka1 opt]# netstat -lntp
    	Active Internet connections (only servers)
    	Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name          
    	tcp6       0      0 :::47457                :::*                    LISTEN      6582/java           
    	tcp6       0      0 172.16.81.131:9092      :::*                    LISTEN      9260/java           
    	tcp6       0      0 :::2181                 :::*                    LISTEN      6582/java           
    	tcp6       0      0 :::33230                :::*                    LISTEN      9260/java           
    	tcp6       0      0 172.16.81.131:3888      :::*                    LISTEN      6582/java           
    [root@kafka2 ~]# netstat -lntp
    	Active Internet connections (only servers)
    	Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name          
    	tcp6       0      0 172.16.81.132:9092      :::*                    LISTEN      9395/java           
    	tcp6       0      0 :::42884                :::*                    LISTEN      6779/java           
    	tcp6       0      0 :::2181                 :::*                    LISTEN      6779/java           
    	tcp6       0      0 172.16.81.132:2888      :::*                    LISTEN      6779/java           
    	tcp6       0      0 172.16.81.132:3888      :::*                    LISTEN      6779/java                 
    	tcp6       0      0 :::38557                :::*                    LISTEN      9395/java
    

    4、测试zookeeper和kafka是否正常

    (1)建立一个主题
    [root@kafka2 ~]# /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic summer
    	Created topic "summer".
    	#注意:factor大小不能超过broker数,否则报错,当前集群broker值值为2
    (2)查看有哪些主题已经创建
    [root@kafka2 ~]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 172.16.81.132:2181
    	summer
    [root@kafka1 ~]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 172.16.81.131:2181
    	summer
    (3)查看topic的详情
    [root@kafka2 ~]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic summer
    	Topic:summer	PartitionCount:1	ReplicationFactor:2	Configs:
    	Topic: summer	Partition: 0	Leader: 2	Replicas: 2,1	Isr: 2,1
    	#主题名称:summer
    	#Partition:只有一个,从0开始
    	#leader :id为2的broker
    	#Replicas 副本存在于broker id为2,1的上面
    	#Isr:活跃状态的broker
    (4)发送消息,这里使用的是生产者角色
    [root@kafka2 ~]# /bin/bash /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092  --topic summer
    	>Hello,wangyanlin
    	>I am from china.
    	>
    	>
    	>;
    	>^C[root@kafka2 ~]# 
    (5)接收消息,这里使用的是消费者角色
    [root@kafka2 ~]# /opt/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic summer --from-beginning
    	Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    	Hello,wangyanlin
    	I am from china.
    
    
    	;
    
    	^CProcessed a total of 5 messages
    [root@kafka1 kafka]# /opt/kafka/bin/kafka-console-consumer.sh --zookeeper 172.16.81.132:2181 --topic summer --from-beginning
    	Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    	Hello,wangyanlin
    	I am from china.
    
    
    	;
    	^CProcessed a total of 5 messages
    (6)删除消费主题
    	/opt/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic summer
    	开启conf里面的 delete.topic.enable改成true
    #测试正常!!完成!  

    测试kafka集群能正常接收消费信息和消费信息!! 

    后续将发布配置logstash日志收集和过滤,还有kibana图形化展示。

  • 相关阅读:
    Robot Framework (十)html基础
    Robot Framework (九)Selenium的安装
    Robot Framework (八)循环&分支
    Robot Framework (七)Keyword 关键字
    约瑟夫环问题的两种解法(详解)
    msdn
    java同一个包中,类之间的的调用
    循环语句中break 与 continue的区别
    memset()函数
    DFS(深搜)算法
  • 原文地址:https://www.cnblogs.com/JeremyWYL/p/8183103.html
Copyright © 2011-2022 走看看