zoukankan      html  css  js  c++  java
  • flink1.12连接hive修改并行度

    flink1.12连接hive修改并行度

    在HiveTableSource 中. getDataStream 方法:

    		int parallelism = 0;
    			int splitNum = new HiveParallelismInference(tablePath, flinkConf)
    				.infer(
    					() -> HiveSourceFileEnumerator.getNumFiles(allHivePartitions, jobConf),
    					() -> HiveSourceFileEnumerator.createInputSplits(0, allHivePartitions, jobConf).size())
    				.limit(limit);
    			// sql-client-defaults.yaml  max-parallelism: 30
    			int max = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
    			// 传进来的参数
    			// sql-client-defaults.yaml  中的 parallelism: 20
    			int hiveParallelism = source.getExecutionConfig().getParallelism();
    			parallelism = Math.min(splitNum, max);
    			parallelism = Math.min(parallelism, hiveParallelism);
    			return source.setParallelism(parallelism);
    

    下载 flink1.12 版本的源码.

    用idea打开, 修改后打包flink-connector-hive_2.11 module即可. 上传jar包.

  • 相关阅读:
    redux
    ajax跨域例子
    flux
    BSON数据格式
    JS代码风格自动规整工具Prettier
    JS通用模块模式 UMD
    Promise库
    webpack打包理解
    前端自动提示功能插件-typeahead
    socket.io emit callback调用探秘
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14186853.html
Copyright © 2011-2022 走看看