zoukankan      html  css  js  c++  java
  • Windbg调优Kafka.Client内存泄露

        从来没写过Blog,想想也是,工作十多年了,搞过N多的架构、技术,不与大家分享实在是可惜了。另外,从传统地ERP行业转到互联网,也遇到了很所前所未有的问题,原来知道有一些坑,但是不知道坑太多太深。借着填坑的机会,把过程Log下来。

        言归正传,先说说背景吧。Teld的业务平台中存在大量的物联网终端传感数据和车辆运行数据,这些数据中蕴含着大量的财富。So,要存储。Teld的充电终端还是很NB的,现在已经有2W+,而且每隔30S上报一次数据,当然单条数据量不会很大。这才是开始,按照国家规划,到2020年,我们要到百万级别了。擦,说的太远了!换算了一下,仅充电终端上报数据的TPS要求还是挺高的。通过2个月的研究和技术选型,我们选用Kafka作为海量数据处理的应用中间件。

        好吧!选了Kafka,开始填坑吧。由于我们采用了.net技术路线,Kafka Client也必须是.net的。…(此处省略1万字),Kafka环境顺利调试成功,但是基于Kafka.Client编写的Consumer程序却出现严重的内存泄露。

          

        Consumer程序需长时间运行,上图仅仅运行了2个小时后的内存就达到了570M。果断抓Dump,Windbg分析。

        启动Windbg,设置符号文件,加载Dump。

        执行下面命令:

            .loadby sos clr  (说明:程序是4.0的,2.0请问度娘)。

            !dumpheap –stat (说明:按照类型显示堆中的对象数量和内存占用大小)

        执行结果:

    00007ff947e2f2e8  1215019     29160456 Kafka.Client.Common.NoOpTimer
    00007ff947e2f1a8  1215019     29160456 Kafka.Client.Metrics.KafkaTimer
    00007ff947e39600  1215018     38880576 Kafka.Client.Consumers.FetchRequestAndResponseMetrics
    00007ff947e2df70  1215018     38880576 Kafka.Client.Common.ClientIdAndBroker
    00007ff947e3a058  1215007     58320336 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
    00007ff9a5cc3d60  1267853     86313134 System.String

        通过执行结果可以看到,NoOpTimer、KafkaTimer、TetchRequestAndResponseMetrics、ConcurrentDictionary对象每类都有120w+,占用内存近200M。好吧,好像是这几个家伙的原因,矛头直指Kafka.Client。选取NoOpTimer,先看看gcroot情况吧,继续!

        执行命令:(对象太多了,命令运行一会,break吧。)

    !dumpheap -mt 00007ff947e2f3b0 

        执行结果:

    0000021a972f8538 00007ff947e2f3b0       24    
    0000021a972f86e0 00007ff947e2f3b0       24    
    0000021a972f8828 00007ff947e2f3b0       24    
    0000021a972f89b8 00007ff947e2f3b0       24    
    0000021a972f8b10 00007ff947e2f3b0       24    

        执行结果的第一列为NoOpTimer对象的地址。查看gcroot情况。

        执行命令:

    !gcroot 0000021a972f8538

        执行结果:

    0000021ae58965a8 Teld.Core.Log.Processor.ProcessService
               ->  0000021ae58966a8 System.Collections.Generic.List`1[[Teld.Core.Log.Processor.LogListener, Teld.Core.Log.Processor]]
               ->  0000021ae5898068 Teld.Core.Log.Processor.LogListener[]
               ->  0000021ae5896b78 Teld.Core.Log.Processor.LogListener
               ->  0000021ae5896bb8 Teld.Core.Log.Processor.KafkaConsumer
               ->  0000021a8ae4a2f8 Kafka.Client.Consumers.ZookeeperConsumerConnector
               ->  0000021a94f6c0e8 Kafka.Client.Consumers.ConsumerFetcherManager
               ->  0000021a94f6c1f0 System.Collections.Generic.Dictionary`2[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]]
               ->  0000021a958fc328 System.Collections.Generic.Dictionary`2+Entry[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]][]
               ->  0000021a962decf0 Kafka.Client.Consumers.ConsumerFetcherThread
               ->  0000021a962df050 Kafka.Client.Consumers.SimpleConsumer
               ->  0000021ae58f6348 Kafka.Client.Consumers.FetchRequestAndResponseStats
               ->  0000021ae58f6378 Kafka.Client.Utils.Pool`2[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
               ->  0000021a8d531598 System.Collections.Concurrent.ConcurrentDictionary`2+Tables[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
               ->  0000021af58130c8 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]][]
               ->  0000021a972f8550 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
               ->  0000021a972f84e8 Kafka.Client.Consumers.FetchRequestAndResponseMetrics

        通过执行结果可以看到,NoOpTimer对象被FetchRequestAndResponseMetric所持有,而FetchRequestAndResponseMetric好像被缓存到ConcurrentDictionary中了。ConcurrentDictionary这一坨看着这么熟悉呢,fuck!刚才!dumpheap –stat的结果里面有它!那就再分析ConCurrentDictionary类型看看吧。继续!

        执行命令:(00007ff947e3a058 是第一次!dumpheap –stat 执行结果中的ConcurrentDictionary类型第一列的值(MT)。)

    !dumpheap -mt 00007ff947e3a058

        执行结果:(随机截取一段)

    0000021aefcd5a90 00007ff947e3a058       48    
    0000021aefcd5c20 00007ff947e3a058       48    
    0000021aefcd5d60 00007ff947e3a058       48    
    0000021aefcd5ef0 00007ff947e3a058        48    
    0000021aefcd6030 00007ff947e3a058       48    
    0000021aefcd65e8 00007ff947e3a058       48    
    0000021aefcd6790 00007ff947e3a058       48    
    0000021aefcd68d8 00007ff947e3a058       48    
    0000021aefcd6a68 00007ff947e3a058       48  

        随机选取一个,继续查看gcroot情况。

        执行命令:

    !gcroot 0000021aefcd6a68

        执行结果:

    0000021ae58965a8 Teld.Core.Log.Processor.ProcessService
                ->  0000021ae58966a8 System.Collections.Generic.List`1[[Teld.Core.Log.Processor.LogListener, Teld.Core.Log.Processor]]
                ->  0000021ae5898068 Teld.Core.Log.Processor.LogListener[]
                ->  0000021ae58970a8 Teld.Core.Log.Processor.LogListener
                ->  0000021ae58970e8 Teld.Core.Log.Processor.KafkaConsumer
                ->  0000021a8cedba08 Kafka.Client.Consumers.ZookeeperConsumerConnector
                ->  0000021a94f56710 Kafka.Client.Consumers.ConsumerFetcherManager
                ->  0000021a94f56818 System.Collections.Generic.Dictionary`2[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]]
                ->  0000021a94f5bd20 System.Collections.Generic.Dictionary`2+Entry[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]][]
                ->  0000021a962e5e80 Kafka.Client.Consumers.ConsumerFetcherThread
                ->  0000021a962e61e0 Kafka.Client.Consumers.SimpleConsumer
                ->  0000021ae58f60e8 Kafka.Client.Consumers.FetchRequestAndResponseStats
                ->  0000021ae58f6118 Kafka.Client.Utils.Pool`2[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
                ->  0000021a89deda70 System.Collections.Concurrent.ConcurrentDictionary`2+Tables[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
                ->  0000021af5a43128 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]][]
                ->  0000021aefcd6a68 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]

        通过结果可以看到,ConcurrentDictionary被Pool引用,而Pool又被FetchRequestAndResponseStats引用。这与NoOpTimer类型的引用情况很相似啊!

        搜一下第一次!dumpheap –stat 的结果,发现FetchRequestAndResponseStats和Pool类型的对象数量只有11个。

    00007ff947e387f8       11          528 Kafka.Client.Consumers.FetchRequestAndResponseStats

    7ff947e397d8       11          792 Kafka.Client.Utils.Pool`2[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]

        看来,100多万个对象都是从Pool上来的。果断翻开kafka.Client的源代码。

    internal class FetchRequestAndResponseStats
        {
            private string clientId;

            private Func<ClientIdAndBroker, FetchRequestAndResponseMetrics> valueFactory;
            private Pool<ClientIdAndBroker, FetchRequestAndResponseMetrics> stats;

            private FetchRequestAndResponseMetrics allBrokerStats;

            public FetchRequestAndResponseStats(string clientId)
            {
                this.clientId = clientId;
                this.valueFactory = k => new FetchRequestAndResponseMetrics(k);
                this.stats = new Pool<ClientIdAndBroker, FetchRequestAndResponseMetrics>(this.valueFactory);
                this.allBrokerStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers"));
            }

            public FetchRequestAndResponseMetrics GetFetchRequestAndResponseAllBrokersStats()
            {
                return this.allBrokerStats;
            }

            public FetchRequestAndResponseMetrics GetFetchRequestAndResponseStats(string brokerInfo)
            {
                return this.stats.GetAndMaybePut(new ClientIdAndBroker(this.clientId, brokerInfo + "-"));
            }
        }

        Pool类型的对象是FetchRequestAndResponseStats的一个属性,并且Pool是继承自ConcurrentDictionary,Key的类型为ClientIdAndBroker。Pool的定义如下:

    public class Pool<TKey, TValue> : ConcurrentDictionary<TKey, TValue>
        {
            public Func<TKey, TValue> ValueFactory { get; set; }

            public Pool(Func<TKey, TValue> valueFactory = null)
            {
                this.ValueFactory = valueFactory;
            }

            public TValue GetAndMaybePut(TKey key)
            {
                if (this.ValueFactory == null)
                {
                    throw new KafkaException("Empty value factory in pool");
                }
                return this.GetOrAdd(key, this.ValueFactory);
            }

        }

        问题来了,FetchRequestAndResponseStats.GetFetchRequestAndResponseStats方法,每次New ClientIdAndBroker 对象后,调用Pool.GetAndMaybePut方法。擦!!!每次访问都是新对象,这个对象是要作为ConcurrentDictionary的Key存入的。并且存入方法调用的是ConcurrentDictionary.GetOrAdd()。新建的对象只能从ConcurrentDictionary中Add,没有任何Get到的可能性啊。Kafka.Client中竟然会出现这么低级的问题,瞬间对开源的组件有了新的认识:开源组件的坑太深了,不填不知道啊。

        抓紧把开源组件的代码改一下吧。把Pool的key类型从ClientIdAndBroker改为string。调试运行,下面是Run了2天的Consumer程序的内存占用情况,期间Consumer已经处理了60万日志。

        

        问题终于完美解决了!最后,国际惯例,感谢JuQiang老师指导。在互联网领域,我是个新手,Blog中难免存在一些不客观,不成熟的见解,还请多多包涵!

        vveiliang 2015-12-3

                                                                         

  • 相关阅读:
    winform中利用正则表达式得到有效的电话/手机号
    winform运行时如何接受参数?(示例)
    [基础]Javascript中的继承示例代码
    [转]C#中"is" vs "as"
    Javascript数组常用方法[包含MS AJAX.NET的prototype扩展方法]示例
    linq学习笔记(一)
    用winform应用程序登录网站的解决方案
    [转贴]操纵自如--页面内的配合与通信
    .net3.0中的扩展方法(示例)
    window.location或window.open如何指定target?
  • 原文地址:https://www.cnblogs.com/teld/p/5016891.html
Copyright © 2011-2022 走看看