zoukankan      html  css  js  c++  java
  • kafka AdminClient 闲时关闭连接

    AdminClient 类提供了创建、删除 topic 的 api。

    在项目中创建了一个 AdminClient 对象,每次创建 topic 时,调用

    org.apache.kafka.clients.admin.AdminClient#createTopics

    如果长时间不使用这个对象,客户端与 broker 之间的连接会被关掉,相关的参数:

    connections.max.idle.ms

    这个最大空闲参数在 broker 和 客户端都可以配置,即 broker 和客户端都会关闭空闲太久的连接。

    org.apache.kafka.common.network.Selector#maybeCloseOldestConnection

        private void maybeCloseOldestConnection(long currentTimeNanos) {
            if (idleExpiryManager == null)
                return;
    
            Map.Entry<String, Long> expiredConnection = idleExpiryManager.pollExpiredConnection(currentTimeNanos);
            if (expiredConnection != null) {
                String connectionId = expiredConnection.getKey();
                KafkaChannel channel = this.channels.get(connectionId);
                if (channel != null) {
                    if (log.isTraceEnabled())
                        log.trace("About to close the idle connection from {} due to being idle for {} millis",
                                connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000);
                    channel.state(ChannelState.EXPIRED);
                    close(channel, CloseMode.GRACEFUL);
                }
            }
        }

    org.apache.kafka.common.network.Selector.IdleExpiryManager#pollExpiredConnection

    lruConnections 是 LinkedHashMap 类型,可以按照插入和访问顺序进行排序,这里是按访问顺序进行排序,访问过的顺序放到双向链表的结尾。

            public Map.Entry<String, Long> pollExpiredConnection(long currentTimeNanos) {
                if (currentTimeNanos <= nextIdleCloseCheckTime)
                    return null;
    
                if (lruConnections.isEmpty()) {
                    nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
                    return null;
                }
    
                Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
                Long connectionLastActiveTime = oldestConnectionEntry.getValue();
                nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
    
                if (currentTimeNanos > nextIdleCloseCheckTime)
                    return oldestConnectionEntry;
                else
                    return null;
            }
  • 相关阅读:
    超简单tensorflow入门优化程序&&tensorboard可视化
    tf.random_normal()函数
    tensorflow中创建多个计算图(Graph)
    tensorflow中有向图(计算图、Graph)、上下文环境(Session)和执行流程
    配置错误 在唯一密钥属性“fileExtension”设置为“.log”时,无法添加类型为“mimeMap”的重复集合项
    取奇偶数
    DNS添加/修改/查询/删除A记录
    IE自动化
    Get-ChildItem参数之 -Exclude,Filter,Recurse应用
    自动下载
  • 原文地址:https://www.cnblogs.com/allenwas3/p/10289039.html
Copyright © 2011-2022 走看看