zoukankan      html  css  js  c++  java
  • Flume使用(案例分析)

    Flume官方文档

    Usage: bin/flume-ng <command> [options]...
    
    commands:
      help                      display this help text
      agent                     run a Flume agent
    
    global options:
      --conf,-c <conf>          use configs in <conf> directory
      -Dproperty=value          sets a Java system property value
    
    agent options:
      --name,-n <name>          the name of this agent (required)
      --conf-file,-f <file>     specify a config file (required if -z missing)
    
    eg:
    bin/flume-ng agent --conf conf --name agent-test --conf-file test.conf -Dflume.root.logger=DEBUG,console
    bin/flume-ng agent -c conf -n agent-test -f test.conf -Dflume.root.logger=DEBUG,console
    

    一个不能再简单的例子

    1.编辑 Conf 范例 (官网和 conf 目录下都有)

    # example.conf: A single-node Flume configuration
    
    # 1.定义三个组件的名称
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # 2.配置Source(从哪里连接Sources)
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = cen-ubuntu
    a1.sources.r1.port = 44444
    
    # 3.配置Sink(主要用于输出日志信息)
    # Describe the sink
    a1.sinks.k1.type = logger
    a1.sinks.k1.maxBytesToLog = 1024
    
    # 4.配置Channel(使用存储当做管道)
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 5.绑定三个组件
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    2.安装 netcat (一个可以传输文件,信息的网络工具)来发送接收信息

    $ sudo apt-get install netcat
    

    3.运行实时 flume 实时抓取数据(监控 端口 )

    bin/flume-ng agent --conf conf --name a1 --conf-file conf/a1.conf -Dflume.root.logger=DEBUG,console
    

    4.通过 shell 查看端口是否开启成功

    netstat -tnlp
    

    5.通过 telnet 向该端口发送数据

    telnet cen-ubuntu 44444
    

    6.若Flume接收到数据则表示成功

    Event: { headers:{} body: 6E 69 68 61 6F 20 08 0D                         nihao .. }
    

    各种各样的 Sources

    Exec Source 通过执行命令行

    a1.sources = r1
    a1.channels = c1
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /var/log/secure
    

    Spooling Directory Source 监控一个目录的文件变化

    Kafka Source

    Syslog Sources 收集系统日志

    HTTP Source 通过HTTP协议供互联网下载服务器的数据

    NetCat Source

    各种各样的Channels

    Memory Channel

    Kafka Channel

    File Channel 存在文件中

    各种各样的Sinks

    HDFS Sink

    Hive Sink

    HBase Siinks(HBase Sink ; AsyncHBaseSink)

    MorphlineSolrSink 一个ELT工具(Extract, transform, load)

    ElasticSearchSink 一个基于Lucene的搜索服务器



    案例1:

    收集Hive运行的目录到hdfs文件系统

    分析:使用 Exec 来监控文件实时性较高,但可靠性较差,当系统命令中断后,数据丢失,或重新读取,数据安全性无法得到保障,生产环境中不能使用;使用文件缓存比内存来得更安全

    • Source: Exec Source
      tail -f /opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
    • Channel: Memory Channel
    • Sink: HDFS Sink
      /user/cen/flume/hive-log

    1.编写 agent 程序

    # example.conf: A single-node Flume configuration
    
    # 1.定义三个组件的名称
    # Name the components on this agent
    a2.sources = r2
    a2.sinks = k2
    a2.channels = c2
    
    # 2.配置Source(从哪里连接Sources)
    # Describe/configure the source
    a2.sources.r2.type = exec
    a2.sources.r2.command = tail -F /opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
    
    # 3.配置Sink(主要用于输出日志信息)
    # Describe the sink
    a2.sinks.k2.type = hdfs
    # 非高可用的 namenode 指定 host (注1,注2)
    a2.sinks.k2.hdfs.path = hdfs://cen-ubuntu:8020/user/cen/flume/hive-log
    # 设置前缀
    a2.sinks.k2.hdfs.filePrefix = events-
    # 数据格式(不压缩的文本数据)
    a2.sinks.k2.hdfs.fileType = DataStream 
    # 存储格式
    a2.sinks.k2.hdfs.writeFormat = Text
    # 每次写的event数
    a2.sinks.k2.hdfs.batchSize = 100
    # 设置文件滚动的参数(配合下面一项使用)
    a2.sinks.k2.hdfs.rollInterval = 0
    a2.sinks.k2.hdfs.rollSize = 1024
    a2.sinks.k2.hdfs.rollCount = 0
    # 参考http://doc.okbase.net/chiweitree/archive/126197.html
    a2.sinks.k2.hdfs.minBlockReplicas=1
    
    # 4.配置Channel(使用存储当做管道)
    # Use a channel which buffers events in memory
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100
    
    # 5.绑定三个组件
    # Bind the source and sink to the channel
    a2.sources.r2.channels = c2
    a2.sinks.k2.channel = c2
    

    2.添加相应的jar依赖包(使用 find /dir/dir -name 'filename' 即可轻松找到)

    commons-configuration-1.6.jar
    hadoop-common-2.5.0-cdh5.3.6.jar
    hadoop-auth-2.5.0-cdh5.3.6.jar
    hadoop-hdfs-2.5.0-cdh5.3.6.jar
    

    3.执行

    bin/flume-ng agent --conf conf --name a2 --conf-file conf/flume-tail.conf -Dflume.root.logger=DEBUG,console
    


    案例二:

    • 收集Hive运行的目录到hdfs文件系统

    • Source: Spooling Directory Source
      /opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/logs/
    • Channel: File Channel
    • Sink: HDFS Sink
      /user/cen/flume/hive-log
      分析:Spooling Directory Source 通过监控文件夹的新增文件来实现日志信息收集。实际生产环境结合 log4j 来使用,日志文件传输完成后会修改其后缀名,添加.COMPLETED 后缀

    1.编写 agent 程序

    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3
    
    # Describe/configure the source
    a3.sources.r3.type = spooldir
    a3.sources.r3.spoolDir = /opt/datas/flume/
    a3.sources.r3.ignorePattern = (.)*.log$
    # 监控后的文件后缀
    a3.sources.r3.fileSuffix = .deleteable
    
    # Describe the sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = hdfs://cen-ubuntu:8020/user/cen/flume/spool-file-hdfs/%Y%m%d
    a3.sinks.k3.hdfs.useLocalTimeStamp = true
    a3.sinks.k3.hdfs.filePrefix = events-
    a3.sinks.k3.hdfs.fileType = DataStream 
    a3.sinks.k3.hdfs.writeFormat = Text
    a3.sinks.k3.hdfs.batchSize = 10
    
    # Use a channel which buffers events in file
    a3.channels.c3.type = file
    # 临时文件存储目录(可选)
    a3.channels.c3.checkpointDir = /opt/cdh5.3.6/flume-1.5.0-cdh5.3.6/data/filechanel/cheakpoint
    a3.channels.c3.dataDirs = /opt/cdh5.3.6/flume-1.5.0-cdh5.3.6/data/filechanel/data
    
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3
    

    2.执行

    bin/flume-ng agent --conf conf --name a3 --conf-file conf/spooling-file-hdfs.conf -Dflume.root.logger=DEBUG,console
    

    3.运行结果

    • 被读取过的文件从背上了.delectable 的罪名
    • .log 结尾的文件不会被读取
    • HDFS文件系统如实出现了被读取的文件,且按日期分文件夹存储

    注1:HDFS 的 HA 配置

    1.添加配置文件 hdfs-site.xml core-site.xml 到目录 conf 下

    2.修改 hdfs 的路径

    # 若 namenode 为HA 
    # a2.sinks.k2.hdfs.path = hdfs://ns1/user/cen/flume/hive-log
    

    注2:特别的,可以设置一定规则(如按时间%Y%m%d)来创建文件目录,详情见官方文档

    # 如官方文档所说明,关于时间有关的参数需要在 events 的头中加入服务器的时间这个字段,添加参数如下
    hdfs.useLocalTimeStamp = true
    

    注3:使用文件

    /bin/sqoop --options-file /opt/datas/filename
  • 相关阅读:
    SDUT 1488 数据结构实验:连通分量个数
    SDUT 3364 数据结构实验之图论八:欧拉回路
    SDUT 2413 n a^o7 !
    SDUT 3363 数据结构实验之图论七:驴友计划
    SDUT 3362 数据结构实验之图论六:村村通公路
    SDUT 2139 数据结构实验之图论五:从起始点到目标点的最短步数(BFS)
    POJ 3278 Catch That Cow
    SDUT 3361 数据结构实验之图论四:迷宫探索
    SDUT 2107 数据结构实验之图论二:图的深度遍历
    SDUT 2142 数据结构实验之图论二:基于邻接表的广度优先搜索遍历
  • 原文地址:https://www.cnblogs.com/cenzhongman/p/7226518.html
Copyright © 2011-2022 走看看