zoukankan      html  css  js  c++  java
  • logback KafkaAppender 写入Kafka队列,集中日志输出.

    为了减少应用服务器对磁盘的读写,以及可以集中日志在一台机器上,方便使用ELK收集日志信息,所以考虑做一个jar包,让应用集中输出日志

    Redis 自定义 RedisAppender 插件, 实现日志缓冲队列,集中日志输出.

     网上搜了一圈,只发现有人写了个程序在github

    地址:https://github.com/johnmpage/logback-kafka

    Redis 自定义 RedisAppender 插件, 实现日志缓冲队列,集中日志输出.

    本来打算引用一下这个jar就完事了,没想到在pom里下不下来,只好把源码下了,拷贝了代码过来,自己修改一下.

    首先,安装一个Kafka,作为一个懒得出神入化得程序员,我选择的安装方式是

    启动zookeeper容器

    docker run -d --name zookeeper --net=host  -t wurstmeister/zookeeper

    启动kafka容器

    docker run --name kafka -d -e HOST_IP=192.168.1.7 --net=host -v /usr/local/docker/kafka/conf/server.properties:/opt/kafka_2.12-1.0.0/config/server.properties  -v /etc/localtime:/etc/localtime:ro -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -t wurstmeister/kafka

    要修改Kafka的server.properties 中zookeeper配置

    配置文件如下

    listeners=PLAINTEXT://192.168.1.7:9092
    delete.topic.enable=true
    advertised.listeners=PLAINTEXT://192.168.1.7:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    log.dirs=/kafka/kafka-logs-92cfb0bbd88c
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.retention.bytes=10737418240
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.1.7:2181
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    version=1.0.0

    启动好了,新建SpringBoot项目,首先消费队列的

    pom文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.lzw</groupId>
        <artifactId>kafkalog</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>kafkalog</name>
        <description>Demo project for Spring Boot</description>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.0.M6</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
        <repositories>
            <repository>
                <id>spring-snapshots</id>
                <name>Spring Snapshots</name>
                <url>https://repo.spring.io/snapshot</url>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
            </repository>
            <repository>
                <id>spring-milestones</id>
                <name>Spring Milestones</name>
                <url>https://repo.spring.io/milestone</url>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </repository>
        </repositories>
        <pluginRepositories>
            <pluginRepository>
                <id>spring-snapshots</id>
                <name>Spring Snapshots</name>
                <url>https://repo.spring.io/snapshot</url>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
            </pluginRepository>
            <pluginRepository>
                <id>spring-milestones</id>
                <name>Spring Milestones</name>
                <url>https://repo.spring.io/milestone</url>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </pluginRepository>
        </pluginRepositories>
    </project>

     程序结构

     

    KafkaConfig

    package com.lzw.kafkalog.config;
    /**
     * Created by laizhenwei on 2017/11/28
     */
    @Configuration
    @EnableKafka
    public class KafkaConfig {
    
        @Value("${spring.kafka.consumer.bootstrap-servers}")
        private String consumerBootstrapServers;
    
    
        @Value("${spring.kafka.producer.bootstrap-servers}")
        private String producerBootstrapServers;
    
        @Bean
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(3);
            factory.getContainerProperties().setPollTimeout(3000);
            return factory;
        }
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public Areceiver areceiver() { return new Areceiver(); } @Bean public Breceiver breceiver(){ return new Breceiver(); } }
    KafkaAdminConfig
    package com.lzw.kafkalog.config;
    
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.KafkaAdmin;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * Created by laizhenwei on 2017/11/28
     */
    @Configuration
    public class KafkaAdminConfig {
    
        @Value("${spring.kafka.producer.bootstrap-servers}")
        private String producerBootstrapServers;
    
        @Bean
        public KafkaAdmin admin() {
            Map<String, Object> configs = new HashMap<>();
            configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,producerBootstrapServers);
            return new KafkaAdmin(configs);
        }
    
        /**
         * 创建队列A,1个分区
         * @return
         */
        @Bean
        public NewTopic a() {
            return new NewTopic("A", 1, (short) 1);
        }
    
        /**
         * 创建队列B,1个分区
         * @return
         */
        @Bean
        public NewTopic b() {
            return new NewTopic("B", 1, (short) 1);
        }
    }

    B队列消费者

    package com.lzw.kafkalog.b;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    
    /**
     * Created by laizhenwei on 2017/11/28
     */
    public class Breceiver {
        Logger logger = LoggerFactory.getLogger(this.getClass());
        @KafkaListener(topics={"B"})
        public void listen(ConsumerRecord data) {
            logger.info(data.value().toString());
        }
    }

    application.yml

    spring:
      kafka:
        consumer:
          bootstrap-servers: 192.168.1.7:9092
        producer: 
          bootstrap-servers: 192.168.1.7:9092

    logback-test.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration debug="true">
        <contextName>logback</contextName>
        <property name="LOG_HOME" value="F:/log" />
        <appender name="aAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <file>${LOG_HOME}/a/a.log</file>
            <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
                <fileNamePattern>${LOG_HOME}/a/a-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
                <!--<fileNamePattern>${LOG_HOME}/a/a-%d{yyyy-MM-dd}.%i.tar.gz</fileNamePattern>-->
                <!-- 日志文件保留天数 -->
                <MaxHistory>30</MaxHistory>
                <!-- 文件大小触发重写新文件 -->
                <MaxFileSize>100MB</MaxFileSize>
                <totalSizeCap>10GB</totalSizeCap>
            </rollingPolicy>
            <encoder>
                <pattern>%msg%n</pattern>
            </encoder>
        </appender>
    
        <appender name="bAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <file>${LOG_HOME}/b/b.log</file>
            <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
                <fileNamePattern>${LOG_HOME}/b/b-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
                <!--<fileNamePattern>${LOG_HOME}/b/b-%d{yyyy-MM-dd}.%i.tar.gz</fileNamePattern>-->
                <!-- 日志文件保留天数 -->
                <MaxHistory>30</MaxHistory>
                <!-- 文件大小触发重写新文件 -->
                <MaxFileSize>100MB</MaxFileSize>
                <totalSizeCap>10GB</totalSizeCap>
            </rollingPolicy>
    
            <encoder>
                <pattern>%msg%n</pattern>
            </encoder>
        </appender>
    
        <!--异步输出-->
        <appender name="aAsyncFile" class="ch.qos.logback.classic.AsyncAppender">
            <discardingThreshold>0</discardingThreshold>
            <queueSize>2048</queueSize>
            <appender-ref ref="aAppender" />
        </appender>
    
        <logger name="com.lzw.kafkalog.a" level="INFO" additivity="false">
            <appender-ref ref="aAsyncFile" />
        </logger>
    
    
        <!--异步输出-->
        <appender name="bAsyncFile" class="ch.qos.logback.classic.AsyncAppender">
            <discardingThreshold>0</discardingThreshold>
            <queueSize>2048</queueSize>
            <appender-ref ref="bAppender" />
        </appender>
        <logger name="com.lzw.kafkalog.b" level="INFO" additivity="false">
            <appender-ref ref="bAsyncFile" />
        </logger>
    
    </configuration>

    消费者程序,重点是红框部分

    红框源码,本来想做个容错,后来发现不行,原因等下再说

    package com.lzw.project_b.kafka;
    
    import ch.qos.logback.core.AppenderBase;
    import ch.qos.logback.core.Layout;
    import ch.qos.logback.core.status.ErrorStatus;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.StringReader;
    import java.util.Properties;
    
    public class KafkaAppender<E> extends AppenderBase<E> {
    
        protected Layout<E> layout;
        private static final Logger LOGGER = LoggerFactory.getLogger("local");
        private boolean logToLocal = false;
        private String kafkaProducerProperties;
        private String topic;
        private KafkaProducer producer;
    
        public void start() {
            super.start();
            int errors = 0;
            if (this.layout == null) {
                this.addStatus(new ErrorStatus("No layout set for the appender named "" + this.name + "".", this));
                ++errors;
            }
    
            if (errors == 0) {
                super.start();
            }
    
            LOGGER.info("Starting KafkaAppender...");
            final Properties properties = new Properties();
            try {
                properties.load(new StringReader(kafkaProducerProperties));
                producer = new KafkaProducer<>(properties);
            } catch (Exception exception) {
                System.out.println("KafkaAppender: Exception initializing Producer. " + exception + " : " + exception.getMessage());
            }
            System.out.println("KafkaAppender: Producer initialized: " + producer);
            if (topic == null) {
                System.out.println("KafkaAppender requires a topic. Add this to the appender configuration.");
            } else {
                System.out.println("KafkaAppender will publish messages to the '" + topic + "' topic.");
            }
            LOGGER.info("kafkaProducerProperties = {}", kafkaProducerProperties);
            LOGGER.info("Kafka Producer Properties = {}", properties);
            if (logToLocal) {
                LOGGER.info("KafkaAppender: kafkaProducerProperties = '" + kafkaProducerProperties + "'.");
                LOGGER.info("KafkaAppender: properties = '" + properties + "'.");
            }
        }
    
        @Override
        public void stop() {
            super.stop();
            LOGGER.info("Stopping KafkaAppender...");
            producer.close();
        }
    
        @Override
        protected void append(E event) {
            /**
             * 源码这里是用Formatter类转为JSON
             */
            String msg = layout.doLayout(event);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, msg);
            producer.send(producerRecord);
        }
    
        public String getTopic() {
            return topic;
        }
    
        public void setTopic(String topic) {
            this.topic = topic;
        }
    
        public boolean getLogToLocal() {
            return logToLocal;
        }
    
        public void setLogToLocal(String logToLocal) {
            if (Boolean.valueOf(logToLocal)) {
                this.logToLocal = true;
            }
        }
    
        public void setLayout(Layout<E> layout) {
            this.layout = layout;
        }
    
        public String getKafkaProducerProperties() {
            return kafkaProducerProperties;
        }
    
        public void setKafkaProducerProperties(String kafkaProducerProperties) {
            this.kafkaProducerProperties = kafkaProducerProperties;
        }
    }
    LogService就记录一段长的垃圾日志
    package com.lzw.project_b.service;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    /**
     * Created by laizhenwei on 2017/12/1
     */
    @Component
    public class LogService {
        Logger logger = LoggerFactory.getLogger(this.getClass());
    
        private static final String msg = "asdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfa" +
                "sdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdf" +
                "sadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfa" +
                "sdfsadfasdfsadfasdfsaasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsa" +
                "dfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadf" +
                "asdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfas" +
                "dfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsa" +
                "dfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfas" +
                "dfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadf" +
                "sdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfa" +
                "sdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsa";
    
        public void dolog() {
            logger.info(msg, new RuntimeException(msg));
        }
    
    }
    KafkaLogController就一个很无聊的输出日志请求,并记录入队时间
    package com.lzw.project_b.controller;
    
    import com.lzw.project_b.service.LogService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * Created by laizhenwei on 2017/11/29
     */
    @RestController
    @RequestMapping(path = "/kafka")
    public class KafkaLogController {
    
        @Autowired
        private LogService logService;
    
        @GetMapping(path = "/aa")
        public void aa() {
            long begin = System.nanoTime();
            for (int i = 0; i < 100000; i++) {
                logService.dolog();
            }
            long end = System.nanoTime();
    
            System.out.println((end - begin) / 1000000);
        }
    
    }

    启动两个程序,来一个请求

     查看耗时

    生产者的 logback-test.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration debug="true">
        <appender name="KAFKA" class="com.lzw.project_b.kafka.KafkaAppender">
            <topic>B</topic>
            <kafkaProducerProperties>
                bootstrap.servers=192.168.1.7:9092
                retries=0
                value.serializer=org.apache.kafka.common.serialization.StringSerializer
                key.serializer=org.apache.kafka.common.serialization.StringSerializer
                <!--reconnect.backoff.ms=1-->
                producer.type=async
                request.required.acks=0
                <!--acks=0-->
                <!--producer.type=async -->
                <!--request.required.acks=1 -->
                <!--queue.buffering.max.ms=20000 -->
                <!--queue.buffering.max.messages=1000-->
                <!--queue.enqueue.timeout.ms = -1 -->
                <!--batch.num.messages=8-->
                <!--metadata.fetch.timeout.ms=3000-->
                <!--producer.type=sync-->
                <!--request.required.acks=1-->
                <!--reconnect.backoff.ms=3000-->
                <!--retry.backoff.ms=3000-->
            </kafkaProducerProperties>
            <logToLocal>true</logToLocal>
            <layout class="ch.qos.logback.classic.PatternLayout">
                <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
            </layout>
        </appender>
    
        时间滚动输出 level为 monitor 日志
        <appender name="localAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <file>F:/localLog/b/b.log</file>
            <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
                <fileNamePattern>F:/localLog/b/b-%d{yyyy-MM-dd}.%i.tar.gz</fileNamePattern>
                <!-- 日志文件保留天数 -->
                <MaxHistory>30</MaxHistory>
                <!-- 文件大小触发重写新文件 -->
                <MaxFileSize>200MB</MaxFileSize>
                <totalSizeCap>10GB</totalSizeCap>
            </rollingPolicy>
    
            <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
                <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
                <charset>UTF-8</charset>
            </encoder>
        </appender>
    
        <appender name="asyncLocal" class="ch.qos.logback.classic.AsyncAppender">
            <!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
            <discardingThreshold>0</discardingThreshold>
            <queueSize>2048</queueSize>
            <appender-ref ref="localAppender"/>
        </appender>
    
        <!--万一kafka队列不通,记录到本地-->
        <logger name="local" additivity="false">
            <appender-ref ref="asyncLocal"/>
        </logger>
    
        <!--<appender name="asyncKafka" class="ch.qos.logback.classic.AsyncAppender">-->
            <!--&lt;!&ndash; 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 &ndash;&gt;-->
            <!--<discardingThreshold>0</discardingThreshold>-->
            <!--<queueSize>2048</queueSize>-->
            <!--<appender-ref ref="KAFKA"/>-->
        <!--</appender>-->
    
        <root level="INFO">
            <appender-ref ref="KAFKA"/>
        </root>
    
    </configuration>

    关于为什么我没用有源码中的Json Formatter ,因为转换Json会花更多时间,性能更低.源码中是用了Json-simple,我换成了Gson,快了很多,但是还是有性能影响,如果非要转成Json

    我选择在ELK中转,也不会在应用中耗时间去转

    生产者之里,我用了最极端的one way 方式.吞吐量最高,但是无法得知是否已经入队.

    这里生产者的程序里Logback 必须使用同步日志才能客观知道入队的耗时.

    总结

    容错:我尝试在生产者中写一段容错代码,一旦链接Kafka不通.或者队列不可写的时候,记录倒本地日志.关闭Kafka测试,生产者却阻塞了,一直重连,程序基本废了

    找了很多方法,没有找到关闭重连的方式.

    灵活性:相比起redis队列来说,Kafka就比较尴尬(例如我这个场景,还需要保证Kafka队列可用,性能没提升多少,还增加了维护成本)

    性能:我在固态硬盘与机械硬盘中测试过,由于Kafka很懂机械硬盘,并且对顺序写入做了很大优化,在机械硬盘上表现比固态硬盘性能大概高30%,主打低成本?

            入队的性能不怎么高,实际上还比不上直接写入本地(别忘了入队以后,在消费者那边还要写盘,队列也是持久化倒硬盘,等于写了两次盘)

    用户体验:据说JAVA驱动还算是做得比较好的了

    最后:不适合我的业务场景.也用得不深.最后我选择了redis做队列

            我也没找到办法关闭Kafaka的持久化,写两次硬盘,某些情况日志并不是不可丢失(redis做队列很灵活,写不进队列的时候,可以写入本地硬盘),redis进的快消费快,内存基本不会有很大压力,cpu消耗也不高,个人认为在数据不是特别重要的情况下成本比Kafka还低,性能可是质的提升.

  • 相关阅读:
    BMP图像信息隐藏
    多项式模2运算及求逆元
    day08 xml tomcat
    day07 c3p0连接池
    day06 多表查询
    java学习日记(day30--dbutils)
    java学习日记(29 JDBC)
    java学习日记(28)-- mysql基础
    activiti主要组件解析
    activiti流程跟踪图算法
  • 原文地址:https://www.cnblogs.com/sweetchildomine/p/8025336.html
Copyright © 2011-2022 走看看