zoukankan      html  css  js  c++  java
  • nginx+lua+kafka 编写 在线日志上报系统

    案例一

    rewrite_by_lua '
    --引入openresty自带的json处理对象
    local cjson = require("cjson")
    local producer = require "resty.kafka.producer"
    -- 定义kafka broker地址,ip需要和kafka的host.name配置一致
    local broker_list = {
    { host = "192.168.115.28", port = 9092 },
    { host = "192.168.115.29", port = 9092 },
    { host = "192.168.115.30", port = 9092 }
    }
    -- 定义json便于日志数据整理收集
    local log_json = {}

    local request_method = ngx.req.get_method
    if "GET" == request_method then
    -- 普通get请求
    ngx.log(ngx.ERR,"不支持GET请求")
    log_json["start_time"]=ngx.req.start_time()
    log_json["header"]=ngx.req.raw_header()
    log_json["uri"]=ngx.req.get_uri_args()
    log_json["headers"]=ngx.req.get_headers()
    log_json["body"]=ngx.req.read_body()
    log_json["body_data"]=ngx.req.get_body_data()
    elseif "POST" == request_method then
    ngx.req.read_body()
    local body_data = ngx.req.get_body_data() --body_data可是符合http协议的请求体,不是普通的字符串
    --请求体的size大于nginx配置里的client_body_buffer_size,则会导致请求体被缓冲到磁盘临时文件里,client_body_buffer_size默认是8k或者16k
    if not body_data then
    ngx.log(ngx.WARN,"no request body found" )
    end
    log_json["start_time"]=ngx.req.start_time()
    log_json["header"]=ngx.req.raw_header()
    log_json["uri"]=ngx.req.get_uri_args()
    log_json["post"]=ngx.req.get_post_args()
    log_json["headers"]=ngx.req.get_headers()
    log_json["body"]=ngx.req.read_body()
    log_json["body_data"]=ngx.req.get_body_data()
    end
    ngx.log(ngx.INFO,"log_json",cjson.encode(log_json))
    -- 转换json为字符串
    local message = cjson.encode(log_json);
    -- 定义kafka异步生产者
    local bp = producer:new(broker_list, { producer_type = "async" })
    -- 发送日志消息,send第二个参数key,用于kafka路由控制:
    -- key为nill(空)时,一段时间向同一partition写入数据
    -- 指定key,按照key的hash写入到对应的partition
    local ok, err = bp:send("postman1", nil, message)

    if not ok then
    ngx.log(ngx.ERR, "kafka send err:", err)
    return
    end

    ';

    案例二

    server {
    listen 80;
    server_name localhost;
    location /favicon.ico {
    root html;
    index index.html index.htm;
    }
    location / {
    proxy_connect_timeout 8;
    proxy_send_timeout 8;
    proxy_read_timeout 8;
    proxy_buffer_size 4k;
    proxy_buffers 512 8k;
    proxy_busy_buffers_size 8k;
    proxy_temp_file_write_size 64k;
    proxy_next_upstream http_500 http_502 http_503 http_504 error timeout invalid_header;
    root html;
    index index.html index.htm;
    proxy_pass http://rc;
    proxy_http_version 1.1;
    proxy_set_header Connection "";
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    # 使用log_by_lua 包含lua代码,因为log_by_lua指令运行在请求最后且不影响proxy_pass机制
    rewrite_by_lua '
    -- 引入lua所有api
    local cjson = require "cjson"
    local producer = require "resty.kafka.producer"
    -- 定义kafka broker地址,ip需要和kafka的host.name配置一致
    local broker_list = {
    { host = "10.10.78.52", port = 9092 },
    }
    -- 定义json便于日志数据整理收集
    local log_json = {}
    log_json["uri"]=ngx.var.uri
    log_json["args"]=ngx.var.args
    log_json["host"]=ngx.var.host
    log_json["request_body"]=ngx.var.request_body
    log_json["remote_addr"] = ngx.var.remote_addr
    log_json["remote_user"] = ngx.var.remote_user
    log_json["time_local"] = ngx.var.time_local
    log_json["status"] = ngx.var.status
    log_json["body_bytes_sent"] = ngx.var.body_bytes_sent
    log_json["http_referer"] = ngx.var.http_referer
    log_json["http_user_agent"] = ngx.var.http_user_agent
    log_json["http_x_forwarded_for"] = ngx.var.http_x_forwarded_for
    log_json["upstream_response_time"] = ngx.var.upstream_response_time
    log_json["request_time"] = ngx.var.request_time
    -- 转换json为字符串
    local message = cjson.encode(log_json);
    -- 定义kafka异步生产者
    local bp = producer:new(broker_list, { producer_type = "async" })
    -- 发送日志消息,send第二个参数key,用于kafka路由控制:
    -- key为nill(空)时,一段时间向同一partition写入数据
    -- 指定key,按照key的hash写入到对应的partition
    local ok, err = bp:send("test1", nil, message)

    if not ok then
    ngx.log(ngx.ERR, "kafka send err:", err)
    return
    end
    ';
    }
    error_page 500 502 503 504 /50x.html;
    location = /50x.html {
    root html;
    }
    }

    参考:https://blog.csdn.net/u011239989/article/details/52239785

    #user nobody;
    worker_processes 1;

    #error_log logs/error.log;
    #error_log logs/error.log notice;
    error_log logs/error.log info;

    pid logs/nginx.pid;


    events {
    use epoll;
    worker_connections 1024;
    }


    http {
    include mime.types;
    default_type application/octet-stream;

    log_format main '$remote_addr - $remote_user [$time_local] "$request" '
    '$status $body_bytes_sent "$http_referer" '
    '"$http_user_agent" "$http_x_forwarded_for"';

    access_log logs/access.log main;

    sendfile on;
    tcp_nopush on;

    #keepalive_timeout 0;
    keepalive_timeout 65;

    #gzip on;
    lua_package_path "lua-resty-kafka/lib/?.lua;;";
    client_body_buffer_size 128m;
    server {
    listen 80;
    server_name localhost;
    location / {
    root html;
    index index.html index.htm;
    }
    location ^~ /api/getuserinfo {
    rewrite_by_lua '
    -- 引入lua所有api
    local cjson = require("cjson")
    local producer = require "resty.kafka.producer"
    -- 定义kafka broker地址,ip需要和kafka的host.name配置一致
    local broker_list = {
    { host = "192.168.115.28", port = 9092 },
    { host = "192.168.115.29", port = 9092 },
    { host = "192.168.115.30", port = 9092 }
    }
    -- 定义json便于日志数据整理收集
    local log_json = {}
    ngx.req.read_body()
    log_json["start_time"]=ngx.req.start_time()
    log_json["header"]=ngx.req.raw_header()
    log_json["uri"]=ngx.req.get_uri_args()
    log_json["post"]=ngx.req.get_post_args()
    log_json["headers"]=ngx.req.get_headers()
    log_json["body_data"]=ngx.req.get_body_data()
    log_json["uri"]=ngx.var.uri
    log_json["args"]=ngx.var.args
    log_json["host"]=ngx.var.host
    log_json["request_body"]=ngx.var.request_body
    log_json["remote_addr"] = ngx.var.remote_addr
    log_json["remote_user"] = ngx.var.remote_user
    log_json["time_local"] = ngx.var.time_local
    log_json["status"] = ngx.var.status
    log_json["body_bytes_sent"] = ngx.var.body_bytes_sent
    log_json["http_referer"] = ngx.var.http_referer
    log_json["http_user_agent"] = ngx.var.http_user_agent
    log_json["http_x_forwarded_for"] = ngx.var.http_x_forwarded_for
    log_json["upstream_response_time"] = ngx.var.upstream_response_time
    log_json["request_time"] = ngx.var.request_time
    -- 转换json为字符串
    local message = cjson.encode(log_json);
    -- 定义kafka异步生产者
    local bp = producer:new(broker_list, { producer_type = "async" })
    -- 发送日志消息,send第二个参数key,用于kafka路由控制:
    -- key为nill(空)时,一段时间向同一partition写入数据
    -- 指定key,按照key的hash写入到对应的partition
    local ok, err = bp:send("postman1", nil, message)

    if not ok then
    ngx.log(ngx.ERR, "kafka send err:", err)
    return
    end
    ';
    }
    error_page 500 502 503 504 /50x.html;
    location = /50x.html {
    root html;
    }
    }
    }

  • 相关阅读:
    Linux学习(2)
    Linux学习(1)
    Sklearn_决策树(1)
    graphviz 包的安装问题
    函数的一些使用方法
    在装Scrapy库时出现了错误 ,如下图:
    跨域问题及解决
    pip永久换源及基本设置(新手必看)
    drf☞jwt自动签发与手动签发
    drf频率源码、自动生成接口文档、JWT
  • 原文地址:https://www.cnblogs.com/net2817/p/10601824.html
Copyright © 2011-2022 走看看