一、准备
1、启动zookeeper
2、启动kafka
3、kafka创建主题。主题名称为:couponTopic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic couponTopic
二、生产者工程
1、增加引用
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
2、增加配置
spring:
kafka:
bootstrap-servers: 47.xx.xx.120:9092
consumer:
group-id:mygroup
listener:
concurrency: 4
3、服务层增加kafka调用
注入kafkaTemplate
@Autowired
public MerchantsServImpl(MerchantsDao merchantsDao,
KafkaTemplate<String, String> kafkaTemplate) {
this.merchantsDao = merchantsDao;
this.kafkaTemplate = kafkaTemplate;
}
通过kafkaTemplate发送消息。Constants.TEMPLATE_TOPIC为couponTopic
String passTemplate = JSON.toJSONString(template);
kafkaTemplate.send(
Constants.TEMPLATE_TOPIC,
Constants.TEMPLATE_TOPIC,
passTemplate
);
三、消费者工程
1、增加Kafka依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency>
2、配置连接kafka
spring:
application:
name: kafkaConsume
kafka:
bootstrap-servers: 4x.xx.xx.xx:9092
consumer:
group-id: = mygroup
listener:
concurrency: 4
server:
port: 9527
3、接收Kafka消息
主题名称:
public static final String TEMPLATE_TOPIC = "couponTopic";
@Slf4j
@Component
public class ConsumeTemplate {
@KafkaListener(topics = {Constants.TEMPLATE_TOPIC})
public void receive(@Payload String passTemplate,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Consumer Receive PassTemplate: {}", passTemplate);
PassTemplate pt;
try {
pt = JSON.parseObject(passTemplate, PassTemplate.class);
} catch (Exception ex) {
log.error("Parse PassTemplate Error: {}", ex.getMessage());
return;
}
...
}
}