从来没写过Blog,想想也是,工作十多年了,搞过N多的架构、技术,不与大家分享实在是可惜了。另外,从传统地ERP行业转到互联网,也遇到了很所前所未有的问题,原来知道有一些坑,但是不知道坑太多太深。借着填坑的机会,把过程Log下来。
言归正传,先说说背景吧。Teld的业务平台中存在大量的物联网终端传感数据和车辆运行数据,这些数据中蕴含着大量的财富。So,要存储。Teld的充电终端还是很NB的,现在已经有2W+,而且每隔30S上报一次数据,当然单条数据量不会很大。这才是开始,按照国家规划,到2020年,我们要到百万级别了。擦,说的太远了!换算了一下,仅充电终端上报数据的TPS要求还是挺高的。通过2个月的研究和技术选型,我们选用Kafka作为海量数据处理的应用中间件。
好吧!选了Kafka,开始填坑吧。由于我们采用了.net技术路线,Kafka Client也必须是.net的。…(此处省略1万字),Kafka环境顺利调试成功,但是基于Kafka.Client编写的Consumer程序却出现严重的内存泄露。
Consumer程序需长时间运行,上图仅仅运行了2个小时后的内存就达到了570M。果断抓Dump,Windbg分析。
启动Windbg,设置符号文件,加载Dump。
执行下面命令:
.loadby sos clr (说明:程序是4.0的,2.0请问度娘)。
!dumpheap –stat (说明:按照类型显示堆中的对象数量和内存占用大小)
执行结果:
00007ff947e2f2e8 1215019 29160456 Kafka.Client.Common.NoOpTimer
00007ff947e2f1a8 1215019 29160456 Kafka.Client.Metrics.KafkaTimer
00007ff947e39600 1215018 38880576 Kafka.Client.Consumers.FetchRequestAndResponseMetrics
00007ff947e2df70 1215018 38880576 Kafka.Client.Common.ClientIdAndBroker
00007ff947e3a058 1215007 58320336 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
00007ff9a5cc3d60 1267853 86313134 System.String
通过执行结果可以看到,NoOpTimer、KafkaTimer、TetchRequestAndResponseMetrics、ConcurrentDictionary对象每类都有120w+,占用内存近200M。好吧,好像是这几个家伙的原因,矛头直指Kafka.Client。选取NoOpTimer,先看看gcroot情况吧,继续!
执行命令:(对象太多了,命令运行一会,break吧。)
!dumpheap -mt 00007ff947e2f3b0
执行结果:
0000021a972f8538 00007ff947e2f3b0 24
0000021a972f86e0 00007ff947e2f3b0 24
0000021a972f8828 00007ff947e2f3b0 24
0000021a972f89b8 00007ff947e2f3b0 24
0000021a972f8b10 00007ff947e2f3b0 24
执行结果的第一列为NoOpTimer对象的地址。查看gcroot情况。
执行命令:
!gcroot 0000021a972f8538
执行结果:
0000021ae58965a8 Teld.Core.Log.Processor.ProcessService
-> 0000021ae58966a8 System.Collections.Generic.List`1[[Teld.Core.Log.Processor.LogListener, Teld.Core.Log.Processor]]
-> 0000021ae5898068 Teld.Core.Log.Processor.LogListener[]
-> 0000021ae5896b78 Teld.Core.Log.Processor.LogListener
-> 0000021ae5896bb8 Teld.Core.Log.Processor.KafkaConsumer
-> 0000021a8ae4a2f8 Kafka.Client.Consumers.ZookeeperConsumerConnector
-> 0000021a94f6c0e8 Kafka.Client.Consumers.ConsumerFetcherManager
-> 0000021a94f6c1f0 System.Collections.Generic.Dictionary`2[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]]
-> 0000021a958fc328 System.Collections.Generic.Dictionary`2+Entry[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]][]
-> 0000021a962decf0 Kafka.Client.Consumers.ConsumerFetcherThread
-> 0000021a962df050 Kafka.Client.Consumers.SimpleConsumer
-> 0000021ae58f6348 Kafka.Client.Consumers.FetchRequestAndResponseStats
-> 0000021ae58f6378 Kafka.Client.Utils.Pool`2[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
-> 0000021a8d531598 System.Collections.Concurrent.ConcurrentDictionary`2+Tables[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
-> 0000021af58130c8 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]][]
-> 0000021a972f8550 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
-> 0000021a972f84e8 Kafka.Client.Consumers.FetchRequestAndResponseMetrics
通过执行结果可以看到,NoOpTimer对象被FetchRequestAndResponseMetric所持有,而FetchRequestAndResponseMetric好像被缓存到ConcurrentDictionary中了。ConcurrentDictionary这一坨看着这么熟悉呢,fuck!刚才!dumpheap –stat的结果里面有它!那就再分析ConCurrentDictionary类型看看吧。继续!
执行命令:(00007ff947e3a058 是第一次!dumpheap –stat 执行结果中的ConcurrentDictionary类型第一列的值(MT)。)
!dumpheap -mt 00007ff947e3a058
执行结果:(随机截取一段)
0000021aefcd5a90 00007ff947e3a058 48
0000021aefcd5c20 00007ff947e3a058 48
0000021aefcd5d60 00007ff947e3a058 48
0000021aefcd5ef0 00007ff947e3a058 48
0000021aefcd6030 00007ff947e3a058 48
0000021aefcd65e8 00007ff947e3a058 48
0000021aefcd6790 00007ff947e3a058 48
0000021aefcd68d8 00007ff947e3a058 48
0000021aefcd6a68 00007ff947e3a058 48
随机选取一个,继续查看gcroot情况。
执行命令:
!gcroot 0000021aefcd6a68
执行结果:
0000021ae58965a8 Teld.Core.Log.Processor.ProcessService
-> 0000021ae58966a8 System.Collections.Generic.List`1[[Teld.Core.Log.Processor.LogListener, Teld.Core.Log.Processor]]
-> 0000021ae5898068 Teld.Core.Log.Processor.LogListener[]
-> 0000021ae58970a8 Teld.Core.Log.Processor.LogListener
-> 0000021ae58970e8 Teld.Core.Log.Processor.KafkaConsumer
-> 0000021a8cedba08 Kafka.Client.Consumers.ZookeeperConsumerConnector
-> 0000021a94f56710 Kafka.Client.Consumers.ConsumerFetcherManager
-> 0000021a94f56818 System.Collections.Generic.Dictionary`2[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]]
-> 0000021a94f5bd20 System.Collections.Generic.Dictionary`2+Entry[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]][]
-> 0000021a962e5e80 Kafka.Client.Consumers.ConsumerFetcherThread
-> 0000021a962e61e0 Kafka.Client.Consumers.SimpleConsumer
-> 0000021ae58f60e8 Kafka.Client.Consumers.FetchRequestAndResponseStats
-> 0000021ae58f6118 Kafka.Client.Utils.Pool`2[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
-> 0000021a89deda70 System.Collections.Concurrent.ConcurrentDictionary`2+Tables[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
-> 0000021af5a43128 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]][]
-> 0000021aefcd6a68 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
通过结果可以看到,ConcurrentDictionary被Pool引用,而Pool又被FetchRequestAndResponseStats引用。这与NoOpTimer类型的引用情况很相似啊!
搜一下第一次!dumpheap –stat 的结果,发现FetchRequestAndResponseStats和Pool类型的对象数量只有11个。
00007ff947e387f8 11 528 Kafka.Client.Consumers.FetchRequestAndResponseStats
7ff947e397d8 11 792 Kafka.Client.Utils.Pool`2[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
看来,100多万个对象都是从Pool上来的。果断翻开kafka.Client的源代码。
internal class FetchRequestAndResponseStats
{
private string clientId;private Func<ClientIdAndBroker, FetchRequestAndResponseMetrics> valueFactory;
private Pool<ClientIdAndBroker, FetchRequestAndResponseMetrics> stats;private FetchRequestAndResponseMetrics allBrokerStats;
public FetchRequestAndResponseStats(string clientId)
{
this.clientId = clientId;
this.valueFactory = k => new FetchRequestAndResponseMetrics(k);
this.stats = new Pool<ClientIdAndBroker, FetchRequestAndResponseMetrics>(this.valueFactory);
this.allBrokerStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers"));
}public FetchRequestAndResponseMetrics GetFetchRequestAndResponseAllBrokersStats()
{
return this.allBrokerStats;
}public FetchRequestAndResponseMetrics GetFetchRequestAndResponseStats(string brokerInfo)
{
return this.stats.GetAndMaybePut(new ClientIdAndBroker(this.clientId, brokerInfo + "-"));
}
}
Pool类型的对象是FetchRequestAndResponseStats的一个属性,并且Pool是继承自ConcurrentDictionary,Key的类型为ClientIdAndBroker。Pool的定义如下:
public class Pool<TKey, TValue> : ConcurrentDictionary<TKey, TValue>
{
public Func<TKey, TValue> ValueFactory { get; set; }public Pool(Func<TKey, TValue> valueFactory = null)
{
this.ValueFactory = valueFactory;
}public TValue GetAndMaybePut(TKey key)
{
if (this.ValueFactory == null)
{
throw new KafkaException("Empty value factory in pool");
}
return this.GetOrAdd(key, this.ValueFactory);
}}
问题来了,FetchRequestAndResponseStats.GetFetchRequestAndResponseStats方法,每次New ClientIdAndBroker 对象后,调用Pool.GetAndMaybePut方法。擦!!!每次访问都是新对象,这个对象是要作为ConcurrentDictionary的Key存入的。并且存入方法调用的是ConcurrentDictionary.GetOrAdd()。新建的对象只能从ConcurrentDictionary中Add,没有任何Get到的可能性啊。Kafka.Client中竟然会出现这么低级的问题,瞬间对开源的组件有了新的认识:开源组件的坑太深了,不填不知道啊。
抓紧把开源组件的代码改一下吧。把Pool的key类型从ClientIdAndBroker改为string。调试运行,下面是Run了2天的Consumer程序的内存占用情况,期间Consumer已经处理了60万日志。
问题终于完美解决了!最后,国际惯例,感谢JuQiang老师指导。在互联网领域,我是个新手,Blog中难免存在一些不客观,不成熟的见解,还请多多包涵!
vveiliang 2015-12-3