本文主要介绍使用Flume传输数据到MongoDB的过程,内容涉及环境部署和注意事项。
一、环境搭建
1、flune-ng下载地址:http://www.apache.org/dyn/closer.cgi/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
2、mongodb java driver jar包下载地址:https://oss.sonatype.org/content/repositories/releases/org/mongodb/mongo-java-driver/2.13.0/mongo-java-driver-2.13.0.jar
3、flume-ng-mongodb-sink 源码下载地址:https://github.com/leonlee/flume-ng-mongodb-sink
flume-ng-mongodb-sink 需要自己编译jar包,从github上下载代码,解压之后执行mvn package,即可生成。需要先安装maven用于编译jar包
二、Flume配置
1、env配置
将mongo-java-driver和flume-ng-mongodb-sink两个jar包放到flumelib目录下,并将路径加入到flume-env.sh文件的FLUME_CLASSPATH变量中;
JAVA_OPTS变量: 加上-Dflume.monitoring.type=http -Dflume.monitoring.port=xxxx,可以在[hostname:xxxx]/metrics 上看到监控信息; -Xms指定JVM初始内存,-Xmx指定JVM最大内存
FLUME_HOME变量: 设定FLUME根目录
JAVA_HOME变量: 设定JAVA根目录
2、 log配置
在调试时,将日志设置为debug并打到文件:flume.root.logger=DEBUG,LOGFILE
3、 传输配置
采用 Exec Source、file-channel、flume-ng-mongodb-sink
my_agent.sources.my_source_1.channels = my_channel_1 my_agent.sources.my_source_1.type = exec my_agent.sources.my_source_1.command = python xxx.py my_agent.sources.my_source_1.shell = /bin/bash -c my_agent.sources.my_source_1.restartThrottle = 10000 my_agent.sources.my_source_1.restart = true my_agent.sources.my_source_1.logStdErr = true my_agent.sources.my_source_1.batchSize = 1000 my_agent.sources.my_source_1.interceptors = i1 i2 i3 my_agent.sources.my_source_1.interceptors.i1.type = static my_agent.sources.my_source_1.interceptors.i1.key = db my_agent.sources.my_source_1.interceptors.i1.value = cswuyg_test my_agent.sources.my_source_1.interceptors.i2.type = static my_agent.sources.my_source_1.interceptors.i2.key = collection my_agent.sources.my_source_1.interceptors.i2.value = cswuyg_test my_agent.sources.my_source_1.interceptors.i3.type = static my_agent.sources.my_source_1.interceptors.i3.key = op my_agent.sources.my_source_1.interceptors.i3.value = upsert
字段说明:采用exec source,指定执行命令行为python xxx.py,在xxx.py代码中处理日志,并按照跟flume-ng-mongodb-sink的约定,print出json格式的数据,如果update类操作必须带着_id字段,print出来的日志被当作Event的Body,我再通过interceptors给它加上自定义Event Header;
static interceptors用于为Event Header添加信息,这里我为它加上了db=cswuyg_test、collection=cswuyg_test、op=upsert,这三个key是跟flume-ng-mongodb-sink 约定的,用于指定mongodb中的db、collection名以及操作类型为update。
my_agent.channels.my_channel_1.type = file my_agent.channels.my_channel_1.checkpointDir = /home/work/flume/file-channel/my_channel_1/checkPoint my_agent.channels.my_channel_1.useDualCheckpoints = true my_agent.channels.my_channel_1.backupCheckpointDir = /home/work/flume/file-channel/my_channel_1/checkPoint2 my_agent.channels.my_channel_1.dataDirs = /home/work/flume/file-channel/my_channel_1/data my_agent.channels.my_channel_1.transactionCapacity = 10000 my_agent.channels.my_channel_1.checkpointInterval = 30000 my_agent.channels.my_channel_1.maxFileSize = 4292870142 my_agent.channels.my_channel_1.minimumRequiredSpace = 524288000 my_agent.channels.my_channel_1.capacity = 100000
sink配置:
my_agent.sinks.my_mongo_1.type = org.riderzen.flume.sink.MongoSink my_agent.sinks.my_mongo_1.host = xxxhost my_agent.sinks.my_mongo_1.port = yyyport my_agent.sinks.my_mongo_1.model = DYNAMIC/SINGLE ---查看源码仅支持此二种方式,并且必须大小
my_agent.sinks.my_mongo_1.db = XXX --mongo表名,默认名称为events
my_agent.sinks.my_mongo_1.username = XXX --mongo用户名
my_agent.sinks.my_mongo_1.password = YYY --mongo密码
my_agent.sinks.my_mongo_1.collecion = log my_agent.sinks.my_mongo_1.batch = 10 my_agent.sinks.my_mongo_1.channel = my_channel_1 my_agent.sinks.my_mongo_1.timestampField = _S