zoukankan      html  css  js  c++  java
  • 离线电商数仓(十)之用户行为数据采集(十)组件安装(六)采集日志Flume(二)消费Kafka数据Flume

    集群规划

    服务器hadoop102

    服务器hadoop103

    服务器hadoop104

    Flume(消费Kafka)

    Flume

    1 项目经验Flume组件选型

    1FileChannelMemoryChannel区别

    MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。

    FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。

    选型:

    金融类公司、对钱要求非常准确的公司通常会选择FileChannel

    传输的是普通日志信息(京东内部一天丢100-200万条,这是非常正常的),通常选择MemoryChannel。

    2FileChannel优化

    通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。

    官方说明如下:

    Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

    checkpointDirbackupCheckpointDir也尽量配置在不同硬盘对应的目录中保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据

    3SinkHDFS Sink

    1HDFS存入大量小文件,有什么影响?

    元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命

    计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

    2HDFS小文件处理

    官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollIntervalhdfs.rollSizehdfs.rollCount

    基于以上hdfs.rollInterval=3600hdfs.rollSize=134217728hdfs.rollCount =0几个参数综合作用,效果如下:

    1文件在达到128M时会滚动生成文件

    2文件创建超3600秒时会滚动生成文件

    2 Flume拦截器

    由于flume默认会用linux系统时间,作为输出到HDFS路径的时间。如果数据是23:59分产生的。Flume消费kafka里面的数据时,有可能已经是第二天了,那么这部门数据会被发往第二天的HDFS路径。我们希望的是根据日志里面的实际时间,发往HDFS的路径,所以下面拦截器作用是获取日志中的实际时间。

    1)在com.atguigu.flume.interceptor包下创建TimeStampInterceptor类

    package com.atguigu.interceptor;
    
    import com.alibaba.fastjson.JSONObject;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    public class TimeStampInterceptor implements Interceptor {
    
        private ArrayList<Event> events = new ArrayList<>();
    
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
    
            Map<String, String> headers = event.getHeaders();
            String log = new String(event.getBody(), StandardCharsets.UTF_8);
    
            JSONObject jsonObject = JSONObject.parseObject(log);
    
            String ts = jsonObject.getString("ts");
            headers.put("timestamp", ts);
    
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> list) {
            events.clear();
            for (Event event : list) {
                events.add(intercept(event));
            }
    
            return events;
        }
    
        @Override
        public void close() {
    
        }
    
        public static class Builder implements Interceptor.Builder {
            @Override
            public Interceptor build() {
                return new TimeStampInterceptor();
            }
    
            @Override
            public void configure(Context context) {
            }
        }
    }

    2)重新打包

     

    3)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

    [atguigu@hadoop102 lib]$ ls | grep interceptor

    flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

    4)分发Flumehadoop103、hadoop104

    [atguigu@hadoop102 module]$ xsync flume/

    3 日志消费Flume配置

    1)Flume配置分析

    2)Flume的具体配置如下:

    1)在hadoop104的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件

    [atguigu@hadoop104 conf]$ vim kafka-flume-hdfs.conf

    文件配置如下内容

    ## 组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1
    
    ## source1
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.sources.r1.kafka.topics=topic_log
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder
    
    ## channel1
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 6
    
    
    ## sink1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = log-
    a1.sinks.k1.hdfs.round = false
    
    
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 0
    
    ## 控制输出文件是原生文件。
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = lzop
    
    ## 拼装
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel= c1

    4 日志消费Flume启动停止脚本

    1)在/home/atguigu/bin目录下创建脚本f2.sh

    [atguigu@hadoop102 bin]$ vim f2.sh

    脚本中填写如下内容

    #! /bin/bash
    
    case $1 in
    "start"){
            for i in hadoop104
            do
                    echo " --------启动 $i 消费flume-------"
                    ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt   2>&1 &"
            done
    };;
    "stop"){
            for i in hadoop104
            do
                    echo " --------停止 $i 消费flume-------"
                    ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print $2}' | xargs -n1 kill"
            done
    
    };;
    esac

    2)增加脚本执行权限

    [atguigu@hadoop102 bin]$ chmod u+x f2.sh

    3)f2集群启动脚本

    [atguigu@hadoop102 module]$ f2.sh start

    4)f2集群停止脚本

    [atguigu@hadoop102 module]$ f2.sh stop

    5 项目经验Flume内存优化

    1问题描述:如果启动消费Flume抛出如下异常

    ERROR hdfs.HDFSEventSink: process failed

    java.lang.OutOfMemoryError: GC overhead limit exceeded

    2)解决方案步骤:

    1hadoop102服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置

    export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

    2)同步配置到hadoop103、hadoop104服务器

    [atguigu@hadoop102 conf]$ xsync flume-env.sh

    3Flume内存参数设置优化

    JVM heap一般设置为4G或更高

    -Xmx-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。

    -Xms表示JVM Heap(堆内存)最小尺寸,初始分配-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/14118038.html

  • 相关阅读:
    python修改镜像源
    nginx 记录
    linux 常用命令
    修改ssh连上默认目录
    sqlplus 导出一张表数据
    推送kafka消息失败
    Mybatis generator配置
    Oracle导库
    docker -- 安装mysql8.0.16
    安装自动集成工具jenkins
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14118038.html
Copyright © 2011-2022 走看看