zoukankan      html  css  js  c++  java
  • Flink快速构建项目quickstart

    官方推荐两种构建方式,第一

    mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java  -DarchetypeVersion=1.12.0

    创建flink的UDF需要Flink-table的jar包,ScalarFunction在table的依赖中

    通过quickstart构建后,在pom.xml中加入依赖

            <dependency>
                <groupId>org.apache.flink</groupId>
                <!--<artifactId>flink-table_${scala.binary.version}</artifactId>-->
                <artifactId>flink-table_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>

    创建java类

    package department.jr;
    
    import org.apache.flink.table.functions.ScalarFunction;
    
    public class HashCode extends ScalarFunction {
    
        /**
         * udf的这个方法必须是public的
         * @param s
         * @return
         */
        public int eval(String s){
            return s.hashCode();
        }
        
    //    public static void main(String[] args) {
    //        HashCode hashCode = new HashCode();
    //        System.out.println(hashCode.eval("abc"));
    //    }
    }
    package department.jr;
    
    import org.apache.flink.table.functions.ScalarFunction;
    
    /**
     * 给一个数值乘以一个因子
     */
    public class Multiply extends ScalarFunction {
        public Integer factor;
    
        public Multiply() {
        }
        public Multiply(int factor) {
            this.factor = factor;
        }
        public int eval(Integer s){
            return s * factor;
        }
    }

     

    运行一下试试,发现报错

    java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/InvalidTypesException
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
        at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
        at java.lang.Class.getMethod0(Class.java:3018)
        at java.lang.Class.getMethod(Class.java:1784)
        at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
        at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.functions.InvalidTypesException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 7 more
    Error: A JNI error has occurred, please check your installation and try again
    Exception in thread "main" 
    Process finished with exit code 1

    原因:pom.xml中的核心包都是<provided>的,调试时候注释掉<provieded>,采用默认值。

    成功!

    注释掉main方法,打包UDF类:

    jr-flinkudf-0.1.jar

     上传 jr-flinkudf-0.1.jar 到Flink的lib目录下。

    修改 Flink的conf的目录下的 sql-client-defaults.yaml 文件

     启动Flink-sql-client

    ./sql-client.sh embedded

    发现报错:

    Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
            at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)
    Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
            at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)
            at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)
            at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
            at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)
    Caused by: org.apache.flink.table.api.ValidationException: Cannot find a public constructor with parameter types 'java.lang.Integer' for 'department.jr.Multiply'.
            at org.apache.flink.table.functions.FunctionService.generateInstance(FunctionService.java:205)
            at org.apache.flink.table.functions.FunctionService.createFunction(FunctionService.java:115)
            at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$registerFunctions$10(ExecutionContext.java:704)
            at java.util.HashMap.forEach(HashMap.java:1288)
            at org.apache.flink.table.client.gateway.local.ExecutionContext.registerFunctions(ExecutionContext.java:703)
            at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
            at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:185)
            at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:138)
            at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)
            ... 3 more

    原因:Multiply 类的构造器是int类型的参数,但是FlinkSQL只允许是Interger类型,见官方文档

    ref: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sqlClient.html

    修改 Multiply 类的构造器 参数为Integer,重新打包上传。

    成功启动sql-client端。使用两个自定义UDF函数,没问题!

  • 相关阅读:
    ado GetRows
    mysql数据库学习——2,数据库的选定,创建,删除和变更
    mysql数据库学习——4,完整性约束
    mssql数据集操作方法
    mysql数据库学习——1,获取原数据
    mysql书籍
    php学习——smarty
    mysql数据库学习——5,数据类型,字符集和校对
    phpcms——评论内容字符控制
    phpcms权限问题,父栏目权限应用到子栏目不管用
  • 原文地址:https://www.cnblogs.com/yoyowin/p/14667145.html
Copyright © 2011-2022 走看看