环境:腾讯云centos7
1、下载
http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz
2、解压
tar -xvf kafka_2.11-2.3.0.tgz mv kafka_2.11-2.3.0 /usr/java/kafka2.11 cd /usr/java/kafka2.11
3、启动与测试
(a)zookeeper启动 bin/zookeeper-server-start.sh config/zookeeper.properties (b)kafka服务端启动 bin/kafka-server-start.sh config/server.properties (c)列出topic bin/kafka-topics.sh --zookeeper localhost:2181 --list (d)创建topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Demo1 (e)描述Topic bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Demo1 (f)发布消息到指定的Topic bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Demo1 (g)消费指定Topic上的消息 (已过时,老版本使用,否则报zookeeper is not a recognized option) bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic Demo1 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Demo1 --from-beginning
4、安装kafka web界面
a)下载地址:https://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.1/KafkaOffsetMonitor-assembly-0.2.1.jar
b) 运行
mkdir /mydata/kafkamonitorlogs java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 132.232.44.82:2181 --port 8787 --refresh 10.seconds --retain 7.days 1>/mydata/kafkamonitorlogs/stdout.log 2>/mydata/kafkamonitorlogs/stderr.log &
c) web访问
http://ip:8787
本人虚拟机内存太小了,所以无法查看到消息列表,但是web界面确实可以用!
完毕!
########springboot集成实践###########
1、pom.xml添加依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
2、yml文件添加配置
spring: profiles: active: @activatedProperties@ kafka: bootstrap-servers: 132.232.44.82:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: test enable-auto-commit: true auto-commit-interval: 1000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3、在Kafka的config/server.properties文件中添加
advertised.listeners=PLAINTEXT://132.232.44.89:9092
4、KafkaConsumer.java
package com.cn.commodity.controller; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * kafka消费者测试 */ @Component public class KafkaConsumer { @KafkaListener(topics = "test_topic1") public void listen (ConsumerRecord<?, ?> record) throws Exception { System.out.printf("topic = %s, offset = %d, value = %s ", record.topic(), record.offset(), record.value()); } }
5、KafkaProducer.java
package com.cn.commodity.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 测试kafka生产者 */ @RestController @RequestMapping("kafka") public class KafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping("send") public String send(String msg){ kafkaTemplate.send("test_topic1", msg); return "success"; } }
启动运行,完毕!