zoukankan      html  css  js  c++  java
  • Flume 实战,将多台机器日志直接收集到 Kafka

    目前我们使用的一个 b 端软件的报错日志分散在集群各处,现在想把它收集到一个地方然后统一丢进 Kafka 提供给下游业务进行消费。

    我想到了 flume,之前让同事搭建的这次自己想多了解一些细节于是就开搞了。

    首先还是下载 flume 的客户端,这里我使用最新版本 1.9.0

    curl -O http://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
    tar -zvf apache-flume-1.9.0-bin.tar.gz

    设置需要的 java 环境,注意路径自定义一下,没有 java 自己下个 java8

    export JAVA_HOME=/opt/java8
    PATH=$PATH:$JAVA_HOME/bin

    在 apache-flume-1.9.0-bin/conf 我们可以找到对应的配置文件模版,1.9.0 的模版大概长这样

    # The configuration file needs to define the sources,
    # the channels and the sinks.
    # Sources, channels and sinks are defined per agent,
    # in this case called 'agent'
    
    agent.sources = seqGenSrc
    agent.channels = memoryChannel
    agent.sinks = loggerSink
    
    # For each one of the sources, the type is defined
    agent.sources.seqGenSrc.type = seq
    
    # The channel can be defined as follows.
    agent.sources.seqGenSrc.channels = memoryChannel
    
    # Each sink's type must be defined
    agent.sinks.loggerSink.type = logger
    
    #Specify the channel the sink should use
    agent.sinks.loggerSink.channel = memoryChannel
    
    # Each channel's type is defined.
    agent.channels.memoryChannel.type = memory
    
    # Other config values specific to each type of channel(sink or source)
    # can be defined as well
    # In this case, it specifies the capacity of the memory channel
    agent.channels.memoryChannel.capacity = 100

    我们复制一份当作操作的 conf

    mv flume-conf.properties.template flume-conf.properties

    从上面的配置文件中我们不难发现

    source channelsink 都是单独定义的项,他们都需要配置一个这个配置文件里面生效的名字,以及其他的基于这个名字的配置。

    比如这里我的需求是将某文件里面的新增信息读出来包装为事件,先发到 channel 等待处理,我可以配置一个 Taildir Source 来处理这个任务。

    flume 为我们准备了非常多的现成的 sources channelsink ,他们都具有不同的功能可以直接提供给我们使用,具体可以参考一下对应版本的官方文档。

    这里我们只谈一下这次用到的 Taildir Source

    agent.sources = sensorsInvalidRecordsFile
    agent.channels = file
    agent.sinks = kafkaSink
    
    # For each one of the sources, the type is defined
    agent.sources.sensorsInvalidRecordsFile.type = TAILDIR
    agent.sources.sensorsInvalidRecordsFile.filegroups = f1
    agent.sources.sensorsInvalidRecordsFile.filegroups.f1 = /sa_cluster/logs/sp/extractor/invalid_records
    agent.sources.sensorsInvalidRecordsFile.headers.f1.fileName = invalid_records
    agent.sources.sensorsInvalidRecordsFile.headers.f1.logType = sensorsInvalidRecords
    agent.sources.sensorsInvalidRecordsFile.channels = file
    agent.sources.positionFile = ~/.flume/taildir_position.json

    头三行先申明一下这里配置的 sources channels sinks 各为什么名字。这里我们可以留意到,所有的组件都被命名为复数,这就意味着我们可以同时申明多个 sources ,只需要将其配置行用空格依次分割即可

    agent.sources = s1 s2 s3

    这样即可同时生成三个 source。

    这里的配置我们指定了一个实例,并且对这个实例上的属性就行初始化。

    然后我们继续配置一个 channel 。这里配置一个 file channel,将从 source 里面抽出来的 event 都落盘防止数据丢失。

    # Each channel's type is defined.
    agent.channels.fileC.type = file
    agent.channels.fileC.dataDirs = ~/.flume/file-channel/data
    agent.channels.fileC.useDualCheckpoints = true
    agent.channels.fileC.backupCheckpointDir = ~/.flume/file-channel/backup_checkpoint

    最后我需要定义一个可以将 channel 里面的数据读出来,并且放到 kafka 里面去的 sink。找了一下正好有一个叫 kafka sink 的 sink 可以满足我

    可以看到和 apache hadoop 生态结合得比较好的 flume 为什么成为抽取日志的首选,或者优先考虑的对象,就是其对生态的友好和提供足够多的开箱即用的功能。

    agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.kafkaSink.channel = fileC
    agent.sinks.kafkaSink.kafka.bootstrap.servers = 10.171.97.1:9092, 10.163.13.219:9092, 10.170.249.122:9092
    agent.sinks.kafkaSink.topic = flume-topic-sensors-invalid-records
    agent.sinks.kafkaSink.producer.acks = -1
    agent.sinks.kafkaSink.producer.compression.type = snappy

    将 kafka 集群信息配置上去。

    最后一步我们来启动 flume-ng 

    /bin/flume-ng agent -n agent -c conf -f /home/flume_self/apache-flume-1.9.0-bin/conf/flume-conf.properties -Dflume.root.logger=INFO,console

    -n 是名称

    -c 是配置

    -f 是配置地址

    最好用 nohup 或者 supervisor 对任务进行管理。

    再去目标 kafka-manager 之类的工具上去看下是否发送成功即可!

    到此为止我们的目标就达成了。感觉还是蛮简单的,就是随便配置一下配置就可以完成工作,需要定制化的工作 flume 也支持利用一些勾子读取到数据然后进行 etl 或者修改之后再发送。还是比较灵活。希望早点遇到类似需求再玩一下。

    Reference:

    https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html    Flume user_guide

    https://juejin.im/post/5be4e549f265da61441f8dbe    Apache Flume 入门教程

    https://www.mtyun.com/library/how-to-install-flume-on-centos7    在 CentOS7 上安装 Flume

  • 相关阅读:
    分布式版本控制系统Git的安装与使用
    利用GitLab自动同步软件仓库
    MakerDAO 代币解释:DAI, WETH, PETH, SIN, MKR(一)
    数组 Major^
    String 类 Major^
    深度优先排序(数字全排列) Major^
    喊数字小游戏 Major^
    java数据类型 Major^
    ArrayList类的使用方法 Major^
    深度优先搜索(迷宫救人最短路径) Major^
  • 原文地址:https://www.cnblogs.com/piperck/p/11868452.html
Copyright © 2011-2022 走看看