使用docker部署
1 下载
# wget https://github.com/doujiang24/lua-resty-kafka/archive/v0.06.tar.gz
# tar xvf v0.06.tar.gz
2 准备配置文件testkafka.conf
# vi testkafka.conf
lua_package_path "/usr/local/openresty/lualib/resty/kafka/?.lua;;"; lua_need_request_body on; server { listen 80; server_name testkafka; location /test { content_by_lua ' local testfile = "/tmp/test.log" local cjson = require "cjson" local client = require "resty.kafka.client" local producer = require "resty.kafka.producer" local broker_list = { { host = "127.0.0.1", port = 9092 } } local topic = "test" local key = "key" local message = "halo world" -- usually we do not use this library directly local cli = client:new(broker_list) local brokers, partitions = cli:fetch_metadata(topic) if not brokers then ngx.say("fetch_metadata failed, err:", partitions) end --ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions)) -- sync producer_type local p = producer:new(broker_list) local f = io.open(testfile, "a+") f:write(topic .. ":" .. key .. ":" .. message .. "\n") f:close() local offset, err = p:send(topic, key, message) if not offset then ngx.say("send err:", err) return end ngx.say("send success, offset: ", tonumber(offset)) -- this is async producer_type and bp will be reused in the whole nginx worker local bp = producer:new(broker_list, { producer_type = "async" }) local ok, err = bp:send(topic, key, message) if not ok then ngx.say("send err:", err) return end ngx.say("host : ", ngx.var.host) ngx.say("uri : ", ngx.var.uri) ngx.say("args : ", ngx.var.args) ngx.say("body : ", ngx.req.get_body_data()) ngx.say("client ip : ", ngx.var.remote_addr) ngx.say("time : ", ngx.var.time_local) ngx.say("send success, ok:", ok) '; } }
功能:发送kafka、写日志到/tmp/test.log,打印请求信息
修改其中broker的ip和端口,以及topic名;
3 启动docker
$ docker -d -p 80:80 -v /path/to/testkafka.conf:/etc/nginx/conf.d/testkafka.conf -v /path/to/lua-resty-kafka-0.06/lib/resty/kafka:/usr/local/openresty/lualib/resty/kafka openresty/openresty
挂载testkafka.conf以及kafka lib目录
4 测试
# curl 'http://testkafka/test?a=1&b=2' -d 'hello' -x 127.0.0.1:82
send success, offset: 13
host : testkafka
uri : /test
args : a=1&b=2
body : hello
client ip : 172.17.0.1
time : 08/Mar/2019:14:26:20 +0000
send success, ok:true
5 更多
1)可以将nginx访问日志发送到kafka
2)可以将请求数据作为消息发送到kafka(从uri中的path解析出topic)
6 报错
有可能报错:no resolver defined to resolve
这是因为kafka broker配置的是hostname,而不是ip,而nginx遇到hostname必须通过dns解析,而不能依靠/etc/hosts来解析,所以会报以上错误,这时有两种解决方法:
1)安装dnsmasq;
2)修改kafka配置中的advertised.host.name,将其修改为ip即可;
参考:https://github.com/doujiang24/lua-resty-kafka