zoukankan      html  css  js  c++  java
  • 部分项目从kafka迁移至pulsar,近期使用中碰到了一些问题,勉强把大的坑踩完了,topic永驻,性能相关

    部分项目从kafka迁移至pulsar,近期使用中碰到了一些问题,勉强把大的坑踩完了,topic永驻,性能相关

    pulsar概念类的东西官方文档和基本介绍的博客很多,也就不重复说明了,更深入的东西也不涉及

    只说下近期的使用体验

    设计理念上,虽然pulsar也支持持久化队列,但和kafka对持久化的理解是不一样的

    kafka的持久化多少有一些数据仓储的概念在里面,数据长期保存,通常是指定数据的保存日期,kafka后台按时清理过期数据

    而pulsar的持久化,是更纯粹的mq的持久化,只是为了保证数据会被消费到,没有数据仓储的概念,只是纯粹的消息队列,这种设计目标使得pulsar对硬盘空间的回收非常积极

    纯粹的消息队列个人也用过些,例如nsq,因为pulsar处处是和kafka对比,因此一直觉得pulsar的默认配置,可以完全替代kafka,但出了不少问题

    回收级别

    topic

    kafka里的topic无论是否有消费,写入后的数据,都会稳定的一段时间,官方默认是7天,根据应用需要会调整

    pulsar里的topic,写入完成后,即使数据从来没有被消费过,也会在活跃检测机制下,被自动清除

    pulsar默认会有以下场景:producer 写入了100w条数据,还没来得及consume,topic就被清除了

    这可以通过几个配置来调整,提高活跃间隔,但治村不治本。

    可以禁用非活跃topic 自动回收,相关配置为

    brokerDeleteInactiveTopicsEnabled=true
    brokerDeleteInactiveTopicsFrequencySeconds=60
    brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions
    brokerDeleteInactiveTopicsMaxInactiveDurationSeconds=
    

    本以为能解决问题,但是历史数据仍然消失了,这是因为topic下还有另一个维度的回收,lenger的回收,topic还在,不删topic,但是topic下的数据,还是会被清除

    lenger

    kafka的topic 数据以partition区分存储多分区数据

    pulsar 没有partition的概念,而是有lenger概念,lenger可以理解为一个文件块,topic下的数据以lenger的格式分块存储,lenger有序,lenger id全局自增,和消费的offset相关,低位的lenger消费完再消费高位的lenger

    例如topic 下有以下3个lenger,按顺序消费每个lenger的数据

    lenger_id offset_start offset_end
    1 0 50000
    2 50001 100000
    3 100001 150000

    在topic级别的活跃检测清除外,pulsar对topic下的lenger回收也较积极

    如果所有已存在的consume的offset都漫过了第50000,也就漫过了lenger_id_1,则lenger_id_1 会被pulsar积极的回收掉,这也就导致了,即使关闭了非活跃topic的自动回收,topic是还在,但是topic下的lenger仍然会'消失'的情况


    kafka的一个经典使用场景是producer先写入完成,数据默认保持数周,然后多个consumer再分别去消费

    pulsar的默认行为会导致一些初从kafka迁移来未预料到的问题(实际是pulsar的机制)

    • 1 producer 写kafka写入topic完成,producer任务完成关闭连接,但是这时还没有消费者来消费,下一个活跃topic检测周期到了,该topic被完全清除,producer白写了

    • 2 producer 写kafka写入topic完成,consume-1从头消费数据,已经消费了一半的数据,pulsar检测到该topic低位的一半lenger都被所有consume(只有consume-1)消费过了,把低位的lenger清除,过了段时间consume-2从头消费,低位lenger的数据已经被删除,只能处理高位lenger下的数据

    解决办法

    非活跃回收可以关闭,topic会保留

    brokerDeleteInactiveTopicsEnabled=false

    lenger会回收已被消费过的lenger块,如果当前lenger有consumer,并且offset处在低位,则相比该lenger高位的都不会被清除

    因此初期想到的办法是在topic时,自动新建一个consumer,将offset锁定在低位0(这是在测试时发现的,测试topic消费,临时起了个conumser_test,该conumser_test一直将offset锁定在低位,上位的数据并没有被回收)

    这种方法只是使用初期工程上的办法,非常的不优雅,只是过渡使用,更好的办法是调整pulsar的配置,以个人多年各种服务和组件的使用优化经验,肯定是有这个配置的


    目标是解决pulsar对topic下已被consumer消费过的lenger回收问题,使lenger保留更久的时间

    以下是lenger回收的相关日志,这是查看源码的主要线索

    13:41:07.520 [bookkeeper-ml-workers-OrderedExecutor-7-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-2] Removing ledger 30295 - size: 528330607
    13:43:07.520 [bookkeeper-ml-workers-OrderedExecutor-6-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-1] Removing ledger 30300 - size: 486083111
    13:43:07.525 [bookkeeper-ml-workers-OrderedExecutor-5-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-0] Removing ledger 30253 - size: 504748015
    

    分享下排查和调整过程(实际是走了一些弯路)

    • 调高managedLedgerOffloadDeletionLagMs

    首先是在pulsar默认的配置文件里找到可能相关的参数项,关键词delete/remove之类

    cat /pulsar/conf/bookkeeper.conf

    找到了managedLedgerOffloadDeletionLagMs,把这个时间调大,以为能生效,但是上线后还是有ledger的删除日志,问题未解决

    观查Removing ledger 的间隔,大概是4小时一个批次

    managedLedgerOffloadDeletionLagMs=14400000
    

    managedLedgerOffloadDeletionLagMs: "864000000"调高以为能解决问题,但没有生效,还有会有ledger的删除日志

    • 添加 managedLedgerOffloadDeletionLagMs

    https://www.google.com/search?q=managedLedgerOffloadDeletionLagMs&oq=managedLedgerOffloadDeletionLagMs

    查到相关 issue https://github.com/apache/pulsar/issues/8220

    google managedLedgerOffloadDeletionLagMs 查到的这个issues,但也不确定是不是问题

    看了下源码,又找到两个可能的相关项,调整同样不生效

    按issues添加配置

    ServiceConfiguration (broker.conf / standalone.conf)
    managedLedgerOffloadDeletionLagMs
    managedLedgerOffloadAutoTriggerSizeThresholdBytes
    
    OffloadPolicies
    managedLedgerOffloadDeletionLagInMillis
    managedLedgerOffloadThresholdInBytes
    

    • 查看源码寻找线索

    只好从源码层面想办法了-其实通常这种存储服务会给一个的config文件,里面有所有的项,按关键词+经验找相关项调整即可,通常用不着看代码解决

    目标是先找到执行lenger删除的代码

    13:41:07.520 [bookkeeper-ml-workers-OrderedExecutor-7-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-2] Removing ledger 30295 - size: 528330607
    13:43:07.520 [bookkeeper-ml-workers-OrderedExecutor-6-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-1] Removing ledger 30300 - size: 486083111
    13:43:07.525 [bookkeeper-ml-workers-OrderedExecutor-5-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-0] Removing ledger 30253 - size: 504748015
    

    日志里已经有目标类的信息,源码里全文检索'Removing ledger'可精确定位

    https://github.com/apache/pulsar/blob/v2.6.1/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

                store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
                    @Override
                    public void operationComplete(Void result, Stat stat) {
                        log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
                                TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
                        ledgersStat = stat;
                        metadataMutex.unlock();
                        trimmerMutex.unlock();
    
                        for (LedgerInfo ls : ledgersToDelete) {
                            log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
                            asyncDeleteLedger(ls.getLedgerId(), ls);
                        }
                        for (LedgerInfo ls : offloadedLedgersToDelete) {
                            log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
                                    ls.getSize());
                            asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
                        }
                        promise.complete(null);
                    }
    
                    @Override
                    public void operationFailed(MetaStoreException e) {
                        log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
                        metadataMutex.unlock();
                        trimmerMutex.unlock();
    
                        promise.completeExceptionally(e);
                    }
                });
            }
    

    核心部分是

                        for (LedgerInfo ls : ledgersToDelete) {
                            log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
                            asyncDeleteLedger(ls.getLedgerId(), ls);
                        }
    

    其实只要注释掉这段代码重新编译pulsar便能完全禁用pulsar的lenger清除,但这个太粗暴了,重新编译自定义的jar包,虽然也很简单,但这只是最后的办法,最好还是先从源码里的相关配置里解决

    ledgersToDelete 相关部分

            List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
            for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
                    boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
                    boolean overRetentionQuota = isLedgerRetentionOverSizeQuota();
    
                    if (log.isDebugEnabled()) {
                        log.debug(
                                "[{}] Checking ledger {} -- time-old: {} sec -- "
                                        + "expired: {} -- over-quota: {} -- current-ledger: {}",
                                name, ls.getLedgerId(), (clock.millis() - ls.getTimestamp()) / 1000.0, expired,
                                overRetentionQuota, currentLedger.getId());
                    }
                    if (ls.getLedgerId() == currentLedger.getId()) {
                        log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
                                ls.getLedgerId());
                        break;
                    } else if (expired) {
                        log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp());
                        ledgersToDelete.add(ls);
                    } else if (overRetentionQuota) {
                        log.debug("[{}] Ledger {} is over quota", name, ls.getLedgerId());
                        ledgersToDelete.add(ls);
                    } else {
                        log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId());
                        break;
                    }
                }
    

    其实可以简单看到ledgersToDelete.add 有两种来源,一是expired,二是overRetentionQuota

    只要使得ledger 不满足这两个条件即可

                    boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
                    boolean overRetentionQuota = isLedgerRetentionOverSizeQuota();
    

    分别进入这两个方法

        private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
            if (config.getRetentionTimeMillis() < 0) {
                // Negative retention time equates to infinite retention
                return false;
            }
    
            long elapsedMs = clock.millis() - ledgerTimestamp;
            return elapsedMs > config.getRetentionTimeMillis();
        }
        private boolean isLedgerRetentionOverSizeQuota() {
            // Handle the -1 size limit as "infinite" size quota
            return config.getRetentionSizeInMB() >= 0
                    && TOTAL_SIZE_UPDATER.get(this) > config.getRetentionSizeInMB() * 1024 * 1024;
        }    
    

    看到了config关键词,目标就在眼前了

    https://github.com/apache/pulsar/blob/v2.6.1/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java

        public ManagedLedgerConfig setRetentionSizeInMB(long retentionSizeInMB) {
            this.retentionSizeInMB = retentionSizeInMB;
            return this;
        }
        public long getRetentionSizeInMB() {
            return retentionSizeInMB;
        }
        
        public ManagedLedgerConfig setRetentionTime(int retentionTime, TimeUnit unit) {
            this.retentionTimeMs = unit.toMillis(retentionTime);
            return this;
        }
        public long getRetentionTimeMillis() {
            return retentionTimeMs;
        }
    

    接下来找set的位置

    https://github.com/apache/pulsar/blob/v2.6.1/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

    managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
    managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
    

    还需要看retentionPolicies类

    实际这个时候已经可以结合配置了,查看pulsar的配置文件,发现两项配置,按经验,可以确定就是这两项,更新集群配置后,重启,lenger被清除的问题便已解决

    defaultRetentionTimeInMinutes=0
    defaultRetentionSizeInMB=0
    

    总结

    实际检索代码retentionSizeInMB的时候,检索到了一个faq.md文件,里面明确有提到类似的问题

    之前只看了官方的文档,官方文档更是使用和概含说明,并没有看github上项目的一些md文件,没有注意到这项配置,上文说的弯路就是指这个faq.md文件

    这种场景常见,个人肯定不是第一个注意到并想解决办法的

    https://github.com/apache/pulsar/blob/master/faq.md#how-can-i-prevent-an-inactive-topic-to-be-deleted-under-any-circumstance-i-want-to-set-no-time-or-space-limit-for-a-certain-namespace

    How can I prevent an inactive topic to be deleted under any circumstance? I want to set no time or space limit for a certain namespace.
    There’s not currently an option for “infinite” (though it sounds a good idea! maybe we could use -1 for that). The only option now is to use INT_MAX for retentionTimeInMinutes and LONG_MAX for retentionSizeInMB. It’s not “infinite” but 4085 years of retention should probably be enough!
    

    另外提一个小坑,同硬件环境裸机搭了3个节点也比单实例的慢,也排除了zk gfs 因素

    3点的分布式环境比单点测试要慢

    https://github.com/apache/pulsar/issues/8570
    咨询了下官方,确认了是个参数配置问题
    默认单点 journalSyncData=false
    默认集群 journalSyncData=true

    而kakfa的默认行为类似journalSyncData=false

  • 相关阅读:
    动静分离和前后端分离相关
    Nginx搭建动态静态服务器
    动态资源与静态资源
    LVS与Keepalived
    Tomcat实现多主多备
    Keepalived实现心跳检测实现自动重启
    nginx+keepalived简单双机主从热备
    keepalived安装
    php与java通用AES加密解密算法
    PHP修改memory_limit的三种办法
  • 原文地址:https://www.cnblogs.com/zihunqingxin/p/14460099.html
Copyright © 2011-2022 走看看