zoukankan      html  css  js  c++  java
  • 项目实战 从 0 到 1 学习之Flink (29)UDF实现

    1、pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project
        xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>org.example</groupId>
        <artifactId>FlinkUdf</artifactId>
        <version>1.0-SNAPSHOT</version>
        <name>test</name>
        <!-- FIXME change it to the project's website -->
        <url>http://www.example.com</url>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.7</maven.compiler.source>
            <maven.compiler.target>1.7</maven.compiler.target>
            <flink.version>1.11.1</flink.version>
            <scala.binary.version>2.11</scala.binary.version>
            <scala.version>2.11.0</scala.version>
            <hadoop.version>3.0.0</hadoop.version>
            <hive.version>3.0.0</hive.version>
            <hbase.version>2.3.0</hbase.version>
            <spark.version>3.0.0</spark.version>
            <jedis.version>3.0.0</jedis.version>
        </properties>
        <dependencies>
            <!--        0、基本语言-->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
            <!--        1、Flink modules-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>scala-library</artifactId>
                        <groupId>org.scala-lang</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>slf4j-api</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- 2、CLI dependencies -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!--        3、alibaba的json依赖-->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.58</version>
                <exclusions>
                    <exclusion>
                        <artifactId>javassist</artifactId>
                        <groupId>org.javassist</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>scala-parser-combinators_2.11</artifactId>
                        <groupId>org.scala-lang.modules</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>slf4j-api</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>snappy-java</artifactId>
                        <groupId>org.xerial.snappy</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!--        4、kafka依赖-->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.11.0.3</version>
                <exclusions>
                    <exclusion>
                        <artifactId>slf4j-api</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>kafka-clients</artifactId>
                        <groupId>org.apache.kafka</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <!--        5、数据库依赖-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-csv</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-json</artifactId>
                <version>1.10.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-hbase_2.12</artifactId>
                <version>1.10.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-jdbc_2.12</artifactId>
                <version>1.10.2</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.37</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-redis_2.11</artifactId>
                <version>1.1.5</version>
                <exclusions>
                    <exclusion>
                        <artifactId>force-shading</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>slf4j-api</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
                <version>2.9.5</version>
            </dependency>
            <dependency>
                <groupId>io.lettuce</groupId>
                <artifactId>lettuce-core</artifactId>
                <version>5.0.5.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.4.Final</version>
            </dependency>
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>${jedis.version}</version>
            </dependency>
            <!-- Add connector dependencies here. They must be in the default scope (compile). -->
            <!-- Example:
    
            <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
            -->
            <!-- Add logging framework, to produce console output when running in the IDE. -->
            <!-- These dependencies are excluded from the application JAR by default. -->
            <!--        5、log日志依赖-->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.7</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>net.sf.json-lib</groupId>
                <artifactId>json-lib</artifactId>
                <version>2.4</version>
                <classifier>jdk15</classifier>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
            <!--        6、离线数仓hive依赖-->
            <!--        ①、hadoop依赖-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <!--        ②、hive依赖-->
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-jdbc</artifactId>
                <version>${hive.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-exec</artifactId>
                <version>${hive.version}</version>
            </dependency>
            <!--        ③、hbase依赖-->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>${hbase.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>${hbase.version}</version>
            </dependency>
            <!--        7、spark依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>${spark.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.12</artifactId>
                <version>${spark.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>${spark.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_2.12</artifactId>
                <version>${spark.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>
        </dependencies>
        <repositories>
            <!--    <repository><id>nexus-aliyun</id><name>Nexus aliyun</name><layout>default</layout><url>http://maven.aliyun.com/nexus/content/groups/public</url><snapshots><enabled>false</enabled></snapshots><releases><enabled>true</enabled></releases></repository>-->
            <repository>
                <id>apache.snapshots</id>
                <name>Apache Development Snapshot Repository</name>
                <url>https://repository.apache.org/content/repositories/snapshots/</url>
                <releases>
                    <enabled>false</enabled>
                </releases>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
            </repository>
        </repositories>
        <build>
            <pluginManagement>
                <!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
                <plugins>
                    <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
                    <plugin>
                        <artifactId>maven-clean-plugin</artifactId>
                        <version>3.1.0</version>
                    </plugin>
                    <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
                    <plugin>
                        <artifactId>maven-resources-plugin</artifactId>
                        <version>3.0.2</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-compiler-plugin</artifactId>
                        <version>3.8.0</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-surefire-plugin</artifactId>
                        <version>2.22.1</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-jar-plugin</artifactId>
                        <version>3.0.2</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-install-plugin</artifactId>
                        <version>2.5.2</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-deploy-plugin</artifactId>
                        <version>2.8.2</version>
                    </plugin>
                    <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
                    <plugin>
                        <artifactId>maven-site-plugin</artifactId>
                        <version>3.7.1</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-project-info-reports-plugin</artifactId>
                        <version>3.0.0</version>
                    </plugin>
                </plugins>
            </pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>8</source>
                        <target>8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>

    2、数组转字符串

    1、方法①

    package Udf;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    //方法1:数组按照指定的分隔符转成字符串
    public class ArrToString extends ScalarFunction {
        private static final  long serialVersionUID=1L;
        public ArrToString() {
        }
    
        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
        }
    
        @Override
        public void close() throws Exception {
            super.close();
        }
    
        public String eval(String[] bodyRow, String split) {
            try {
                if(bodyRow !=null && bodyRow.length>0)
                {
                    StringBuilder stringBuilder=new StringBuilder();
                    for(int i=0;i<bodyRow.length-1;i++)
                    {
                        stringBuilder.append(bodyRow[i]).append(split);
                    }
                    stringBuilder.append(bodyRow[bodyRow.length-1]);
                    return stringBuilder.toString();
                }
                else
                {
                    return null;
                }
            }
            catch (Exception ex)
            {
                return ex.getMessage();
            }
        }
    }

    2、方法②

    package Udf;
    import org.apache.commons.lang3.ArrayUtils;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    
    //方法3:数组按照指定的分隔符转成字符串
    public class ArrToString2 extends ScalarFunction {
        private static final  long serialVersionUID=1L;
        public ArrToString2() {
        }
    
        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
        }
    
        @Override
        public void close() throws Exception {
            super.close();
        }
        public String eval(String[] arr, String split)
        {
            try {
               if(arr !=null && arr.length>0)
               {
                    return ArrayUtils.toString(arr,split);
               }
               else
               {
                   return null;
               }
            }
            catch(Exception ex)
            {
                return ex.getMessage();
            }
        }
    }

    3、方法③

    package Udf;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    import java.util.Arrays;
    //方法4:数组转成数组字符串
    public class ArrToString3 extends ScalarFunction {
        private static final  long serialVersionUID=1L;
        public ArrToString3() {
        }
    
        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
        }
    
        @Override
        public void close() throws Exception {
            super.close();
        }
    
        public String eval(String[] arr)
        {
            try {
               if(arr !=null && arr.length>0)
               {
                  return Arrays.toString(arr);
               }
               else
               {
                   return null;
               }
            }
            catch(Exception ex)
            {
                return ex.getMessage();
            }
        }
    }

    3、字符串转数组

    package Udf;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    //方法1:字符串按照指定的分隔符转成数组
    public class StringToArr extends ScalarFunction {
        private static final  long serialVersionUID=1L;
        public StringToArr() {
        }
    
        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
        }
    
        @Override
        public void close() throws Exception {
            super.close();
        }
    
        public String[] eval(String arr, String split)
        {
            try {
               if(arr !=null)
               {
                    return arr.split(split);
               }
               else
               {
                   return null;
               }
            }
            catch(Exception ex)
            {
                return null;
            }
        }
    }

     4、字符串转map

    package Udf;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    import java.util.HashMap;
    import java.util.Map;
    //方法1:字符串按照指定的分隔符转成map类型
    public class StringToMap extends ScalarFunction {
        private static final  long serialVersionUID=1L;
        public StringToMap() {
        }
    
        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
        }
    
        @Override
        public void close() throws Exception {
            super.close();
        }
    
        public Map eval(String str, String split, String split1)
        {
            try {
               if(str !=null)
               {
                    String [] arr=str.split(split);
                   Map<String, Object> map = new HashMap<>();
                   for (int i=0;i<arr.length;i++)
                   {
                       String [] arr1=arr[i].split(split1);
                       map.put(arr1[0],arr1[1]);
                      return map;
                   }
               }
               else
               {
                  return null;
               }
            }
            catch(Exception ex)
            {
                ex.printStackTrace();
            }
            return null;
        }
    }

    import org.junit.Test;
    import java.util.HashMap;
    import java.util.Map;
     
    public class StringUtilsTest {
     
        @Test
        public void testDataToMap() {
            String data = "certificatetype=0&certificateno=220182&depositacct=622848";
            Map map = new HashMap();
     
            if (null != data) {
                String[] param = data.split("&");
                for (int i = 0; i < param.length; i++) {
                    int index = param[i].indexOf('=');
                    map.put(param[i].substring(0,index), param[i].substring((index + 1)));
                }
            }
     
            System.out.println(map);
     
            System.out.println("----------------分割线---------------");
            Map result = new HashMap();
            String[] params = data.split("\&");
            for (String entry : params) {
                if (entry.contains("=")) {
                    String[] sub = entry.split("\=");
                    if (sub.length > 1) {
                        result.put(sub[0], sub[1]);
                    } else {
                        result.put(sub[0], "");
                    }
                }
            }
            System.out.println(result);
        }
     
    }
    
    
    
    
    String str1 = "d3fe1e186e41475ea965f4722f5488a8";
                    String str2 = "5093";
                    String str3 = "公共设施";
    
                    String str = str1 + "1" + str2 + "1" + str3;
                    System.out.println(str);
                    // 也就是下面的字符串,分隔符为 u0001
                    str = "d3fe1e186e41475ea965f4722f5488a8u00015093u0001公共设施";
                    String[] split = str.split("1");
                    for (String s : split) {
                        System.out.println(s);
                    }
                    System.out.println(split.length);
                

     ③

    package Flink.Udf;
    import org.apache.commons.lang.StringUtils;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.LinkedHashMap;
    import java.util.Map;
    /**
     * mapStr转Map
     */
    public class MapStrToMap extends ScalarFunction {
        private static final long serialVersionUID = 1L;
        private static final Logger logger = LoggerFactory.getLogger(MapStrToMap.class);
        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
        }
        @Override
        public void close() throws Exception {
            super.close();
        }
        public String eval(String data,String spt1,String spt2) {
            Map<String, String> map = new LinkedHashMap<>();
            try {
                if (StringUtils.isEmpty(data)) {
                    return "";
                }
                else {
                    String[] split = data.split(spt1);
                    for (String s : split) {
                        String[] s1 = s.split(spt2);
                        if (s1.length > 1) {
                            map.put(s1[0], s1[1]);
                        } else {
                            map.put(s1[0], null);
                        }
                    }
                }
            } catch (Exception e) {
                logger.error("MapStr to Json Error!,mapStr={}", e);
            }
            return map.toString();
        }
    }

    5、map转字符串

    package Udf;
    import org.apache.flink.table.functions.ScalarFunction;
    import org.apache.flink.table.functions.TableFunction;
    import java.util.Arrays;
    import java.util.Map;
    import java.util.Set;
    
    //方法1:map按照指定的分隔符转成字符串
    public class MapToString extends ScalarFunction {
        private static final  long serialVersionUID=1L;
        public MapToString() {
        }
        public String eval(Map<String,Object> map,String split,String split1)
        {
            try {
                Set<String> keySet = map.keySet();
                //将set集合转换为数组
                String[] keyArray = keySet.<String>toArray(new String[keySet.size()]);
                //给数组排序(升序)
                Arrays.sort(keyArray);
                //因为String拼接效率会很低的,所以转用StringBuilder
                StringBuilder sb = new StringBuilder();
                for (int i = 0; i < keyArray.length; i++) {
                    // 参数值为空,则不参与签名 这个方法trim()是去空格
                    if ((String.valueOf(map.get(keyArray[i]))).trim().length() > 0) {
                        sb.append(keyArray[i]).append(split1).append(String.valueOf(map.get(keyArray[i])).trim());
                    }
                    if(i != keyArray.length-1){
                        sb.append(split);
                    }
                }
                return sb.toString();
            }
            catch(Exception ex)
            {
                ex.printStackTrace();
                return null;
            }
        }
    }

    6、map转Json

    package Flink.Udf;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.commons.lang.StringUtils;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    import java.util.LinkedHashMap;
    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    /**
     * mapStr转Json
     */
    public class MapStrToJson extends ScalarFunction {
        private static final long serialVersionUID = 1L;
        private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class);
        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
        }
        @Override
        public void close() throws Exception {
            super.close();
        }
        public String eval(String data,String spt1,String spt2) {
            Map<String, String> map = new LinkedHashMap<>();
            try {
                if (StringUtils.isEmpty(data)) {
                    return "";
                }
                else {
                    String[] split = data.split(spt1);
                    for (String s : split) {
                        String[] s1 = s.split(spt2);
                        if (s1.length > 1) {
                            map.put(s1[0], s1[1]);
                        } else {
                            map.put(s1[0], null);
                        }
                    }
                }
            } catch (Exception e) {
                logger.error("MapStr to Json Error!,mapStr={}", e);
            }
            return JSONObject.toJSONString(map);
        }
    }

    package com.oppo.dc.ostream.udf.common;
    
    import com.alibaba.fastjson.JSON;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    
    import java.util.Map;
    
    public class
    MapToJSONString extends ScalarFunction {
        private static final long serialVersionUID = 1L;
        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
        }
    
        @Override
        public void close() throws Exception {
            super.close();
        }
    
        public String eval(Map map) {
            if (map == null) {
                return null;
            }
    
            String jsonStr = JSON.toJSONString(map);
            if (jsonStr.length() > 5000) {
                jsonStr = jsonStr.substring(0, 5000);
            }
    
            return jsonStr;
        }
    }
    package com.oppo.dc.ostream.udf.common;
    
    import com.alibaba.fastjson.JSON;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    
    import java.util.Map;
    
    /**
     * Author: Fisher Xiang
     * Date: 2019-10-28 15:21
     *
     * @description:MapToJSONStr
     * @version: 1.0.0
     */
    
    public class MapToJSONStr extends ScalarFunction {
    
        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
        }
    
        @Override
        public void close() throws Exception {
            super.close();
        }
    
        public String eval(Map map) {
            if (map == null) {
                return null;
            }
            if (map.containsKey("data")) {
                map.put("data", ((String) map.get("data")).replace(""%2C"", "",""));
            }
    
            String jsonStr = JSON.toJSONString(map);
            /*if (jsonStr.length() > 5000) {
                jsonStr = jsonStr.substring(0, 5000);
            }*/
    
            return jsonStr;
        }
    
    }
    ④map字符串转json(按照hash进行排序)
    package FlinkDW.Udf;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.commons.lang.StringUtils;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    import java.util.HashMap;
    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    /**
     * mapStr转Json
     */
    public class MapStrToJson extends ScalarFunction
    {
        private static final long serialVersionUID = 1 L;
        private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class);@
        Override
        public void open(FunctionContext context) throws Exception
        {
            super.open(context);
        }@
        Override
        public void close() throws Exception
        {
            super.close();
        }
        public String eval(String data, String spt1, String spt2)
        {
            Map < String, String > map = new HashMap < > ();
            try
            {
                if(StringUtils.isEmpty(data))
                {
                    return "";
                }
                else
                {
                    String[] split = data.split(spt1);
                    for(String s: split)
                    {
                        String[] s1 = s.split(spt2);
                        if(s1.length > 1)
                        {
                            map.put(s1[0], s1[1]);
                        }
                        else
                        {
                            map.put(s1[0], "null");
                        }
                    }
                }
            }
            catch(Exception e)
            {
                logger.error("MapStr to Json Error!,mapStr={}", data, e);
            }
            return JSONObject.toJSONString(map);
        }
    }
    ⑤优化(固定顺序)
    package Flink.Udf;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.commons.lang.StringUtils;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    import java.util.LinkedHashMap;
    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    /**
     * mapStr转Json
     */
    public class MapStrToJson extends ScalarFunction
    {
        private static final long serialVersionUID = 1 L;
        private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class);@
        Override
        public void open(FunctionContext context) throws Exception
        {
            super.open(context);
        }@
        Override
        public void close() throws Exception
        {
            super.close();
        }
        public String eval(String data, String spt1, String spt2)
        {
            Map < String, String > map = new LinkedHashMap < > ();
            try
            {
                if(StringUtils.isEmpty(data))
                {
                    return "";
                }
                else
                {
                    String[] split = data.split(spt1);
                    for(String s: split)
                    {
                        String[] s1 = s.split(spt2);
                        if(s1.length > 1)
                        {
                            map.put(s1[0], s1[1]);
                        }
                        else
                        {
                            map.put(s1[0], null);
                        }
                    }
                }
            }
            catch(Exception e)
            {
                logger.error("MapStr to Json Error!,mapStr={}", e);
            }
            return JSONObject.toJSONString(map);
        }
    }

    测试

    package Flink.Udf;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.commons.lang.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.*;
    public class Test {
        private static final Logger logger = LoggerFactory.getLogger(Test.class);
        /**
        * @Description: 1、map字符串转json
        * @Param: [data, spt1, spt2]
        * @return: java.lang.String
        * @Author: BigData
        * @Date: 2020/12/3
        */
        public static String  testDataToJson(String data,String spt1,String spt2) {
            Map<String, String> map = new LinkedHashMap<>();
            try {
                if (StringUtils.isEmpty(data)) {
                    return "";
                }
                else {
                    String[] split = data.split(spt1);
                    for (String s : split) {
                        String[] s1 = s.split(spt2);
                        if (s1.length > 1) {
                            map.put(s1[0], s1[1]);
                        } else {
                            map.put(s1[0], "null");
                        }
                    }
                }
            } catch (Exception e) {
                logger.error("MapStr to Json Error!,mapStr={}", data, e);
            }
            return JSONObject.toJSONString(map);
        }
        /**
        * @Description: 2、map字符串转map格式
        * @Param: [data, spt1, spt2]
        * @return: java.lang.String
        * @Author: BigData
        * @Date: 2020/12/3
        */
        public static String  testDataToMap(String data,String spt1,String spt2) {
            Map<String,String> map = new LinkedHashMap<>();
            try
            {
                if (data instanceof String)
                {
                    if (StringUtils.isEmpty(data) || data.length() <= 2) {
                        return null;
                    }
                    else
                    {
                        String[] split = data.split(spt1);
                        for (String s : split) {
                            String[] s1=s.split(spt2);
                            if(s1.length>1)
                            {
                                map.put(s1[0], s1[1]);
                            }
                            else
                            {
                                map.put(s1[0],"null");
                            }
                        }
    
                    }
                }
                else if(data!=null || data.length()>2)
                {
                    return null;
                }
            }
            catch (Exception e)
            {
                e.getMessage();
            }
            return map.toString();
        }
        public static String getType(Object o){ //获取变量类型方法
            return o.getClass().toString();
        }
        public static void main(String[] args) {
            String data1 = "certificatetype=0&certificateno=220182&depositacct=622848&name";
            System.out.println(testDataToJson(data1,"&", "="));
            String str="k1:,k2:V2";
            System.out.println(testDataToJson(str, ",", ":"));
            String str1="stayTimeu00020u0001firstScreenTimeu00020u0001apiNameu0002stringValueu0001apiParamu0002idu003d44u0001methodTypeu0002GETu0001loadingTimeu0002453u0001websiteIdu0002stringValueu0001pagePathu0002/stringValueu0001pageNameu0002/valueu0001sessionKeyu0002eyJhbX0AwqPisoE6V8Au0001userNameu000280254861u0001ipAddressu0002XX.XX.XCX.XXXu0001uploadTimeu00022020-11-23 13:07:13u0001receiveTimeu00022020-11-26 11:08:55u0001eventCodeu00025";
            System.out.println(testDataToMap(str1,"u0001", "u0002"));
            System.out.println(testDataToJson(str1,"u0001", "u0002"));
            String data ="client_timestampu00011587375750618u0002apply_valueu00013u0002pre_page_idu0001DraftActivityu0002close_valueu00010u0002item_idu0001no_applyu0002start_timestampu00011587375592167u0002template_id_autou0001-1u0002music_nameu0001少年2u0002play_cntu00010u0002duration_valueu00015506u0002video_idu00011587375633000u0002is_storyu0001false";
            System.out.println(testDataToJson(data,"u0002", "u0001"));
            String str2="ts=1529995388&channel_id=164292&program_id=9081951&play_duration=20&position=10&os=iOS&os_version=4.3&app_name=aliyun&app_version=1.0&device_model=Samsumg_br_FM__samsung_SCH-N719";
            System.out.println(testDataToJson(str2,"&","="));
            System.out.println(testDataToMap(str2,"&","="));
            String logMap ="websiteIdu0002ota-recruit-TEST_ENVu0001pagePathu0002/configuration/indexu0001pageNameu0002????u0001sessionKeyu0002sessionKeyu0001userNameu00020u0001ipAddressu0002172.17.75.8u0001uploadTimeu00022020-12-09 15:10:13u0001receiveTimeu00022020-12-09 15:10:20u0001eventCodeu00021";
            System.out.println(testDataToJson(logMap,"u0001", "u0002"));
            System.out.println(testDataToMap(logMap, "u0001", "u0002"));
        }
    }

    测试结果

    "C:Program FilesJavajdk1.8.0_221injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3libidea_rt.jar=54638:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3in" -Dfile.encoding=UTF-8 -classpath C:Usersdefaultuser0AppDataLocalTempclasspath1090826265.jar Flink.Udf.Test
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/C:/Users/defaultuser0/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/C:/Users/defaultuser0/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.10.0/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    {"certificatetype":"0","certificateno":"220182","depositacct":"622848","name":"null"}
    {"k1":"null","k2":"V2"}
    {stayTime=0, firstScreenTime=0, apiName=stringValue, apiParam=id=44, methodType=GET, loadingTime=453, websiteId=stringValue, pagePath=/stringValue, pageName=/value, sessionKey=eyJhbX0AwqPisoE6V8A, userName=80254861, ipAddress=XX.XX.XCX.XXX, uploadTime=2020-11-23 13:07:13, receiveTime=2020-11-26 11:08:55, eventCode=5}
    {"stayTime":"0","firstScreenTime":"0","apiName":"stringValue","apiParam":"id=44","methodType":"GET","loadingTime":"453","websiteId":"stringValue","pagePath":"/stringValue","pageName":"/value","sessionKey":"eyJhbX0AwqPisoE6V8A","userName":"80254861","ipAddress":"XX.XX.XCX.XXX","uploadTime":"2020-11-23 13:07:13","receiveTime":"2020-11-26 11:08:55","eventCode":"5"}
    {"client_timestamp":"1587375750618","apply_value":"3","pre_page_id":"DraftActivity","close_value":"0","item_id":"no_apply","start_timestamp":"1587375592167","template_id_auto":"-1","music_name":"少年2","play_cnt":"0","duration_value":"5506","video_id":"1587375633000","is_story":"false"}
    {"ts":"1529995388","channel_id":"164292","program_id":"9081951","play_duration":"20","position":"10","os":"iOS","os_version":"4.3","app_name":"aliyun","app_version":"1.0","device_model":"Samsumg_br_FM__samsung_SCH-N719"}
    {ts=1529995388, channel_id=164292, program_id=9081951, play_duration=20, position=10, os=iOS, os_version=4.3, app_name=aliyun, app_version=1.0, device_model=Samsumg_br_FM__samsung_SCH-N719}
    {"websiteId":"ota-recruit-TEST_ENV","pagePath":"/configuration/index","pageName":"????","sessionKey":"sessionKey","userName":"0","ipAddress":"172.17.75.8","uploadTime":"2020-12-09 15:10:13","receiveTime":"2020-12-09 15:10:20","eventCode":"1"}
    {websiteId=ota-recruit-TEST_ENV, pagePath=/configuration/index, pageName=????, sessionKey=sessionKey, userName=0, ipAddress=172.17.75.8, uploadTime=2020-12-09 15:10:13, receiveTime=2020-12-09 15:10:20, eventCode=1}

    Process finished with exit code 0

     测试2

    package FlinkDW.Udf;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.commons.lang.StringUtils;
    import java.util.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    public class Test {
        public static String  testDataToMap(String data,String spt1,String spt2) {
            Map<String,String> map = new HashMap<String,String>();
            try
            {
                if (data instanceof String)
                {
                    if (StringUtils.isEmpty(data) || data.length() <= 2) {
                        return data;
                    }
                    else
                    {
                        String[] split = data.split(spt1);
                        for (String s : split) {
                            String[] s1=s.split(spt2);
                            if(s1.length>1)
                            {
                                map.put(s1[0], s1[1]);
                            }
                            else
                            {
                                map.put(s1[0],"null");
                            }
                        }
    
                    }
                }
                else if(data!=null || data.length()>2)
                {
                    return "please input type of string "+getType(data);
                }
            }
            catch (Exception e)
            {
                e.getMessage();
            }
            return JSONObject.toJSONString(map);
        }
        public static String getType(Object o){ //获取变量类型方法
            return o.getClass().toString();
        }
        public static void main(String[] args) {
            String data1 = "certificatetype=0&certificateno=220182&depositacct=622848&name";
            String data ="client_timestampu00011587375750618u0002apply_valueu00013u0002pre_page_idu0001DraftActivityu0002close_valueu00010u0002item_idu0001no_applyu0002start_timestampu00011587375592167u0002template_id_autou0001-1u0002music_nameu0001少年2u0002play_cntu00010u0002duration_valueu00015506u0002video_idu00011587375633000u0002is_storyu0001false";
            System.out.println(testDataToMap(data,"u0002", "u0001"));
            System.out.println(testDataToMap(data1,"&", "="));
            String str="k1:,k2:V2";
            System.out.println(testDataToMap(str, ",", ":"));
        }
    }

    结果:

    "C:Program FilesJavajdk1.8.0_221injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3libidea_rt.jar=65426:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3in" -Dfile.encoding=UTF-8 -classpath C:Usersdefaultuser0AppDataLocalTempclasspath1898420598.jar FlinkDW.Udf.Test
    {"client_timestamp":"1587375750618","apply_value":"3","pre_page_id":"DraftActivity","close_value":"0","item_id":"no_apply","start_timestamp":"1587375592167","template_id_auto":"-1","music_name":"少年2","play_cnt":"0","duration_value":"5506","video_id":"1587375633000","is_story":"false"}
    {"name":"null","certificatetype":"0","certificateno":"220182","depositacct":"622848"}
    {"k1":"null","k2":"V2"}

    Process finished with exit code 0

    7、json转map

    package Udf;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    import java.util.Map;
    //方法1:json转成map
    public class JsonToMap extends ScalarFunction {
        private static final  long serialVersionUID=1L;
        public JsonToMap() {
        }
    
        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
        }
    
        @Override
        public void close() throws Exception {
            super.close();
        }
    
        public Map eval(String str)
        {
            try {
                if(str !=null)
                {
                    JSONObject jsonObject = JSONObject.parseObject(str);
                    Map<String,Object> map = jsonObject;
                    return map;
                }
                else
                {
                    return null;
                }
            }
            catch(Exception ex)
            {
                ex.printStackTrace();
                return null;
            }
        }
    }

    注意:FastJson的json转map的6种方式

    package Udf;
    import java.util.Map;
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    public class Test1 {
        public static void main(String[] args){
    
            String str = "{"0":"zhangsan","1":"lisi","2":"wangwu","3":"maliu"}";
            //第一种方式
            Map maps = (Map) JSON.parse(str);
            System.out.println("这个是用JSON类来解析JSON字符串!!!");
            for (Object map : maps.entrySet()){
                System.out.println(((Map.Entry)map).getKey()+"     " + ((Map.Entry)map).getValue());
            }
            //第二种方式
            Map mapTypes = JSON.parseObject(str);
            System.out.println("这个是用JSON类的parseObject来解析JSON字符串!!!");
            for (Object obj : mapTypes.keySet()){
                System.out.println("key为:"+obj+"值为:"+mapTypes.get(obj));
            }
            //第三种方式
            Map mapType = JSON.parseObject(str,Map.class);
            System.out.println("这个是用JSON类,指定解析类型,来解析JSON字符串!!!");
            for (Object obj : mapType.keySet()){
                System.out.println("key为:"+obj+"值为:"+mapType.get(obj));
            }
            //第四种方式
            /**
             * JSONObject是Map接口的一个实现类
             */
            Map json = (Map) JSONObject.parse(str);
            System.out.println("这个是用JSONObject类的parse方法来解析JSON字符串!!!");
            for (Object map : json.entrySet()){
                System.out.println(((Map.Entry)map).getKey()+"  "+((Map.Entry)map).getValue());
            }
            //第五种方式
            /**
             * JSONObject是Map接口的一个实现类
             */
            JSONObject jsonObject = JSONObject.parseObject(str);
            System.out.println("这个是用JSONObject的parseObject方法来解析JSON字符串!!!");
            for (Object map : json.entrySet()){
                System.out.println(((Map.Entry)map).getKey()+"  "+((Map.Entry)map).getValue());
            }
            //第六种方式
            /**
             * JSONObject是Map接口的一个实现类
             */
            Map mapObj = JSONObject.parseObject(str,Map.class);
            System.out.println("这个是用JSONObject的parseObject方法并执行返回类型来解析JSON字符串!!!");
            for (Object map: json.entrySet()){
                System.out.println(((Map.Entry)map).getKey()+"  "+((Map.Entry)map).getValue());
            }
            String strArr = "{{"0":"zhangsan","1":"lisi","2":"wangwu","3":"maliu"}," +
                    "{"00":"zhangsan","11":"lisi","22":"wangwu","33":"maliu"}}";
            // JSONArray.parse()
            System.out.println(json);
        }
    }

    结果:

    "C:Program FilesJavajdk1.8.0_221injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3libidea_rt.jar=55064:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3in" -Dfile.encoding=UTF-8 -classpath "C:Program FilesJavajdk1.8.0_221jrelibcharsets.jar;C:Program FilesJavajdk1.8.0_221jrelibdeploy.jar;C:Program FilesJavajdk1.8.0_221jrelibextaccess-bridge-64.jar;C:Program FilesJavajdk1.8.0_221jrelibextcldrdata.jar;C:Program FilesJavajdk1.8.0_221jrelibextdnsns.jar;C:Program FilesJavajdk1.8.0_221jrelibextjaccess.jar;C:Program FilesJavajdk1.8.0_221jrelibextjfxrt.jar;C:Program FilesJavajdk1.8.0_221jrelibextlocaledata.jar;C:Program FilesJavajdk1.8.0_221jrelibext
    ashorn.jar;C:Program FilesJavajdk1.8.0_221jrelibextsunec.jar;C:Program FilesJavajdk1.8.0_221jrelibextsunjce_provider.jar;C:Program FilesJavajdk1.8.0_221jrelibextsunmscapi.jar;C:Program FilesJavajdk1.8.0_221jrelibextsunpkcs11.jar;C:Program FilesJavajdk1.8.0_221jrelibextzipfs.jar;C:Program FilesJavajdk1.8.0_221jrelibjavaws.jar;C:Program FilesJavajdk1.8.0_221jrelibjce.jar;C:Program FilesJavajdk1.8.0_221jrelibjfr.jar;C:Program FilesJavajdk1.8.0_221jrelibjfxswt.jar;C:Program FilesJavajdk1.8.0_221jrelibjsse.jar;C:Program FilesJavajdk1.8.0_221jrelibmanagement-agent.jar;C:Program FilesJavajdk1.8.0_221jrelibplugin.jar;C:Program FilesJavajdk1.8.0_221jrelib
    esources.jar;C:Program FilesJavajdk1.8.0_221jrelib
    t.jar;C:appFlinkUdf	arget	est-classes;C:Usersdefaultuser0.m2
    epositoryjunitjunit4.11junit-4.11.jar;C:Usersdefaultuser0.m2
    epositoryorghamcresthamcrest-core1.3hamcrest-core-1.3.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-table-api-java-bridge_2.111.11.1flink-table-api-java-bridge_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-table-api-java1.11.1flink-table-api-java-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-java1.11.1flink-java-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapachecommonscommons-lang33.3.2commons-lang3-3.3.2.jar;C:Usersdefaultuser0.m2
    epositoryorgapachecommonscommons-math33.5commons-math3-3.5.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-streaming-java_2.111.11.1flink-streaming-java_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-runtime_2.111.11.1flink-runtime_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-queryable-state-client-java1.11.1flink-queryable-state-client-java-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-hadoop-fs1.11.1flink-hadoop-fs-1.11.1.jar;C:Usersdefaultuser0.m2
    epositorycommons-iocommons-io2.4commons-io-2.4.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-shaded-netty4.1.39.Final-11.0flink-shaded-netty-4.1.39.Final-11.0.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-shaded-jackson2.10.1-11.0flink-shaded-jackson-2.10.1-11.0.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-shaded-zookeeper-33.4.14-11.0flink-shaded-zookeeper-3-3.4.14-11.0.jar;C:Usersdefaultuser0.m2
    epositorycommons-clicommons-cli1.3.1commons-cli-1.3.1.jar;C:Usersdefaultuser0.m2
    epositorycom	ypesafeakkaakka-actor_2.112.5.21akka-actor_2.11-2.5.21.jar;C:Usersdefaultuser0.m2
    epositorycom	ypesafeconfig1.3.3config-1.3.3.jar;C:Usersdefaultuser0.m2
    epositoryorgscala-langmodulesscala-java8-compat_2.11.7.0scala-java8-compat_2.11-0.7.0.jar;C:Usersdefaultuser0.m2
    epositorycom	ypesafeakkaakka-stream_2.112.5.21akka-stream_2.11-2.5.21.jar;C:Usersdefaultuser0.m2
    epositoryorg
    eactivestreams
    eactive-streams1.0.2
    eactive-streams-1.0.2.jar;C:Usersdefaultuser0.m2
    epositorycom	ypesafessl-config-core_2.11.3.7ssl-config-core_2.11-0.3.7.jar;C:Usersdefaultuser0.m2
    epositorycom	ypesafeakkaakka-protobuf_2.112.5.21akka-protobuf_2.11-2.5.21.jar;C:Usersdefaultuser0.m2
    epositorycom	ypesafeakkaakka-slf4j_2.112.5.21akka-slf4j_2.11-2.5.21.jar;C:Usersdefaultuser0.m2
    epositoryorgclappergrizzled-slf4j_2.111.3.2grizzled-slf4j_2.11-1.3.2.jar;C:Usersdefaultuser0.m2
    epositorycomgithubscoptscopt_2.113.5.0scopt_2.11-3.5.0.jar;C:Usersdefaultuser0.m2
    epositoryorgxerialsnappysnappy-java1.1.4snappy-java-1.1.4.jar;C:Usersdefaultuser0.m2
    epositorycom	witterchill_2.11.7.6chill_2.11-0.7.6.jar;C:Usersdefaultuser0.m2
    epositorycom	witterchill-java.7.6chill-java-0.7.6.jar;C:Usersdefaultuser0.m2
    epositoryorglz4lz4-java1.6.0lz4-java-1.6.0.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-shaded-guava18.0-11.0flink-shaded-guava-18.0-11.0.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-streaming-java_2.111.11.1flink-streaming-java_2.11-1.11.1-tests.jar;C:Usersdefaultuser0.m2
    epositoryorgslf4jslf4j-api1.7.15slf4j-api-1.7.15.jar;C:Usersdefaultuser0.m2
    epositorycomgooglecodefindbugsjsr3051.3.9jsr305-1.3.9.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkforce-shading1.11.1force-shading-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-table-planner-blink_2.111.11.1flink-table-planner-blink_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-table-api-scala_2.111.11.1flink-table-api-scala_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-table-api-scala-bridge_2.111.11.1flink-table-api-scala-bridge_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-table-runtime-blink_2.111.11.1flink-table-runtime-blink_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgcodehausjaninojanino3.0.9janino-3.0.9.jar;C:Usersdefaultuser0.m2
    epositoryorgcodehausjaninocommons-compiler3.0.9commons-compiler-3.0.9.jar;C:Usersdefaultuser0.m2
    epositoryorgapachecalciteavaticaavatica-core1.16.0avatica-core-1.16.0.jar;C:Usersdefaultuser0.m2
    epositoryorg
    eflections
    eflections.9.10
    eflections-0.9.10.jar;C:Usersdefaultuser0.m2
    epositoryorgjavassistjavassist3.19.0-GAjavassist-3.19.0-GA.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-streaming-scala_2.111.11.1flink-streaming-scala_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-scala_2.111.11.1flink-scala_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgscala-langscala-reflect2.11.12scala-reflect-2.11.12.jar;C:Usersdefaultuser0.m2
    epositoryorgscala-langscala-library2.11.12scala-library-2.11.12.jar;C:Usersdefaultuser0.m2
    epositoryorgscala-langscala-compiler2.11.12scala-compiler-2.11.12.jar;C:Usersdefaultuser0.m2
    epositoryorgscala-langmodulesscala-xml_2.111.0.5scala-xml_2.11-1.0.5.jar;C:Usersdefaultuser0.m2
    epositoryorgscala-langmodulesscala-parser-combinators_2.111.0.4scala-parser-combinators_2.11-1.0.4.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-table-common1.11.1flink-table-common-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-core1.11.1flink-core-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-annotations1.11.1flink-annotations-1.11.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-metrics-core1.11.1flink-metrics-core-1.11.1.jar;C:Usersdefaultuser0.m2
    epositorycomesotericsoftwarekryokryo2.24.0kryo-2.24.0.jar;C:Usersdefaultuser0.m2
    epositorycomesotericsoftwareminlogminlog1.2minlog-1.2.jar;C:Usersdefaultuser0.m2
    epositoryorgobjenesisobjenesis2.1objenesis-2.1.jar;C:Usersdefaultuser0.m2
    epositoryorgapachecommonscommons-compress1.20commons-compress-1.20.jar;C:Usersdefaultuser0.m2
    epositoryorgapacheflinkflink-shaded-asm-77.1-11.0flink-shaded-asm-7-7.1-11.0.jar;C:Usersdefaultuser0.m2
    epositorycomalibabafastjson1.2.73fastjson-1.2.73.jar;C:Usersdefaultuser0.m2
    epositoryorgslf4jslf4j-log4j121.7.7slf4j-log4j12-1.7.7.jar;C:Usersdefaultuser0.m2
    epositorylog4jlog4j1.2.17log4j-1.2.17.jar;C:Usersdefaultuser0.m2
    epository
    etsfjson-libjson-lib2.4json-lib-2.4-jdk15.jar;C:Usersdefaultuser0.m2
    epositorycommons-beanutilscommons-beanutils1.8.0commons-beanutils-1.8.0.jar;C:Usersdefaultuser0.m2
    epositorycommons-collectionscommons-collections3.2.1commons-collections-3.2.1.jar;C:Usersdefaultuser0.m2
    epositorycommons-langcommons-lang2.5commons-lang-2.5.jar;C:Usersdefaultuser0.m2
    epositorycommons-loggingcommons-logging1.1.1commons-logging-1.1.1.jar;C:Usersdefaultuser0.m2
    epository
    etsfezmorphezmorph1.0.6ezmorph-1.0.6.jar" Udf.Test1
    这个是用JSON类来解析JSON字符串!!!
    0     zhangsan
    1     lisi
    2     wangwu
    3     maliu
    这个是用JSON类的parseObject来解析JSON字符串!!!
    key为:0值为:zhangsan
    key为:1值为:lisi
    key为:2值为:wangwu
    key为:3值为:maliu
    这个是用JSON类,指定解析类型,来解析JSON字符串!!!
    key为:0值为:zhangsan
    key为:1值为:lisi
    key为:2值为:wangwu
    key为:3值为:maliu
    这个是用JSONObject类的parse方法来解析JSON字符串!!!
    0  zhangsan
    1  lisi
    2  wangwu
    3  maliu
    这个是用JSONObject的parseObject方法来解析JSON字符串!!!
    0  zhangsan
    1  lisi
    2  wangwu
    3  maliu
    这个是用JSONObject的parseObject方法并执行返回类型来解析JSON字符串!!!
    0  zhangsan
    1  lisi
    2  wangwu
    3  maliu
    {"0":"zhangsan","1":"lisi","2":"wangwu","3":"maliu"}
    
    Process finished with exit code 0

     8、数组转字符串数组

    package Udf;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    import java.util.Arrays;
    //方法4:数组转成数组字符串
    public class ArrToString3 extends ScalarFunction {
        private static final  long serialVersionUID=1L;
        public ArrToString3() {
        }
    
        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
        }
    
        @Override
        public void close() throws Exception {
            super.close();
        }
    
        public String eval(String[] arr)
        {
            try {
               if(arr !=null && arr.length>0)
               {
                  return Arrays.toString(arr);
               }
               else
               {
                   return null;
               }
            }
            catch(Exception ex)
            {
                return ex.getMessage();
            }
        }
    }

    9、数组转json

    package Udf;
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    import org.json.JSONArray;
    //方法1:数组转成json数组
    public class ArrToJson extends ScalarFunction {
        private static final long serialVersionUID = 1L;
    
        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
        }
    
        @Override
        public void close() throws Exception {
            super.close();
        }
    
        public ArrToJson() {
        }
    
        public String eval(String[] arr) {
            try {
                if (arr != null && arr.length > 0) {
                    JSONArray jsonArray=new JSONArray(arr);
                    return jsonArray.toString();
                } else {
                    return null;
                }
            } catch (Exception ex) {
                return ex.getMessage();
            }
        }
    }

     10、LogMapInfo数据格式处理(logmap字段解析udf)

    1、模型包

    1.CellInfo
    package Test.Model;
    
    /**
     * @program: FlinkUdf
     * @description: CellInfo
     * @author: BigData
     * @create: 2020-11-16 14:50
     **/
    public class CellInfo {
        private String mcc;
        private String mnc;
        private String ci;
        private String pci;
        private String tac;
        private String type;
        public String getMcc() {
            return mcc;
        }
    
        public void setMcc(String mcc) {
            this.mcc = mcc;
        }
    
        public String getMnc() {
            return mnc;
        }
    
        public void setMnc(String mnc) {
            this.mnc = mnc;
        }
    
        public String getCi() {
            return ci;
        }
    
        public void setCi(String ci) {
            this.ci = ci;
        }
    
        public String getPci() {
            return pci;
        }
    
        public void setPci(String pci) {
            this.pci = pci;
        }
    
        public String getTac() {
            return tac;
        }
    
        public void setTac(String tac) {
            this.tac = tac;
        }
    
        public String getType() {
            return type;
        }
    
        public void setType(String type) {
            this.type = type;
        }
    }
    2.GPS
    package Test.Model;
    
    /**
     * @program: FlinkUdf
     * @description: Gps
     * @author: BigData
     * @create: 2020-11-16 14:51
     **/
    public class Gps {
        private String longitude;
        private String latitude;
    
        public String getLongitude() {
            return longitude;
        }
    
        public void setLongitude(String longitude) {
            this.longitude = longitude;
        }
    
        public String getLatitude() {
            return latitude;
        }
    
        public void setLatitude(String latitude) {
            this.latitude = latitude;
        }
    }
    package Test.Model;
    
    /**
     * @program: FlinkUdf
     * @description: Gps
     * @author: BigData
     * @create: 2020-11-16 14:51
     **/
    public class Gps {
        private String longitude;
        private String latitude;
    
        public String getLongitude() {
            return longitude;
        }
    
        public void setLongitude(String longitude) {
            this.longitude = longitude;
        }
    
        public String getLatitude() {
            return latitude;
        }
    
        public void setLatitude(String latitude) {
            this.latitude = latitude;
        }
    }
    3.StationInfo
    package Test.Model;
    
    import java.util.List;
    
    /**
     * @program: FlinkUdf
     * @description: 地铁模型
     * @author: BigData
     * @create: 2020-11-16 14:49
     **/
    public final class StationInfo {
        private String station;
        private CellInfo cell_info;
        private List<String> wifi_infos;
        private String timestamp;
        private String reason;
        private String imei;
        private String guid;
        private Gps gps;
        private String city;
        private String group;
        private String version_name;
        private String version_code;
    
        public String getStation() {
            return station;
        }
    
        public void setStation(String station) {
            this.station = station;
        }
    
        public CellInfo getCell_info() {
            return cell_info;
        }
    
        public void setCell_info(CellInfo cell_info) {
            this.cell_info = cell_info;
        }
    
        public List<String> getWifi_infos() {
            return wifi_infos;
        }
    
        public void setWifi_infos(List<String> wifi_infos) {
            this.wifi_infos = wifi_infos;
        }
    
        public String getTimestamp() {
            return timestamp;
        }
    
        public void setTimestamp(String timestamp) {
            this.timestamp = timestamp;
        }
    
        public String getReason() {
            return reason;
        }
    
        public void setReason(String reason) {
            this.reason = reason;
        }
    
        public String getImei() {
            return imei;
        }
    
        public void setImei(String imei) {
            this.imei = imei;
        }
    
        public String getGuid() {
            return guid;
        }
    
        public void setGuid(String guid) {
            this.guid = guid;
        }
    
        public Gps getGps() {
            return gps;
        }
    
        public void setGps(Gps gps) {
            this.gps = gps;
        }
    
        public String getCity() {
            return city;
        }
    
        public void setCity(String city) {
            this.city = city;
        }
    
        public String getGroup() {
            return group;
        }
    
        public void setGroup(String group) {
            this.group = group;
        }
    
        public String getVersion_name() {
            return version_name;
        }
    
        public void setVersion_name(String version_name) {
            this.version_name = version_name;
        }
    
        public String getVersion_code() {
            return version_code;
        }
    
        public void setVersion_code(String version_code) {
            this.version_code = version_code;
        }
    }

    2、工具包

    1、LogMapInfoSplit(logmap数据格式解析)
    package Test.Util;
    import java.lang.reflect.Field;
    import java.nio.charset.StandardCharsets;
    import java.sql.Timestamp;
    import java.util.Base64;
    import java.util.HashMap;
    import java.util.Map;
    import Test.Model.StationInfo;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.commons.lang3.time.DateFormatUtils;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.table.functions.TableFunction;
    import org.apache.flink.types.Row;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import com.google.gson.Gson;
    /**
     * @program: FlinkUdf
     * @description: LogMap字段解析
     * @author: BigData
     * @create: 2020-11-16 14:53
     **/
    public class LogMapInfoSplit extends TableFunction<Row> {
        private static final long serialVersionUID = 5485869053798534732L;
        private static Gson gson = new Gson();
        private static Logger logger= LoggerFactory.getLogger(LogMapInfoSplit.class);
        /**
         * 接受参数格式转换
         * @param param
         */
        public void eval(String param) {
            if(StringUtils.isBlank(param)) {
                logger.debug("param is empty");
                return;
            }
            try {
                final Base64.Decoder decoder = Base64.getUrlDecoder();
                String ocaUploadJson = new String(
                        decoder.decode(param.replace(" ", "+").replace("$", "=").getBytes(StandardCharsets.UTF_8)),
                        StandardCharsets.UTF_8);
                logger.info("sourceParam :" + ocaUploadJson);
                if (StringUtils.isBlank(ocaUploadJson)) {
                    return;
                }
                StationInfo stationInfo = gson.fromJson(ocaUploadJson, StationInfo.class);
                if (stationInfo == null)
                    return;
                logger.info("received wifiInfo:" + stationInfo.toString());
                String[] wifiInfos = null;
                if (stationInfo.getWifi_infos() != null && stationInfo.getWifi_infos().size() != 0) {
                    wifiInfos = stationInfo.getWifi_infos().toArray(new String[stationInfo.getWifi_infos().size()]);
                }
                Row row = new Row(13);
                row.setField(0, stationInfo.getStation());
                row.setField(1, stationInfo.getImei());
                row.setField(2, objectToMap(stationInfo.getCell_info()));
                row.setField(3, wifiInfos);
                row.setField(4, stationInfo.getCity());
                row.setField(5, stationInfo.getReason());
                logger.info("station is:" + stationInfo.getStation());
                Map<String, String> objMap = objectToMap(stationInfo.getGps());
                logger.info("obj map is:" + objMap);
                row.setField(6, objMap);
                row.setField(7, new Timestamp(Long.parseLong(stationInfo.getTimestamp())));
                row.setField(8, stationInfo.getGroup());
                row.setField(9, DateFormatUtils.ISO_DATE_FORMAT.format(Long.parseLong(stationInfo.getTimestamp())));
                row.setField(10,stationInfo.getGuid());
                row.setField(11,stationInfo.getVersion_name());
                row.setField(12,stationInfo.getVersion_code());
                collect(row);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    
        /**
         * 指定返回类型
         */
        @Override
        public TypeInformation<Row> getResultType() {
            return Types.ROW(Types.STRING, Types.STRING, Types.MAP(Types.STRING, Types.STRING)
                    , Types.OBJECT_ARRAY(Types.STRING), Types.STRING, Types.STRING, Types.MAP(Types.STRING, Types.STRING), Types.SQL_TIMESTAMP, Types.STRING, Types.STRING,Types.STRING,Types.STRING,Types.STRING);
        }
        /**
         * object转换为Map
         * @param obj
         * @return
         */
        public static Map<String, String> objectToMap(Object obj) {
            if(obj==null)
            {
                return null;
            }
            else {
                Map<String, String> map = new HashMap<String, String>();
                Class<?> clazz = obj.getClass();
                for (Field field : clazz.getDeclaredFields()) {
                    field.setAccessible(true);
                    String fieldName = field.getName();
                    try {
                        map.put(fieldName, String.valueOf(field.get(obj)));
                    } catch (Exception e) {
                        logger.error(e.getMessage());
                    }
                }
                return map;
            }
        }
    }

     flinksql代码:

    INSERT INTO
      software_research.subwayaware_info
    SELECT
      station,
      logMap_imei as `imei`,
      cell_info,
      wifi_infos,
      city,
      reason,
      gps,
      CAST(createtime as VARCHAR) as `timestamp`,
      logMap_group as `group`,
      datestr,
      model,
      event_id,
      guid,
      version_name,
      version_code
    FROM
      software_research.subwayaware_dcs
      LEFT JOIN LATERAL TABLE(logMapInfoSplit(log_map ['oca_upload_json'])) AS T(
        station,
        logMap_imei,
        cell_info,
        wifi_infos,
        city,
        reason,
        gps,
        createtime,
        logMap_group,
        datestr,
        guid,
        version_name,
        version_code
      ) ON TRUE
  • 相关阅读:
    Thoughtworks的技术雷达
    Stackdump: 一个可以离线看stackoverflow的工具
    我最喜欢的visual studio 2013的新特性
    把用octopress最新发布的博文同步到提供metaweblog API的博客(例如博客园)上
    博客搬家到 http://fresky.github.io/
    runnable:在线IDE+代码片段分享
    用LINQPad加上Tx驱动来分析log
    编码规范
    数论欧几里德定理的运用
    uva 11806 容斥原理+dfs
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/13924746.html
Copyright © 2011-2022 走看看