部分项目从kafka迁移至pulsar,近期使用中碰到了一些问题,勉强把大的坑踩完了,topic永驻,性能相关
pulsar概念类的东西官方文档和基本介绍的博客很多,也就不重复说明了,更深入的东西也不涉及
只说下近期的使用体验
设计理念上,虽然pulsar也支持持久化队列,但和kafka对持久化的理解是不一样的
kafka的持久化多少有一些数据仓储的概念在里面,数据长期保存,通常是指定数据的保存日期,kafka后台按时清理过期数据
而pulsar的持久化,是更纯粹的mq的持久化,只是为了保证数据会被消费到,没有数据仓储的概念,只是纯粹的消息队列,这种设计目标使得pulsar对硬盘空间的回收非常积极
纯粹的消息队列个人也用过些,例如nsq,因为pulsar处处是和kafka对比,因此一直觉得pulsar的默认配置,可以完全替代kafka,但出了不少问题
回收级别
topic
kafka里的topic无论是否有消费,写入后的数据,都会稳定的一段时间,官方默认是7天,根据应用需要会调整
pulsar里的topic,写入完成后,即使数据从来没有被消费过,也会在活跃检测机制下,被自动清除
pulsar默认会有以下场景:producer 写入了100w条数据,还没来得及consume,topic就被清除了
这可以通过几个配置来调整,提高活跃间隔,但治村不治本。
可以禁用非活跃topic 自动回收,相关配置为
brokerDeleteInactiveTopicsEnabled=true
brokerDeleteInactiveTopicsFrequencySeconds=60
brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions
brokerDeleteInactiveTopicsMaxInactiveDurationSeconds=
本以为能解决问题,但是历史数据仍然消失了,这是因为topic下还有另一个维度的回收,lenger的回收,topic还在,不删topic,但是topic下的数据,还是会被清除
lenger
kafka的topic 数据以partition区分存储多分区数据
pulsar 没有partition的概念,而是有lenger概念,lenger可以理解为一个文件块,topic下的数据以lenger的格式分块存储,lenger有序,lenger id全局自增,和消费的offset相关,低位的lenger消费完再消费高位的lenger
例如topic 下有以下3个lenger,按顺序消费每个lenger的数据
lenger_id | offset_start | offset_end |
---|---|---|
1 | 0 | 50000 |
2 | 50001 | 100000 |
3 | 100001 | 150000 |
在topic级别的活跃检测清除外,pulsar对topic下的lenger回收也较积极
如果所有已存在的consume的offset都漫过了第50000,也就漫过了lenger_id_1,则lenger_id_1 会被pulsar积极的回收掉,这也就导致了,即使关闭了非活跃topic的自动回收,topic是还在,但是topic下的lenger仍然会'消失'的情况
kafka的一个经典使用场景是producer先写入完成,数据默认保持数周,然后多个consumer再分别去消费
pulsar的默认行为会导致一些初从kafka迁移来未预料到的问题(实际是pulsar的机制)
-
1 producer 写kafka写入topic完成,producer任务完成关闭连接,但是这时还没有消费者来消费,下一个活跃topic检测周期到了,该topic被完全清除,producer白写了
-
2 producer 写kafka写入topic完成,consume-1从头消费数据,已经消费了一半的数据,pulsar检测到该topic低位的一半lenger都被所有consume(只有consume-1)消费过了,把低位的lenger清除,过了段时间consume-2从头消费,低位lenger的数据已经被删除,只能处理高位lenger下的数据
解决办法
非活跃回收可以关闭,topic会保留
brokerDeleteInactiveTopicsEnabled=false
lenger会回收已被消费过的lenger块,如果当前lenger有consumer,并且offset处在低位,则相比该lenger高位的都不会被清除
因此初期想到的办法是在topic时,自动新建一个consumer,将offset锁定在低位0(这是在测试时发现的,测试topic消费,临时起了个conumser_test,该conumser_test一直将offset锁定在低位,上位的数据并没有被回收)
这种方法只是使用初期工程上的办法,非常的不优雅,只是过渡使用,更好的办法是调整pulsar的配置,以个人多年各种服务和组件的使用优化经验,肯定是有这个配置的
目标是解决pulsar对topic下已被consumer消费过的lenger回收问题,使lenger保留更久的时间
以下是lenger回收的相关日志,这是查看源码的主要线索
13:41:07.520 [bookkeeper-ml-workers-OrderedExecutor-7-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-2] Removing ledger 30295 - size: 528330607
13:43:07.520 [bookkeeper-ml-workers-OrderedExecutor-6-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-1] Removing ledger 30300 - size: 486083111
13:43:07.525 [bookkeeper-ml-workers-OrderedExecutor-5-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-0] Removing ledger 30253 - size: 504748015
分享下排查和调整过程(实际是走了一些弯路)
-
调高managedLedgerOffloadDeletionLagMs
首先是在pulsar默认的配置文件里找到可能相关的参数项,关键词delete/remove之类
cat /pulsar/conf/bookkeeper.conf
找到了managedLedgerOffloadDeletionLagMs,把这个时间调大,以为能生效,但是上线后还是有ledger的删除日志,问题未解决
观查Removing ledger 的间隔,大概是4小时一个批次
managedLedgerOffloadDeletionLagMs=14400000
把 managedLedgerOffloadDeletionLagMs: "864000000"
调高以为能解决问题,但没有生效,还有会有ledger的删除日志
-
添加 managedLedgerOffloadDeletionLagMs
查到相关 issue https://github.com/apache/pulsar/issues/8220
google managedLedgerOffloadDeletionLagMs 查到的这个issues,但也不确定是不是问题
看了下源码,又找到两个可能的相关项,调整同样不生效
按issues添加配置
ServiceConfiguration (broker.conf / standalone.conf)
managedLedgerOffloadDeletionLagMs
managedLedgerOffloadAutoTriggerSizeThresholdBytes
OffloadPolicies
managedLedgerOffloadDeletionLagInMillis
managedLedgerOffloadThresholdInBytes
-
查看源码寻找线索
只好从源码层面想办法了-其实通常这种存储服务会给一个的config文件,里面有所有的项,按关键词+经验找相关项调整即可,通常用不着看代码解决
目标是先找到执行lenger删除的代码
13:41:07.520 [bookkeeper-ml-workers-OrderedExecutor-7-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-2] Removing ledger 30295 - size: 528330607
13:43:07.520 [bookkeeper-ml-workers-OrderedExecutor-6-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-1] Removing ledger 30300 - size: 486083111
13:43:07.525 [bookkeeper-ml-workers-OrderedExecutor-5-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-0] Removing ledger 30253 - size: 504748015
日志里已经有目标类的信息,源码里全文检索'Removing ledger'可精确定位
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
ledgersStat = stat;
metadataMutex.unlock();
trimmerMutex.unlock();
for (LedgerInfo ls : ledgersToDelete) {
log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
asyncDeleteLedger(ls.getLedgerId(), ls);
}
for (LedgerInfo ls : offloadedLedgersToDelete) {
log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
ls.getSize());
asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
}
promise.complete(null);
}
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
metadataMutex.unlock();
trimmerMutex.unlock();
promise.completeExceptionally(e);
}
});
}
核心部分是
for (LedgerInfo ls : ledgersToDelete) {
log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
asyncDeleteLedger(ls.getLedgerId(), ls);
}
其实只要注释掉这段代码重新编译pulsar便能完全禁用pulsar的lenger清除,但这个太粗暴了,重新编译自定义的jar包,虽然也很简单,但这只是最后的办法,最好还是先从源码里的相关配置里解决
ledgersToDelete 相关部分
List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
boolean overRetentionQuota = isLedgerRetentionOverSizeQuota();
if (log.isDebugEnabled()) {
log.debug(
"[{}] Checking ledger {} -- time-old: {} sec -- "
+ "expired: {} -- over-quota: {} -- current-ledger: {}",
name, ls.getLedgerId(), (clock.millis() - ls.getTimestamp()) / 1000.0, expired,
overRetentionQuota, currentLedger.getId());
}
if (ls.getLedgerId() == currentLedger.getId()) {
log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
ls.getLedgerId());
break;
} else if (expired) {
log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp());
ledgersToDelete.add(ls);
} else if (overRetentionQuota) {
log.debug("[{}] Ledger {} is over quota", name, ls.getLedgerId());
ledgersToDelete.add(ls);
} else {
log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId());
break;
}
}
其实可以简单看到ledgersToDelete.add 有两种来源,一是expired,二是overRetentionQuota
只要使得ledger 不满足这两个条件即可
boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
boolean overRetentionQuota = isLedgerRetentionOverSizeQuota();
分别进入这两个方法
private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
if (config.getRetentionTimeMillis() < 0) {
// Negative retention time equates to infinite retention
return false;
}
long elapsedMs = clock.millis() - ledgerTimestamp;
return elapsedMs > config.getRetentionTimeMillis();
}
private boolean isLedgerRetentionOverSizeQuota() {
// Handle the -1 size limit as "infinite" size quota
return config.getRetentionSizeInMB() >= 0
&& TOTAL_SIZE_UPDATER.get(this) > config.getRetentionSizeInMB() * 1024 * 1024;
}
看到了config关键词,目标就在眼前了
public ManagedLedgerConfig setRetentionSizeInMB(long retentionSizeInMB) {
this.retentionSizeInMB = retentionSizeInMB;
return this;
}
public long getRetentionSizeInMB() {
return retentionSizeInMB;
}
public ManagedLedgerConfig setRetentionTime(int retentionTime, TimeUnit unit) {
this.retentionTimeMs = unit.toMillis(retentionTime);
return this;
}
public long getRetentionTimeMillis() {
return retentionTimeMs;
}
接下来找set的位置
managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
还需要看retentionPolicies类
实际这个时候已经可以结合配置了,查看pulsar的配置文件,发现两项配置,按经验,可以确定就是这两项,更新集群配置后,重启,lenger被清除的问题便已解决
defaultRetentionTimeInMinutes=0
defaultRetentionSizeInMB=0
总结
实际检索代码retentionSizeInMB的时候,检索到了一个faq.md文件,里面明确有提到类似的问题
之前只看了官方的文档,官方文档更是使用和概含说明,并没有看github上项目的一些md文件,没有注意到这项配置,上文说的弯路就是指这个faq.md文件
这种场景常见,个人肯定不是第一个注意到并想解决办法的
How can I prevent an inactive topic to be deleted under any circumstance? I want to set no time or space limit for a certain namespace.
There’s not currently an option for “infinite” (though it sounds a good idea! maybe we could use -1 for that). The only option now is to use INT_MAX for retentionTimeInMinutes and LONG_MAX for retentionSizeInMB. It’s not “infinite” but 4085 years of retention should probably be enough!
另外提一个小坑,同硬件环境裸机搭了3个节点也比单实例的慢,也排除了zk gfs 因素
3点的分布式环境比单点测试要慢
https://github.com/apache/pulsar/issues/8570
咨询了下官方,确认了是个参数配置问题
默认单点 journalSyncData=false
默认集群 journalSyncData=true
而kakfa的默认行为类似journalSyncData=false