最近在做把elk告警日志发送到kinesis 流,供后续数据分析处理使用。。。。。。。。
基于尽量不修改elastalert ,把修改工作放到接收端服务的原则。计划把elk的告警数据通过远程api的接口的形式发送到接收端,然后由接收端处理接收到的数据,并传送保存到kinesis 中。
从网上搜索了下elastalert 相关告警配置,搜到的文章大多以邮件告警为主,从官网扒拉了下资源,简单实现方式如下:
1、elastalert 配置(可以本地测试)
1)启动配置config.yaml
rules_folder: rules run_every: minutes: 1 buffer_time: minutes: 5 es_host: es_endpoint es_port: 9200 es_username: username es_password: password use_ssl: False verify_certs: False writeback_index: elastalert_status alert_time_limit: hours: 2
2)告警规则kinesis.yaml
name: alertfortest type: frequency num_events: 1 timeframe: minutes: 1 index: debug-* filter: - terms: fields.app: ["app1","app2"] - query: query_string: default_field: "message" query: "error NOT INFO" alert: - "email" - "post" http_post_url: "http://localhost:8000/elastalert/" http_post_static_payload: rule_name: alertfortest smtp_host: "smtp.163.com" smtp_port: 25 from_addr: "elastalert@163.com" smtp_auth_file: /tmp/smtp_auth.yaml email: - "youremail@qq.com"
2、数据接收端
def elastalert2kinesis(request): if request.method == 'GET':return HttpResponse(status=400) elif request.method == 'POST': data_dict = { "region":"", "env":"","service":"", "ip":"","endpoint":"", "metric":"", "value":"",
"timestamp":0,
"dataSource":"",
"status":"" } alertbody = json.loads(bytes.decode(request.body)) endpoint_list = alertbody['beat']['hostname'].split('-') data_dict['env'] = endpoint_list[0] data_dict['region'] = endpoint_list[1] data_dict['service'] = "-".join(endpoint_list[2:-1]) data_dict['ip'] = endpoint_list[-1] data_dict['endpoint'] = alertbody['beat']['hostname'] data_dict['dataSource'] = "elk" data_dict['metric'] = alertbody['source'] data_dict['value'] = alertbody['message'] data_dict['timestamp'] = utc_to_local(alertbody['@timestamp'].split('.')[0]+"Z")
Stream().put_to_stream(data_dict['service'],**data_dict) print("data_dict.....................:",data_dict) return HttpResponse(status=200)