zoukankan      html  css  js  c++  java
  • 【Flume学习之一】Flume简介

    环境

      apache-flume-1.6.0

    Flume是分布式日志收集系统。可以将应用产生的数据存储到任何集中存储器中,比如HDFS,HBase;同类工具:Facebook Scribe,Apache chukwa,淘宝Time Tunnel

    应用场景图

    一、Flume核心组件
    1、Event: 一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)
    Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。

    2、Agent
    Flume运行的核心是Agent。Flume以agent为最小的独立运行单位,一个agent就是一个JVM,它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。通过这些组件, Event 可以从一个地方流向另一个地方,如下图所示。

    3、Source
    Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中。 Flume提供了很多内置的Source, 支持 Avro, log4j, syslog 和 http post(body为json格式)。可以让应用程序同已有的Source直接打交道,如AvroSource,SyslogTcpSource。 如果内置的Source无法满足需要, Flume还支持自定义Source。

    4、Channel
    Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直到Sink处理完该事件。介绍两个较为常用的Channel, MemoryChannel和FileChannel。

    5、Sink
    Sink从Channel中取出事件,然后将数据发到别处,可以向文件系统、数据库、 hadoop、hbase存数据, 也可以是其他agent的Source。在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据。


    二、Flume 安装

    1、解压 apache-flume-1.6.0-bin.tar.gz 

    [root@node101 src]# tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /usr/local/

    2、配置jdk路径

    [root@node101 conf]# cd /usr/local/apache-flume-1.6.0-bin/conf && mv flume-env.sh.template flume-env.sh
    [root@node101 conf]# vi flume-env.sh
    export JAVA_HOME=/usr/local/jdk1.8.0_65

    注意:JAVA_OPTS 配置 如果我们传输文件过大 报内存溢出时 需要修改这个配置项

    3、配置环境变量

    [root@node101 apache-flume-1.6.0-bin]# vi /etc/profile

    [root@node101 bin]# source /etc/profile

    4、验证安装是否成功

    [root@node101 bin]# flume-ng version
    Flume 1.6.0
    Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
    Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
    Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
    From source with checksum b29e416802ce9ece3269d34233baf43f

    三、简单测试示例

    1、flume agent配置文件

    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
     

    Memory Chanel 配置:
    capacity:默认该通道中最大的可以存储的event数量是100,
    trasactionCapacity:每次最大可以source中拿到或者送到sink中的event数量也是100
    keep-alive:event添加到通道中或者移出的允许时间
    byte**:即event的字节量的限制,只包括eventbody

    2、启动flume
    flume-ng agent --conf /usr/local/apache-flume-1.6.0-bin/conf --conf-file /usr/local/apache-flume-1.6.0-bin/conf/option1 --name a1 -Dflume.root.logger=INFO,console

    注意:参数的数序不要打乱 否则启动失败或卡住
    --name  或 -n 指定agent的名字
    --conf 或 -c 指定配置目录
    --conf-file 或 -f 指定配置文件名字
    -Dflume.root.logger 指定flume日志显示的级别和输出到控制台

    [root@node101 conf]# flume-ng agent --conf /usr/local/apache-flume-1.6.0-bin/conf --conf-file /usr/local/apache-flume-1.6.0-bin/conf/option1 --name a1 -Dflume.root.logger=INFO,console
    Info: Sourcing environment configuration script /usr/local/apache-flume-1.6.0-bin/conf/flume-env.sh
    Info: Including Hive libraries found via () for Hive access
    + exec /usr/local/jdk1.7.0_80/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/usr/local/apache-flume-1.6.0-bin/conf:/usr/local/apache-flume-1.6.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file /usr/local/apache-flume-1.6.0-bin/conf/option1 --name a1
    2019-06-28 22:58:07,389 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
    2019-06-28 22:58:07,393 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:/usr/local/apache-flume-1.6.0-bin/conf/option1
    2019-06-28 22:58:07,409 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)] Added sinks: k1 Agent: a1
    2019-06-28 22:58:07,409 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:k1
    2019-06-28 22:58:07,409 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:k1
    2019-06-28 22:58:07,429 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)] Post-validation flume configuration contains configuration for agents: [a1]
    2019-06-28 22:58:07,429 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)] Creating channels
    2019-06-28 22:58:07,442 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory
    2019-06-28 22:58:07,450 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)] Created channel c1
    2019-06-28 22:58:07,452 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type netcat
    2019-06-28 22:58:07,482 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger
    2019-06-28 22:58:07,493 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)] Channel c1 connected to [r1, k1]
    2019-06-28 22:58:07,511 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@71ae13c0 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
    2019-06-28 22:58:07,528 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel c1
    2019-06-28 22:58:07,573 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
    2019-06-28 22:58:07,573 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: c1 started
    2019-06-28 22:58:07,576 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink k1
    2019-06-28 22:58:07,580 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1
    2019-06-28 22:58:07,585 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:150)] Source starting
    2019-06-28 22:58:07,616 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

    3、使用Telnet测试

    [root@node101 ~]# telnet localhost 44444
    Trying ::1...
    telnet: connect to address ::1: Connection refused
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    hello
    OK
    wjy
    OK
    haha
    OK
    ^]
    telnet> quit
    Connection closed.
    [root@node101 ~]# 

    flume控制台:

    2019-06-28 23:37:11,154 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 0D                               hello. }
    2019-06-28 23:37:15,561 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 77 6A 79 0D                                     wjy. }
    2019-06-28 23:37:17,930 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 61 68 61 0D                                  haha. }

    注意:先启动flume 44444 然后再telenet,否则Connection refused

    退出: 按组合键 ctrl+]  出现 

    ^] 

    telnet>

    再输入quit即可退出

    CentOS6.5安装telnet命令

    参考:

    Apache Flume使用手册

    Flume概念与原理

    Flume原理解析
    Flume构建日志采集系统

  • 相关阅读:
    时间相关概念
    如何查询Oracle EBS接口表
    生成随机数
    web.config/app.config敏感数据加/解密的二种方法
    浅谈.NET下的多线程和并行计算
    禁用浏览中的Tab键
    主从表关联,判断子表全部符合条件后主有才有效
    windows xp 中的administrator帐户不在用户登录内怎么解决?
    WebResourceCompression压缩模块以及.Net 程序脱离.net framework框架运行
    使用架构(XSD)验证XML文件
  • 原文地址:https://www.cnblogs.com/cac2020/p/11101006.html
Copyright © 2011-2022 走看看