zoukankan      html  css  js  c++  java
  • Flume 案例演示

    一 NetCat Source

    NetCat Source:采集端口数据

    创建配置文件netcat-flume-logger.conf

    # 给三大组件取名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # 配置 NetCat Source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # 配置 Logger Sink
    a1.sinks.k1.type = logger
    
    # 配置 Memory Channel
    a1.channels.c1.type = memory
    # channel最大容量为1000个event
    a1.channels.c1.capacity = 1000
    # 每次事务传输100个event
    a1.channels.c1.transactionCapacity = 100
    
    # 配置三大组件的绑定关系
    # 一个source可以通过多个channel传输
    a1.sources.r1.channels = c1
    # 一个sink只能接受唯一的channel传输的数据
    a1.sinks.k1.channel = c1

    启动flume,此时flume充当netcat服务器

    bin/flume-ng agent --conf conf/ --conf-file job/netcat-flume-logger.conf --name a1 -Dflume.root.logger=INFO,console

    启动命令也可以简写如下

    bin/flume-ng agent -c conf -f job/netcat-flume-logger.conf --n a1 -Dflume.root.logger=INFO,console

    发送端口消息

    [atguigu@hadoop102 ~]$ nc localhost 44444
    hello
    OK
    flume
    OK

    flume采集并打印消息到控制台

    2020-06-05 17:57:40,069 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
    2020-06-05 17:57:41,813 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 66 6C 75 6D 65                                  flume }

    二  Exec Source

    Exec Source:通过tail -F 命令,监控单个文件里的内容的变化。

    编写配置文件file-flume-hdfs.conf

    # 给三大组件取名
    a2.sources = r2
    a2.sinks = k2
    a2.channels = c2
    
    # 配置 Exec Source
    a2.sources.r2.type = exec
    a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
    
    # 配置 HDFS Sink
    a2.sinks.k2.type = hdfs
    #路径配合文件夹滚动使用,这里配的一小时一个文件夹。生产环境一般一天一个文件夹。
    a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
    #上传文件的前缀
    a2.sinks.k2.hdfs.filePrefix = logs-
    #是否按照时间滚动文件夹
    a2.sinks.k2.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a2.sinks.k2.hdfs.roundValue = 1
    #重新定义时间单位
    a2.sinks.k2.hdfs.roundUnit = hour
    #是否使用本地时间戳,各种按时间滚动的时间戳
    a2.sinks.k2.hdfs.useLocalTimeStamp = true
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a2.sinks.k2.hdfs.batchSize = 1000
    #设置文件类型,可支持压缩
    a2.sinks.k2.hdfs.fileType = DataStream
    #多久生成一个新的文件,单位秒
    a2.sinks.k2.hdfs.rollInterval = 30
    #设置每个文件的滚动大小,比块略小一点,单位byte
    a2.sinks.k2.hdfs.rollSize = 134217700
    #文件的滚动与 Event 数量无关
    a2.sinks.k2.hdfs.rollCount = 0
    
    # 配置 Memory Channel
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100
    
    # 配置三大组件的绑定关系
    a2.sources.r2.channels = c2
    a2.sinks.k2.channel = c2

    启动flume

    bin/flume-ng agent -c conf/ -f job/file-flume-hdfs.conf -n a2

    随便执行几条hive查询产生日志,查看hdfs上的文件

    三 Spooling Directory Source

    Spooling Directory Source:监控文件夹里的文件的变动,不能监控到文件内容的修改变化。文件夹到hdfs。

    编写配置文件dir-flume-hdfs.conf

    # 给三大组件取名
    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3
    
    # 配置 Spooling Directory Source
    a3.sources.r3.type = spooldir
    a3.sources.r3.spoolDir = /opt/module/flume/upload
    #忽略所有以.tmp 结尾的文件,不上传
    a3.sources.r3.ignorePattern = ([^ ]*.tmp)
    
    # 配置 HDFS Sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
    #上传文件的前缀
    a3.sinks.k3.hdfs.filePrefix = upload-
    #是否按照时间滚动文件夹
    a3.sinks.k3.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a3.sinks.k3.hdfs.roundValue = 1
    #重新定义时间单位
    a3.sinks.k3.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a3.sinks.k3.hdfs.useLocalTimeStamp = true
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a3.sinks.k3.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a3.sinks.k3.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a3.sinks.k3.hdfs.rollInterval = 60
    #设置每个文件的滚动大小大概是 128M
    a3.sinks.k3.hdfs.rollSize = 134217700
    #文件的滚动与 Event 数量无关
    a3.sinks.k3.hdfs.rollCount = 0
    
    # 配置 Memory Channel
    a3.channels.c3.type = memory
    a3.channels.c3.capacity = 1000
    a3.channels.c3.transactionCapacity = 100
    
    # 配置三大组件的绑定关系
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3

    启动flume

     bin/flume-ng agent -c conf/ -f job/dir-flume-hdfs.conf -n a3

    将事先准备的几个文件复制到upload文件夹,flume采集完成后,默认添加.COMPLETED后缀

    [atguigu@hadoop102 flume]$ cp 1.txt upload/
    [atguigu@hadoop102 flume]$ cp 2.txt upload/
    [atguigu@hadoop102 flume]$ ls -l upload/
    总用量 8
    -rw-rw-r-- 1 atguigu atguigu 26 6月   5 20:36 1.txt.COMPLETED
    -rw-rw-r-- 1 atguigu atguigu 26 6月   5 20:36 2.txt.COMPLETED

    查看上传到hdfs的文坚,略。

    四 Taildir Source

    实时监控目录下的多个追加文件
    创建配置文件files-flume-logger.conf
    # 给三大组件取名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # 配置 Taildir Source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1 f2
    a1.sources.r1.filegroups.f1 = /opt/module/flume/files/file1.txt
    a1.sources.r1.filegroups.f2 = /opt/module/flume/files/file2.txt
    a1.sources.r1.positionFile = /opt/module/flume/position/position.json
    
    # 配置 Logger Sink
    a1.sinks.k1.type = logger
    
    # 配置 Memory Channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 配置三大组件的绑定关系
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    启动flume

    bin/flume-ng agent -c conf/ -f job/files-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console

    分别往file1和file2里追加数据

    [atguigu@hadoop102 flume]$ echo atguigu >> files/file1.txt 
    [atguigu@hadoop102 flume]$ echo hello >> files/file2.txt

    可以看到flume都可以采集到

    2020-06-07 18:57:31,082 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 61 74 67 75 69 67 75                            atguigu }
    2020-06-07 18:57:46,320 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:283)] Opening file: /opt/module/flume/files/file2.txt, inode: 298173, pos: 10
    2020-06-07 18:57:46,322 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
    Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File 的格式如下:
    [{"inode":298172,"pos":60,"file":"/opt/module/flume/files/file1.txt"},{"inode":298173,"pos":16,"file":"/opt/module/flume/files/file2.txt"}]
    注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。
     
     
     
  • 相关阅读:
    Java 流(Stream)、文件(File)和IO
    归并排序(Java)
    TreeMap和TreeSet在排序时如何比较元素?Collections工具类中的sort()方法如何比较元素?
    动态规划—矩阵链乘法
    SQL Server 行列转换
    dropdownlist无刷新传值
    ASP.NET在主题中添加CSS文件
    asp.net网站后台退出后,点后退按钮仍能进,如何安全退出
    下拉框数据绑定两种方式
    win7安装IIS及将网站发布到IIS上
  • 原文地址:https://www.cnblogs.com/noyouth/p/13050397.html
Copyright © 2011-2022 走看看