zoukankan      html  css  js  c++  java
  • 日志收集-Flume-ng-mongodb-sink

      本文主要介绍使用Flume传输数据到MongoDB的过程,内容涉及环境部署和注意事项。

    一、环境搭建

    1、flune-ng下载地址:http://www.apache.org/dyn/closer.cgi/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
    2、mongodb java driver jar包下载地址:https://oss.sonatype.org/content/repositories/releases/org/mongodb/mongo-java-driver/2.13.0/mongo-java-driver-2.13.0.jar
    3、flume-ng-mongodb-sink 源码下载地址:https://github.com/leonlee/flume-ng-mongodb-sink
         flume-ng-mongodb-sink 需要自己编译jar包,从github上下载代码,解压之后执行mvn package,即可生成。需要先安装maven用于编译jar包

    二、Flume配置

    1、env配置

      将mongo-java-driver和flume-ng-mongodb-sink两个jar包放到flumelib目录下,并将路径加入到flume-env.sh文件的FLUME_CLASSPATH变量中;
      JAVA_OPTS变量: 加上-Dflume.monitoring.type=http -Dflume.monitoring.port=xxxx,可以在[hostname:xxxx]/metrics 上看到监控信息; -Xms指定JVM初始内存,-Xmx指定JVM最大内存
      FLUME_HOME变量: 设定FLUME根目录
      JAVA_HOME变量: 设定JAVA根目录

    2、 log配置

    在调试时,将日志设置为debug并打到文件:flume.root.logger=DEBUG,LOGFILE

    3、 传输配置

    采用 Exec Source、file-channel、flume-ng-mongodb-sink

    my_agent.sources.my_source_1.channels = my_channel_1
    my_agent.sources.my_source_1.type = exec
    my_agent.sources.my_source_1.command = python  xxx.py
    my_agent.sources.my_source_1.shell = /bin/bash -c
    my_agent.sources.my_source_1.restartThrottle = 10000
    my_agent.sources.my_source_1.restart = true
    my_agent.sources.my_source_1.logStdErr = true
    my_agent.sources.my_source_1.batchSize = 1000
    my_agent.sources.my_source_1.interceptors = i1 i2 i3
    my_agent.sources.my_source_1.interceptors.i1.type = static
    my_agent.sources.my_source_1.interceptors.i1.key = db
    my_agent.sources.my_source_1.interceptors.i1.value = cswuyg_test
    my_agent.sources.my_source_1.interceptors.i2.type = static
    my_agent.sources.my_source_1.interceptors.i2.key = collection
    my_agent.sources.my_source_1.interceptors.i2.value = cswuyg_test
    my_agent.sources.my_source_1.interceptors.i3.type = static
    my_agent.sources.my_source_1.interceptors.i3.key = op
    my_agent.sources.my_source_1.interceptors.i3.value = upsert
    

    字段说明:采用exec source,指定执行命令行为python xxx.py,在xxx.py代码中处理日志,并按照跟flume-ng-mongodb-sink的约定,print出json格式的数据,如果update类操作必须带着_id字段,print出来的日志被当作Event的Body,我再通过interceptors给它加上自定义Event Header;

    static interceptors用于为Event Header添加信息,这里我为它加上了db=cswuyg_test、collection=cswuyg_test、op=upsert,这三个key是跟flume-ng-mongodb-sink 约定的,用于指定mongodb中的db、collection名以及操作类型为update。

    my_agent.channels.my_channel_1.type = file
    my_agent.channels.my_channel_1.checkpointDir = /home/work/flume/file-channel/my_channel_1/checkPoint
    my_agent.channels.my_channel_1.useDualCheckpoints = true
    my_agent.channels.my_channel_1.backupCheckpointDir = /home/work/flume/file-channel/my_channel_1/checkPoint2
    my_agent.channels.my_channel_1.dataDirs = /home/work/flume/file-channel/my_channel_1/data
    my_agent.channels.my_channel_1.transactionCapacity = 10000
    my_agent.channels.my_channel_1.checkpointInterval = 30000
    my_agent.channels.my_channel_1.maxFileSize = 4292870142
    my_agent.channels.my_channel_1.minimumRequiredSpace = 524288000
    my_agent.channels.my_channel_1.capacity = 100000
    

    sink配置:

    my_agent.sinks.my_mongo_1.type = org.riderzen.flume.sink.MongoSink
    my_agent.sinks.my_mongo_1.host = xxxhost
    my_agent.sinks.my_mongo_1.port = yyyport
    my_agent.sinks.my_mongo_1.model = DYNAMIC/SINGLE ---查看源码仅支持此二种方式,并且必须大小
    my_agent.sinks.my_mongo_1.db = XXX --mongo表名,默认名称为events
    my_agent.sinks.my_mongo_1.username = XXX --mongo用户名
    my_agent.sinks.my_mongo_1.password = YYY --mongo密码
    my_agent.sinks.my_mongo_1.collecion = log my_agent.sinks.my_mongo_1.batch = 10 my_agent.sinks.my_mongo_1.channel = my_channel_1 my_agent.sinks.my_mongo_1.timestampField = _S

    参见:http://www.cnblogs.com/cswuyg/p/4498804.html

  • 相关阅读:
    吸烟的女人有着一种让人心动的美
    怎么样的女人让男人不变心
    姐妹们!一起来做狐狸精!
    清华图书馆机器人
    大地实习程序
    《非2》里的两首诗
    GIS拓扑生成
    一点小发现
    directX下的三维坐标系
    Let's start from here
  • 原文地址:https://www.cnblogs.com/moonandstar08/p/6517061.html
Copyright © 2011-2022 走看看