zoukankan      html  css  js  c++  java
  • Flink 1.13 Code Reading

    Mark list
    [工具]: 可以业务借鉴的
    [Java]: Java基本知识, 自行了解
    
    
    FileSystem init
    基本类
      Configuration [工具]管理所有的配置信息
        Optional<T> getOptional(ConfigOption<T> option) : 调用getRawValueFromOption获取配置Optional, 如果目标结果为List会转换为列表Optional返回
        Optional<Object> getRawValueFromOption(ConfigOption<?> configOption) : 根据configOption的key和fallbackKeys拿到配置Optional
      ConfigOption<T> [工具]管理单个配置基本信息
        String key, FallbackKey[] fallbackKeys 后备key, 当key配置不存在则使用后备key
        T defaultValue, Description description配置描述
        Class<?> clazz配置值类型, boolean isList配置值是否为列表
      FallbackKey [工具]单个后备key管理
        String key, boolean isDeprecated是否已弃用
      CoreOptions: 以env为前缀的核心flink配置
      Description: 用于ConfigOption的描述, 核心字段 List<BlockElement> blocks 存储字符串module
      ConfigurationUtils [工具]提供常用配置管理方法
        <T> T convertValue(Object rawValue, Class<?> clazz) : 根据配置Optional描述的clazz转换值类型.
      interface Plugin: 插件SPI需要继承此接口
        ClassLoader getClassLoader(): 默认使用加载当前插件的类加载器
        void configure(Configuration config): 插件配置接口
      interface FileSystemFactory extends Plugin: 用于创建FileSystem, 必须先configure再create
        String getScheme(): 返回文件系统schema, 如: ftp://, hdfs://
        FileSystem create(URI fsUri): 基于已配置的config和解析到的schema创建FileSystem 
      class PluginFileSystemFactory implements FileSystemFactory: 插件文件系统工厂实现
      class UnsupportedSchemeFactory implements FileSystemFactory: 内部用于处理 HadoopFsFactory 加载失败
      ConnectionLimitingFactory implements FileSystemFactory: 带连接限制的文件系统工厂实现
        FileSystemFactory decorateIfLimited(FileSystemFactory factory, String scheme, Configuration config): 基于配置创建ConnectionLimitingFactory
        FileSystem create(URI fsUri): 基于原始FileSystem创建 LimitedConnectionsFileSystem
      ConnectionLimitingSetting 连接限制配置
        ConnectionLimitingSettings fromConfig(Configuration config, String fsScheme): 根据全局配置创建ConnectionLimitingSettings
      flink-shaded-guava-18.0-13.0.jar [工具]
        Iterators 提供迭代器操作方法(flink-shaded-guava-18.0-13.0.jar)
          <F, T> Iterator<T> transform(Iterator<F> fromIterator, final Function<? super F, ? extends T> function): 返回经function转换后的迭代器
        Splitter 提供字符串分割处理时的各种方法(链式编码)
          Splitter on(char separator): 指定分隔符separator
          Splitter omitEmptyStrings(): 省略空串
          Splitter trimResults(): 裁剪结果串的前后空格
          Iterable<String> split(final CharSequence sequence): 分割字符串后返回迭代器
    基本字段
      HashMap<FSKey, FileSystem> CACHE: FileSystem的缓存
      HashMap<String, FileSystemFactory> FS_FACTORIES: FileSystemFactory的缓存
      FileSystemFactory FALLBACK_FACTORY: 无schema匹配时的默认工厂
      Set<String> ALLOWED_FALLBACK_FILESYSTEMS: 允许的后备工厂
    涉及方法
      loadHadoopFsFactory(): 加载 HadoopFsFactory 和相关hadoop类, 但不初始化. 
      loadFileSystemFactories(Collection<Supplier<Iterator<FileSystemFactory>>> factoryIteratorsSuppliers):
        添加LocalFileSystemFactory后, 配合addAllFactoriesToList将工厂迭代器的Supplier集合扁平化为工厂集合
      addAllFactoriesToList(Iterator<FileSystemFactory> iter, List<FileSystemFactory> list)
    主要流程
      获取所有工厂 -> 配置工厂(有连接限制的工厂则装饰限制) -> 配置默认工厂 -> 配置默认schema ->配置允许的后备文件系统
      factorySuppliers -> config(decorateIfLimited) -> FALLBACK_FACTORY.configure -> defaultScheme -> ALLOWED_FALLBACK_FILESYSTEMS
    
    DescriptionElement与Formatter
    DescriptionElement: 描述元素接口提供用格式化方法抽象 void format(Formatter formatter), 两个继承此接口的接口未添加新方法抽象, 用于归类
      interface BlockElement: 块描述, 如text, linebreak, list
      interface InlineElement: 块中的一个描述元素, 如link
      class LineBreakElement implements InlineElement, BlockElement: 换行描述
      class TextElement implements BlockElement, InlineElement: 文本描述
      class ListElement implements BlockElement: 列表描述
      class LinkElement implements InlineElement: 连接描述
    Formatter: 抽象类提供所有DescriptionElement相互解析的基本方法
      HtmlFormatter 作为Formatter唯一实现, 提供常用格式化为Html格式的方法, 目测Flink Web会经常使用 
    
    
    ClusterDescriptor
      interface ClusterDescriptor<T> extends AutoCloseable: 集群部署描述接口, 部署完成返回用于与集群通信的ClusterClientProvider<T>
        getClusterDescription(): 返回集群描述详情, 3种集群描述实现:
          KubernetesClusterDescriptor: "Kubernetes cluster"
          StandaloneClusterDescriptor: "Standalone cluster at " + host + ":" + port
          YarnClusterDescriptor: 资源描述和节点分布
        retrieve(T clusterId): 自已有集群重获ClusterClientProvider<T>, 3种实现中, Provider核心逻辑
          KubernetesClusterDescriptor: 验证已有client能获取集群Endpoint, 通过则新建RestClusterClient<String>返回
          StandaloneClusterDescriptor: 直接用Configuration, StandaloneClusterId创建新的RestClusterClient<StandaloneClusterId>
          YarnClusterDescriptor: 检查Hadoop环境变量, 根据ApplicationId获取ApplicationReport, 检查报告中的最终状态, 
            不为UNDEFINED(其他状态: SUCCEEDED, FAILED, KILLED, 代表应用是否正常结束)就是Flink集群已停止, 未停止则根据报告设置终端配置, 最后返回RestClusterClient<ApplicationId>
        deploySessionCluster(): 部署会话模式的集群
          KubernetesClusterDescriptor:
            涉及配置检查和默认值: [//源码固定, 不可外部配置]
              //internal.cluster.execution-mode = ExecutionMode.NORMAL, 等待结果
              //kubernetes.internal.jobmanager.entrypoint.class = KubernetesApplicationClusterEntrypoint.class.getName()
              blob.server.port = 6124
              taskmanager.rpc.port = 6122
              rest.bind-port = 8081
              high-availability = NONE  (其他两种则支持高可用模式: ZOOKEEPER, FACTORY_CLASS)
              high-availability.cluster-id = ${kubernetes.cluster-id}, 后者必须配置
              high-availability.jobmanager.port = ${jobmanager.rpc.port}
            根据kubernetes.pod-template-file.jobmanager模板文件创建FlinkPod
    

    Java Base

    ServiceLoader
      ServiceLoader.load(FileSystemFactory.class).iterator()
        <S> ServiceLoader<S> load(Class<S> service): 使用线程上下文类加载器创建基于指定服务的ServiceLoader
        Iterator<S> iterator(): 返回用于懒加载providers的迭代器
    
  • 相关阅读:
    java rmi 入门实例
    flex“深拷贝”
    Cygwin 下部署Hadoop
    Hadoop学习原地
    Scribe+HDFS日志收集系统安装方法
    使用HDFS来进行线上应用的文件存储
    转:C++初始化成员列表
    转:为什么数据库选B-tree或B+tree而不是二叉树作为索引结构
    B树、B+树、B*树三者的对比详解
    转载:构造函数不能声明为虚函数,而构造函数可以。为什么?
  • 原文地址:https://www.cnblogs.com/tyxuanCX/p/15618160.html
Copyright © 2011-2022 走看看