zoukankan      html  css  js  c++  java
  • 使用log4j将数据流入flume

    最近做了一个log抽取的项目,采用log4j+flume实现,在此分享记录一下。

    准备

    什么是flume?

    flume是一个提供高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。

    flume提供了source、channel、sink三个组件,实现数据的抽取加载。一组source、channel、sink组成一个agent同步数据,可以通过并联、串联agent的方式来灵活的实现数据抽取。

    更多flume的文章可参考:Flume系列文章

    log4j+flume

    log4j和flume整合,官方提供了两种appender将log4j的日志写入flume,分别是Log4J AppenderLoad Balancing Log4J Appender

    Log4J Appender

    Log4J Appender将log数据发送到flume的一个avro source中,在flume中可以根据需求在下游接不同的sink。

    Log4j Appender使用时,有以下的配置参数(加粗的是必须的):

    参数名 默认值 描述
    Hostname source的host地址,如:110.110.110.100
    Port source的监听端口,如:9999
    UnsafeMode false 如果为true,则添加程序不会在发送事件失败时引发异常。
    AvroReflectionEnabled false 使用Avro Reflection序列化Log4j事件。
    AvroSchemaUrl avro schema的url地址

    Load Balancing Log4J Appender

    将log数据发送到flume的多个avro source中。实现负载均衡。

    使用时,有以下的配置参数(加粗的是必须的):

    参数名 默认值 描述
    Hosts sources的host:port。是以空格分隔的。如:10.10.10.10:9999 10.10.10.11:9999
    Selector ROUND_ROBIN 选择机制。必须为ROUND_ROBIN,RANDOM或自定义FQDN。
    MaxBackoff 表示负载均衡客户端将从未能消耗事件的节点退出的最长时间(以毫秒为单位)。
    UnsafeMode false 如果为true,则添加程序不会在发送事件失败时引发异常。
    AvroReflectionEnabled false 使用Avro Reflection序列化Log4j事件。.
    AvroSchemaUrl avro schema的url地址

    Load Balancing Log4J Appender相当于是实现了多个Log4J Appender来实现负载均衡。在flume端,
    Load Balancing Log4J Appender需要配置多个avro source来监听输入。

    具体实现

    pom依赖

    <!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-clients/flume-ng-log4jappender -->
    <dependency>
        <groupId>org.apache.flume.flume-ng-clients</groupId>
        <artifactId>flume-ng-log4jappender</artifactId>
        <version>1.9.0</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-sdk</artifactId>
        <version>1.9.0</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/log4j/log4j -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.25</version>
        <scope>test</scope>
    </dependency>
    

    测试类

    package com.upupfeng;
    
    import org.apache.log4j.Logger;
    
    public class Log4j2Flume {
        public static void main(String[] args) {
            Logger logger = Logger.getLogger(Log4j2Flume.class);
            logger.info("test");
        }
    }
    

    log4j.properties

    log4j.rootLogger=debug,stdout,flume
    
    # 输出到控制台
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target=System.out
    log4j.appender.stdout.layout = org.apache.log4j.PatternLayout 
    log4j.appender.stdout.layout.ConversionPattern =[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %-5p %l - %m%n
    
    # Log4j Appender
    log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
    log4j.appender.flume.Hostname=192.168.168.200
    log4j.appender.flume.Port=41414
    log4j.appender.flume.UnsafeMode=true
    log4j.appender.flume.layout=org.apache.log4j.PatternLayout
    log4j.appender.flume.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %-5p %l - %m%n
    
    # Load Balancing Log4J Appender
    log4j.appender.flume2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
    log4j.appender.flume2.Hosts = 192.168.168.200:9001 192.168.168.200:9002 192.168.168.200:9003
    log4j.appender.flume2.Selector = ROUND_ROBIN
    log4j.appender.flume2.MaxBackoff = 30000
    log4j.appender.flume2.UnsafeMode = true
    log4j.appender.flume2.Threshold=ERROR
    log4j.appender.flume2.layout=org.apache.log4j.PatternLayout
    log4j.appender.flume2.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %-5p %l - %m%n
    

    flume-conf.properties

    agent的配置。Log4J Appender只需要配置一个agent;Load Balancing Log4J Appender要配置多个。

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 41414
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    ## 事件容量
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    

    对于Log4j Appender的方式,启动一个agent等待接收,运行代码即可在flume的sink端获得数据。

    对于Load Balancing Log4J Appender的方式,启动多个agent等待接收,进行负载均衡的接收数据。

    参考

    http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#log4j-appender

  • 相关阅读:
    H5调用Android播放视频
    JavaScript调Java
    Java调用JavaScript
    python的下载和安装
    s5_day1作业
    s5_day2作业
    pycharm激活(转)
    for…else和while…else
    小练习
    09 grep、正则表达式和sed
  • 原文地址:https://www.cnblogs.com/upupfeng/p/13550914.html
Copyright © 2011-2022 走看看