zoukankan      html  css  js  c++  java
  • Flume多Sink方案修正

       在实际项目中采用http://www.cnblogs.com/moonandstar08/p/6091384.html方案进行布署时,由于系统产生的消费比较大按照原方案进行布署时,随着国外局点不断增加,那么SZ局点的Channel会不断增加,另一方面,在Kafaka集群中创建Partitation时由于无法保证Channel均匀的分布到Kafka集群时,那么在实际的生产环境上布署时会发现:SZ Kafka中的数据会保存N(海外局点数)份在SZ的环境上,很容易造成磁盘中存了N份冗余数据,此时Flume的模型如下图所示:

       因此需要对此方案进行修正,修正的思路主要有二种:

    一、采用Flume load balance模式

       模型原型如下所示:

      采用此方案时,SZ本地的Flume配置如下:

    #list the sources,sinks and channels int the agent
    agent.sources = kafkaSrc
    agent.channels = kafkaChannel_sz
    agent.sinks = kafkaSink_sz
    
    #source configure
    agent.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource
    agent.sources.kafkaSrc.zookeeperConnect = XXX:2181,YYY:2182
    agent.sources.kafkaSrc.topic = test_produce
    agent.sources.kakkaSrc.groupId = test
    agent.sources.kakkaSrc.kafka.consumer.timeout.ms = 100
    
    #use a channel which buffers events in memory
    agent.channels.kafkaChannel_sz.type = memory
    agent.channels.kafkaChannel_sz.capacity = 1000000
    agent.channels.kafkaChannel_sz.transactionCapacity = 100
    
    #sink 
    agent.sources.kafkaSink_sz.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sources.kafkaSink_sz.topic = test_consume
    agent.sources.kafkaSink_sz.brokerList= XXX:9092
    agent.sources.kafkaSink_sz.batchSize = 5
    
    #bind the source and sink to the channel
    agent.sources.kafkaSrc.channels = kafkaChannel_sz
    agent.sinks.kafkaSink_sz.channel = kafkaChannel_sz
    

     此方案的实质是通过memory共享数据,当数据量比较大时很容易造成内存溢出。另外,当memory中数据丢失时也无法恢复。

     此模型的使用如下所示: 

    可以参见:http://www.cnblogs.com/lishouguang/p/4558790.html

    二、Kafka + Flume Souce groupID来处理

      Kafka + Flume Souce groupID方案的模型如下图所示:

     

      Flume相关配置如下:

      A.SZ本地搭建Kafka集群,不进行Flume配置;

      B.UK本地Flume配置如下:

    #Source
    agent.sources = kafkaSrc
    agent.channels = kafkaChannel_sz
    agent.sinks = kafkaSink_uk
     
    #Source configure
    agent.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource
    agent.sources.kafkaSrc.channels = kafkaChannel_sz
    agent.sources.kafkaSrc.zookeeperConnect = XXX:2181,YYY:2182 (SZ ZOO)
    agent.sources.kafkaSrc.topic = k_produce
    agent.sources.kafkaSrc.groupId = k_uk
    
    #Channel
    agent.channels.kafkaChannel_sz.type = org.apache.flume.channel.kafka.KafkaChannel
    agent.channels.kafkaChannel_sz.brokeList = XXX:9092,YYY:9093(UK brokeList)
    agent.channels.kafkaChannel_sz.topic = k_uk
    agent.channels.kafkaChannel_sz.zookeeperConnect = XXX:2181,YYY:2182(UK ZOO)
    agent.channels.kafkaChannel_sz.capacity = 10000
    agent.channels.kafkaChannel_sz.transactionCapacity = 1000
    
    #Sink
    agent.sinks.kafkaSink_uk.channel = kafkaChannel_sz
    agent.sinks.kafkaSink_uk.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.kafkaSink_uk.topic = t_uk_consume
    agent.sinks,kafkaSink_uk.brokeList = XXX:9092,YYY:9093(UK brokeList)
    agent.sinks.kafkaSink_uk.bachSize = 20
    

     C.BR本地Flume配置如下:

    #Source
    agent.sources = kafkaSrc
    agent.channels = kafkaChannel_sz
    agent.sinks = kafkaSink_br
     
    #Source configure
    agent.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource
    agent.sources.kafkaSrc.channels = kafkaChannel_sz
    agent.sources.kafkaSrc.zookeeperConnect = XXX:2181,YYY:2182 (SZ ZOO)
    agent.sources.kafkaSrc.topic = k_produce
    agent.sources.kafkaSrc.groupId = k_br
    
    #Channel
    agent.channels.kafkaChannel_sz.type = org.apache.flume.channel.kafka.KafkaChannel
    agent.channels.kafkaChannel_sz.brokeList = XXX:9092,YYY:9093(BR brokeList)
    agent.channels.kafkaChannel_sz.topic = k_br
    agent.channels.kafkaChannel_sz.zookeeperConnect = XXX:2181,YYY:2182(BR ZOO)
    agent.channels.kafkaChannel_sz.capacity = 10000
    agent.channels.kafkaChannel_sz.transactionCapacity = 1000
    
    #Sink
    agent.sinks.kafkaSink_uk.channel = kafkaChannel_sz
    agent.sinks.kafkaSink_uk.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.kafkaSink_uk.topic = t_br_consume
    agent.sinks,kafkaSink_uk.brokeList = XXX:9092,YYY:9093(BR brokeList)
    agent.sinks.kafkaSink_uk.bachSize = 20
    

     参考:http://shiyanjun.cn/archives/915.html

  • 相关阅读:
    C# 利用StringBuilder提升字符串拼接性能
    T420 开启麦克风
    理解数据库中的undo日志、redo日志、检查点
    网络编程api总结
    源码剖析Linux epoll实现机制及Linux上惊群
    linux信号的处理--部分源码分析
    git使用笔记-提高篇-重置揭密
    git使用笔记-比较分支差异
    git使用笔记-提高篇
    ubuntu安装软件依赖解决
  • 原文地址:https://www.cnblogs.com/moonandstar08/p/6216329.html
Copyright © 2011-2022 走看看