1、pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>FlinkUdf</artifactId> <version>1.0-SNAPSHOT</version> <name>test</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <flink.version>1.11.1</flink.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.0</scala.version> <hadoop.version>3.0.0</hadoop.version> <hive.version>3.0.0</hive.version> <hbase.version>2.3.0</hbase.version> <spark.version>3.0.0</spark.version> <jedis.version>3.0.0</jedis.version> </properties> <dependencies> <!-- 0、基本语言--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- 1、Flink modules--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>scala-library</artifactId> <groupId>org.scala-lang</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 2、CLI dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 3、alibaba的json依赖--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> <exclusions> <exclusion> <artifactId>javassist</artifactId> <groupId>org.javassist</groupId> </exclusion> <exclusion> <artifactId>scala-parser-combinators_2.11</artifactId> <groupId>org.scala-lang.modules</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>snappy-java</artifactId> <groupId>org.xerial.snappy</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <!-- 4、kafka依赖--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.3</version> <exclusions> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency> <!-- 5、数据库依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.12</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.12</artifactId> <version>1.10.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.37</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1.5</version> <exclusions> <exclusion> <artifactId>force-shading</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.9.5</version> </dependency> <dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> <version>5.0.5.RELEASE</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.4.Final</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>${jedis.version}</version> </dependency> <!-- Add connector dependencies here. They must be in the default scope (compile). --> <!-- Example: <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency> --> <!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <!-- 5、log日志依赖--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.4</version> <classifier>jdk15</classifier> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <!-- 6、离线数仓hive依赖--> <!-- ①、hadoop依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <!-- ②、hive依赖--> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>${hive.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency> <!-- ③、hbase依赖--> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <!-- 7、spark依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <repositories> <!-- <repository><id>nexus-aliyun</id><name>Nexus aliyun</name><layout>default</layout><url>http://maven.aliyun.com/nexus/content/groups/public</url><snapshots><enabled>false</enabled></snapshots><releases><enabled>true</enabled></releases></repository>--> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <build> <pluginManagement> <!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> <plugins> <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --> <plugin> <artifactId>maven-clean-plugin</artifactId> <version>3.1.0</version> </plugin> <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.1</version> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --> <plugin> <artifactId>maven-site-plugin</artifactId> <version>3.7.1</version> </plugin> <plugin> <artifactId>maven-project-info-reports-plugin</artifactId> <version>3.0.0</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>
2、数组转字符串
1、方法①
package Udf; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; //方法1:数组按照指定的分隔符转成字符串 public class ArrToString extends ScalarFunction { private static final long serialVersionUID=1L; public ArrToString() { } @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(String[] bodyRow, String split) { try { if(bodyRow !=null && bodyRow.length>0) { StringBuilder stringBuilder=new StringBuilder(); for(int i=0;i<bodyRow.length-1;i++) { stringBuilder.append(bodyRow[i]).append(split); } stringBuilder.append(bodyRow[bodyRow.length-1]); return stringBuilder.toString(); } else { return null; } } catch (Exception ex) { return ex.getMessage(); } } }
2、方法②
package Udf; import org.apache.commons.lang3.ArrayUtils; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; //方法3:数组按照指定的分隔符转成字符串 public class ArrToString2 extends ScalarFunction { private static final long serialVersionUID=1L; public ArrToString2() { } @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(String[] arr, String split) { try { if(arr !=null && arr.length>0) { return ArrayUtils.toString(arr,split); } else { return null; } } catch(Exception ex) { return ex.getMessage(); } } }
3、方法③
package Udf; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; import java.util.Arrays; //方法4:数组转成数组字符串 public class ArrToString3 extends ScalarFunction { private static final long serialVersionUID=1L; public ArrToString3() { } @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(String[] arr) { try { if(arr !=null && arr.length>0) { return Arrays.toString(arr); } else { return null; } } catch(Exception ex) { return ex.getMessage(); } } }
3、字符串转数组
package Udf; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; //方法1:字符串按照指定的分隔符转成数组 public class StringToArr extends ScalarFunction { private static final long serialVersionUID=1L; public StringToArr() { } @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String[] eval(String arr, String split) { try { if(arr !=null) { return arr.split(split); } else { return null; } } catch(Exception ex) { return null; } } }
4、字符串转map
①
package Udf; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; import java.util.HashMap; import java.util.Map; //方法1:字符串按照指定的分隔符转成map类型 public class StringToMap extends ScalarFunction { private static final long serialVersionUID=1L; public StringToMap() { } @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public Map eval(String str, String split, String split1) { try { if(str !=null) { String [] arr=str.split(split); Map<String, Object> map = new HashMap<>(); for (int i=0;i<arr.length;i++) { String [] arr1=arr[i].split(split1); map.put(arr1[0],arr1[1]); return map; } } else { return null; } } catch(Exception ex) { ex.printStackTrace(); } return null; } }
②
import org.junit.Test; import java.util.HashMap; import java.util.Map; public class StringUtilsTest { @Test public void testDataToMap() { String data = "certificatetype=0&certificateno=220182&depositacct=622848"; Map map = new HashMap(); if (null != data) { String[] param = data.split("&"); for (int i = 0; i < param.length; i++) { int index = param[i].indexOf('='); map.put(param[i].substring(0,index), param[i].substring((index + 1))); } } System.out.println(map); System.out.println("----------------分割线---------------"); Map result = new HashMap(); String[] params = data.split("\&"); for (String entry : params) { if (entry.contains("=")) { String[] sub = entry.split("\="); if (sub.length > 1) { result.put(sub[0], sub[1]); } else { result.put(sub[0], ""); } } } System.out.println(result); } } String str1 = "d3fe1e186e41475ea965f4722f5488a8"; String str2 = "5093"; String str3 = "公共设施"; String str = str1 + "1" + str2 + "1" + str3; System.out.println(str); // 也就是下面的字符串,分隔符为 u0001 str = "d3fe1e186e41475ea965f4722f5488a8u00015093u0001公共设施"; String[] split = str.split("1"); for (String s : split) { System.out.println(s); } System.out.println(split.length);
③
package Flink.Udf; import org.apache.commons.lang.StringUtils; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.LinkedHashMap; import java.util.Map; /** * mapStr转Map */ public class MapStrToMap extends ScalarFunction { private static final long serialVersionUID = 1L; private static final Logger logger = LoggerFactory.getLogger(MapStrToMap.class); @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(String data,String spt1,String spt2) { Map<String, String> map = new LinkedHashMap<>(); try { if (StringUtils.isEmpty(data)) { return ""; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1 = s.split(spt2); if (s1.length > 1) { map.put(s1[0], s1[1]); } else { map.put(s1[0], null); } } } } catch (Exception e) { logger.error("MapStr to Json Error!,mapStr={}", e); } return map.toString(); } }
5、map转字符串
package Udf; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableFunction; import java.util.Arrays; import java.util.Map; import java.util.Set; //方法1:map按照指定的分隔符转成字符串 public class MapToString extends ScalarFunction { private static final long serialVersionUID=1L; public MapToString() { } public String eval(Map<String,Object> map,String split,String split1) { try { Set<String> keySet = map.keySet(); //将set集合转换为数组 String[] keyArray = keySet.<String>toArray(new String[keySet.size()]); //给数组排序(升序) Arrays.sort(keyArray); //因为String拼接效率会很低的,所以转用StringBuilder StringBuilder sb = new StringBuilder(); for (int i = 0; i < keyArray.length; i++) { // 参数值为空,则不参与签名 这个方法trim()是去空格 if ((String.valueOf(map.get(keyArray[i]))).trim().length() > 0) { sb.append(keyArray[i]).append(split1).append(String.valueOf(map.get(keyArray[i])).trim()); } if(i != keyArray.length-1){ sb.append(split); } } return sb.toString(); } catch(Exception ex) { ex.printStackTrace(); return null; } } }
6、map转Json
①
package Flink.Udf; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang.StringUtils; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; import java.util.LinkedHashMap; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * mapStr转Json */ public class MapStrToJson extends ScalarFunction { private static final long serialVersionUID = 1L; private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class); @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(String data,String spt1,String spt2) { Map<String, String> map = new LinkedHashMap<>(); try { if (StringUtils.isEmpty(data)) { return ""; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1 = s.split(spt2); if (s1.length > 1) { map.put(s1[0], s1[1]); } else { map.put(s1[0], null); } } } } catch (Exception e) { logger.error("MapStr to Json Error!,mapStr={}", e); } return JSONObject.toJSONString(map); } }
②
package com.oppo.dc.ostream.udf.common; import com.alibaba.fastjson.JSON; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; import java.util.Map; public class MapToJSONString extends ScalarFunction { private static final long serialVersionUID = 1L; @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(Map map) { if (map == null) { return null; } String jsonStr = JSON.toJSONString(map); if (jsonStr.length() > 5000) { jsonStr = jsonStr.substring(0, 5000); } return jsonStr; } }
③
package com.oppo.dc.ostream.udf.common; import com.alibaba.fastjson.JSON; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; import java.util.Map; /** * Author: Fisher Xiang * Date: 2019-10-28 15:21 * * @description:MapToJSONStr * @version: 1.0.0 */ public class MapToJSONStr extends ScalarFunction { @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(Map map) { if (map == null) { return null; } if (map.containsKey("data")) { map.put("data", ((String) map.get("data")).replace(""%2C"", "","")); } String jsonStr = JSON.toJSONString(map); /*if (jsonStr.length() > 5000) { jsonStr = jsonStr.substring(0, 5000); }*/ return jsonStr; } }
④map字符串转json(按照hash进行排序)
package FlinkDW.Udf; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang.StringUtils; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; import java.util.HashMap; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * mapStr转Json */ public class MapStrToJson extends ScalarFunction { private static final long serialVersionUID = 1 L; private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class);@ Override public void open(FunctionContext context) throws Exception { super.open(context); }@ Override public void close() throws Exception { super.close(); } public String eval(String data, String spt1, String spt2) { Map < String, String > map = new HashMap < > (); try { if(StringUtils.isEmpty(data)) { return ""; } else { String[] split = data.split(spt1); for(String s: split) { String[] s1 = s.split(spt2); if(s1.length > 1) { map.put(s1[0], s1[1]); } else { map.put(s1[0], "null"); } } } } catch(Exception e) { logger.error("MapStr to Json Error!,mapStr={}", data, e); } return JSONObject.toJSONString(map); } }
⑤优化(固定顺序)
package Flink.Udf; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang.StringUtils; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; import java.util.LinkedHashMap; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * mapStr转Json */ public class MapStrToJson extends ScalarFunction { private static final long serialVersionUID = 1 L; private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class);@ Override public void open(FunctionContext context) throws Exception { super.open(context); }@ Override public void close() throws Exception { super.close(); } public String eval(String data, String spt1, String spt2) { Map < String, String > map = new LinkedHashMap < > (); try { if(StringUtils.isEmpty(data)) { return ""; } else { String[] split = data.split(spt1); for(String s: split) { String[] s1 = s.split(spt2); if(s1.length > 1) { map.put(s1[0], s1[1]); } else { map.put(s1[0], null); } } } } catch(Exception e) { logger.error("MapStr to Json Error!,mapStr={}", e); } return JSONObject.toJSONString(map); } }
测试
package Flink.Udf; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; public class Test { private static final Logger logger = LoggerFactory.getLogger(Test.class); /** * @Description: 1、map字符串转json * @Param: [data, spt1, spt2] * @return: java.lang.String * @Author: BigData * @Date: 2020/12/3 */ public static String testDataToJson(String data,String spt1,String spt2) { Map<String, String> map = new LinkedHashMap<>(); try { if (StringUtils.isEmpty(data)) { return ""; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1 = s.split(spt2); if (s1.length > 1) { map.put(s1[0], s1[1]); } else { map.put(s1[0], "null"); } } } } catch (Exception e) { logger.error("MapStr to Json Error!,mapStr={}", data, e); } return JSONObject.toJSONString(map); } /** * @Description: 2、map字符串转map格式 * @Param: [data, spt1, spt2] * @return: java.lang.String * @Author: BigData * @Date: 2020/12/3 */ public static String testDataToMap(String data,String spt1,String spt2) { Map<String,String> map = new LinkedHashMap<>(); try { if (data instanceof String) { if (StringUtils.isEmpty(data) || data.length() <= 2) { return null; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1=s.split(spt2); if(s1.length>1) { map.put(s1[0], s1[1]); } else { map.put(s1[0],"null"); } } } } else if(data!=null || data.length()>2) { return null; } } catch (Exception e) { e.getMessage(); } return map.toString(); } public static String getType(Object o){ //获取变量类型方法 return o.getClass().toString(); } public static void main(String[] args) { String data1 = "certificatetype=0&certificateno=220182&depositacct=622848&name"; System.out.println(testDataToJson(data1,"&", "=")); String str="k1:,k2:V2"; System.out.println(testDataToJson(str, ",", ":")); String str1="stayTimeu00020u0001firstScreenTimeu00020u0001apiNameu0002stringValueu0001apiParamu0002idu003d44u0001methodTypeu0002GETu0001loadingTimeu0002453u0001websiteIdu0002stringValueu0001pagePathu0002/stringValueu0001pageNameu0002/valueu0001sessionKeyu0002eyJhbX0AwqPisoE6V8Au0001userNameu000280254861u0001ipAddressu0002XX.XX.XCX.XXXu0001uploadTimeu00022020-11-23 13:07:13u0001receiveTimeu00022020-11-26 11:08:55u0001eventCodeu00025"; System.out.println(testDataToMap(str1,"u0001", "u0002")); System.out.println(testDataToJson(str1,"u0001", "u0002")); String data ="client_timestampu00011587375750618u0002apply_valueu00013u0002pre_page_idu0001DraftActivityu0002close_valueu00010u0002item_idu0001no_applyu0002start_timestampu00011587375592167u0002template_id_autou0001-1u0002music_nameu0001少年2u0002play_cntu00010u0002duration_valueu00015506u0002video_idu00011587375633000u0002is_storyu0001false"; System.out.println(testDataToJson(data,"u0002", "u0001")); String str2="ts=1529995388&channel_id=164292&program_id=9081951&play_duration=20&position=10&os=iOS&os_version=4.3&app_name=aliyun&app_version=1.0&device_model=Samsumg_br_FM__samsung_SCH-N719"; System.out.println(testDataToJson(str2,"&","=")); System.out.println(testDataToMap(str2,"&","=")); String logMap ="websiteIdu0002ota-recruit-TEST_ENVu0001pagePathu0002/configuration/indexu0001pageNameu0002????u0001sessionKeyu0002sessionKeyu0001userNameu00020u0001ipAddressu0002172.17.75.8u0001uploadTimeu00022020-12-09 15:10:13u0001receiveTimeu00022020-12-09 15:10:20u0001eventCodeu00021"; System.out.println(testDataToJson(logMap,"u0001", "u0002")); System.out.println(testDataToMap(logMap, "u0001", "u0002")); } }
测试结果
"C:Program FilesJavajdk1.8.0_221injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3libidea_rt.jar=54638:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3in" -Dfile.encoding=UTF-8 -classpath C:Usersdefaultuser0AppDataLocalTempclasspath1090826265.jar Flink.Udf.Test
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/defaultuser0/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/defaultuser0/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.10.0/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
{"certificatetype":"0","certificateno":"220182","depositacct":"622848","name":"null"}
{"k1":"null","k2":"V2"}
{stayTime=0, firstScreenTime=0, apiName=stringValue, apiParam=id=44, methodType=GET, loadingTime=453, websiteId=stringValue, pagePath=/stringValue, pageName=/value, sessionKey=eyJhbX0AwqPisoE6V8A, userName=80254861, ipAddress=XX.XX.XCX.XXX, uploadTime=2020-11-23 13:07:13, receiveTime=2020-11-26 11:08:55, eventCode=5}
{"stayTime":"0","firstScreenTime":"0","apiName":"stringValue","apiParam":"id=44","methodType":"GET","loadingTime":"453","websiteId":"stringValue","pagePath":"/stringValue","pageName":"/value","sessionKey":"eyJhbX0AwqPisoE6V8A","userName":"80254861","ipAddress":"XX.XX.XCX.XXX","uploadTime":"2020-11-23 13:07:13","receiveTime":"2020-11-26 11:08:55","eventCode":"5"}
{"client_timestamp":"1587375750618","apply_value":"3","pre_page_id":"DraftActivity","close_value":"0","item_id":"no_apply","start_timestamp":"1587375592167","template_id_auto":"-1","music_name":"少年2","play_cnt":"0","duration_value":"5506","video_id":"1587375633000","is_story":"false"}
{"ts":"1529995388","channel_id":"164292","program_id":"9081951","play_duration":"20","position":"10","os":"iOS","os_version":"4.3","app_name":"aliyun","app_version":"1.0","device_model":"Samsumg_br_FM__samsung_SCH-N719"}
{ts=1529995388, channel_id=164292, program_id=9081951, play_duration=20, position=10, os=iOS, os_version=4.3, app_name=aliyun, app_version=1.0, device_model=Samsumg_br_FM__samsung_SCH-N719}
{"websiteId":"ota-recruit-TEST_ENV","pagePath":"/configuration/index","pageName":"????","sessionKey":"sessionKey","userName":"0","ipAddress":"172.17.75.8","uploadTime":"2020-12-09 15:10:13","receiveTime":"2020-12-09 15:10:20","eventCode":"1"}
{websiteId=ota-recruit-TEST_ENV, pagePath=/configuration/index, pageName=????, sessionKey=sessionKey, userName=0, ipAddress=172.17.75.8, uploadTime=2020-12-09 15:10:13, receiveTime=2020-12-09 15:10:20, eventCode=1}
Process finished with exit code 0
测试2
package FlinkDW.Udf; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang.StringUtils; import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Test { public static String testDataToMap(String data,String spt1,String spt2) { Map<String,String> map = new HashMap<String,String>(); try { if (data instanceof String) { if (StringUtils.isEmpty(data) || data.length() <= 2) { return data; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1=s.split(spt2); if(s1.length>1) { map.put(s1[0], s1[1]); } else { map.put(s1[0],"null"); } } } } else if(data!=null || data.length()>2) { return "please input type of string "+getType(data); } } catch (Exception e) { e.getMessage(); } return JSONObject.toJSONString(map); } public static String getType(Object o){ //获取变量类型方法 return o.getClass().toString(); } public static void main(String[] args) { String data1 = "certificatetype=0&certificateno=220182&depositacct=622848&name"; String data ="client_timestampu00011587375750618u0002apply_valueu00013u0002pre_page_idu0001DraftActivityu0002close_valueu00010u0002item_idu0001no_applyu0002start_timestampu00011587375592167u0002template_id_autou0001-1u0002music_nameu0001少年2u0002play_cntu00010u0002duration_valueu00015506u0002video_idu00011587375633000u0002is_storyu0001false"; System.out.println(testDataToMap(data,"u0002", "u0001")); System.out.println(testDataToMap(data1,"&", "=")); String str="k1:,k2:V2"; System.out.println(testDataToMap(str, ",", ":")); } }
结果:
"C:Program FilesJavajdk1.8.0_221injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3libidea_rt.jar=65426:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3in" -Dfile.encoding=UTF-8 -classpath C:Usersdefaultuser0AppDataLocalTempclasspath1898420598.jar FlinkDW.Udf.Test
{"client_timestamp":"1587375750618","apply_value":"3","pre_page_id":"DraftActivity","close_value":"0","item_id":"no_apply","start_timestamp":"1587375592167","template_id_auto":"-1","music_name":"少年2","play_cnt":"0","duration_value":"5506","video_id":"1587375633000","is_story":"false"}
{"name":"null","certificatetype":"0","certificateno":"220182","depositacct":"622848"}
{"k1":"null","k2":"V2"}
Process finished with exit code 0
7、json转map
package Udf; import com.alibaba.fastjson.JSONObject; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; import java.util.Map; //方法1:json转成map public class JsonToMap extends ScalarFunction { private static final long serialVersionUID=1L; public JsonToMap() { } @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public Map eval(String str) { try { if(str !=null) { JSONObject jsonObject = JSONObject.parseObject(str); Map<String,Object> map = jsonObject; return map; } else { return null; } } catch(Exception ex) { ex.printStackTrace(); return null; } } }
注意:FastJson的json转map的6种方式
package Udf; import java.util.Map; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; public class Test1 { public static void main(String[] args){ String str = "{"0":"zhangsan","1":"lisi","2":"wangwu","3":"maliu"}"; //第一种方式 Map maps = (Map) JSON.parse(str); System.out.println("这个是用JSON类来解析JSON字符串!!!"); for (Object map : maps.entrySet()){ System.out.println(((Map.Entry)map).getKey()+" " + ((Map.Entry)map).getValue()); } //第二种方式 Map mapTypes = JSON.parseObject(str); System.out.println("这个是用JSON类的parseObject来解析JSON字符串!!!"); for (Object obj : mapTypes.keySet()){ System.out.println("key为:"+obj+"值为:"+mapTypes.get(obj)); } //第三种方式 Map mapType = JSON.parseObject(str,Map.class); System.out.println("这个是用JSON类,指定解析类型,来解析JSON字符串!!!"); for (Object obj : mapType.keySet()){ System.out.println("key为:"+obj+"值为:"+mapType.get(obj)); } //第四种方式 /** * JSONObject是Map接口的一个实现类 */ Map json = (Map) JSONObject.parse(str); System.out.println("这个是用JSONObject类的parse方法来解析JSON字符串!!!"); for (Object map : json.entrySet()){ System.out.println(((Map.Entry)map).getKey()+" "+((Map.Entry)map).getValue()); } //第五种方式 /** * JSONObject是Map接口的一个实现类 */ JSONObject jsonObject = JSONObject.parseObject(str); System.out.println("这个是用JSONObject的parseObject方法来解析JSON字符串!!!"); for (Object map : json.entrySet()){ System.out.println(((Map.Entry)map).getKey()+" "+((Map.Entry)map).getValue()); } //第六种方式 /** * JSONObject是Map接口的一个实现类 */ Map mapObj = JSONObject.parseObject(str,Map.class); System.out.println("这个是用JSONObject的parseObject方法并执行返回类型来解析JSON字符串!!!"); for (Object map: json.entrySet()){ System.out.println(((Map.Entry)map).getKey()+" "+((Map.Entry)map).getValue()); } String strArr = "{{"0":"zhangsan","1":"lisi","2":"wangwu","3":"maliu"}," + "{"00":"zhangsan","11":"lisi","22":"wangwu","33":"maliu"}}"; // JSONArray.parse() System.out.println(json); } }
结果:
"C:Program FilesJavajdk1.8.0_221injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3libidea_rt.jar=55064:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3in" -Dfile.encoding=UTF-8 -classpath "C:Program FilesJavajdk1.8.0_221jrelibcharsets.jar;C:Program FilesJavajdk1.8.0_221jrelibdeploy.jar;C:Program FilesJavajdk1.8.0_221jrelibextaccess-bridge-64.jar;C:Program FilesJavajdk1.8.0_221jrelibextcldrdata.jar;C:Program FilesJavajdk1.8.0_221jrelibextdnsns.jar;C:Program FilesJavajdk1.8.0_221jrelibextjaccess.jar;C:Program FilesJavajdk1.8.0_221jrelibextjfxrt.jar;C:Program FilesJavajdk1.8.0_221jrelibextlocaledata.jar;C:Program FilesJavajdk1.8.0_221jrelibext ashorn.jar;C:Program FilesJavajdk1.8.0_221jrelibextsunec.jar;C:Program FilesJavajdk1.8.0_221jrelibextsunjce_provider.jar;C:Program FilesJavajdk1.8.0_221jrelibextsunmscapi.jar;C:Program FilesJavajdk1.8.0_221jrelibextsunpkcs11.jar;C:Program FilesJavajdk1.8.0_221jrelibextzipfs.jar;C:Program FilesJavajdk1.8.0_221jrelibjavaws.jar;C:Program FilesJavajdk1.8.0_221jrelibjce.jar;C:Program FilesJavajdk1.8.0_221jrelibjfr.jar;C:Program FilesJavajdk1.8.0_221jrelibjfxswt.jar;C:Program FilesJavajdk1.8.0_221jrelibjsse.jar;C:Program FilesJavajdk1.8.0_221jrelibmanagement-agent.jar;C:Program FilesJavajdk1.8.0_221jrelibplugin.jar;C:Program FilesJavajdk1.8.0_221jrelib esources.jar;C:Program FilesJavajdk1.8.0_221jrelib t.jar;C:appFlinkUdf arget est-classes;C:Usersdefaultuser0.m2 epositoryjunitjunit4.11junit-4.11.jar;C:Usersdefaultuser0.m2 epositoryorghamcresthamcrest-core1.3hamcrest-core-1.3.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-table-api-java-bridge_2.111.11.1flink-table-api-java-bridge_2.11-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-table-api-java1.11.1flink-table-api-java-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-java1.11.1flink-java-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgapachecommonscommons-lang33.3.2commons-lang3-3.3.2.jar;C:Usersdefaultuser0.m2 epositoryorgapachecommonscommons-math33.5commons-math3-3.5.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-streaming-java_2.111.11.1flink-streaming-java_2.11-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-runtime_2.111.11.1flink-runtime_2.11-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-queryable-state-client-java1.11.1flink-queryable-state-client-java-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-hadoop-fs1.11.1flink-hadoop-fs-1.11.1.jar;C:Usersdefaultuser0.m2 epositorycommons-iocommons-io2.4commons-io-2.4.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-shaded-netty4.1.39.Final-11.0flink-shaded-netty-4.1.39.Final-11.0.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-shaded-jackson2.10.1-11.0flink-shaded-jackson-2.10.1-11.0.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-shaded-zookeeper-33.4.14-11.0flink-shaded-zookeeper-3-3.4.14-11.0.jar;C:Usersdefaultuser0.m2 epositorycommons-clicommons-cli1.3.1commons-cli-1.3.1.jar;C:Usersdefaultuser0.m2 epositorycom ypesafeakkaakka-actor_2.112.5.21akka-actor_2.11-2.5.21.jar;C:Usersdefaultuser0.m2 epositorycom ypesafeconfig1.3.3config-1.3.3.jar;C:Usersdefaultuser0.m2 epositoryorgscala-langmodulesscala-java8-compat_2.11 .7.0scala-java8-compat_2.11-0.7.0.jar;C:Usersdefaultuser0.m2 epositorycom ypesafeakkaakka-stream_2.112.5.21akka-stream_2.11-2.5.21.jar;C:Usersdefaultuser0.m2 epositoryorg eactivestreams eactive-streams1.0.2 eactive-streams-1.0.2.jar;C:Usersdefaultuser0.m2 epositorycom ypesafessl-config-core_2.11 .3.7ssl-config-core_2.11-0.3.7.jar;C:Usersdefaultuser0.m2 epositorycom ypesafeakkaakka-protobuf_2.112.5.21akka-protobuf_2.11-2.5.21.jar;C:Usersdefaultuser0.m2 epositorycom ypesafeakkaakka-slf4j_2.112.5.21akka-slf4j_2.11-2.5.21.jar;C:Usersdefaultuser0.m2 epositoryorgclappergrizzled-slf4j_2.111.3.2grizzled-slf4j_2.11-1.3.2.jar;C:Usersdefaultuser0.m2 epositorycomgithubscoptscopt_2.113.5.0scopt_2.11-3.5.0.jar;C:Usersdefaultuser0.m2 epositoryorgxerialsnappysnappy-java1.1.4snappy-java-1.1.4.jar;C:Usersdefaultuser0.m2 epositorycom witterchill_2.11 .7.6chill_2.11-0.7.6.jar;C:Usersdefaultuser0.m2 epositorycom witterchill-java .7.6chill-java-0.7.6.jar;C:Usersdefaultuser0.m2 epositoryorglz4lz4-java1.6.0lz4-java-1.6.0.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-shaded-guava18.0-11.0flink-shaded-guava-18.0-11.0.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-streaming-java_2.111.11.1flink-streaming-java_2.11-1.11.1-tests.jar;C:Usersdefaultuser0.m2 epositoryorgslf4jslf4j-api1.7.15slf4j-api-1.7.15.jar;C:Usersdefaultuser0.m2 epositorycomgooglecodefindbugsjsr3051.3.9jsr305-1.3.9.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkforce-shading1.11.1force-shading-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-table-planner-blink_2.111.11.1flink-table-planner-blink_2.11-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-table-api-scala_2.111.11.1flink-table-api-scala_2.11-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-table-api-scala-bridge_2.111.11.1flink-table-api-scala-bridge_2.11-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-table-runtime-blink_2.111.11.1flink-table-runtime-blink_2.11-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgcodehausjaninojanino3.0.9janino-3.0.9.jar;C:Usersdefaultuser0.m2 epositoryorgcodehausjaninocommons-compiler3.0.9commons-compiler-3.0.9.jar;C:Usersdefaultuser0.m2 epositoryorgapachecalciteavaticaavatica-core1.16.0avatica-core-1.16.0.jar;C:Usersdefaultuser0.m2 epositoryorg eflections eflections .9.10 eflections-0.9.10.jar;C:Usersdefaultuser0.m2 epositoryorgjavassistjavassist3.19.0-GAjavassist-3.19.0-GA.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-streaming-scala_2.111.11.1flink-streaming-scala_2.11-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-scala_2.111.11.1flink-scala_2.11-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgscala-langscala-reflect2.11.12scala-reflect-2.11.12.jar;C:Usersdefaultuser0.m2 epositoryorgscala-langscala-library2.11.12scala-library-2.11.12.jar;C:Usersdefaultuser0.m2 epositoryorgscala-langscala-compiler2.11.12scala-compiler-2.11.12.jar;C:Usersdefaultuser0.m2 epositoryorgscala-langmodulesscala-xml_2.111.0.5scala-xml_2.11-1.0.5.jar;C:Usersdefaultuser0.m2 epositoryorgscala-langmodulesscala-parser-combinators_2.111.0.4scala-parser-combinators_2.11-1.0.4.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-table-common1.11.1flink-table-common-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-core1.11.1flink-core-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-annotations1.11.1flink-annotations-1.11.1.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-metrics-core1.11.1flink-metrics-core-1.11.1.jar;C:Usersdefaultuser0.m2 epositorycomesotericsoftwarekryokryo2.24.0kryo-2.24.0.jar;C:Usersdefaultuser0.m2 epositorycomesotericsoftwareminlogminlog1.2minlog-1.2.jar;C:Usersdefaultuser0.m2 epositoryorgobjenesisobjenesis2.1objenesis-2.1.jar;C:Usersdefaultuser0.m2 epositoryorgapachecommonscommons-compress1.20commons-compress-1.20.jar;C:Usersdefaultuser0.m2 epositoryorgapacheflinkflink-shaded-asm-77.1-11.0flink-shaded-asm-7-7.1-11.0.jar;C:Usersdefaultuser0.m2 epositorycomalibabafastjson1.2.73fastjson-1.2.73.jar;C:Usersdefaultuser0.m2 epositoryorgslf4jslf4j-log4j121.7.7slf4j-log4j12-1.7.7.jar;C:Usersdefaultuser0.m2 epositorylog4jlog4j1.2.17log4j-1.2.17.jar;C:Usersdefaultuser0.m2 epository etsfjson-libjson-lib2.4json-lib-2.4-jdk15.jar;C:Usersdefaultuser0.m2 epositorycommons-beanutilscommons-beanutils1.8.0commons-beanutils-1.8.0.jar;C:Usersdefaultuser0.m2 epositorycommons-collectionscommons-collections3.2.1commons-collections-3.2.1.jar;C:Usersdefaultuser0.m2 epositorycommons-langcommons-lang2.5commons-lang-2.5.jar;C:Usersdefaultuser0.m2 epositorycommons-loggingcommons-logging1.1.1commons-logging-1.1.1.jar;C:Usersdefaultuser0.m2 epository etsfezmorphezmorph1.0.6ezmorph-1.0.6.jar" Udf.Test1 这个是用JSON类来解析JSON字符串!!! 0 zhangsan 1 lisi 2 wangwu 3 maliu 这个是用JSON类的parseObject来解析JSON字符串!!! key为:0值为:zhangsan key为:1值为:lisi key为:2值为:wangwu key为:3值为:maliu 这个是用JSON类,指定解析类型,来解析JSON字符串!!! key为:0值为:zhangsan key为:1值为:lisi key为:2值为:wangwu key为:3值为:maliu 这个是用JSONObject类的parse方法来解析JSON字符串!!! 0 zhangsan 1 lisi 2 wangwu 3 maliu 这个是用JSONObject的parseObject方法来解析JSON字符串!!! 0 zhangsan 1 lisi 2 wangwu 3 maliu 这个是用JSONObject的parseObject方法并执行返回类型来解析JSON字符串!!! 0 zhangsan 1 lisi 2 wangwu 3 maliu {"0":"zhangsan","1":"lisi","2":"wangwu","3":"maliu"} Process finished with exit code 0
8、数组转字符串数组
package Udf; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; import java.util.Arrays; //方法4:数组转成数组字符串 public class ArrToString3 extends ScalarFunction { private static final long serialVersionUID=1L; public ArrToString3() { } @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(String[] arr) { try { if(arr !=null && arr.length>0) { return Arrays.toString(arr); } else { return null; } } catch(Exception ex) { return ex.getMessage(); } } }
9、数组转json
package Udf; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; import org.json.JSONArray; //方法1:数组转成json数组 public class ArrToJson extends ScalarFunction { private static final long serialVersionUID = 1L; @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public ArrToJson() { } public String eval(String[] arr) { try { if (arr != null && arr.length > 0) { JSONArray jsonArray=new JSONArray(arr); return jsonArray.toString(); } else { return null; } } catch (Exception ex) { return ex.getMessage(); } } }
10、LogMapInfo数据格式处理(logmap字段解析udf)
1、模型包
1.CellInfo
package Test.Model; /** * @program: FlinkUdf * @description: CellInfo * @author: BigData * @create: 2020-11-16 14:50 **/ public class CellInfo { private String mcc; private String mnc; private String ci; private String pci; private String tac; private String type; public String getMcc() { return mcc; } public void setMcc(String mcc) { this.mcc = mcc; } public String getMnc() { return mnc; } public void setMnc(String mnc) { this.mnc = mnc; } public String getCi() { return ci; } public void setCi(String ci) { this.ci = ci; } public String getPci() { return pci; } public void setPci(String pci) { this.pci = pci; } public String getTac() { return tac; } public void setTac(String tac) { this.tac = tac; } public String getType() { return type; } public void setType(String type) { this.type = type; } }
2.GPS
package Test.Model; /** * @program: FlinkUdf * @description: Gps * @author: BigData * @create: 2020-11-16 14:51 **/ public class Gps { private String longitude; private String latitude; public String getLongitude() { return longitude; } public void setLongitude(String longitude) { this.longitude = longitude; } public String getLatitude() { return latitude; } public void setLatitude(String latitude) { this.latitude = latitude; } }
package Test.Model; /** * @program: FlinkUdf * @description: Gps * @author: BigData * @create: 2020-11-16 14:51 **/ public class Gps { private String longitude; private String latitude; public String getLongitude() { return longitude; } public void setLongitude(String longitude) { this.longitude = longitude; } public String getLatitude() { return latitude; } public void setLatitude(String latitude) { this.latitude = latitude; } }
3.StationInfo
package Test.Model; import java.util.List; /** * @program: FlinkUdf * @description: 地铁模型 * @author: BigData * @create: 2020-11-16 14:49 **/ public final class StationInfo { private String station; private CellInfo cell_info; private List<String> wifi_infos; private String timestamp; private String reason; private String imei; private String guid; private Gps gps; private String city; private String group; private String version_name; private String version_code; public String getStation() { return station; } public void setStation(String station) { this.station = station; } public CellInfo getCell_info() { return cell_info; } public void setCell_info(CellInfo cell_info) { this.cell_info = cell_info; } public List<String> getWifi_infos() { return wifi_infos; } public void setWifi_infos(List<String> wifi_infos) { this.wifi_infos = wifi_infos; } public String getTimestamp() { return timestamp; } public void setTimestamp(String timestamp) { this.timestamp = timestamp; } public String getReason() { return reason; } public void setReason(String reason) { this.reason = reason; } public String getImei() { return imei; } public void setImei(String imei) { this.imei = imei; } public String getGuid() { return guid; } public void setGuid(String guid) { this.guid = guid; } public Gps getGps() { return gps; } public void setGps(Gps gps) { this.gps = gps; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getGroup() { return group; } public void setGroup(String group) { this.group = group; } public String getVersion_name() { return version_name; } public void setVersion_name(String version_name) { this.version_name = version_name; } public String getVersion_code() { return version_code; } public void setVersion_code(String version_code) { this.version_code = version_code; } }
2、工具包
1、LogMapInfoSplit(logmap数据格式解析)
package Test.Util; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.util.Base64; import java.util.HashMap; import java.util.Map; import Test.Model.StationInfo; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.gson.Gson; /** * @program: FlinkUdf * @description: LogMap字段解析 * @author: BigData * @create: 2020-11-16 14:53 **/ public class LogMapInfoSplit extends TableFunction<Row> { private static final long serialVersionUID = 5485869053798534732L; private static Gson gson = new Gson(); private static Logger logger= LoggerFactory.getLogger(LogMapInfoSplit.class); /** * 接受参数格式转换 * @param param */ public void eval(String param) { if(StringUtils.isBlank(param)) { logger.debug("param is empty"); return; } try { final Base64.Decoder decoder = Base64.getUrlDecoder(); String ocaUploadJson = new String( decoder.decode(param.replace(" ", "+").replace("$", "=").getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8); logger.info("sourceParam :" + ocaUploadJson); if (StringUtils.isBlank(ocaUploadJson)) { return; } StationInfo stationInfo = gson.fromJson(ocaUploadJson, StationInfo.class); if (stationInfo == null) return; logger.info("received wifiInfo:" + stationInfo.toString()); String[] wifiInfos = null; if (stationInfo.getWifi_infos() != null && stationInfo.getWifi_infos().size() != 0) { wifiInfos = stationInfo.getWifi_infos().toArray(new String[stationInfo.getWifi_infos().size()]); } Row row = new Row(13); row.setField(0, stationInfo.getStation()); row.setField(1, stationInfo.getImei()); row.setField(2, objectToMap(stationInfo.getCell_info())); row.setField(3, wifiInfos); row.setField(4, stationInfo.getCity()); row.setField(5, stationInfo.getReason()); logger.info("station is:" + stationInfo.getStation()); Map<String, String> objMap = objectToMap(stationInfo.getGps()); logger.info("obj map is:" + objMap); row.setField(6, objMap); row.setField(7, new Timestamp(Long.parseLong(stationInfo.getTimestamp()))); row.setField(8, stationInfo.getGroup()); row.setField(9, DateFormatUtils.ISO_DATE_FORMAT.format(Long.parseLong(stationInfo.getTimestamp()))); row.setField(10,stationInfo.getGuid()); row.setField(11,stationInfo.getVersion_name()); row.setField(12,stationInfo.getVersion_code()); collect(row); } catch (Exception e) { logger.error(e.getMessage(), e); } } /** * 指定返回类型 */ @Override public TypeInformation<Row> getResultType() { return Types.ROW(Types.STRING, Types.STRING, Types.MAP(Types.STRING, Types.STRING) , Types.OBJECT_ARRAY(Types.STRING), Types.STRING, Types.STRING, Types.MAP(Types.STRING, Types.STRING), Types.SQL_TIMESTAMP, Types.STRING, Types.STRING,Types.STRING,Types.STRING,Types.STRING); } /** * object转换为Map * @param obj * @return */ public static Map<String, String> objectToMap(Object obj) { if(obj==null) { return null; } else { Map<String, String> map = new HashMap<String, String>(); Class<?> clazz = obj.getClass(); for (Field field : clazz.getDeclaredFields()) { field.setAccessible(true); String fieldName = field.getName(); try { map.put(fieldName, String.valueOf(field.get(obj))); } catch (Exception e) { logger.error(e.getMessage()); } } return map; } } }
flinksql代码:
INSERT INTO software_research.subwayaware_info SELECT station, logMap_imei as `imei`, cell_info, wifi_infos, city, reason, gps, CAST(createtime as VARCHAR) as `timestamp`, logMap_group as `group`, datestr, model, event_id, guid, version_name, version_code FROM software_research.subwayaware_dcs LEFT JOIN LATERAL TABLE(logMapInfoSplit(log_map ['oca_upload_json'])) AS T( station, logMap_imei, cell_info, wifi_infos, city, reason, gps, createtime, logMap_group, datestr, guid, version_name, version_code ) ON TRUE