zoukankan      html  css  js  c++  java
  • Logstash+ Kafka基于AOP 实时同步日志到es

    Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地,logstash丰富的插件(logstash-input-jdbc,logstash-input-kafka,logstash-input-rabbitmq,logstash-input-flie,logstash-input-syslog等,github地址: https://github.com/logstash-plugins)

    1.logstash-input-kafka将微服务日志同步到 elasticsearch

    1.1 基于AOP+Kafka同步数据原理

    原理其实很简单,就是基于JAVA的AOP技术拦截方法收集请求日志和异常日志发送到Kafka,然后通过logstash订阅相应的topic来消费消息(即发布订阅模式)output到es来实现日志收集

    1.2 日志收集配置文件

    szhuangl_goods_log.conf

    input {
      kafka {
        #kafaka服务地址
        bootstrap_servers => "server.natappfree.cc:33402"
        topics => ["szhuangl_goods_log"]
      }
    }
    output {
        stdout { codec => rubydebug }
        elasticsearch {
           #es服务地址  
           hosts => ["127.0.0.1:9200"]
           index => "szhuangl_goods_log"
        }
    }

    Kafka消息提供者代码 

    package com.szhuangl.basic.elk.kafka;
    
     
    
    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
     
    
    /**
     * @program: szhunagl-shop-parent
     * @author: Brian Huang
     * @create: 2019-10-19 16
     **/
    @Component
    @Slf4j
    public class KafkaSender<T> {
    
        @Value("${szhuangl.log.topic: szhuangl_log}")
        private String log_topic;
    
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
     
    
        /**
         * kafka 发送消息
         *
         * @param obj
         *            消息对象
         */
    
        public void send(T obj) {
            String jsonObj = JSON.toJSONString(obj);
    
    
            // 发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(log_topic, jsonObj);
            future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
    
                @Override
                public void onFailure(Throwable throwable) {
                    log.info("Produce: The message failed to be sent:" + throwable.getMessage());
                }
    
     
                @Override
                public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                    // TODO 业务处理
                    log.info("Produce: The message was sent successfully:");
                    log.info("Produce: >>>>>>>>>>>>>>>>>>> result: " + stringObjectSendResult.toString());
                }
            });
        }
    }

    AOP切面收集日志代码

    package com.szhuangl.basic.elk.aop;
     
    
    import com.alibaba.fastjson.JSONObject;
    import com.szhuangl.basic.elk.kafka.KafkaSender;
    import com.szhuangl.common.web.util.IpUtils;
    import lombok.extern.slf4j.Slf4j;
    import org.aspectj.lang.JoinPoint;
    import org.aspectj.lang.annotation.AfterReturning;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Before;
    import org.aspectj.lang.annotation.Pointcut;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.web.context.request.RequestContextHolder;
    import org.springframework.web.context.request.ServletRequestAttributes;
    import javax.servlet.http.HttpServletRequest;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.Arrays;
    
     
    
    /**
     * @program: kafka日志收集切面类
     * @author: Brian Huang
     * @create: 2019-10-19 16
     **/
    @Aspect
    @Component
    @Slf4j
    public class AopLogAspect {
    
        @Autowired
        private KafkaSender<JSONObject> kafkaSender;
    
    
        // 申明一个切点 里面是 execution表达式
        @Pointcut("execution(* com.szhuangl.impl.*.service.*.*(..))")
        private void serviceAspect() {
        }
    
    
        // 请求method前打印内容
        @Before(value = "serviceAspect()")
        public void methodBefore(JoinPoint joinPoint) {
            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                    .getRequestAttributes();
            HttpServletRequest request = requestAttributes.getRequest();
    
            String ip = IpUtils.getIpAddress(request);
            int localPort = request.getLocalPort();
            log.info("---localPort---:" + localPort);
            int serverPort = request.getServerPort();
            log.info("---serverPort---:" + serverPort);
            int remotePort = request.getRemotePort();
            log.info("---remotePort---:" + remotePort);
            JSONObject jsonObject = new JSONObject();
    
            DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    
            jsonObject.put("request_time", LocalDateTime.now().format(dateTimeFormatter));
            jsonObject.put("request_ip_port", ip);
            jsonObject.put("request_url", request.getRequestURL().toString());
            jsonObject.put("request_method", request.getMethod());
            jsonObject.put("signature", joinPoint.getSignature());
            jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));
            JSONObject requestJsonObject = new JSONObject();
            requestJsonObject.put("szhuangl_request", jsonObject);
            kafkaSender.send(requestJsonObject);
        }
    
        // 在方法执行完结后打印返回内容
        @AfterReturning(returning = "o", pointcut = "serviceAspect()")
        public void methodAfterReturing(Object o) {
            JSONObject respJSONObject = new JSONObject();
            JSONObject jsonObject = new JSONObject();
            DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            jsonObject.put("response_time", LocalDateTime.now().format(dateTimeFormatter));
            jsonObject.put("response_content", JSONObject.toJSONString(o));
            respJSONObject.put("szhuangl_response", jsonObject);
            kafkaSender.send(respJSONObject);
        }
    }

    异常日志收集代码 

    package com.szhuangl.basic.elk.aop.error;
    
     
    
    import java.text.SimpleDateFormat;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.Date;
    import com.szhuangl.basic.elk.kafka.KafkaSender;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.ControllerAdvice;
    import org.springframework.web.bind.annotation.ExceptionHandler;
    import org.springframework.web.bind.annotation.ResponseBody;
    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.web.context.request.RequestContextHolder;
    import org.springframework.web.context.request.ServletRequestAttributes;
    import javax.servlet.http.HttpServletRequest;
    
     
    
    /**
     *
     * @description: 全局捕获异常
     */
    @ControllerAdvice
    @Slf4j
    public class GlobalExceptionHandler {
    
        @Autowired
        private KafkaSender<JSONObject> kafkaSender;
     
    
        @ExceptionHandler(RuntimeException.class)
        @ResponseBody
        public JSONObject exceptionHandler(Exception e) {
            log.info("<<<<<<<<<<<<<<<全局捕获异常>>>>>>>>>>>>>>>>>,error:{}", e);
            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                    .getRequestAttributes();
            HttpServletRequest request = requestAttributes.getRequest();
     
    
            // 1.封装异常日志信息
            JSONObject errorJson = new JSONObject();
            JSONObject logJson = new JSONObject();
            DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            logJson.put("request_time", LocalDateTime.now().format(dateTimeFormatter));
            logJson.put("error_info", e);
            errorJson.put("szhuangl_request_error", logJson);
            kafkaSender.send(errorJson);
            // 2. 返回错误信息
            JSONObject result = new JSONObject();
            result.put("code", 500);
            result.put("msg", "系统错误");
            return result;
        }
    }

    application.yml kafka配置信息

    ###服务端口
    server:
      port: 8700
    ###eurake
    eureka:
      client:
        service-url:
          defaultZone: http://localhost:8100/eureka
    spring:
      application:
        name:  szhuangl-server-goods
      data:
        elasticsearch:
          cluster-name: szhuangl_es
          cluster-nodes: j1ekxg71oe.52http.tech:51267
          repositories:
            enable: true
      kafka:
        #kafka配置信息
        bootstrap-servers: server.natappfree.cc:33402
    
    
    #配置kafka的־topic
    szhuangl:
      log:
        topic: szhuangl_goods_log
  • 相关阅读:
    JavaScript Patterns 5.7 Object Constants
    JavaScript Patterns 5.6 Static Members
    JavaScript Patterns 5.5 Sandbox Pattern
    JavaScript Patterns 5.4 Module Pattern
    JavaScript Patterns 5.3 Private Properties and Methods
    JavaScript Patterns 5.2 Declaring Dependencies
    JavaScript Patterns 5.1 Namespace Pattern
    JavaScript Patterns 4.10 Curry
    【Android】如何快速构建Android Demo
    【Android】如何实现ButterKnife
  • 原文地址:https://www.cnblogs.com/hlkawa/p/11921485.html
Copyright © 2011-2022 走看看