- 安装JDK
- 下载jdk-8u202-ea-bin-b03-linux-x64-07_nov_2018.tar.gz
- 解压
- 配置
- $ vi /etc/profile,在最后加入下面两行
export JAVA_HOME=/usr/local/bigdata/jdk1.8.0_202
export PATH=$JAVA_HOME/bin:$PATH - 重新登录执行 java,验证JDK配置成功
- 安装Kafka
- 下载kafka_2.11-1.0.2.tgz,这里主要1.0.2这个Kafka Server的 版本需要和客户端Spring-Kafka的版本对应,具体对应关系请看https://spring.io/projects/spring-kafka
- 解压 tar -xzvf kafka_2.11-1.0.2.tgz
- 配置 vi config/server.properties,host.name 非常非常关键,否则你讲无法远程连接到Kafka
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
host.name=192.168.198.128 - 启动 Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
- 启动 Kafka
bin/kafka-server-start.sh config/server.properties
- 创建一个Topic并查看是否创建成功
bin/kafka-topics.sh --create --zookeeper 192.168.198.128:2181 --replication-factor 1 --partitions 1 --topic
test
- 查看Topic是否创建成功
bin/kafka-topics.sh --list --zookeeper
192.168.198.128
:2181- 本地Producer及Consumer演示
-
创建一个本地消息消费者
bin/kafka-console-consumer.sh --bootstrap-server
:9092 --topic192.168.198.128
test
--from-beginning
- 创建一个本地消息生产者
bin/kafka-console-producer.sh --broker-list
:9092 --topic192.168.198.128
test
- 在生产者端输入消息,消费者端将会打印此消息
- 远程Consumer演示(Windows 平台)
- 下载kafka_2.11-1.0.2.tgz解压并进入bin/windows
- 创建一个远程消费者
kafka-console-consumer.bat --bootstrap-server
:9092 --topic192.168.198.128
test
--from-beginning
- 在Linux平台生成者端输入消息,检查windows端远程消费者是否打印此消息
- Spring Boot集成配置
在Maven中添加依赖
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.8.RELEASE</version>
</dependency>
关于依赖版本及Kafka Server的版本的对应关系,查看https://spring.io/projects/spring-kafka
- KafkaProducer.java
@Component public class KafkaProducer { Logger logger = LoggerFactory.getLogger(KafkaProducer.class); @Autowired private KafkaTemplate kafkaTemplate; @Scheduled(fixedRateString = "1000") //per second public void send(){ logger.info("Start to send message to Kakfa with topic name 'test'"); String message = UUID.randomUUID().toString(); ListenableFuture future = kafkaTemplate.send("test", message); future.addCallback(o -> System.out.println("message sent successfully: " + message), throwable -> System.out.println("message sent failed: " + message)); } }
- KafkaConsumer.java
@Component public class KafkaConsumer { @KafkaListener(topics = {"test"}) public void receive(String message){ System.out.println("test-message:" + message); } }