zoukankan      html  css  js  c++  java
  • 【Flume学习之二】Flume 使用场景

    环境
      apache-flume-1.6.0


    一、多agent连接

    1、node101配置 option2

        # Name the components on this agent
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1
    
        # Describe/configure the source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = node101
        a1.sources.r1.port = 44444
    
        # Describe the sink
        # a1.sinks.k1.type = logger
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = node102
        a1.sinks.k1.port = 60000
    
        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100
    
        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    2、node102配置 option1

    ############################################################
        # Name the components on this agent
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1
    
        # Describe/configure the source
        a1.sources.r1.type = avro
        a1.sources.r1.bind = node102
        a1.sources.r1.port = 60000
    
        # Describe the sink
        a1.sinks.k1.type = logger
    
        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100
    
        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
        ############################################################

    3、启动顺序
    先启动node102-flume,后启动node101-flume,看一下flume启动顺序就知道,要先创建sink,然后创建channel,最后创建source;然后channel连接sink和channel;最后启动channel、sink、source

    [root@node102 conf]# flume-ng agent -c /usr/local/apache-flume-1.6.0-bin/conf -f /usr/local/apache-flume-1.6.0-bin/conf/option1 -n a1 -Dflume.root.logger=INFO,console
    [root@node101 conf]# flume-ng agent -c /usr/local/apache-flume-1.6.0-bin/conf -f /usr/local/apache-flume-1.6.0-bin/conf/option2 -n a1 -Dflume.root.logger=INFO,console

    4、测试:在node101 telnet测试,在node102查看输出日志
    node101 telnet:

    [root@node101 ~]# telnet node101 44444
    Trying 192.168.118.101...
    Connected to node101.
    Escape character is '^]'.
    hello world
    OK
    haha wjy
    OK
    hi xiaoming
    OK
    ^]
    telnet> quit
    Connection closed.
    [root@node101 ~]#

    node102 flume日志:

    2019-06-29 00:43:46,022 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 0D hello world. }
    2019-06-29 00:45:04,365 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 61 68 61 20 77 6A 79 0D haha wjy. }
    2019-06-29 00:45:13,713 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 69 20 78 69 61 6F 6D 69 6E 67 0D hi xiaoming. }

    二、Exec Source
    Source类型选择Exec
    1、配置 option3

    ############################################################
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1
    
        # Describe/configure the source
        a1.sources.r1.type = exec
        a1.sources.r1.command = tail -F /home/flume.exec.log
    
        # Describe the sink
        a1.sinks.k1.type = logger
        
        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100
    
        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
        ############################################################

    2、启动

    [root@node101 conf]# flume-ng agent -c /usr/local/apache-flume-1.6.0-bin/conf -f /usr/local/apache-flume-1.6.0-bin/conf/option3 -n a1 -Dflume.root.logger=INFO,console

    3、测试

    [root@node101 home]# echo "wjy" >> flume.exec.log
    [root@node101 home]# echo "hi" >> flume.exec.log
    [root@node101 home]# echo "hello wjy" >> flume.exec.log

    flume输出:

    2019-06-29 01:23:28,237 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 77 6A 79 wjy }
    2019-06-29 01:23:43,333 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 69 hi }
    2019-06-29 01:23:58,652 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6A 79 hello wjy }

    三、Spooling Directory Source

    监测配置的目录下新增的文件,并将文件中的数据读取出来:
    1)拷贝到spool目录下的文件不可以再打开编辑;
    2) spool目录下不可包含相应的子目录;

    1、配置

        ############################################################
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1
    
        # Describe/configure the source
        a1.sources.r1.type = spooldir
        a1.sources.r1.spoolDir = /home/logs
        a1.sources.r1.fileHeader = true
    
        # Describe the sink
        a1.sinks.k1.type = logger
    
        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100
    
        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
        ############################################################

    2、启动

    [root@node101 conf]# flume-ng agent -c /usr/local/apache-flume-1.6.0-bin/conf -f /usr/local/apache-flume-1.6.0-bin/conf/option4 -n a1 -Dflume.root.logger=INFO,console

    3、测试

    日志目录:/home/logs

    [root@node101 home]# cat flume.exec.log
    hello
    hello
    hello
    wjy
    hi
    hello wjy
    [root@node101 home]# mkdir logs && mv flume.exec.log ./logs && cd logs && ls
    flume.exec.log.COMPLETED

    flume输出:

    2019-06-29 01:35:28,281 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /home/logs/flume.exec.log to /home/logs/flume.exec.log.COMPLETED
    2019-06-29 01:35:28,282 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{file=/home/logs/flume.exec.log} body: 68 65 6C 6C 6F                                  hello }
    2019-06-29 01:35:28,282 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{file=/home/logs/flume.exec.log} body: 68 65 6C 6C 6F                                  hello }
    2019-06-29 01:35:28,282 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{file=/home/logs/flume.exec.log} body: 68 65 6C 6C 6F                                  hello }
    2019-06-29 01:35:28,283 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{file=/home/logs/flume.exec.log} body: 77 6A 79                                        wjy }
    2019-06-29 01:35:28,283 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{file=/home/logs/flume.exec.log} body: 68 69                                           hi }
    2019-06-29 01:35:28,283 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{file=/home/logs/flume.exec.log} body: 68 65 6C 6C 6F 20 77 6A 79                      hello wjy }

    四、日志输出到HDFS

    1、配置

    ############################################################
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1
    
        # Describe/configure the source
        a1.sources.r1.type = spooldir
        a1.sources.r1.spoolDir = /home/logs
        a1.sources.r1.fileHeader = true
    
        # Describe the sink
        ***只修改上一个spool sink的配置代码块 a1.sinks.k1.type = logger
        a1.sinks.k1.type=hdfs
        a1.sinks.k1.hdfs.path=hdfs://node101:8020/flume/%Y-%m-%d/%H%M
        
        ##每隔60s或者文件大小超过10M的时候产生新文件
        # hdfs有多少条消息时新建文件,0不基于消息个数
        a1.sinks.k1.hdfs.rollCount=0
        # hdfs创建多长时间新建文件,0不基于时间
        a1.sinks.k1.hdfs.rollInterval=60
        # hdfs多大时新建文件,0不基于文件大小
        a1.sinks.k1.hdfs.rollSize=10240
        # 当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件
        a1.sinks.k1.hdfs.idleTimeout=3
        
        a1.sinks.k1.hdfs.fileType=DataStream
        a1.sinks.k1.hdfs.useLocalTimeStamp=true
        
        ## 每五分钟生成一个目录:
        # 是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响除了%t的其他所有时间表达式
        a1.sinks.k1.hdfs.round=true
        # 时间上进行“舍弃”的值;
        a1.sinks.k1.hdfs.roundValue=5
        # 时间上进行”舍弃”的单位,包含:second,minute,hour
        a1.sinks.k1.hdfs.roundUnit=minute
    
        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100
    
        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
        ############################################################

    创建HDFS目录

    [root@node101 conf]# hdfs dfs -mkdir /flume

    2、启动

    [root@node101 conf]# flume-ng agent -c /usr/local/apache-flume-1.6.0-bin/conf -f /usr/local/apache-flume-1.6.0-bin/conf/option5 -n a1 -Dflume.root.logger=INFO,console

    3、测试

    制造测试数据:

    [root@node101 home]# echo "hello wjy" >> test.log
    [root@node101 home]# echo "hello xiaoming" >> test.log
    [root@node101 home]# echo "hi xiaowang" >> test.log
    [root@node101 home]# cp test.log ./logs

    flume执行日志:

    2019-07-01 18:48:18,213 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
    2019-07-01 18:48:18,213 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /home/logs/test.log to /home/logs/test.log.COMPLETED
    2019-07-01 18:48:20,197 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:58)] Serializer = TEXT, UseRawLocalFileSystem = false
    2019-07-01 18:48:20,522 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:234)] Creating hdfs://node101:8020/flume/2019-07-01/1845/FlumeData.1561978100198.tmp
    2019-07-01 18:48:28,285 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:429)] Closing idle bucketWriter hdfs://node101:8020/flume/2019-07-01/1845/FlumeData.1561978100198.tmp at 1561978108285
    2019-07-01 18:48:28,286 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:363)] Closing hdfs://node101:8020/flume/2019-07-01/1845/FlumeData.1561978100198.tmp
    2019-07-01 18:48:28,331 (hdfs-k1-call-runner-6) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:629)] Renaming hdfs://node101:8020/flume/2019-07-01/1845/FlumeData.1561978100198.tmp to hdfs://node101:8020/flume/2019-07-01/1845/FlumeData.1561978100198
    2019-07-01 18:48:28,357 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:394)] Writer callback called.

    hdfs文件:


    五、其他:
    多路日志合并

    多路日志输出

    Apache Flume使用手册

    Flume概念与原理

    Flume(一)Flume原理解析

    Flume构建日志采集系统

  • 相关阅读:
    MySQL企业常用集群图解
    MySQL常见错误类型
    MySQL 数据库增量数据恢复案例
    异地备份同步校验脚本
    python 自动化之路 day 面向对象基础
    Docker 入门
    awk知识点总结
    Linux文件管理类命令及命令别名
    Linux重定向
    Linux进程管理
  • 原文地址:https://www.cnblogs.com/cac2020/p/11103615.html
Copyright © 2011-2022 走看看