Hadoop用于处理非常大的数据集,通常假设数据是已经在HDFS中,或者可以批量复制。 然而,有很多系统不符合这个假设,他们产生了我们想要的数据流,使用Hadoop进行收集,存储和分析,Apache Flume系统是处理这些问题理想的方案。
Flume是设计用于将大容量基于事件的数据存储到Hadoop的工具。一个典型的例子就是使用Flume来收集一系列Web服务器的日志文件,将这些分散的日志事件移动到HDFS中重新汇总用来分析处理。通常的目的地(或Flume水槽)是HDFS。 但是,Flume是灵活的足以写入其他系统,如HBase或Solr。
要使用Flume,我们需要运行一个Flume代理,它是一个长期存在的Java进程,可以通过通道连接源和数据漕。 Flume中的一个源产生事件并将其传递到通道,该事件将一直存储在通道,直到它们被转发到数据漕中。您可以将源---通道----数据漕组合视为基本的Flume构建块。
Flume安装由在分布式集群中运行的连接代理的集合组成。系统边缘的代理(例如,位于Web服务器上)收集数据并将其转发给负责聚合的代理然后将数据存储在其最终目的地。代理被配置为运行特定数据源和数据漕的集合,因此使用Flume主要是将这些片连接在一起的配置练习。在本章中,我们将介绍如何构建Flume集群用于数据摄取,您可以将其用作自己的Hadoop管道的一部分。
安装Flume
从下载页面下载Flume二进制发行版的稳定版本,并在适当的位置解压缩:
% tar xzf apache-flume-x.y.z-bin.tar.gz 。
把Flume二进制文件放在你的系统路径是很有用的:
% export FLUME_HOME=~/hmy/apache-flume-x.y.z-bin
% export PATH=$PATH:$FLUME_HOME/bin
然后可以使用flume-ng命令启动Flume代理,接下来我们将看到。
一个例子
为了展示Flume如何工作,我们先从一个设置开始:1.在本地目录中查找新的文本文件。2.在添加文件时,将每个文件的每一行发送到控制台。
我们将手动添加文件,但很容易想像一个像Web服务器创建新文件的过程,我们想与Flume用连续摄取。 而且,在一个真实的系统中,相反不仅仅是记录文件内容,我们将把内容写入HDFS以供后续使用处理 - 我们将在本章后面看到如何做。
在此示例中,Flume代理运行单个数据源---数据通道---数据漕,使用一个Java属性文件配置。 这个配置文件控制所使用的数据源,数据漕和通道的类型,以及它们如何连接在一起。 对于这个例子,我们将使用示例14-1中的配置。
Example 14-1. Flume的配置使用假脱机目录源和记录器数据漕
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /tmp/spooldir
agent1.sinks.sink1.type = logger
agent1.channels.channel1.type = file
来自层次结构的属性名称,代理名称位于顶层。在这个例子中,我们只有一个的代理,叫做agent1。代理中不同组件的名称定义在下一级,所以例如agent1.sources列出应该在agent1中运行的源的名称(这里是单个源,source1)。类似地,agent1具有sink(sink1)和channel(channel1)。每个组件的属性在层次结构的下一级定义。可用于组件的配置属性取决于组件的类型。在这种情况下,agent1.sources.source1.type设置为spooldir,这是一个后台处理目录源,用于监视新文件的假脱机目录。假脱机目录源定义一个spoolDir属性,因此对于source1,完整密钥是agent1.sources.source1.spoolDir。源的通道是通过agent1.sources.source1.channels设置的。数据漕是用于将事件记录到控制台。 它也必须连接到通道(与agent1.sinks.sink1.channel属性),通道是一个文件通道,这意味着通道中的事件会持续到磁盘以获得持久性。
该系统如图14-1所示。
Figure 14-1.具有假脱机目录源的Flume代理和通过文件通道连接的记录器数据漕
在运行示例之前,我们需要在本地文件系统上创建假脱机目录:
% mkdir /tmp/spooldir
然后我们可以使用flume-ng命令启动Flume代理:
% flume-ng agent
--conf-file spool-to-logger.properties
--name agent1
--conf $FLUME_HOME/conf
-Dflume.root.logger=INFO,console
来自示例14-1的Flume属性文件用--conf-file标志指定。代理名称也必须使用--name传入,因为Flume属性文件可以定义多个代理,我们必须说出要运行的代理。 --conf标志告诉Flume在哪里可以找到它的一般配置,比如环境设置。
在新终端中,在假脱机目录中创建一个文件。 假脱机目录源期望文件是不可变的。 防止部分写入的文件被数据源读取,我们将完整内容写入隐藏文件。 然后,我们做一个原子重命名,所以源可以读取它:
% echo "Hello Flume" > /tmp/spooldir/.file1.txt
% mv /tmp/spooldir/.file1.txt /tmp/spooldir/file1.txt
返回Flume代理终端,我们看到Flume已经检测并处理了该文件:
Preparing to move file /tmp/spooldir/file1.txt to
/tmp/spooldir/file1.txt.COMPLETED
Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 Hello Flume }
假脱机目录源通过将文件拆分成行并创建文件来获取文件每行的数据槽事件。事件具有可选标题和二进制体,这是文本行的UTF-8表示形式。 记录器数据漕以十六进制和字符串形式记录身体。我们放在假脱机目录中的文件只有一行长度,因此在这种情况下只记录了一个事件。我们还看到文件被源重命名为file1.txt.COMPLETED,这表明Flume已经完成处理,并且不会再处理它。