zoukankan      html  css  js  c++  java
  • 最新Flume1.7 自定义 MongodbSink 结合TAILDIR Sources的使用

    Flume MongodbSink

    此mongodb支持3.0

    github地址

    MongodbSink

    flume-ng-mongodbsink
    An Apache Flume Sink that send JSON to MongoDB collection

    配置文件 configuration properties

    Property Name Default Description
    hostNames - host1:port1,host2,port2,...the mongodb host and port
    database - the mongodb database
    collection - the collection of database
    user - the username of databse
    password - the password of database
    batchSize 100 the batchSize of sources
    authentication_enabled False Whether u need a password and a user

    如果没有密码和用户名 就不需要user password authentication_enabled T

    如果有密码,设置authentication_enabled =True

    Example

    # 定义数据出口 
    a1.sinks.s.type = com.kenshuchong.MongodbSink.MongoSinkSelf
    a1.sinks.s.hostNames=127.0.0.1:27017
    a1.sinks.s.authentication_enabled=True
    a1.sinks.s.database = database
    a1.sinks.s.password = password
    a1.sinks.s.user     = user
    a1.sinks.s.collection = collection
    a1.sinks.s.batchSize = 100 
    a1.sinks.s.channel = c 
    

    自定义修改Custom modify

    可以修改其中生成json部分

    line 76-82 jsonEvent is the event body

    json event是日志主体

     String cuTime = getCurrentTime();
            String jsonEvent = new String(event.getBody(), StandardCharsets.UTF_8);
            Document sentEvent = new Document("log",jsonEvent)
            		.append("Dir","/data/ngnix.log")
            		.append("Time", cuTime);
        
            documents.add(sentEvent);
    

    tips

    本mongodbsink 支持3.0版本

    线上使用需在flume/lib下添加一下几个jar包

    • mongodb-driver-3.0.2.jar
    • mongodb-driver-core-3.0.2.jar
    • bson-3.0.2.jar

    结合TAILDIR srouce实时采集日志并存入mongodb中

    需求

    • 日志存储在/opt/rec/log
    • 日志需要采集其中的ERROR级别日志
    • 日志存储在mongodb中

    处理需求

    • 采用新的TAILDIR source来对/log进行实时采集
    • 给suorce配置正则拦截器,拦截非ERROR日志
    • 采用自定义mongodbsink实时将日志插入mongodb中
    • positionFile 为存储文件读取偏移地址的josn文件,这种只从最新位置读取
    • 只有检测到了文件位置有新的偏移才会再次读取文件

    配置文件

    #定义组件名称
    a1.sources = r 
    a1.sinks = s 
    a1.channels = c  
      
    #定义数据入口
    a1.sources.r.type = TAILDIR
    a1.sources.r.channels = c
    a1.sources.r.positionFile = /home/ch/logMonitor/taildir_position.json
    a1.sources.r.filegroups = f1
    a1.sources.r.filegroups.f1 = /opt/rec/log/*.log
    
    ##定义拦截器
    a1.sources.r.interceptors=i1
    a1.sources.r.interceptors.i1.type=regex_filter
    a1.sources.r.interceptors.i1.regex= ERROR
      
    # 定义数据出口 
    a1.sinks.s.type = com.kenshuchong.MongodbSink.MongoSinkSelf
    a1.sinks.s.hostNames=127.0.0.1:27017
    a1.sinks.s.authentication_enabled=True
    a1.sinks.s.database = database
    a1.sinks.s.password = password
    a1.sinks.s.user     = user
    a1.sinks.s.collection = logsearch_info
    a1.sinks.s.batchSize = 100 
    a1.sinks.s.channel = c 
      
    # 使用内存管道  
    a1.channels.c.type = memory  
    a1.channels.c.capacity = 10000  
    a1.channels.c.transactionCapacity = 100
    
    
  • 相关阅读:
    【NodeJs】Nodejs系列安装
    【webstrom+stylus】stylus在webstrom中的识别
    python基础:数据类型一
    python基础:流程控制案例:
    python基础:if判断与流程控制案例
    计算机基础与python入门
    Robot Framework-断言函数
    RF设置全局变量
    RF自定义系统关键字
    RF第二讲--Selenium2Library库的简单实用
  • 原文地址:https://www.cnblogs.com/yueyanyu/p/7085018.html
Copyright © 2011-2022 走看看