zoukankan      html  css  js  c++  java
  • Jafka Broker代码阅读之总览

    从本文开始,笔者将尝试从源码角度解读Jafka(Kafka)的特性,探究其背后的实现原理与技术。前面讲解Jafka Broker的文章中有提到下面这段启动服务端的代码,我们就从这里开始。

    Properties props = new Properties();
    props.setProperty("port","9093");
    props.setProperty("log.dir","/home/alfred/jafkaDataDirs/data1");
    Jafka broker = new Jafka();
    broker.start(props,null,null);
    broker.awaitShutdown();

    上面代码演示了Jafka一个broker的启动,其顺序图如下所示:

    jafka-sq

    上图呈现了Jafka Broker启动过程中主要涉及的类和调用方法,可以看到主要的代码集中在Server类的startup方法,其代码如下:

    public void startup() {
    try {
    logger.info("Starting Jafka server "+serverInfo.getVersion());
    Utils.registerMBean(serverInfo);
    //检查上次服务端是否正常关闭,如果不是,要校验jafka文件,取出损坏的消息
    boolean needRecovery = true;
    File cleanShutDownFile = new File(new File(config.getLogDir()), CLEAN_SHUTDOWN_FILE);
    if (cleanShutDownFile.exists()) {
    needRecovery = false;
    cleanShutDownFile.delete();
    }
    //初始化消息数据管理类LogManager,并将所有的消息数据按照一定格式读入内存(非数据内容本身)
    this.logManager = new LogManager(config,//
    scheduler,//
    1000L * 60 * config.getLogCleanupIntervalMinutes(),//
    1000L * 60 * 60 * config.getLogRetentionHours(),//
    needRecovery);
    this.logManager.setRollingStategy(config.getRollingStrategy());
    logManager.load();

            //producer和consumer请求处理类,如写消息、抓取消息等
            RequestHandlers handlers = new RequestHandlers(logManager);
            //启动socket server端,处理producer和consumer的请求
            socketServer = new SocketServer(handlers, config);
            Utils.registerMBean(socketServer.getStats());
            socketServer.startup();
            Mx4jLoader.maybeLoad();
            //如果开启了zookeeper连接,则将该broker信息注册到zookeeper中,并开启定时flush消息数据的线程
            logManager.startup();
            logger.info("Server started.");
        } catch (Exception ex) {
            logger.fatal("Fatal error during startup.", ex);
            close();
        } finally {
            serverInfo.started();
        }
    }
    

    对于上述代码的意义,可参见注释,另外作以下几点说明:

    Jafka broker在正常关闭的时候会在数据文件夹下生成一个.jafka_cleanshutdown的文件,broker 在启动时可以通过检查该文件是否存在来判定上一次broker是否为正常关闭,如果不是正常关闭,那么jafka在打开每个topic-partition文件夹下最后一个jafka文件时会去校验其消息数据(即jafka文件),如果发现其上某条消息的实际长度不等于消息头中指定的长度,说明该消息有问题,则删除它及其以后的消息数据。具体实现后续会有源码分析。
    
    LogManager是消息数据的管理类,负责消息的读写。其中load方法是将该broker数据文件夹下的所有消息数据以一定形式读入内存,以便读写。
    
    SocketServer利用java的nio实现了一个高性能的socket服务器,来处理producer consumer的请求。
    

    Jafka Broker的代码中最主要的两个类便是LogManager和SocketServer,后续文章也会从这两方面展开源码讲解。下面还是先看下Jafka源码结构

    src
    ├── main
    │ ├── assembly #maven assembly插件配置文件
    │ ├── java
    │ │ └── com
    │ │ └── sohu
    │ │ └── jafka
    │ │ ├── admin #bin/admin-console.sh实现类
    │ │ ├── api #request reponse的构建类,封装producer consumer的请求及返回
    │ │ ├── cluster #搭建集群用到的一些类
    │ │ ├── common #异常类和注解类
    │ │ ├── console #bin/producer/consumer-console.sh的实现类
    │ │ ├── consumer #consumer实现类
    │ │ ├── log #消息数据管理的相关类
    │ │ ├── message #消息数据的相关类,如格式定义、Message和offset的封装等
    │ │ ├── mx #jmx的相关类,用于监控broker运行状况和管理broker
    │ │ ├── network #socket服务器实现类
    │ │ ├── producer #producer实现类
    │ │ ├── server #broker的实现类,主要负责各类配置文件的封装,调用network和log下的SocketServer和LogManager启动服务端
    │ │ └── utils #实用类封装
    │ └── resources
    └── test
    ├── java #测试方法,可以以此为入口学习jafka的使用和源码阅读
    │ └── com
    │ └── sohu
    │ └── jafka
    │ ├── admin
    │ ├── api
    │ ├── cluster
    │ ├── common
    │ ├── consumer
    │ ├── log
    │ ├── message
    │ ├── producer
    │ ├── server
    │ └── utils
    └── resources

    小结

    与Jafka broker相关的源码文件夹主要有server network log三个文件夹,大家在阅读源码的时候可以先从相关的单元测试文件下手,这样不至于陷于源码的茫茫森林里迷失方向。对于broker这一块,笔者后续会从Socket服务器和LogManager消息数据管理这两个方面展开来讲解。Jafka的源码实现清晰明了,非常值得学习,欢迎感兴趣的读者可以阅读并与笔者交流感悟。
    Jafka Broker代码阅读之总览

  • 相关阅读:
    CentOS上手动配置nginx.services
    Mac安装软件时,提示文件已损坏,需要移动到废纸篓的解决方法
    Jumpserver安装部署
    Linux服务器测试带宽
    Zabbix_server执行window脚本出现中文乱码如何解决
    四行shell脚本实现Zabbix_server 的高可用
    Kubernetes Pod故障归类与排查方法
    Nginx配置location与rewrite规则教程
    ipa文件信息检查工具
    申请免费SSL证书
  • 原文地址:https://www.cnblogs.com/clds/p/6038319.html
Copyright © 2011-2022 走看看