zoukankan      html  css  js  c++  java
  • Logstash+ Kafka基于AOP 实时同步日志到es

    Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地,logstash丰富的插件(logstash-input-jdbc,logstash-input-kafka,logstash-input-rabbitmq,logstash-input-flie,logstash-input-syslog等,github地址: https://github.com/logstash-plugins)

    1.logstash-input-kafka将微服务日志同步到 elasticsearch

    1.1 基于AOP+Kafka同步数据原理

    原理其实很简单,就是基于JAVA的AOP技术拦截方法收集请求日志和异常日志发送到Kafka,然后通过logstash订阅相应的topic来消费消息(即发布订阅模式)output到es来实现日志收集

    1.2 日志收集配置文件

    szhuangl_goods_log.conf

    input {
      kafka {
        #kafaka服务地址
        bootstrap_servers => "server.natappfree.cc:33402"
        topics => ["szhuangl_goods_log"]
      }
    }
    output {
        stdout { codec => rubydebug }
        elasticsearch {
           #es服务地址  
           hosts => ["127.0.0.1:9200"]
           index => "szhuangl_goods_log"
        }
    }

    Kafka消息提供者代码 

    package com.szhuangl.basic.elk.kafka;
    
     
    
    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
     
    
    /**
     * @program: szhunagl-shop-parent
     * @author: Brian Huang
     * @create: 2019-10-19 16
     **/
    @Component
    @Slf4j
    public class KafkaSender<T> {
    
        @Value("${szhuangl.log.topic: szhuangl_log}")
        private String log_topic;
    
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
     
    
        /**
         * kafka 发送消息
         *
         * @param obj
         *            消息对象
         */
    
        public void send(T obj) {
            String jsonObj = JSON.toJSONString(obj);
    
    
            // 发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(log_topic, jsonObj);
            future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
    
                @Override
                public void onFailure(Throwable throwable) {
                    log.info("Produce: The message failed to be sent:" + throwable.getMessage());
                }
    
     
                @Override
                public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                    // TODO 业务处理
                    log.info("Produce: The message was sent successfully:");
                    log.info("Produce: >>>>>>>>>>>>>>>>>>> result: " + stringObjectSendResult.toString());
                }
            });
        }
    }

    AOP切面收集日志代码

    package com.szhuangl.basic.elk.aop;
     
    
    import com.alibaba.fastjson.JSONObject;
    import com.szhuangl.basic.elk.kafka.KafkaSender;
    import com.szhuangl.common.web.util.IpUtils;
    import lombok.extern.slf4j.Slf4j;
    import org.aspectj.lang.JoinPoint;
    import org.aspectj.lang.annotation.AfterReturning;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Before;
    import org.aspectj.lang.annotation.Pointcut;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.web.context.request.RequestContextHolder;
    import org.springframework.web.context.request.ServletRequestAttributes;
    import javax.servlet.http.HttpServletRequest;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.Arrays;
    
     
    
    /**
     * @program: kafka日志收集切面类
     * @author: Brian Huang
     * @create: 2019-10-19 16
     **/
    @Aspect
    @Component
    @Slf4j
    public class AopLogAspect {
    
        @Autowired
        private KafkaSender<JSONObject> kafkaSender;
    
    
        // 申明一个切点 里面是 execution表达式
        @Pointcut("execution(* com.szhuangl.impl.*.service.*.*(..))")
        private void serviceAspect() {
        }
    
    
        // 请求method前打印内容
        @Before(value = "serviceAspect()")
        public void methodBefore(JoinPoint joinPoint) {
            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                    .getRequestAttributes();
            HttpServletRequest request = requestAttributes.getRequest();
    
            String ip = IpUtils.getIpAddress(request);
            int localPort = request.getLocalPort();
            log.info("---localPort---:" + localPort);
            int serverPort = request.getServerPort();
            log.info("---serverPort---:" + serverPort);
            int remotePort = request.getRemotePort();
            log.info("---remotePort---:" + remotePort);
            JSONObject jsonObject = new JSONObject();
    
            DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    
            jsonObject.put("request_time", LocalDateTime.now().format(dateTimeFormatter));
            jsonObject.put("request_ip_port", ip);
            jsonObject.put("request_url", request.getRequestURL().toString());
            jsonObject.put("request_method", request.getMethod());
            jsonObject.put("signature", joinPoint.getSignature());
            jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));
            JSONObject requestJsonObject = new JSONObject();
            requestJsonObject.put("szhuangl_request", jsonObject);
            kafkaSender.send(requestJsonObject);
        }
    
        // 在方法执行完结后打印返回内容
        @AfterReturning(returning = "o", pointcut = "serviceAspect()")
        public void methodAfterReturing(Object o) {
            JSONObject respJSONObject = new JSONObject();
            JSONObject jsonObject = new JSONObject();
            DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            jsonObject.put("response_time", LocalDateTime.now().format(dateTimeFormatter));
            jsonObject.put("response_content", JSONObject.toJSONString(o));
            respJSONObject.put("szhuangl_response", jsonObject);
            kafkaSender.send(respJSONObject);
        }
    }

    异常日志收集代码 

    package com.szhuangl.basic.elk.aop.error;
    
     
    
    import java.text.SimpleDateFormat;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.Date;
    import com.szhuangl.basic.elk.kafka.KafkaSender;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.ControllerAdvice;
    import org.springframework.web.bind.annotation.ExceptionHandler;
    import org.springframework.web.bind.annotation.ResponseBody;
    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.web.context.request.RequestContextHolder;
    import org.springframework.web.context.request.ServletRequestAttributes;
    import javax.servlet.http.HttpServletRequest;
    
     
    
    /**
     *
     * @description: 全局捕获异常
     */
    @ControllerAdvice
    @Slf4j
    public class GlobalExceptionHandler {
    
        @Autowired
        private KafkaSender<JSONObject> kafkaSender;
     
    
        @ExceptionHandler(RuntimeException.class)
        @ResponseBody
        public JSONObject exceptionHandler(Exception e) {
            log.info("<<<<<<<<<<<<<<<全局捕获异常>>>>>>>>>>>>>>>>>,error:{}", e);
            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                    .getRequestAttributes();
            HttpServletRequest request = requestAttributes.getRequest();
     
    
            // 1.封装异常日志信息
            JSONObject errorJson = new JSONObject();
            JSONObject logJson = new JSONObject();
            DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            logJson.put("request_time", LocalDateTime.now().format(dateTimeFormatter));
            logJson.put("error_info", e);
            errorJson.put("szhuangl_request_error", logJson);
            kafkaSender.send(errorJson);
            // 2. 返回错误信息
            JSONObject result = new JSONObject();
            result.put("code", 500);
            result.put("msg", "系统错误");
            return result;
        }
    }

    application.yml kafka配置信息

    ###服务端口
    server:
      port: 8700
    ###eurake
    eureka:
      client:
        service-url:
          defaultZone: http://localhost:8100/eureka
    spring:
      application:
        name:  szhuangl-server-goods
      data:
        elasticsearch:
          cluster-name: szhuangl_es
          cluster-nodes: j1ekxg71oe.52http.tech:51267
          repositories:
            enable: true
      kafka:
        #kafka配置信息
        bootstrap-servers: server.natappfree.cc:33402
    
    
    #配置kafka的־topic
    szhuangl:
      log:
        topic: szhuangl_goods_log
  • 相关阅读:
    php生成xml文件头
    常用正则表达式
    数值数组排序
    《那些年啊,那些事——一个程序员的奋斗史》——97
    《那些年啊,那些事——一个程序员的奋斗史》——98
    《那些年啊,那些事——一个程序员的奋斗史》——99
    《那些年啊,那些事——一个程序员的奋斗史》——100
    《那些年啊,那些事——一个程序员的奋斗史》——100
    《那些年啊,那些事——一个程序员的奋斗史》——99
    《那些年啊,那些事——一个程序员的奋斗史》——98
  • 原文地址:https://www.cnblogs.com/hlkawa/p/11921485.html
Copyright © 2011-2022 走看看