上一篇,我们搭建了 kafka 单节点 回顾,现在我们要搭建集群。
在开始之前,先把单节点那套用
docker-compose -f docker-compose-single-broker.yml down
清理掉
1.定义 docker-compose.yml
KAFKA_ADVERTISED_HOST_NAME 不建议使用了,因为它对应 server.properties 中的 advertised.host.name,而这个属性已经是 DEPRECATED
参考自 http://kafka.apache.org/0100/documentation.html#brokerconfigs
作为替代可以使用 KAFKA_ADVERTISED_LISTENERS,该环境变量对应 server.properties 中的 advertised.listeners.
相信你们和我有一样的疑惑, 戳-> kafka listeners 和 advertised.listeners 的区别及应用
我的宿主机的IP地址是 10.24.99.195,我的 docker-compose.yml
文件内容如下:
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
restart: always
kafka1:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
container_name: kafka1
ports:
- "9091:9091"
environment:
HOSTNAME: kafka1
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9091
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9091
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
extra_hosts:
kafka1: 10.24.99.195
kafka2:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
container_name: kafka2
ports:
- "9092:9092"
environment:
HOSTNAME: kafka2
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
extra_hosts:
kafka2: 10.24.99.195
kafka3:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
container_name: kafka3
ports:
- "9093:9093"
environment:
HOSTNAME: kafka3
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
extra_hosts:
kafka3: 10.24.99.195
环境变量 KAFKA_LISTENERS 的 INSIDE 和 OUTSIDE 的端口必须不同。
接着,cd进入docker-compose.yml所在的工作目录,运行命令 docker-compose up -d
,此时默认就是使用该文件。
- container_name 属性:使用
docker ps
命令看到 NAMES 一列,将以你的命名相同。 - HOSTNAME 和 extra_hosts 的组合使用:在容器的 /etc/hosts 中增加一条记录
例如,docker exec -it kafka1 cat /etc/hosts
命令查看容器 kafka1 的 /etc/hosts
文件,多出一条映射:
10.24.99.195 kafka1
2.容器内验证
进入容器的方法,就不啰嗦了,不了解的可以百度。(docker ps
和 docker exec -it CONTAINER_ID bash
)
1、创建一个主题 mytopic
:
kafka-topics.sh --create --topic mytopic --partitions 2 --zookeeper kafka_zookeeper_1:2181 --replication-factor 2
2、打开一个窗口,进入容器作生产者:
kafka-console-producer.sh --topic=mytopic --broker-list kafka1:9091,kafka2:9092,kafka3:9093
3、再打开一个窗口,进入容器作消费者:
kafka-console-consumer.sh --bootstrap-server kafka1:9091,kafka2:9092,kafka3:9093 --from-beginning --topic mytopic
在kafka集群内部,我们使用的集群字符串都是 kafka1:9091,kafka2:9092,kafka3:9093
3. SpringBoot应用
在网站 start.springboot.io 初始化项目:
- Dependencies:选择 Spring Web,以及 Spring For Apache Kafka
- 分别生成一个 kafka-producer 项目和一个 kafka-consumer 项目
3.1 消费端:
ReceiverService.java:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.Optional;
@Service
public class ReceiverService {
@KafkaListener(topics = {"news"}, groupId = "agent")
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
System.out.println("receiver record = " + record);
System.out.println("receiver message = " + message.get());
}
}
}
application.properties:
spring.kafka.bootstrap-servers=kafka2:9090,kafka1:9091,kafka3:9093
3.2 生产端:
SendController.java
import com.alibaba.fastjson.JSON;
import com.example.kafka.producer.dto.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.Date;
@Controller
@RequestMapping
public class SendController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
@ResponseBody
public String send(@RequestParam String msg) {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMessage(msg);
message.setSendAt(new Date());
kafkaTemplate.send("news", JSON.toJSONString(message));
return JSON.toJSONString(message);
}
}
application.properties:
spring.kafka.bootstrap-servers=kafka2:9090,kafka1:9091,kafka3:9093
另外,pom.xml 需要多一个 fastjson 的依赖:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
3.3 测试中的问题
先启动生产端,然后访问 http://localhost:8080/send?msg=hello
,你会遇到这个异常:
org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
在你的宿主机上修改 hosts 文件,在末尾追加
10.24.99.195 kafka1
10.24.99.195 kafka2
10.24.99.195 kafka3
10.24.99.195 是我当前的主机IP,你需要改成你的主机IP
参考文档
-
Kafka Document -- Broker Config官方文档
-
docker快速搭建kafka集群 阅读
-
docker启动容器出现问题 进行日志查看 阅读
docker logs 命令可以帮助寻找容器Exited的异常原因 -
如果你不会用 kafka-docker,看这里 阅读
这篇文章结尾的答疑,画的kafka网络拓扑还不错。 -
kafka listeners 和 advertised.listeners 的区别及应用 阅读
-
用 Docker 快速搭建 Kafka 集群 阅读
这篇中,也和我的想法一样,提到了使用 SpringBoot 应用 -
解决Docker容器连接 Kafka 连接失败问题 阅读