一、硬件环境
假设有4台机,IP及主机名如下:
192.168.100.105 c1 192.168.100.110 c2 192.168.100.115 c3 192.168.100.120 c4
二、软件环境
1.安装JDK
https://www.cnblogs.com/live41/p/14235891.html
2.安装ZooKeeper
https://www.cnblogs.com/live41/p/15522363.html
三、搭建分布式Kafka
1.下载安装包
http://kafka.apache.org/downloads
这里下载的是kafka_2.12-3.0.0.tgz。
* 以下步骤在每台机都要执行
2.上传安装包到服务器
假设安装在home目录
cd /home
rz
3.解压
tar -xvf kafka_2.12-3.0.0.tgz mv kafka_2.12-3.0.0 kafka
4.配置系统环境变量
vim ~/.bashrc
添加以下内容:
export PATH=$PATH:/home/kafka/bin
保存退出后,更新环境变量:
source ~/.bashrc
5.编辑Kafka配置文件
cd /home/kafka/config
vim server.properties
添加以下内容:
vim server.properties broker.id=0 listeners=PLAINTEXT://0.0.0.0:9092 zookeeper.connect=c1:2181,c2:2181,c3:2181,c4:2181
* 其中0.0.0.0是同时监听localhost(127.0.0.1)和内网IP(例如c1或192.168.100.105),也可以改为localhost或c1或192.168.100.105。
6.启动
zkServer.sh start kafka-server-start.sh -daemon home/kafka/config/server.properties
7.检查
jps
会看到jps、QuorumPeerMain、Kafka
8.Kafka命令测试
#创建topic kafka-topics.sh --create --bootstrap-server c1:9092 --topic topic1 --partitions 8 --replication-factor 2 #列出所有topic kafka-topics.sh --list --bootstrap-server c1:9092 #列出所有topic的信息 kafka-topics.sh --bootstrap-server c1:9092 --describe #列出指定topic的信息 kafka-topics.sh --bootstrap-server c1:9092 --describe --topic topic1 #生产者(消息发送程序) kafka-console-producer.sh --broker-list c1:9092 --topic topic1 #消费者(消息接收程序) kafka-console-consumer.sh --bootstrap-server c1:9092 --topic topic1
其中,topic1是topic名,可自定义。
* 由于Apache开发团队的版本升级原因,不同版本的命令会有所区别。
https://www.cnblogs.com/live41/p/15522207.html
9.Java代码测试
(1) 配置maven
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency>
(2) 调用代码
public class KafkaHandler { public static void main(String[] args) { try { // 先监听,再发送消息 consume(); produce(); } catch (Exception e) { System.out.println(e); } } private static void produce() throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "c1:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props); try { kafkaProducer.send(new ProducerRecord<String, String>("topic1", "这是测试文本")); } finally { kafkaProducer.close(); } } private static void consume() throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "c1:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); KafkaConsumer consumer = new KafkaConsumer<>(props); try { consumer.subscribe(Arrays.asList("topic1")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } } }
10.停止
kafka-server-stop.sh
附录
1.acks参数
acks = 1 只保证leader保存成功,如果刚好leader挂了,数据丢失 acks = 0 使用异步模式,该模式下kafka无法保证消息,可能会丢失 acks = all 所有副本都写入成功并确认
2.数据丢失问题的相关参数
acks = all 所有副本都写入成功并确认 retries = n 重试次数,设置为3或以上 min.insync.replicas = 2 消息至少要被写入到2个副本才算成功 unclean.leader.election.enable = false 关闭ubclean leader选举,不允许非ISR中的副本被选举为leader,防止数据不一致的情况
unclean.leader.election.enable参数的资料:
https://honeypps.com/mq/kafka-params-analysis-of-unclean-leader-election-enable/