zoukankan      html  css  js  c++  java
  • 【Flink系列八】构建实时计算平台——动态加载UDF

    前言

    本文首发自https://www.cnblogs.com/slankka/ 转载请注明出处。

    本文的主要内容是介绍如何动态加载Flink作业的UDF。

    Classloader

    加载UDF一定是classLoader做的,而作业启动的过程中,App ClassLoader的具体实现类为:

    static class AppClassLoader extends URLClassLoader
    

    而URLClassLoader 看名字是支持URL的。

    实际情况

    已知在Flink的启动参数-C中加入Flink的UDF可以成功执行作业提交过程。

    static final Option CLASSPATH_OPTION =
                new Option(
                        "C",
                        "classpath",
                        true,
                        "Adds a URL to each user code "
                                + "classloader  on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be "
                                + "accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple "
                                + "times for specifying more than one URL. The protocol must be supported by the "
                                + "{@link java.net.URLClassLoader}.");
    

    但是Flink(1.13以前)会报错:

    Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. Can't resolve udf class com.slankka.flink.udf.PlusTwoFunc
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:564)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
    	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:733)
    	at com.slankka.rtc.flinkplatform.sql.SqlJob.lambda$start$0(SqlJob.java:123)
    	at java.lang.Iterable.forEach(Iterable.java:75)
    	at com.slankka.rtc.flinkplatform.sql.SqlJob.start(SqlJob.java:116)
    	at com.slankka.rtc.flinkplatform.sql.SqlJobDriver.main(SqlJobDriver.java:14)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
    	... 11 more
    Caused by: java.lang.RuntimeException: Can't resolve udf class com.slankka.flink.udf.PlusTwoFunc
    	at org.apache.flink.table.catalog.CatalogFunctionImpl.isGeneric(CatalogFunctionImpl.java:77)
    	at org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory.createFunctionDefinition(HiveFunctionDefinitionFactory.java:63)
    	at org.apache.flink.table.catalog.FunctionCatalog.resolvePreciseFunctionReference(FunctionCatalog.java:570)
    	at org.apache.flink.table.catalog.FunctionCatalog.lambda$resolveAmbiguousFunctionReference$2(FunctionCatalog.java:614)
    	at java.util.Optional.orElseGet(Optional.java:267)
    	at org.apache.flink.table.catalog.FunctionCatalog.resolveAmbiguousFunctionReference(FunctionCatalog.java:614)
    	at org.apache.flink.table.catalog.FunctionCatalog.lookupFunction(FunctionCatalog.java:361)
    	at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:97)
    	at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
    	... 26 more
    Caused by: java.lang.ClassNotFoundException: com.slankka.flink.udf.PlusTwoFunc
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    	at java.lang.Class.forName0(Native Method)
    	at java.lang.Class.forName(Class.java:264)
    	at org.apache.flink.table.catalog.CatalogFunctionImpl.isGeneric(CatalogFunctionImpl.java:72)
    	... 40 more
    

    这个调用链非常清楚,提交作业的程序是:

    private final URLClassLoader userCodeClassLoader;
    
    org.apache.flink.client.program.PackagedProgram#callMainMethod
    

    这里已经进入作业的main函数了。使用的类加载器是:UserCodeClassLoader

    this.extractedTempLibraries =
                    this.jarFile == null
                            ? Collections.emptyList()
                            : extractContainedLibraries(this.jarFile);
    
    this.userCodeClassLoader =
                    ClientUtils.buildUserCodeClassLoader(
                            getJobJarAndDependencies(),
                            classpaths,
                            getClass().getClassLoader(),
                            configuration);
    
    

    再看 UserCodeClassLoader的构建过程:
    实际上已经加载了-C参数指定的JAR了

    public static URLClassLoader buildUserCodeClassLoader(
                List<URL> jars, List<URL> classpaths, ClassLoader parent, Configuration configuration) {
            URL[] urls = new URL[jars.size() + classpaths.size()];
            for (int i = 0; i < jars.size(); i++) {
                urls[i] = jars.get(i);
            }
            for (int i = 0; i < classpaths.size(); i++) {
                urls[i + jars.size()] = classpaths.get(i);
            }
            final String[] alwaysParentFirstLoaderPatterns =
                    CoreOptions.getParentFirstLoaderPatterns(configuration);
            final String classLoaderResolveOrder =
                    configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
            FlinkUserCodeClassLoaders.ResolveOrder resolveOrder =
                    FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder);
            final boolean checkClassloaderLeak =
                    configuration.getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER);
            return FlinkUserCodeClassLoaders.create(
                    resolveOrder,
                    urls,
                    parent,
                    alwaysParentFirstLoaderPatterns,
                    NOOP_EXCEPTION_HANDLER,
                    checkClassloaderLeak);
        }
    

    但不知为何,仍旧出现上述错误。

    临时解决方案

    可以自行编译CatalogFunctionImpl.java 适用于Flink-1.13以下所有版本

    	@Override
    	public boolean isGeneric() {
    		if (functionLanguage == FunctionLanguage.PYTHON) {
    			return true;
    		}
    		try {
    			ClassLoader cl = Thread.currentThread().getContextClassLoader();
    			Class c = Class.forName(className, true, cl);
    			if (UserDefinedFunction.class.isAssignableFrom(c)) {
    				return true;
    			}
    		} catch (ClassNotFoundException e) {
    			throw new RuntimeException(String.format("Can't resolve udf class %s", className), e);
    		}
    		return false;
    	}
    

    在Flink作业的main函数的开头自行加载UDF Flink-1.13 以前有此问题。

    //动态加载Jar
      public static void loadJar(List<URL> jarUrl) {
        //从URLClassLoader类加载器中获取类的addURL方法
        Method method = null;
        try {
          method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
        } catch (NoSuchMethodException | SecurityException ignore) {
        }
        // 获取方法的访问权限
        assert method != null;
        boolean accessible = method.isAccessible();
        try {
          //修改访问权限为可写
          if (!accessible) {
            method.setAccessible(true);
          }
          // 获取系统类加载器
          URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
          //jar路径加入到系统url路径里
          for (URL jar : jarUrl) {
            method.invoke(classLoader, jar);
          }
        } catch (Exception e) {
          e.printStackTrace();
        } finally {
          method.setAccessible(accessible);
        }
      }
    

    新解决方案

    [Flink-20606] Flink-1.13 已经修复此问题。

  • 相关阅读:
    WCF 第八章 安全 确定替代身份(中)使用AzMan认证
    WCF 第八章 安全 总结
    WCF 第八章 安全 因特网上的安全服务(下) 其他认证模式
    WCF Membership Provider
    WCF 第八章 安全 确定替代身份(下)模仿用户
    WCF 第八章 安全 因特网上的安全服务(上)
    WCF 第九章 诊断
    HTTPS的七个误解(转载)
    WCF 第八章 安全 日志和审计
    基于比较的排序算法集
  • 原文地址:https://www.cnblogs.com/slankka/p/14751826.html
Copyright © 2011-2022 走看看