zoukankan      html  css  js  c++  java
  • Flume + HDFS + Hive日志收集系统

    最近一段时间,负责公司的产品日志埋点与收集工作,搭建了基于Flume+HDFS+Hive日志搜集系统。

    一、日志搜集系统架构:

    简单画了一下日志搜集系统的架构图,可以看出,flume承担了agent与collector角色,HDFS承担了数据持久化存储的角色。

    作者搭建的服务器是个demo版,只用到了一个flume_collector,数据只存储在HDFS。当然高可用的日志搜集处理系统架构是需要多台flume collector做负载均衡与容错处理的。

    clipboard

    二、日志产生:

    1、log4j配置,每隔1分钟roll一个文件,如果1分钟之内文件大于5M,则再生成一个文件。

    <!-- 产品数据分析日志 按分钟分 -->
            <RollingRandomAccessFile name="RollingFile_product_minute"
                fileName="${STAT_LOG_HOME}/${SERVER_NAME}_product.log"
                filePattern="${STAT_LOG_HOME}/${SERVER_NAME}_product.log.%d{yyyy-MM-dd-HH-mm}-%i">
                <PatternLayout charset="UTF-8"
                    pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %level - %msg%xEx%n" />
                <Policies>
                    <TimeBasedTriggeringPolicy interval="1"
                        modulate="true" />
                    <SizeBasedTriggeringPolicy size="${EVERY_FILE_SIZE}" />
                </Policies>
                <Filters>
                    <ThresholdFilter level="INFO" onMatch="ACCEPT"
                        onMismatch="NEUTRAL" />
                </Filters>
            </RollingRandomAccessFile>

    roll后的文件格式如下

    clipboard[1]

    2、日志内容

    json格式文件,最外层json按顺序为:tableName,logRequest,timestamp,statBody,logResponse,resultCode,resultMsg

    2016-11-30 09:18:21.916 INFO - {
    
        "tableName": "ReportView",
    
        "logRequest": {
    
             ***
    
        },
    
        "timestamp": 1480468701432,
    
        "statBody": {
    
            ***
    
        },
    
        "logResponse": {
    
            ***
    
        },
    
        "resultCode": 1,
    
        "resultFailMsg": ""
    
    }

    三、flume配置

     虚拟机环境,请见我的博客http://www.cnblogs.com/xckk/p/6000881.html

    hadoop环境,请见我的另一篇博客http://www.cnblogs.com/xckk/p/6124553.html

    此处flume环境是

    centos1:flume-agent

    centos2:flume-collector

    1、flume agent配置,conf文件

    a1.sources = skydataSource
    
    a1.channels = skydataChannel
    
    a1.sinks = skydataSink
    
    a1.sources.skydataSource.type = spooldir
    
    a1.sources.skydataSource.channels = skydataChannel
    
    #日志目录
    
    a1.sources.skydataSource.spoolDir = /opt/flumeSpool
    
    a1.sources.skydataSource.fileHeader = true
    
    #日志内容处理完后,会生成.COMPLETED后缀的文件,同时.log文件每一分钟roll一个,此处忽略.log文件与.COMPLETED文件
    
    a1.sources.skydataSource.ignorePattern=([^_]+)|(.*(.log)$)|(.*(.COMPLETED)$)
    
    a1.sources.skydataSource.basenameHeader=true
    
    a1.sources.skydataSource.deserializer.maxLineLength=102400
    
    #自定义拦截器,对json格式的源日志进行字段分隔,并添加timestamp,为后面的hdfsSink做处理,拦截器代码见后面
    
    a1.sources.skydataSource.interceptors=i1
    
    a1.sources.skydataSource.interceptors.i1.type=com.skydata.flume_interceptor.HiveLogInterceptor2$Builder
    
    a1.sinks.skydataSink.type = avro
    
    a1.sinks.skydataSink.channel = skydataChannel
    
    a1.sinks.skydataSink.hostname = centos2
    
    a1.sinks.skydataSink.port = 4545
    
    #此处配置deflate压缩后,hive collector那边一定也要相应配置解压缩
    
    a1.sinks.skydataSink.compression-type=deflate
    
    a1.channels.skydataChannel.type=memory
    
    a1.channels.skydataChannel.capacity=10000
    
    a1.channels.skydataChannel.transactionCapacity=1000

    2、flume collector配置

    a1.sources = avroSource
    
    a1.channels = memChannel
    
    a1.sinks = hdfsSink
    
    a1.sources.avroSource.type = avro
    
    a1.sources.avroSource.channels = memChannel
    
    a1.sources.avroSource.bind=centos2
    
    a1.sources.avroSource.port=4545
    
    #与flume agent配置对应
    
    a1.sources.avroSource.compression-type=deflate
    
    a1.sinks.hdfsSink.type = hdfs
    
    a1.sinks.hdfsSink.channel = memChannel
    
    # skydata_hive_log为hive表,按年-月-日分区存储,
    
    a1.sinks.hdfsSink.hdfs.path=hdfs://centos1:9000/flume/skydata_hive_log/dt=%Y-%m-%d
    
    a1.sinks.hdfsSink.hdfs.batchSize=10000
    
    a1.sinks.hdfsSink.hdfs.fileType=DataStream
    
    a1.sinks.hdfsSink.hdfs.writeFormat=Text
    
    a1.sinks.hdfsSink.hdfs.rollSize=10240000
    
    a1.sinks.hdfsSink.hdfs.rollCount=0
    
    a1.sinks.hdfsSink.hdfs.rollInterval=300
    
    a1.channels.memChannel.type=memory
    
    a1.channels.memChannel.capacity=100000
    
    a1.channels.memChannel.transactionCapacity=10000

    四、hive表创建与分区

    1、hive表创建

    在hive中执行建表语句后,hdfs://centos1:9000/flume/目录下新生成了skydata_hive_log目录。(建表语句里面有location关键字)

    u0001表示hive通过该分隔符进行字段分离,该字符在linux用vim编辑器打开是^A。

    由于日志格式是JSON格式,因为需要将JSON格式转换成u0001字符分隔,并通过dt进行分区。这一步通过flume自定义拦截器来完成。

    CREATE TABLE `skydata_hive_log`(
    
    `tableNmae` string,
    
    `logRequest` string,
    
    `timestamp` bigint,
    
    `statBody` string,
    
    `logResponse` string,
    
    `resultCode` int,
    
    `resultFailMsg` string
    
    )
    
    PARTITIONED BY (
    
    `dt` string)
    
    ROW FORMAT DELIMITED
    
    FIELDS TERMINATED BY 'u0001'
    
    STORED AS INPUTFORMAT
    
    'org.apache.hadoop.mapred.TextInputFormat'
    
    OUTPUTFORMAT
    
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    
    LOCATION
    
    'hdfs://centos1:9000/flume/skydata_hive_log';

    2、hive表分区

    日志flume sink到hdfs上时,如果没有对hive表预先进行分区,会出现日志已经上传到hdfs目录,但是hive表却无法加载数据的情况。
    这是因为hive表的分区没有创建。因此要对表进行分区添加,这里对最近一年左右时间进行分区添加
    分区脚本 init_flume_hive_table.sh
    for ((i=-1;i<=365;i++))
    do
    
            dt=$(date -d "$(date +%F) ${i} days" +%Y-%m-%d)
    
            echo date=$dt
    
            hive -e "ALTER TABLE skydata_hive_log ADD PARTITION(dt='${dt}')" >> logs/init_skydata_hive_log.out 2>>logs/init_skydata_hive_log.err
    
    done

    五、自定义flume拦截器

    新建maven工程,拦截器HiveInterceptor2代码如下。

    package com.skydata.flume_interceptor;
    
    import java.util.ArrayList;
    
    import java.util.List;
    
    import java.util.Map;
    
    import org.apache.flume.Context;
    
    import org.apache.flume.Event;
    
    import org.apache.flume.interceptor.Interceptor;
    
    import org.apache.flume.interceptor.TimestampInterceptor.Constants;
    
    import org.slf4j.Logger;
    
    import org.slf4j.LoggerFactory;
    
    import com.alibaba.fastjson.JSONObject;
    
    import com.google.common.base.Charsets;
    
    import com.google.common.base.Joiner;
    
    public class HiveLogInterceptor2 implements Interceptor
    
    {
    
        private static Logger logger = LoggerFactory.getLogger(HiveLogInterceptor2.class);
    
        public static final String HIVE_SEPARATOR = "01";
    
        public void close()
    
        {
    
            // TODO Auto-generated method stub
    
        }
    
        public void initialize()
    
        {
    
            // TODO Auto-generated method stub
    
        }
    
        public Event intercept(Event event)
    
        {
    
            String orginalLog = new String(event.getBody(), Charsets.UTF_8);
    
            try
    
            {
    
                String log = this.parseLog(orginalLog);
    
                // 设置时间,用于hdfsSink
    
                long now = System.currentTimeMillis();
    
                Map headers = event.getHeaders();
    
                headers.put(Constants.TIMESTAMP, Long.toString(now));
    
                event.setBody(log.getBytes());
    
            } catch (Throwable throwable)
    
            {
    
                logger.error(("errror when intercept,log [ " + orginalLog + " ] "), throwable);
    
                return null;
    
            }
    
            return event;
    
        }
    
        public List<Event> intercept(List<Event> list)
    
        {
    
            List<Event> events = new ArrayList<Event>();
    
            for (Event event : list)
    
            {
    
                Event interceptedEvent = this.intercept(event);
    
                if (interceptedEvent != null)
    
                {
    
                    events.add(interceptedEvent);
    
                }
    
            }
    
            return events;
    
        }
    
        private static String parseLog(String log)
    
        {
    
            List<String> logFileds = new ArrayList<String>();
    
            String dt = log.substring(0, 10);
    
            String keyStr = "INFO - ";
    
            int index = log.indexOf(keyStr);
    
            String content = "";
    
            if (index != -1)
    
            {
    
                content = log.substring(index + keyStr.length(), log.length());
    
            }
    
            //针对不同OS,使用不同回车换行符号
    
            content = content.replaceAll("
    ", "");
    
            content = content.replaceAll("
    ", "\\" + System.getProperty("line.separator"));
    
            JSONObject jsonObj = JSONObject.parseObject(content);
    
            String tableName = jsonObj.getString("tableName");
    
            String logRequest = jsonObj.getString("logRequest");
    
            String timestamp = jsonObj.getString("timestamp");
    
            String statBody = jsonObj.getString("statBody");
    
            String logResponse = jsonObj.getString("logResponse");
    
            String resultCode = jsonObj.getString("resultCode");
    
            String resultFailMsg = jsonObj.getString("resultFailMsg");
    
            //字段分离
    
            logFileds.add(tableName);
    
            logFileds.add(logRequest);
    
            logFileds.add(timestamp);
    
            logFileds.add(statBody);
    
            logFileds.add(logResponse);
    
            logFileds.add(resultCode);
    
            logFileds.add(resultFailMsg);
    
            logFileds.add(dt);
    
            return Joiner.on(HIVE_SEPARATOR).join(logFileds);
    
        }
    
        public static class Builder implements Interceptor.Builder
    
        {
    
            public Interceptor build()
    
            {
    
                return new HiveLogInterceptor2();
    
            }
    
            public void configure(Context arg0)
    
            {
    
            }
    
        }
    
    }

    pom.xml增加如下配置,将flume拦截器工程进行maven打包,jar包与依赖包均拷到${flume-agent}/lib目录

    <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-dependency-plugin</artifactId>
                    <configuration>
                        <outputDirectory>
                            ${project.build.directory}
                        </outputDirectory>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-dependency-plugin</artifactId>
                    <executions>
                        <execution>
                            <id>copy-dependencies</id>
                            <phase>prepare-package</phase>
                            <goals>
                                <goal>copy-dependencies</goal>
                            </goals>
                            <configuration>
                                <outputDirectory>${project.build.directory}/lib</outputDirectory>
                                <overWriteReleases>true</overWriteReleases>
                                <overWriteSnapshots>true</overWriteSnapshots>
                                <overWriteIfNewer>true</overWriteIfNewer>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>

    对日志用分隔符"01"进行分隔,。经拦截器处理后的日志格式如下,^A即是"01"

    ReportView^A{"request":{},"requestBody":{"detailInfos":[],"flag":"","reportId":7092,"pageSize":0,"searchs":[],"orders":[],"pageNum":1}}^A1480468701432^A{"sourceId":22745,"reportId":7092,"projectId":29355,"userId":2532}^A{"responseBody":{"statusCodeValue":200,"httpHeaders":{},"body":{"msg":"请求成功","httpCode":200,"timestamp":1480468701849},"statusCode":"OK"},"response":{}}^A1^A^A2016-11-30

    至此,flume+Hdfs+Hive的配置均已完成。

    后续可以通过mapreduce或者HQL对数据进行分析。

    六、启动运行与结果

    1、启动hadoop hdfs

    参考我的前一篇文章:hadoop 1.2 集群搭建与环境配置  http://www.cnblogs.com/xckk/p/6124553.html

    2、启动flume_collector和flume_agent,由于flume启动命令参数太多,自己写了一个启动脚本

    start-Flume.sh

    #!/bin/bash
    jps -l|grep org.apache.flume.node.Application|awk '{print $1}'|xargs kill -9 2>&1 >/dev/null
    cd "$(dirname "$0")"
    cd ..
    nohup bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 2>&1 > /dev/null &

    3、hdfs查看数据

    可以看到搜集的日志已经上传到HDFS上

    [root@centos1 bin]# rm -rf FlumeData.1480587273016.tmp 
    [root@centos1 bin]# hadoop fs -ls /flume/skydata_hive_log/dt=2016-12-01/
    Found 3 items
    -rw-r--r--   3 root supergroup       5517 2016-12-01 08:12 /flume/skydata_hive_log/dt=2016-12-01/FlumeData.1480608753042.tmp
    -rw-r--r--   3 root supergroup       5517 2016-12-01 08:40 /flume/skydata_hive_log/dt=2016-12-01/FlumeData.1480610453116
    -rw-r--r--   3 root supergroup       5517 2016-12-01 08:44 /flume/skydata_hive_log/dt=2016-12-01/FlumeData.1480610453117
    [root@centos1 bin]# 

    4、启动hive,查看数据,可以看到hive已经可以加载hdfs数据

    [root@centos1 lib]# hive
    
    Logging initialized using configuration in file:/root/apache-hive-1.2.1-bin/conf/hive-log4j.properties
    hive> select * from skydata_hive_log limit 2;
    OK
    ReportView    {"request":{},"requestBody":{"detailInfos":[],"flag":"","reportId":7092,"pageSize":0,"searchs":[],"orders":[],"pageNum":1}}    1480468701432    {"sourceId":22745,"reportId":7092,"projectId":29355,"userId":2532}    {"responseBody":{"statusCodeValue":200,"httpHeaders":{},"body":{"msg":"请求成功","httpCode":200,"timestamp":1480468701849},"statusCode":"OK"},"response":{}}    1        2016-12-01
    ReportDesignResult    {"request":{},"requestBody":{"sourceId":22745,"detailInfos":[{"colName":"月份","flag":"0","reportId":7092,"colCode":"col_2_22745","pageSize":20,"type":"1","pageNum":1,"rcolCode":"col_25538","colType":"string","formula":"","id":25538,"position":"row","colId":181664,"dorder":1,"pColName":"月份","pRcolCode":"col_25538"},{"colName":"综合利率(合计)","flag":"1","reportId":7092,"colCode":"col_11_22745","pageSize":20,"type":"1","pageNum":1,"rcolCode":"sum_col_25539","colType":"number","formula":"sum","id":25539,"position":"group","colId":181673,"dorder":1,"pColName":"综合利率","pRcolCode":"col_25539"}],"flag":"bar1","reportId":7092,"reportName":"iiiissszzzV","pageSize":100,"searchs":[],"orders":[],"pageNum":1,"projectId":29355}}    1480468703586{"reportType":"bar1","sourceId":22745,"reportId":7092,"num":5,"usedFields":"月份$$综合利率(合计)$$","projectId":29355,"userId":2532}    {"responseBody":{"statusCodeValue":200,"httpHeaders":{},"body":{"msg":"请求成功","reportId":7092,"httpCode":200,"timestamp":1480468703774},"statusCode":"OK"},"response":{}}    1        2016-12-01
    Time taken: 2.212 seconds, Fetched: 2 row(s)
    hive> 

    七、常见问题与处理方法

    1、FATAL: Spool Directory source skydataSource: { spoolDir: /opt/flumeSpool }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.

    java.nio.charset.MalformedInputException: Input length = 1

    可能原因:

    1、字符编码问题,spoolDir目录下的日志文件必须是UTF-8

    2、使用Spooling Directory Source的时候,一定要避免同时读写一个文件的情况,conf文件增加如下配置

    a1.sources.skydataSource.ignorePattern=([^_]+)|(.*(.log)$)|(.*(.COMPLETED)$)

    2、日志导入到hadoop目录,但是hive表查询无数据。如hdfs://centos1:9000/flume/skydata_hive_log/dt=2016-12-01/下面有数据,

    hive查询 select * from skydata_hive_log 却无数据

    可能原因:

    1、建表的时候,没有建立分区。即使flume进行了配置(a1.sinks.hdfsSink.hdfs.path=hdfs://centos1:9000/flume/skydata_hive_log/dt=%Y-%m-%d),但是表的分区结构没有建立,因此文件导入到HDFS上后,HIVE并不能读取。

    解决方法:先创建分区,建立shell可执行文件,将该表的分区先建好

    for ((i=-10;i<=365;i++))
    do
    
            dt=$(date -d "$(date +%F) ${i} days" +%Y-%m-%d)
    
            echo date=$dt
    
            hive -e "ALTER TABLE skydata_hive_log ADD PARTITION(dt='${dt}')" >> logs/init_skydata_hive_log.out 2>>logs/init_skydata_hive_log.err
    
    done

    2、也可能是文件在hdfs上还是.tmp文件,仍然被hdfs在写入。.tmp文件hive暂时无法读取,只能读取非.tmp文件。

    解决方法:等待hdfs配置的roll间隔时间,或者达到一定大小后tmp文件重命名为hdfs上的日志文件后,再查询hive,即可查到。

    秀才坤坤出品

    转载请注明

    原文地址:http://www.cnblogs.com/xckk/p/6125838.html

  • 相关阅读:
    cmd 窗口中运行 Java 程序
    局部变量保证线程安全
    AQS源码详细解读
    理解 Java 内存模型的因果性约束
    高性能Java序列化框架Fse发布
    心跳与超时:高并发高性能的时间轮超时器
    支持内部晋升的无锁并发优先级线程池
    最终一致性:BASE论文笔记
    Activiti架构分析及源码详解
    理解OAuth2
  • 原文地址:https://www.cnblogs.com/xckk/p/6125838.html
Copyright © 2011-2022 走看看