一、agent
第一步是定义agent(代理)及agent下的sources、channels、sinks的简称,如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
二、sources
第二步是定义sources(接收数据源),以下是常见的sources类型:
1.type = netcat =>监视一个端口,需要端口名称、端口号:
bind = localhost
port = 44444
2.type = exec =>监视一个文件,需要命令行、命令行使用的脚本
command = tail -F /opt/jars/calllog.csv
shell = /usr/bin/bash -c
3.type = spooldir =>监视一个文件夹,需要文件夹路径
可以添加进入文件夹文件的后缀名、可以添加绝对路径的文件名、通过正则表达式过滤以tmp结尾的文件
spoolDir = /root/spooldir
fileSuffix = .COMPLETED
fileHeader = true
ignorePattern = ([^]*.tmp)
4.selector.type = replicating =>将数据量复制给多个channel
5.type = avro =>通过端口接收数据,需要端口名称、端口号
bind = hd1-1
port = 4141
三、channels
第三步是设置channel(管道)的类型等
1.tpye = memory =>使用内存为管道,设置内存总容量、每次传输的容量
capacity =1000
transactionCapacity =100
2.使用磁盘作为管道
四、sink
第四步是设置sink(下沉)的类型和细节设置
1.type = logger =>输出日志文件,用于监控端口直接在端口输出接受的数据
2.type = hdfs =>输出到hdfs,
hdfs.path = hdfs://hd1-1:9000/flume/%Y%m%d/%H =>设置hdfs的路径
hdfs.filePrefix = logs- =>设置文件的前缀
hdfs.round = true =>按照时间滚动文件夹
hdfs.roundValue = 1 =>多长时间创建一个新文件夹
hdfs.roundUnit = minute =>定义时间的单位
hdfs.useLocalTimeStamp = true =>使用本地时间戳
hdfs.batchSize = 500 =>积攒到少event后flush到hdfs一次
hdfs.fileType = DataStream =>设置文件类型,可支持压缩
hdfs.rollInterval = 30 =>多久生成一个新文件
hdfs.rollSize = 134217700 =>设置每个文件的滚动大小
hdfs.rollCount = 0 =>滚动与Event无关
hdfs.minBlockReplicas = 1 =>最小冗余数(及备份数,hdfs自带无需配置)
3.type = avro =>将数据发送到端口,需要设置端口名称、端口号
hostname = hd1-1
port = 4141
4.type = file_roll =>将数据传输到本地文件,需要设置文件路径
sink.directory = /root/flume2 注意flume2文件夹需要自己创建
5.type = org.apache.flume.sink.kafka.KafkaSink =>将数据传输到kafka
需要设置集群的机器名称和端口号、主题、batchSize、Ack机制
brokerList = hd1-1:9092,hd1-2:9092,hd1-3:9092
topic = calllog
batchSize = 20
requiredAcks =1 ACK机制(1、0、-1,1是最安全的)
五、bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1