zoukankan      html  css  js  c++  java
  • Jafka源码分析——LogManager



    Kafka中,LogManager负责管理broker上全部的Log(每个topic-partition为一个Log)。

    通过阅读源码可知其详细完毕的功能例如以下:

    1. 依照预设规则对消息队列进行清理。

    2. 依照预设规则对消息队列进行持久化(flush操作)。

    3. 连接ZooKeeper进行brokertopicpartition相关的ZooKeeper操作。

    4. 管理broker上全部的Log

    以下一一对这些功能的实现进行具体的解析。

    一、对于Log的管理

    LogManager包括成员变量logslogskeytopicvaluePool<Integer,Log>(该value又是一个Map。主键是partitionvalue是该partition所相应的Log)。因此LogManager通过logs保存该broker上全部的消息队列。

    private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, 	Pool<Integer, Log>>();

    LogManager在初始化之后。须要依据配置文件配置的消息队列根文件夹进行遍历。

    通过遍历,查找并生成Log。该遍历的详细实如今方法load中:

    ① 获取消息队列根文件夹下的全部文件

    ② 对于根文件夹下的每个文件进行例如以下操作

     1.假设是文件夹。则有可能是一个Log,否则不是并忽略

     2.对于通过1的文件夹分析其文件名称,文件夹的文件名称由两部分组成:topic-partition

     3.对于通过2的文件夹。用文件夹、解析出的topic、解析出的partition生成Log

     4.3生成的Log放入logs日志池

     5.最后,推断文件夹解析的partition与配置文件里配置的partition的大小,假设配置文件较小。则更新配置


    二、消息队列清理

    消息队列的清理由Scheduler周期性的调用,详细的调用在load函数中,基本的删除实如今cleanLogs函数中。

    消息队列的清理分为两种情况:一种是超过预设的时间则删除。二是超过预设的大小则删除。分别相应两个函数cleanupExpiredSegments和cleanupSegmentsToMaintainSize。第一种情况比較简单,由于每个segment相应一个文件,通过对照文件的lastModifiedTime和系统的如今时间来确定其是否超时,假设超时则删除。对于另外一种情况,首先比較Log的大小与配置的大小。假设小于配置的大小则不删除。假设大于了配置的大小,则计算超过配置大小的长度(定为差值);然后将小于该差值的segment删除(这地方有点疑惑。这样删除会不会把一些最新的消息队列给删除了)。

     if (this.scheduler != null) {
     	this.scheduler.scheduleWithRate(new Runnable() {
                    public void run() {
                        try {
                            cleanupLogs();
                        } catch (IOException e) {
                            logger.error("cleanup log failed.", e);
                        }
                    }
                }, 60 * 1000, logCleanupIntervalMs);
    }


    三、对于消息队列的持久化

    对消息队列的flush操作相同由单独的线程来完毕。该线程通过比較Log上一次的flush时间和当前的系统时间来确定是否须要flush。假设须要则持久化到文件。

    注意,消息的队列的持久化在新增消息的时候也会推断,假设一个Log保存的新增消息的条数超过了预设值则进行flush操作。


    Kafka中,LogManager负责管理broker上全部的Log(每个topic-partition为一个Log)。通过阅读源码可知其详细完毕的功能例如以下:

    1. 依照预设规则对消息队列进行清理。

    2. 依照预设规则对消息队列进行持久化(flush操作)。

    3. 连接ZooKeeper进行brokertopicpartition相关的ZooKeeper操作。

    4. 管理broker上全部的Log

    以下一一对这些功能的实现进行具体的解析。

    一、对于Log的管理

    LogManager包括成员变量logslogskeytopicvaluePool<Integer,Log>(该value又是一个Map,主键是partitionvalue是该partition所相应的Log)。

    因此LogManager通过logs保存该broker上全部的消息队列。

    private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, 	Pool<Integer, Log>>();

    LogManager在初始化之后,须要依据配置文件配置的消息队列根文件夹进行遍历。通过遍历,查找并生成Log。该遍历的详细实如今方法load中:

    ① 获取消息队列根文件夹下的全部文件

    ② 对于根文件夹下的每个文件进行例如以下操作

     1.假设是文件夹。则有可能是一个Log。否则不是并忽略

     2.对于通过1的文件夹分析其文件名称,文件夹的文件名称由两部分组成:topic-partition

     3.对于通过2的文件夹。用文件夹、解析出的topic、解析出的partition生成Log

     4.3生成的Log放入logs日志池

     5.最后。推断文件夹解析的partition与配置文件里配置的partition的大小,假设配置文件较小,则更新配置


    二、消息队列清理

    消息队列的清理由Scheduler周期性的调用,详细的调用在load函数中。基本的删除实如今cleanLogs函数中。消息队列的清理分为两种情况:一种是超过预设的时间则删除,二是超过预设的大小则删除,分别相应两个函数cleanupExpiredSegments和cleanupSegmentsToMaintainSize。第一种情况比較简单,由于每个segment相应一个文件,通过对照文件的lastModifiedTime和系统的如今时间来确定其是否超时。假设超时则删除。对于另外一种情况。首先比較Log的大小与配置的大小。假设小于配置的大小则不删除;假设大于了配置的大小,则计算超过配置大小的长度(定为差值)。然后将小于该差值的segment删除(这地方有点疑惑。这样删除会不会把一些最新的消息队列给删除了)。

     if (this.scheduler != null) {
     	this.scheduler.scheduleWithRate(new Runnable() {
                    public void run() {
                        try {
                            cleanupLogs();
                        } catch (IOException e) {
                            logger.error("cleanup log failed.", e);
                        }
                    }
                }, 60 * 1000, logCleanupIntervalMs);
    }


    三、对于消息队列的持久化

    对消息队列的flush操作相同由单独的线程来完毕。该线程通过比較Log上一次的flush时间和当前的系统时间来确定是否须要flush,假设须要则持久化到文件。注意,消息的队列的持久化在新增消息的时候也会推断,假设一个Log保存的新增消息的条数超过了预设值则进行flush操作。


  • 相关阅读:
    c语言中的增量与减量········不要太聪明
    存储器··············RAM,SRAM,EEPROM 等等
    对于 sizeof(char)的一些零碎······
    C语言中float,double等类型,在内存中的结构
    同步异步存储器
    Linux的帧缓冲设备(Framebuffer)简介
    嵌入式 c 中结构体经常碰到_I、 __O 、__IO是什么意思?
    新型的按键扫描程序
    数据结构
    jQuery(八)选择器与选择方法
  • 原文地址:https://www.cnblogs.com/mfmdaoyou/p/6818740.html
Copyright © 2011-2022 走看看