zoukankan      html  css  js  c++  java
  • 使用Flume-Taildir和rocketmq-flume与RocketMQ的结合

    一、Fume-Taidir

    Flume1.7.0加入了taildirSource作为agent的source。可以说是 Spooling Directory Source + Exec Source 的结合体。可以监控多个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集,并记录每个文件最新消费位置,Agent进程重启后不会有重复消费的问题。

    官方文档:https://flume.apache.org/FlumeUserGuide.html#taildir-source

    Exec source:适用于监控一个实时追加的文件,但不能保证数据不丢失

    Spooldir Source:能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控

    Taildir Source:既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控

    Taildir Source 维护了一个 Json 格式的 positionFile,其会定期的往 positionFile 中更新每个文件读取到的最新的位置,因此能够实现断点续传。

    [
      {"inode":1717446,"pos":8,"file":"/tmp/upload/456.txt"},
      {"inode":1717449,"pos":8,"file":"/tmp/upload/789.txt"},
      {"inode":1717442,"pos":12,"file":"/tmp/upload/123.txt"}
    ]

    配置文件示例:

    agent1.sources = source1  
    agent1.channels = channel1  
    agent1.sinks = avroSink  
      
    # describe/configure source1  
    agent1.sources.source1.type = TAILDIR
    agent1.sources.source1.channels = channel1
    agent1.sources.source1.channels.skipToEnd = True
    # throught JSON format to record the inode, the absolute path and the
    last position of each tailing file.For to continual work agent1.sources.source1.positionFile = ./taildir_position.json
    # throught Space
    -separated list file dir which will been tail agent1.sources.source1.filegroups = f1 f2 # define f1 info. agent1.sources.source1.filegroups.f1 = /usr/local/tomcat/logs/ac/ac.log.* agent1.sources.source1.headers.f1.headerKey1 = value1 agent1.sources.source1.filegroups.f2 = /usr/local/tomcat/logs/gi/gi.log.* agent1.sources.source1.headers.f2.headerKey1 = value2 agent1.sources.source1.headers.f2.headerKey2 = value2-2 agent1.sources.source1.fileHeader = true # use a channel which buffers events in memory # type:memory or file is to temporary to save buffer data which is sink using agent1.channels.channel1.type=memory agent1.channels.channel1.capacity = 1000 agent1.channels.channel1.transactionCapacity = 1000 agent1.sinks.avroSink.type = avro agent1.sinks.avroSink.channel = channel1 agent1.sinks.avroSink.hostname = 192.168.216.201 agent1.sinks.avroSink.port = 4545 agent1.sinks.avroSink.batch-size = 5

    二、 Flume-ng与RocketMQ之间的消息接收和投递

    Rocket-Flume:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flume 

    agent1.sources = source1
    agent1.channels = channel1
    agent1.sinks = sink1
    
    agent1.sources.source1.type = TAILDIR
    agent1.sources.source1.positionFile = /usr/local/apache-flume-1.9.0-bin/db/taildir_position.json
    agent1.sources.source1.filegroups = seqGenSrc
    agent1.sources.source1.filegroups.seqGenSrc = /tmp/oss-yiruike-logs/raw/chaopai_push/.*txt
    agent1.sources.source1.fileHeader = false
    agent1.sources.source1.batchSize = 1
    
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.capacity = 100
    agent1.channels.channel1.transactionCapacity = 100
    agent1.channels.channel1.keep-alive = 3
    
    agent1.sinks.sink1.type = org.apache.rocketmq.flume.ng.sink.RocketMQSink
    agent1.sinks.sink1.nameserver = 172.17.213.74:9876;172.17.213.75:9876;172.17.213.75:9876
    agent1.sinks.sink1.producerGroup = MyProducerGroup_1
    agent1.sinks.sink1.topic = ts-push-delineation
    
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1

    启动Flume

    # cd /usr/local/apache-flume-1.9.0-bin ; nohup bin/flume-ng agent -c conf -f conf/flume.conf -n agent1 >/dev/null 2>&1 &
  • 相关阅读:
    Tensorflow的认识
    机器学习中的重点数学知识
    深度学习TensorFlow常用函数
    18、OpenCV Python 简单实现一个图片生成(类似抖音生成字母人像)
    17、OpenCV Python 数字验证码识别
    django contenttype 表应用
    contentType 应用,(表中数据大量存在外键时使用)
    django 组件拾遗
    rest_framework 的验证,权限,频率
    restframework CBV试图的4种方式
  • 原文地址:https://www.cnblogs.com/wjoyxt/p/11883790.html
Copyright © 2011-2022 走看看