最近项目上因为遇到数据量过大导致查询统计性能问题(oracle数据库、单表每月1亿多条车辆定位记录,由一个windows环境下的链路服务程序来接收车辆上传的定位数据写入oracle),急需使用大数据架构来解决。前期同事已经提出整体解决思路(修改链路服务程序,在写oracle的基础上同时写入kafka,之后再用etl工具从kafka保存到hive下)并从技术上初步验证了hive的性能。现在进入工程实操环节,我们需要解决的第一个问题就是先实现链路服务程序,作为kafka的producer产生消息写入kafka。
由于这个链路服务程序是c#写的,所以我们需要Confluent.Kafka来实现对kafka的操作。为了验证技术可走通,先做一个测试工程:
通过nuget引入 Confluent.Kafka,当前是1.5.0的版本,需要对应的framework版本是4.5。brokerlist是kafka的对外服务地址,topic是kafka上创建的主题(类比理解的话,kafka的主题就相当于是传统数据库的表,它把一类消息放在一个主题下,主题下可以有多个分区)。
测试代码很简单,两个按钮,一个按钮开始测试,构建消息、发送消息;一个按钮停止发送消息:
说一下我们遇到的坑:
测试程序在kafka所在机器上运行很正常,通过命令行可以正常看到发送过来的测试数据。但是当我们在正式服务器上测试的时候(测试程序与kafka服务不在一起,分别在两台服务器上),却死活发送不成功,报超时错误:Local:Message timed out
刚开始一头雾水,搞防火墙、重启kafka、修改hosts等等各种配置不好使。后来搜到网上有帖子说,需要配置一下advertised.listeners 详见: https://www.cnblogs.com/wangxinblog/p/7623419.html 配置完成后,消息发送成功!