1、组成source channel sink 事务(put/take)
1)taildir source
(1)断点续传、多目录
(2)哪个flume版本产生的?Apache1.7 cdh1.6
(3)没有断点续传功能时怎么做的?
自定义
(4)taildir挂了怎么办?
不会丢数:断点续传 ,但可能重复数据:
(5)怎么处理重复数据?
不处理:生产环境通常不处理
处理
自身:在taildirsource里面增加自定义事务
找兄弟:下一级处理(hive dwd sparkstreaming flink布隆)
去重手段(groupby、开窗取窗口第一条、redis)
(6)taildir source 是否支持递归遍历文件夹读取文件?
不支持。 自定义 递归遍历文件夹 +读取文件
2)file channel /memory channel/kafka channel
(1)file channel
数据存储于磁盘,优势:可靠性高;劣势:传输速度低
默认容量:100万event
(2)memory channel
数据存储于内存,优势:传输速度快;劣势:可靠性差
默认容量:100个event
(3)kafka channel
数据存储于Kafka,基于磁盘;
优势:可靠性高;
传输速度快 kafka channel 》memory channel+kafka sink 原因省去了sink阶段
(4)kafka channel哪个版本产生的?
flume1.6 版本产生=》并没有火;因为有bug
topic-start 数据内容
topic-event 数据内容 ture 和false 很遗憾,都不起作用。
增加了额外清洗的工作量。
flume1.7解决了这个问题,开始火了。
(5)生产环境如何选择
如果下一级是kafka,优先选择kafka channel
如果下一级不是:
如果是金融、对钱要求准确的公司,选择file channel
如果就是普通的日志,通常可以选择memory channel
每天丢几百万数据 pb级 亿万富翁,掉1块钱会捡?
3)HDFS sink(优化配置)
小文件:
文件大小 128m
文件时间(1小时-2小时)
event个数(0禁止)
2、三个器(拦截器、选择器、监控器)
1)拦截器
(1)ETL 拦截器 判断json的完整性 { }
数据清洗:轻度清洗{} =》保证传输效率
服务器时间:13位 必须全部是数字
(2)时间处理:
(3) 事件拦截器 event start
start 启动
event (商品点击、商品列表、商品详情、、
广告;
故障
点赞、评论、收藏
后台活跃、通知
)
一个表一个topic,一定能满足下一级所有消费者;
可以做适当的轻度聚合
(4)自定义拦截器的步骤
定义一类,实现interceptor接口,重写里面4个方法
(初始化、关闭、单event、多event,创建一个静态内部类Builder)
打包=》上传到flume的lib包下=》在配置文件中管理拦截器
(5)拦截器可以不用吗?
可以不用;需要在下一级hive的dwd层和sparksteaming里面处理
优势:只处理一次,轻度处理,
劣势:影响性能,不适合做实时推荐这种对实时要求比较高的场景
2)选择器
rep(默认,发往下一级所有通道)
m(选择性发往对应通道)
ReplicatingChannelSelector
MultiPlexingChannelSelector
本次项目用m,根据flume头发往对应的topic
start-topic event-topic
3)监控器
Ganglia
尝试提交的次数远远大于最终成功的次数。
自身:增加内存flume-env.sh 4-6g
找朋友:先增加服务器台数,后增加flume服务器,通道打通
搞活动 618 =》增加服务器=》用完在退出
日志服务器配置(阿里云):8-16g内存、磁盘8T
3、优化
1)file channel 能够多配置磁盘就多配置磁盘, 提高吞吐量
2)HDFS sink 小文件
(1)时间(1小时-2小时) or 大小128m、event个数(0禁止)
3)监控器
调整内存
自身 提高自己内存
找朋友 增加flume台数
4、flume挂了怎么办?
1) 如果是memorychannel 有可能丢数据
2) 如果是taildirsource 不会丢数据,但是有可能重复数据
注:要懂flume运行流程
5 Flume的事务机制和可靠性
1) Flume的事务机制
所以这就不得不提Flume的事务机制(类似数据库的事务机制):Flume使用两个独立的事务分别负责从soucrce到channel,以及从channel到sink的事件传递。比如以上面一篇博客中的事例为例:spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到channel且提交成功,那么source就将该文件标记为完成。同理,事务以类似的方式处理从channel到sink的传递过程,如果因为某种 原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持到channel中,等待重新传递。
2) Flume的At-least-once提交方式
Flume的事务机制,总的来说,保证了source产生的每个事件都会传送到sink中。但是值得一说的是,实际上Flume作为高容量并行采集系统采用的是At-least-once(传统的企业系统采用的是exactly-once机制)提交方式,这样就造成每个source产生的事件至少到达sink一次,换句话说就是同一事件有可能重复到达。这样虽然看上去是一个缺陷,但是相比为了保证Flume能够可靠地将事件从source,channel传递到sink,这也是一个可以接受的权衡。如上博客中spooldir的使用,Flume会对已经处理完的数据进行标记。
3) Flume的批处理机制
为了提高效率,Flume尽可能的以事务为单位来处理事件,而不是逐一基于事件进行处理。比如上篇博客提到的spooling directory source以100行文本作为一个批次来读取(BatchSize属性来配置,类似数据库的批处理模式)。批处理的设置尤其有利于提高file channle的效率,这样整个事务只需要写入一次本地磁盘,或者调用一次fsync,速度回快很多。