zoukankan      html  css  js  c++  java
  • Flink的序列化与flink-hadoop-compatibility

    最近 用户提交了一个问题 说他的jar包里明明包含相关的类型 但是在提交Flink作业的时候 却报出classnotfound的错误

    查看之后发现 这里是flink的一个没有说的太明白的地方

    用户的代码之所以报错 是因为在代码中引用了mapreduce相关的东西 

    我们知道 flink会在生成jobGraph的时候就解析所有需要序列化的类型 这里就涉及需要解析mapReduce的类型 比如Text

    但是用户明明打进去了呀 怎么还是找不到

    这就涉及flink的类加载机制 flink对于自己的代码 采用默认的java的类加载机制 但是对于用户的代码 使用了自定义的FlinkClassLoader

    好吧 这就是问题所在 因为在解析序列化类型的时候 flink会传入默认的类加载器 这个类加载器不包含用户代码 所以在寻找的时候 显然是找不到

    知道了问题的症结所在 解决起来却不完美

    方案1:将相关的依赖放入lib目录,即加入flink的类加载器

    方案2:用户提交作业的时候,动态的将用户的类加入默认的类加载器

    以上两个方法都可以解决问题,但缺点也是明显的:

    方案1的缺点在于需要频繁的更新flink的lib目录,方案2的缺点在于打破了Flink的类加载机制,使得用户不能独立的使用不同版本的依赖。

    至于最终的选择的方案,就需要根据平台具体的情况判断了,目前我们选择的是放入lib包,避免classpath热加载导致不可预知的问题。

    不过也简单实验了下热加载的方案。

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.File;
    import java.lang.reflect.Method;
    import java.net.URL;
    import java.net.URLClassLoader;
    import java.util.List;
    
    /**
     * to add the jar to this jvm classpath dynamically, but no need to unload the class because jvm will decide when
     * to unload the class
     */
    public class ClassloaderUtil {
    
        private static final Logger LOG = LoggerFactory.getLogger(ClassloaderUtil.class);
        private static Method addURL;
        private static URLClassLoader system;
    
        static {
            try {
                addURL = URLClassLoader.class.getDeclaredMethod("addURL",
                        new Class[]{URL.class});
                addURL.setAccessible(true);
    
                system = (URLClassLoader) ClassLoader.getSystemClassLoader();
            } catch (Exception ex) {
                LOG.error("Fail to load classloader staff.", ex);
            }
        }
    
        public static void addToClasspath(File file, List<URL> classpath) {
            addToClasspath(file);
            for (URL url : classpath) {
                addToClasspath(url);
            }
        }
    
        public static void addToClasspath(String file) {
            addToClasspath(new File(file));
        }
    
        public static void addToClasspath(File file) {
            try {
                addToClasspath(file.toURL());
            } catch (Exception ex) {
                LOG.error("Fail to dynamically add classpath.", ex);
            }
        }
    
        public static void addToClasspath(URL url) {
            try {
                addURL.invoke(system, new Object[]{url});
                LOG.info("Dynamically add classpath [{}]", url);
            } catch (Exception ex) {
                LOG.error("Fail to dynamically add classpath.", ex);
            }
        }
    }

    参考了https://blog.csdn.net/treeroot/article/details/631490

  • 相关阅读:
    5、面试题-测试用例篇
    4、面试题-技术篇
    3、面试题-测试流程
    2、面试题-接口测试用例
    全屏圆角弹出框
    Jquery的each退出循环
    拖动DIV
    head里面的其他标记
    更新字段
    Python---序列化
  • 原文地址:https://www.cnblogs.com/029zz010buct/p/10076060.html
Copyright © 2011-2022 走看看