zoukankan      html  css  js  c++  java
  • 使用Flume-Taildir和rocketmq-flume与RocketMQ的结合

    一、Fume-Taidir

    Flume1.7.0加入了taildirSource作为agent的source。可以说是 Spooling Directory Source + Exec Source 的结合体。可以监控多个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集,并记录每个文件最新消费位置,Agent进程重启后不会有重复消费的问题。

    官方文档:https://flume.apache.org/FlumeUserGuide.html#taildir-source

    Exec source:适用于监控一个实时追加的文件,但不能保证数据不丢失

    Spooldir Source:能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控

    Taildir Source:既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控

    Taildir Source 维护了一个 Json 格式的 positionFile,其会定期的往 positionFile 中更新每个文件读取到的最新的位置,因此能够实现断点续传。

    [
      {"inode":1717446,"pos":8,"file":"/tmp/upload/456.txt"},
      {"inode":1717449,"pos":8,"file":"/tmp/upload/789.txt"},
      {"inode":1717442,"pos":12,"file":"/tmp/upload/123.txt"}
    ]

    配置文件示例:

    agent1.sources = source1  
    agent1.channels = channel1  
    agent1.sinks = avroSink  
      
    # describe/configure source1  
    agent1.sources.source1.type = TAILDIR
    agent1.sources.source1.channels = channel1
    agent1.sources.source1.channels.skipToEnd = True
    # throught JSON format to record the inode, the absolute path and the
    last position of each tailing file.For to continual work agent1.sources.source1.positionFile = ./taildir_position.json
    # throught Space
    -separated list file dir which will been tail agent1.sources.source1.filegroups = f1 f2 # define f1 info. agent1.sources.source1.filegroups.f1 = /usr/local/tomcat/logs/ac/ac.log.* agent1.sources.source1.headers.f1.headerKey1 = value1 agent1.sources.source1.filegroups.f2 = /usr/local/tomcat/logs/gi/gi.log.* agent1.sources.source1.headers.f2.headerKey1 = value2 agent1.sources.source1.headers.f2.headerKey2 = value2-2 agent1.sources.source1.fileHeader = true # use a channel which buffers events in memory # type:memory or file is to temporary to save buffer data which is sink using agent1.channels.channel1.type=memory agent1.channels.channel1.capacity = 1000 agent1.channels.channel1.transactionCapacity = 1000 agent1.sinks.avroSink.type = avro agent1.sinks.avroSink.channel = channel1 agent1.sinks.avroSink.hostname = 192.168.216.201 agent1.sinks.avroSink.port = 4545 agent1.sinks.avroSink.batch-size = 5

    二、 Flume-ng与RocketMQ之间的消息接收和投递

    Rocket-Flume:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flume 

    agent1.sources = source1
    agent1.channels = channel1
    agent1.sinks = sink1
    
    agent1.sources.source1.type = TAILDIR
    agent1.sources.source1.positionFile = /usr/local/apache-flume-1.9.0-bin/db/taildir_position.json
    agent1.sources.source1.filegroups = seqGenSrc
    agent1.sources.source1.filegroups.seqGenSrc = /tmp/oss-yiruike-logs/raw/chaopai_push/.*txt
    agent1.sources.source1.fileHeader = false
    agent1.sources.source1.batchSize = 1
    
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.capacity = 100
    agent1.channels.channel1.transactionCapacity = 100
    agent1.channels.channel1.keep-alive = 3
    
    agent1.sinks.sink1.type = org.apache.rocketmq.flume.ng.sink.RocketMQSink
    agent1.sinks.sink1.nameserver = 172.17.213.74:9876;172.17.213.75:9876;172.17.213.75:9876
    agent1.sinks.sink1.producerGroup = MyProducerGroup_1
    agent1.sinks.sink1.topic = ts-push-delineation
    
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1

    启动Flume

    # cd /usr/local/apache-flume-1.9.0-bin ; nohup bin/flume-ng agent -c conf -f conf/flume.conf -n agent1 >/dev/null 2>&1 &
  • 相关阅读:
    973. K Closest Points to Origin
    919. Complete Binary Tree Inserter
    993. Cousins in Binary Tree
    20. Valid Parentheses
    141. Linked List Cycle
    912. Sort an Array
    各种排序方法总结
    509. Fibonacci Number
    374. Guess Number Higher or Lower
    238. Product of Array Except Self java solutions
  • 原文地址:https://www.cnblogs.com/wjoyxt/p/11883790.html
Copyright © 2011-2022 走看看