大数据篇:Flume
Flume是什么?
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
如果没有Flume
数据的采集发送怎么处理呢?处理到哪里呢?Flume最主要的作用就是实时读取服务器本地磁盘数据,写入Hdfs或Kafka等中间件。
1 基础架构
-
Agent主要由:source、channel、sink三个组件组成.
-
Source:
- 从数据发生器接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channel,Flume提供多种数据接收的方式,比如Avro(Flume对接Flume),Exec(命令行如tail -f),Taildir(目录本地文件),Kafka等。
-
Channel:
- channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着桥梁的作用,channel是一个完整的事务,这一点保证了数据在收发的时候的一致性,并且它可以和任意数量的source和sink链接,支持的类型有: JDBC , File System,Memory等。
-
sink:
- sink将数据channels消费数据(events)并将其传递给目标地,目标地可能是另一个sink,Flume提供多种数据发送的方式,比如Avro,HDFS,Hive,Kafka。
-
Event
- Flume以事件的形式将数据从源头传送到最终的目的
- Event是数据传输的基本单元
- Event由Header和Body两部分组成,Header用来存放该Event的一些属性(K-V结构),Body存放数据(Byte Array结构)。
2 案例演示
2.1 netcat->Memory->Logger
- 通过netcat工具向本机44444端口发送数据
- Flume监控本机44444端口读取数据
- Flume将获取数据打印到控制台
- 安装netcat工具
yum -y install nc
#监听44444端口(服务端)
nc -lk 44444
#监听44444端口(客户端)
nc localhost 44444
#互相发送数据接收即可
- 创建Agent配置文件flume-netcat-logger.conf
vim flume-netcat-logger.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources相关配置
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# sinks相关配置
a1.sinks.k1.type = logger
# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100
# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
- 启动flume
#普通写法
flume-ng agent --conf /etc/flume-ng/conf --conf-file flume-netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
#简写
flume-ng agent -c /etc/flume-ng/conf -f flume-netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
2.2 .log本地文件->Memory->Hdfs
- 生成本地日志文件
- Flume获取本地数据文件
- Flume将获取的文件发送到Hdfs
- 创建Agent配置文件flume-log-hdfs.conf
vim flume-log-hdfs.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources相关配置
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/flume-test/logs/a.log
# sinks相关配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://cdh01.cm:8020/flume/events/%Y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = events-
#文件夹滚动一分钟创建一个新文件夹
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
#文件滚动时间10S 128M 2条 生成新文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134210000
a1.sinks.k1.hdfs.rollCount = 2
#积累多少Event才刷到hdfs
a1.sinks.k1.hdfs.batchSize = 2
#开启时间滚动需要
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#DataStream不会压缩输出文件
a1.sinks.k1.hdfs.fileType = DataStream
# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100
# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
- 启动flume
flume-ng agent -c /etc/flume-ng/conf -f flume-log-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
- 创建本地文件
mkdir /root/flume-test/logs
echo "1" > /root/flume-test/logs/a.log
echo "2" >> /root/flume-test/logs/a.log
echo "3" >> /root/flume-test/logs/a.log
echo "4" >> /root/flume-test/logs/a.log
#根据上面设置的间隔时间进行效果测试
echo "5" >> /root/flume-test/logs/a.log
echo "6" >> /root/flume-test/logs/a.log
echo "7" >> /root/flume-test/logs/a.log
echo "8" >> /root/flume-test/logs/a.log
2.3 本地文件夹->Memory->Hdfs
- 生成本地文件夹及文件数据
- Flume获取本地数据文件
- Flume将获取的文件发送到Hdfs
- 创建Agent配置文件flume-file-hdfs.conf
vim flume-file-hdfs.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources相关配置
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/flume-test/dirlogs
#忽略文件
a1.sources.r1.ignorePattern = ([^ ]*.txt)
# sinks相关配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://cdh01.cm:8020/flume/dirlogs/%Y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = log-
#文件夹滚动一分钟创建一个新文件夹
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
#文件滚动时间10S 128M 2条 生成新文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134210000
a1.sinks.k1.hdfs.rollCount = 2
#积累多少Event才刷到hdfs
a1.sinks.k1.hdfs.batchSize = 2
#开启时间滚动需要
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#DataStream不会压缩输出文件
a1.sinks.k1.hdfs.fileType = DataStream
# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100
# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
- 启动flume
flume-ng agent -c /etc/flume-ng/conf -f flume-file-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
- 创建本地文件
mkdir /root/flume-test/dirlogs
echo "1" > /root/flume-test/dirlogs/a.log
echo "2" >> /root/flume-test/dirlogs/a.log
echo "3" > /root/flume-test/dirlogs/a.txt
echo "4" >> /root/flume-test/dirlogs/a.txt
#根据上面设置的间隔时间进行效果测试
echo "5" > /root/flume-test/dirlogs/b.log
echo "6" >> /root/flume-test/dirlogs/b.log
#采用cp直接放入一个写好的文件测试效果
不能在监控目录中创建并持续修改文件
上传完成的文件以.COMPLETED结尾
被监控文件夹500毫秒扫描一次文件变动
2.4 本地文件夹->Memory->Logger
监控目录下的实时追加文件
- 生成本地文件夹及文件数据
- Flume获取本地数据文件
- Flume将获取数据打印到控制台
- 创建Agent配置文件flume-files-logger.conf
vim flume-files-logger.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources相关配置
a1.sources.r1.type = TAILDIR
#位置信息
a1.sources.r1.positionFile = /root/flume-test/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /root/flume-test/test1/a.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /root/flume-test/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000
# sinks相关配置
a1.sinks.k1.type = logger
# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100
# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
- 启动flume
flume-ng agent -c /etc/flume-ng/conf -f flume-files-logger.conf -n a1 -Dflume.root.logger=INFO,console
- 创建本地文件
mkdir /root/flume-test/test1/
mkdir /root/flume-test/test2/
echo "1" > /root/flume-test/test1/a.log
echo "2" >> /root/flume-test/test1/a.log
echo "3" >> /root/flume-test/test1/a.log
#根据上面设置的间隔时间进行效果测试
echo "5" > /root/flume-test/test2/b.log
echo "6" >> /root/flume-test/test2/b.log
echo "7" >> /root/flume-test/test2/b.log
#停止flume,追加数据,在启动测试断点续传效果。
2.5 netcat->Memory->kafka
- 生成本地文件夹及文件数据
- Flume获取本地数据文件
- Flume将获取数据打印到控制台
- 创建Agent配置文件flume-files-kafka.conf
vim flume-file-kafka.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources相关配置
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# sinks相关配置
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = top-test
a1.sinks.k1.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100
# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
- 启动flume
flume-ng agent -c /etc/flume-ng/conf -f flume-file-kafka.conf -n a1 -Dflume.root.logger=INFO,console
- 启动kafka消费者
kafka-console-consumer --topic top-test --bootstrap-server cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092 --from-beginning --group g1
- 使用netcat
nc localhost 44444