序列化使用json string
埋点数据类:
@Id @Column(name = "user_foot_id") private int user_foot_id; @Column(name = "user_id") private int user_id; @Column(name = "target_id") private Long target_id; @Column(name = "section") private String section; @Column(name = "flag") private String flag; @Temporal(TemporalType.TIMESTAMP) @Column(name = "create_date") private Date createDate;
生产者:
import net.sf.json.JSONObject;
public void sendUserFoot(UserFooter userFooter){ JSONObject json = JSONObject.fromObject(userFooter); String strJson = json.toString(); kafkaTemplate.send(footTopic,strJson); }
消费者:
@KafkaListener(topics = {"${kafka.footTopic}"}) public void consumerUserFoot(String message){ try{ JSONObject jsonObject=JSONObject.fromObject(message); UserFooter userFooter=(UserFooter)JSONObject.toBean(jsonObject, UserFooter.class); UserFootDao userFootDao = (UserFootDao) SpringUtil.getBean(UserFootDao.class); userFootDao.save(userFooter); } catch (Exception e) { logger.error(e); }
application.properties:
kafka.footTopic=userfootDev