spring-kafka-provider.xml配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:property-placeholder location="classpath*:kafka.properties" /> <!-- 定义producer的参数 --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${bootstrap.servers}" /> <entry key="group.id" value="${group.id}" /> <entry key="retries" value="${retries}" /> <entry key="batch.size" value="${batch.size}" /> <entry key="linger.ms" value="${linger.ms}" /> <entry key="buffer.memory" value="${buffer.memory}" /> <entry key="acks" value="${acks}" /> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> </map> </constructor-arg> </bean> <!-- 创建kafkatemplate需要使用的producerfactory bean --> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="producerProperties" /> </constructor-arg> </bean> <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 --> <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory" /> <constructor-arg name="autoFlush" value="true" /> <property name="defaultTopic" value="jq-test" /> </bean> </beans>
其中${xxxx}是从配置文件kafka.properties引入的。对集群链接的一些属性进行配置。
import java.util.HashMap; import java.util.Map; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.FailureCallback; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.SuccessCallback; import club.codeapes.common.date.DateUtil; import net.sf.json.JSONObject; public class KafkaSendMsgUtils { public static final ClassPathXmlApplicationContext CONTEXT = new ClassPathXmlApplicationContext("/spring-kafka-provider.xml"); @SuppressWarnings("unchecked") public static <K,T>void sendMessage(String topic, Integer partition, Long timestamp, K key, T data) { KafkaTemplate<K, T> kafkaTemplate = (KafkaTemplate<K, T>) CONTEXT.getBean("kafkaTemplate"); ListenableFuture<SendResult<K, T>> listenableFuture = null; if (kafkaTemplate.getDefaultTopic().equals(topic)) { listenableFuture = kafkaTemplate.sendDefault(partition,timestamp,key,data); }else { listenableFuture = kafkaTemplate.send(topic,partition,timestamp,key,data); } //发送成功回调 SuccessCallback<SendResult<K, T>> successCallback = new SuccessCallback<SendResult<K, T>>() { @Override public void onSuccess(SendResult<K, T> result) { System.out.println("成功"); } }; //发送失败回调 FailureCallback failureCallback = new FailureCallback() { @Override public void onFailure(Throwable ex) { throw new RuntimeException(ex); } }; listenableFuture.addCallback(successCallback, failureCallback); } }
其中kafkaTemplate send 方法使用多态重载的,可以有许多不同的参数可以根据自己需要进行调用传参。
返回值是 :ListenableFuture<SendResult<K, T>> listenableFuture
可以通过以下代码处理失败或成功后的情况:
//发送成功回调 SuccessCallback<SendResult<K, T>> successCallback = new SuccessCallback<SendResult<K, T>>() { @Override public void onSuccess(SendResult<K, T> result) { //成功业务逻辑 System.out.println("成功"); } }; //发送失败回调 FailureCallback failureCallback = new FailureCallback() { @Override public void onFailure(Throwable ex) { System.out.println("失败"); //失败业务逻辑 throw new RuntimeException(ex); } }; listenableFuture.addCallback(successCallback, failureCallback);