zoukankan      html  css  js  c++  java
  • flume

    1. Flume介绍

    1.1 Flume

     

    FlumeCloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

    1.2 系统功能

    1.2.1 日志收集

    Flume最早是Cloudera提供的日志收集系统,目前是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据。

    恒生数据接收中间件---file.txt  哪个端口进行监控 --- 数据监控接收数据----内存存储本地硬盘

    Flume—对哪个ip  哪个端口进行监控 --- 数据监控接收数据----内存存储本地硬盘

    1.2.2 数据处理

    Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。 Flume提供了从Console(控制台)、RPCThrift-RPC)、Text(文件)、TailUNIX tail)、SyslogSyslog日志系统,支持TCPUDP2种模式),exec(命令执行)等数据源上收集数据的能力。

    2. Flume原理

    2.1 版本

    2.1.1 Flume OG

     

    Flume OG架构

    Flume逻辑上分三层架构:AgentCollectorStorage

    Flume OG采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。Flume Master间使用gossip协议同步数据。

    FLUM OG 的特点是:

    • FLUM OG 有三种角色的节点:代理节点(agent)、收集节点(collector)、主节点(master)。
    • agent 从各个数据源收集日志数据,将收集到的数据集中到 Collector,然后由收集节点汇总存入 HDFSmaster 负责管理 agentcollector 的活动。
    • agentcollector 都称为 nodenode 的角色根据配置的不同分为 logical node(逻辑节点)、physical node(物理节点)。
    • agentcollector sourcesink 组成,代表在当前节点数据是从 source 传送到 sink

    2.1.2 Flume NG 

     

    Flume NG架构

    Flume NG最明显的改动就是取消了集中管理配置的 Master Zookeeper,变为一个纯粹的传输工具。Flume NG另一个主要的不同点是读入数据和写出数据现在由不同的工作线程处理(称为Runner)。在 Flume NG 中,读入线程同样做写出工作(除了故障重试)。如果写出慢的话(不是完全失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。

    FLUME NG 的特点是:

    • NG 只有一种角色的节点:代理节点(agent)。
    • 没有 collectormaster 节点这是核心组件最核心的变化。
    • 去除了 physical nodeslogical nodes 的概念和相关内容。
    • agent 节点的组成也发生了变化。Flume NG的 agent sourcesinkChannel 组成。

    2.2 组件

    2.2.1 Agent

    FlumeAgent为最小的独立运行单位。AgentFlume中产生数据流的地方,一个Agent就是一个JVM。单AgentSourceSinkChannel三大组件构成,如下图:

     

    • Source:完成对日志数据的收集,分成 transtion event 打入到Channel之中。
    • Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。 
    • Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。 

    对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。 

    ² Source

    flume有许多类型的Source,见官网用户手册:

    http://flume.apache.org/FlumeUserGuide.html#flume-sources

    简单的归纳如下:

    Source类型

    说明

    Avro Source

    支持Avro协议(实际上是Avro RPC),提供一个Avro的接口,需要往设置的地址和端口发送Avro消息,Source就能接收到,如:Log4j Appender通过Avro Source将消息发送到Agent

    Thrift Source

    支持Thrift协议,提供一个Thrift接口,类似Avro

    Exec Source

    Source启动的时候会运行一个设置的UNIX命令(比如 cat file),该命令会不断地往标准输出(stdout)输出数据,这些数据就会被打包成Event,进行处理

    JMS Source

    JMS系统(消息、主题)中读取数据,类似ActiveMQ

    Spooling Directory Source

    监听某个目录,该目录有新文件出现时,把文件的内容打包成Event,进行处理

    Netcat Source

    监控某个端口,将流经端口的每一个文本行数据作为Event输入

    Sequence Generator Source

    序列生成器数据源,生产序列数据

    Syslog Sources

    读取syslog数据,产生Event,支持UDPTCP两种协议

    HTTP Source

    基于HTTP POSTGET方式的数据源,支持JSONBLOB表示形式

    Legacy Sources

    兼容老的Flume OGSource0.9.x版本)

    自定义Source

    使用者通过实现Flume提供的接口来定制满足需求的Source

    对于直接读取文件Source, 主要有两种方式: 

    ü Exec source

    可通过写Unix command的方式组织数据,最常用的就是tail -F [file]

    可以实现实时传输,但在flume不运行和脚本错误时,会丢数据,也不支持断点续传功能。因为没有记录上次文件读到的位置,从而没办法知道,下次再读时,从什么地方开始读。特别是在日志文件一直在增加的时候。flumesource挂了。等flumesource再次开启的这段时间内,增加的日志内容,就没办法被source读取到了。不过flume有一个execStream的扩展,可以自己写一个监控日志增加情况,把增加的日志,通过自己写的工具把增加的内容,传送给flumenode。再传送给sinknode。要是能在tail类的source中能支持,在node挂掉这段时间的内容,等下次node开启后在继续传送,那就更完美了。

    ü Spooling Directory Source

    SpoolSource:是监测配置的目录下新增的文件,并将文件中的数据读取出来,可实现准实时。需要注意两点:

    1、拷贝到spool目录下的文件不可以再打开编辑。

    2、spool目录下不可包含相应的子目录。在实际使用的过程中,可以结合log4j使用,使用log4j的时候,将log4j的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录。log4j有一个TimeRolling的插件,可以把log4j分割的文件到spool目录。基本实现了实时的监控。Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED(后缀也可以在配置文件中灵活指定)

    注:ExecSourceSpoolSource对比

    ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法何证日志数据的完整性。SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用。

    ² Channel

    当前有几个 Channel 可供选择,分别是 Memory Channel, JDBC Channel , File ChannelPsuedo Transaction Channel。比较常见的是前三种 Channel

    v Memory Channel 可以实现高速的吞吐,但是无法保证数据的完整性。

    v Memory Recover Channel 在官方文档的建议上已经建义使用File Channel来替换。

    v File Channel保证数据的完整性与一致性。在具体配置File Channel时,建议File Channel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。

    File Channel 是一个持久化的隧道(Channel),它持久化所有的事件,并将其存储到磁盘中。因此,即使 Java 虚拟机当掉,或者操作系统崩溃或重启,再或者事件没有在管道中成功地传递到下一个代理(agent),这一切都不会造成数据丢失。Memory Channel 是一个不稳定的隧道,其原因是由于它在内存中存储所有事件。如果 Java 进程死掉,任何存储在内存的事件将会丢失。另外,内存的空间收到 RAM大小的限制,File Channel 这方面是它的优势,只要磁盘空间足够,它就可以将所有事件数据存储到磁盘上。

    Flume Channel 支持的类型:

    Channel类型

    说明

    Memory Channel

    Event数据存储在内存中

    JDBC Channel

    Event数据存储在持久化存储中,当前Flume Channel内置支持Derby

    File Channel

    Event数据存储在磁盘文件中

    Spillable Memory Channel

    Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用)

    Pseudo Transaction Channel

    测试用途

    Custom Channel

    自定义Channel实现  

    ² Sink

    Sink在设置存储数据时,可以向文件系统中,数据库中,Hadoop中储数据,在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。 

    Flume Sink支持的类型

    Sink类型

    说明

    HDFS Sink

    数据写入HDFS

    Logger Sink

    数据写入日志文件

    Avro Sink

    数据被转换成Avro Event,然后发送到配置的RPC端口上

    Thrift Sink

    数据被转换成Thrift Event,然后发送到配置的RPC端口上

    IRC Sink

    数据在IRC上进行回放

    File Roll Sink

    存储数据到本地文件系统

    Null Sink

    丢弃到所有数据

    HBase Sink

    数据写入HBase数据库 

    Morphline Solr Sink

    数据发送到Solr搜索服务器(集群)

    ElasticSearch Sink

    数据发送到Elastic Search搜索服务器(集群)

    Kite Dataset Sink

    写数据到Kite Dataset,试验性质的

    Custom Sink

    自定义Sink实现

    Flume提供了大量内置的SourceChannelSink类型。不同类型的Source,ChannelSink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个Agent可以协同工作,并且支持Fan-inFan-outContextual RoutingBackup Routes如下图所示:

     

    2.2.2 Collector

    Flume NG中已经没有Collector的概念了,Collector的作用是将多个Agent的数据汇总后,加载到Storage中。

    2.2.3 Storage

    Storage是存储系统,可以是一个普通File,也可以是HDFSHIVEHBase等。

    2.2.4 Master

    针对于OG版本。

    Master是管理协调AgentCollector的配置等信息,是Flume集群的控制器。

    Flume中,最重要的抽象是data flow(数据流),data flow描述了数据从产生,传输、处理并最终写入目标的一条路径。

     

    对于Agent数据流配置就是从哪得到数据,把数据发送到哪个Collector

    对于Collector是接收Agent发过来的数据,把数据发送到指定的目标机器上。

    2.3 特性

    (1) 可靠性

    当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:

    • end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。)
    • Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送)
    • Best effort(数据发送到接收方后,不会进行确认)。

    (2) 可扩展性

    Flume采用了三层架构,分别为agentcollectorstorage,每一层均可以水平扩展。其中,所有agentcollectormaster统一管理,这使得系统容易监控和维护,且master允许有多个(使用ZooKeeper进行管理和负载均衡),这就避免了单点故障问题。

    (3) 可管理性

    所有agentcolletormaster统一管理,这使得系统便于维护。多master情况,Flume利用ZooKeepergossip,保证动态配置数据的一致性。用户可以在master上查看各个数据源或者数据流执行情况,且可以对各个数据源配置和动态加载。Flume提供了web shell script command两种形式对数据流进行管理。

    (4) 功能可扩展性

    用户可以根据需要添加自己的agentcollector或者storage。此外,Flume自带了很多组件,包括各种agentfilesyslog等),collectorstoragefileHDFS等)。

    小结:

    Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFSHbase)的能力。

    Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据字节数组形式并且携带有头信息,这些EventAgent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source

    当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:

    • end-to-end:收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送
    • Store on failure:这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送
    • Best effort:数据发送到接收方后,不会进行确认
    • Flume的官方网站

    3. Flume安装

    3.1 基础知识

    http://flume.apache.org/

    • 下载Flume 1.6.0版本的tar

    http://www.apache.org/dyn/closer.cgi/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz

    • 集群每台机器上安装Flume

    操作系统版本:CentOS 6.5

    Hadoop版本:2.7.1

    JDK版本:1.7.0_45 (设置环境变量)

    安装Flume版本:apache-flume-1.6.0-bin

    3.2 安装流程

    3.2.1 下载tar

    下载Flume最新版本,现在服务器上安装的是apache-flume-1.6.0-bin.tar.gz的版本,下载地址是http://www.apache.org/dyn/closer.cgi/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz

    Flume的安装包放Linux系统中的/home/bigdata目录下。

    3.2.2 解压tar

    • 解压Flume安装包到/hadoop文件夹下

    # > tar -xvf apache-flume-1.6.0-bin.tar.gz

    • 将文件夹apache-flume-1.6.0-bin重命名为flume

    # > mv apache-flume-1.6.0-bin flume

    3.2.3 修改系统环境变量

    • 修改/etc/profile文件,加入如下内容

    export FLUME_HOME=/home/bigdata/flume

    export PATH=$PATH:$FLUME_HOME/bin:

    # > vim /etc/profile

     

    # > source /etc/profile

    • 验证安装及其他

    安装完毕后,在控制台运行如下命令

    # > flume-ng version

    会看到以下输出:

     

    3.2.4 修改Flume配置文件

    修改 flume-env.sh 配置文件,主要是JAVA_HOME变量设置

    Flume的配置文件位置:$FLUME_HOME/conf

    # > /home/bigdata/flume/conf

    # > cp flume-env.sh.template flume-env.sh

    # > vim flume-env.sh

     

    3.2.5 案例

    1) Avro

    Avro可以发送一个给定的文件给FlumeAvro 源使用AVRO RPC机制

    • 创建agent配置文件

    # > vi /home/bigdata/flume/conf/avro.conf

    添加以下内容:

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe configure the source

    a1.sources.r1.type = avro

    a1.sources.r1.bind = 0.0.0.0

    a1.sources.r1.port = 4141

    # Describe the sink

    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

     

    对以上内容解释:

    指定名称:a1是我们要启动的Agent名字

    a1.sources = r1 命名Agentsourcesr1

    a1.sinks = k1 命名Agentsinksk1

    a1.channels = c1 命名Agentchannels c1

    # Describe configure the source

    a1.sources.r1.type = avro 指定r1的类型为AVRO

    a1.sources.r1.bind = 0.0.0.0  SourceIP地址绑定(这里指本机)

    a1.sources.r1.port = 4141 指定通讯端口为4141

    # Describe the sink

    a1.sinks.k1.type = logger 指定k1的类型为Logger(不产生实体文件,只在控制台显示)

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    指定Channel的类型为Memory

    设置Channel的最大存储event数量为1000

    每次最大可以source中拿到或者送到sink中的event数量也是100

    这里还可以设置Channel的其他属性:

    a1.channels.c1.keep-alive=1000 event添加到通道中或者移出的允许时间(秒)

    a1.channels.c1.byteCapacity = 800000 event的字节量的限制,只包括eventbody

    a1.channels.c1.byteCapacityBufferPercentage = 20

    event的缓存比例为20%80000020%),即event的最大字节量为800000*120%

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    sourcesink分别与Channel c1绑定

    • 启动flume agent a1

    # > flume-ng agent -c . -f /home/bigdata/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

    -c:使用配置文件所在目录(这里指默认路径,即$FLUME_HOME/conf

    -fflume定义组件的配置文件 

    -n:启动Agent的名称,该名称在组件配置文件中定义 

    -Dflume.root.loggerflume自身运行状态的日志,按需配置,详细信息,控制台打印

     

    • 创建指定文件

    # > echo "hello world" > /home/data/log.00

    • 使用avro-client发送文件

    # > flume-ng avro-client -c . -H hadoop01 -p 4141 -F /home/data/log.00

    -H:指定主机

    -p:指定端口

    -F:制定要发送的文件

    • a1的控制台,可以看到以下信息,注意最后一行:

    注:Flume框架对Hadoopzookeeper的依赖只是在jar包上,并不要求flume启动时必须将Hadoopzookeeper服务也启动。

    2) Exec

    • 创建agent配置文件

    # > vi /home/bigdata/flume/conf/exec_tail.conf

    添加以下内容:

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = exec

    a1.sources.r1.command = tail -F /home/data/log_exec_tail

    # Describe the sink

    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    • 启动flume agent a1

    # > flume-ng agent -c . -f /home/bigdata/flume/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console

     

    • 制作log_exec_tail文件

    # > echo "exec tail 1" >> /home/data/log_exec_tail

    • master的控制台,可以看到以下信息:
    • 向log_exec_tail文件中追加数据

    # > echo "exec tail 2" >> /hadoop/flume/log_exec_tail

    • master的控制台,可以看到以下信息:

    # for i in {1..100}

    > do echo "flume +" $i >> /home/data/log_exec_tail

    > done

    3) Spool

    Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:

    ² 拷贝到spool目录下的文件不可以再打开编辑。

    ² spool目录下不可包含相应的子目录

    • 创建agent配置文件
    • 缺点:指定文件下不能有重名(kk.log  kk.log. COMPLETED),会报错 是的flume死掉

    # > vi /home/bigdata/flume/conf/spool.conf

    添加以下内容:

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe configure the source

    a1.sources.r1.type = spooldir

    a1.sources.r1.spoolDir = /home/data/logs

    a1.sources.r1.fileHeader = true

    # Describe the sink

    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    • 启动flume agent a1

    创建/home/data/logs文件夹

    # > mkdir / home/data /logs

    # > flume-ng agent -c . -f /home/bigdata/flume/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console

     

    • 追加文件到/hadoop/flume/logs目录

    # > echo "spool test1" > /home/data/logs/spool_text.log

    • a1的控制台,可以看到以下相关信息:

     

    Spool2—自定义后缀

    #a1.sources.r1.fileHeaderKey = QQ.com

    a1.sources.r1.fileSuffix = .QQ.com

    4)Syslogtcp

    Syslogtcp监听TCP的端口做为数据源

    • 创建agent配置文件

    # > vi /home/bigdata/flume/conf/syslog_tcp.conf

    添加以下内容:

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = syslogtcp

    a1.sources.r1.port = 5140

    a1.sources.r1.host = localhost

    # Describe the sink

    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    • 启动flume agent a1

    # > flume-ng agent -c . -f /home/bigdata/flume/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console

    • 测试产生syslog
    • 需要安装nc

    Rpm –ivh nc-1.84-22.el6.x86_64

    # > echo "hello idoall.org syslog" | nc localhost 5140

    • master的控制台,可以看到以下信息:

     

     

    5)JSONHandler

    • 创建agent配置文件

    # > vi /home/bigdata/flume/conf/post_json.conf

    添加如下内容:

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = org.apache.flume.source.http.HTTPSource

    a1.sources.r1.port = 8888

    # Describe the sink

    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    • 启动flume agent a1

    # > flume-ng agent -c . -f /home/bigdata/flume/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console

    • 生成JSON 格式的POST request

    # > curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "idoall.org_body"}]' http://localhost:8888

    • master的控制台,可以看到以下信息:

     

     

    6)HDFS sink

    • 创建agent配置文件

    # > vi /home/bigdata/flume/conf/hdfs_sink.conf

    添加以下内容:

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = syslogtcp

    a1.sources.r1.port = 5140

    a1.sources.r1.host = localhost

    # Describe the sink

    a1.sinks.k1.type = hdfs

    a1.sinks.k1.hdfs.path = hdfs:// zookeepertest01:8020/user/flume/syslogtcp

    a1.sinks.k1.hdfs.filePrefix = Syslog

    a1.sinks.k1.hdfs.round = true

    a1.sinks.k1.hdfs.roundValue = 1

    a1.sinks.k1.hdfs.roundUnit = minute

    a1.sinks.k1.hdfs.fileType=DataStream

    a1.sinks.k1.hdfs.writeFormat=Text

    a1.sinks.k1.hdfs.rollInterval=0

    a1.sinks.k1.hdfs.rollSize=10240

    a1.sinks.k1.hdfs.rollCount=0

    a1.sinks.k1.hdfs.idleTimeout=60

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    补充:

    • 启动flume agent a1

    # > flume-ng agent -c . -f /home/bigdata/flume/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console

     

    • 测试产生syslog

    # > echo "hello idoall flume -> hadoop testing one" | nc localhost 5140

    • master的控制台,可以看到以下信息:

     

    • master上再打开一个窗口,去Hadoop上检查文件是否生成

    # > hadoop fs -ls /user/flume/syslogtcp

     

    # > hadoop fs -cat /user/flume/syslogtcp/Syslog.1407644509504

     

    #for i in {1..30}; do echo “Flume  +”$i |nc localhost 5140;done

    7)hdfs sink 按照日期创建

    vi conf/hdfsDate.conf

    #定义agent名, sourcechannelsink的名称

    a5.sources = source1

    a5.channels = channel1

    a5.sinks = sink1

    #配置source

    a5.sources.source1.type = spooldir

    a5.sources.source1.spoolDir = /home/data/beicai

    a5.sources.source1.channels = channel1

    a5.sources.source1.fileHeader = false

    a5.sources.source1.interceptors = i1

    a5.sources.source1.interceptors.i1.type = timestamp

    #配置sink

    a5.sinks.sink1.type = hdfs

    a5.sinks.sink1.hdfs.path = hdfs://192.168.10.11:9000/usr/beicai

    a5.sinks.sink1.hdfs.fileType = DataStream

    a5.sinks.sink1.hdfs.writeFormat = TEXT

    a5.sinks.sink1.hdfs.rollInterval = 1

    a5.sinks.sink1.channel = channel1

    a5.sinks.sink1.hdfs.filePrefix = %Y-%m-%d

    #配置channel

    a5.channels.channel1.type = memory

    #flume-ng agent -n a5 -c conf -f conf/hdfsDate.conf -Dflume.root.logger=DEBUG,console

    8)File Roll Sink

    • 创建agent配置文件

    # > vi /home/bigdata/flume/conf/file_roll.conf

    添加以下内容:

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = syslogtcp

    a1.sources.r1.port = 5555

    a1.sources.r1.host = localhost

    # Describe the sink

    a1.sinks.k1.type = file_roll

    a1.sinks.k1.sink.directory = /home/data/logs2

    a1.sinks.k1.sink.serializer = TEXT

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    • 启动flume agent a1

    # > flume-ng agent -c . -f /home/bigdata/flume/conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console

     

    • 测试产生logcat

    # > echo "hello idoall.org syslog" | nc localhost 5555

    # > echo "hello idoall.org syslog 2" | nc localhost 5555

     

    • 查看/home/data/logs2下是否生成文件,默认每30秒生成一个新文件

    # > ll /home/data/logs2

     

     

    9)channels通道类型为文件形式

    vi conf/channelsFile.conf

    a1.sources = s1

    a1.channels = c1

    a1.sinks = k1

    # For each one of the sources, the type is defined

    a1.sources.s1.type = syslogtcp

    a1.sources.s1.host = localhost

    a1.sources.s1.port = 5180

    # Each sink's type must be defined

    a1.sinks.k1.type = logger

    # Each channel's type is defined.

    a1.channels.c1.type = file

    a1.channels.c1.checkpointDir = /home/bigdata/flume/logs/checkpoint

    a1.channels.c1.dataDir = /home/bigdata/flume/logs/data

    #Bind the source and sinks to channels

    a1.sources.s1.channels = c1

    a1.sinks.k1.channel = c1

    #flume-ng agent -n a1 -c conf -f conf/ channelsFile.conf -Dflume.root.logger=DEBUG,console

  • 相关阅读:
    Delphi中动态创建的Panel无法改变颜色的解决办法(要把Panel的ParentBackground设为False)
    判断当前进程是否以管理员权限启动的
    在一个exe文件中查找指定内容,找到则返回起始位置, 否则返回0
    [置顶] 使用严苛模式打破Android4.0以上平台应用中UI主线程的“独断专行”
    删除IE缓存中指定的文件
    将窗体显示在 PageControl 上
    判断操作系统多久没有任何操作
    UrlDownloadFile, 线程下载文件, 带进度条
    用JSON 和 Google 实现全文翻译
    成为高效程序员的7个重要习惯
  • 原文地址:https://www.cnblogs.com/whywy/p/12321987.html
Copyright © 2011-2022 走看看