组件介绍:
代理 Flume Agent
1 Flume内部有一个或者多个Agent 2 每一个Agent是一个独立的守护进程(JVM) 3 从客户端哪儿接收收集,或者从其他的Agent哪儿接收,然后迅速的将获取的数据传到下一个目的节点Agent 4 Agent主要由source、channel、sink三个组件组成。
agent source
1 一个flume源
2 负责一个外部源(数据生成器),如一个web服务器传递给他的事件
3 该外部源将它的事件以flume可以识别的格式(event)发送到flume中
4 当一个flume源接收到一个事件时,其将通过一个或多个通道存储该事件
agent channel
1 通道: 采用被动存储的形式,即通道会缓存该事件直到该事件被sink组件处理
2 所以Channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉它在source和sink间起着一共桥梁的作用,
channel是一个完整的事务,这一点保证了数据在收发的时候的一致性.并且它可以和任意数量的source和sink链接
3 可以通过参数设置event的最大个数
4 Flume通常选择FileChannel,而不使用Memory Channel。
Memory Channel: 内存存储事务,吞吐率极高,但存在丢数据风险
File Channel: 本地磁盘的事务实现模式,保证数据不会丢失(WAL实现)
监控网络端口使用
netcat
1 # example.conf: A single-node Flume configuration 2 3 # Name the components on this agent 4 a1.sources = r1 5 a1.sinks = k1 6 a1.channels = c1 7 8 # Describe/configure the source 9 a1.sources.r1.type = netcat 10 a1.sources.r1.bind = localhost 11 a1.sources.r1.port = 44444 12 13 # Describe the sink 14 a1.sinks.k1.type = logger 15 16 # Use a channel which buffers events in memory 17 a1.channels.c1.type = memory 18 a1.channels.c1.capacity = 1000 19 a1.channels.c1.transactionCapacity = 100 20 21 # Bind the source and sink to the channel 22 a1.sources.r1.channels = c1 23 a1.sinks.k1.channel = c1
启动命令:flume-ng agent -n a1 -c $FLUME_HOME/conf -f $FLUME_HOME/conf/example.conf -Dflume.root.logger=INFO,console
监控具体文件使用
exec
1 # Name the components on this agent 2 a1.sources = r1 3 a1.sinks = k1 4 a1.channels = c1 5 6 # Describe/configure the source 7 a1.sources.r1.type = exec 8 a1.sources.r1.command = tail -F /home/log/data.log 9 a1.sources.r1.shell = /bin/bash -c 10 11 # Describe the sink 12 a1.sinks.k1.type = logger 13 14 # Use a channel which buffers events in memory 15 a1.channels.c1.type = memory 16 17 # Bind the source and sink to the channel 18 a1.sources.r1.channels = c1 19 a1.sinks.k1.channel = c1
启动命令:flume-ng agent -n a1 -c $FLUME_HOME/conf -f $FLUME_HOME/conf/exec-memory-logger.conf -Dflume.root.logger=INFO,console
flume监控日志文件并持久化到hdfs
1 # Name the components on this agent 2 a1.sources = r1 3 a1.sinks = k1 4 a1.channels = c1 5 6 # Describe/configure the source 7 a1.sources.r1.type = exec 8 a1.sources.r1.command = tail -F /home/data/data.log 9 a1.sources.r1.channels = c1 10 11 # Describe the sink 12 a1.sinks.k1.type = hdfs 13 a1.sinks.k1.channel = c1 14 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/ 15 a1.sinks.k1.hdfs.filePrefix = events- 16 a1.sinks.k1.hdfs.fileType=DataStream 17 a1.sinks.k1.hdfs.useLocalTimeStamp = true #必须得写 应为14行用到时间数据 18 19 # Use a channel which buffers events in memory 20 a1.channels.c1.type = memory 21 22 # Bind the source and sink to the channel 23 a1.sources.r1.channels = c1 24 a1.sinks.k1.channel = c1
别人写的
1 # Name the components on this agent 2 a1.sources = r1 3 a1.sinks = k1 4 a1.channels = c1 5 6 # Describe/configure the source 7 ## exec表示flume回去调用给的命令,然后从给的命令的结果中去拿数据 8 a1.sources.r1.type = exec 9 ## 使用tail这个命令来读数据 10 a1.sources.r1.command = tail -F /home/tuzq/software/flumedata/test.log 11 a1.sources.r1.channels = c1 12 13 # Describe the sink 14 ## 表示下沉到hdfs,类型决定了下面的参数 15 a1.sinks.k1.type = hdfs 16 ## sinks.k1只能连接一个channel,source可以配置多个 17 a1.sinks.k1.channel = c1 18 ## 下面的配置告诉用hdfs去写文件的时候写到什么位置,下面的表示不是写死的,而是可以动态的变化的。表示输出的目录名称是可变的 19 a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H%M/ 20 ##表示最后的文件的前缀 21 a1.sinks.k1.hdfs.filePrefix = events- 22 ## 表示到了需要触发的时间时,是否要更新文件夹,true:表示要 23 a1.sinks.k1.hdfs.round = true 24 ## 表示每隔1分钟改变一次 25 a1.sinks.k1.hdfs.roundValue = 1 26 ## 切换文件的时候的时间单位是分钟 27 a1.sinks.k1.hdfs.roundUnit = minute 28 ## 表示只要过了3秒钟,就切换生成一个新的文件 29 a1.sinks.k1.hdfs.rollInterval = 3 30 ## 如果记录的文件大于20字节时切换一次 31 a1.sinks.k1.hdfs.rollSize = 20 32 ## 当写了5个事件时触发 33 a1.sinks.k1.hdfs.rollCount = 5 34 ## 收到了多少条消息往dfs中追加内容 35 a1.sinks.k1.hdfs.batchSize = 10 36 ## 使用本地时间戳 37 a1.sinks.k1.hdfs.useLocalTimeStamp = true 38 #生成的文件类型,默认是Sequencefile,可用DataStream:为普通文本 39 a1.sinks.k1.hdfs.fileType = DataStream 40 41 # Use a channel which buffers events in memory 42 ##使用内存的方式 43 a1.channels.c1.type = memory 44 a1.channels.c1.capacity = 1000 45 a1.channels.c1.transactionCapacity = 100 46 47 # Bind the source and sink to the channel 48 a1.sources.r1.channels = c1 49 a1.sinks.k1.channel = c1