实时数仓(二):DWD层-数据处理
1.数据源
dwd的数据来自Kafka的ods层原始数据:业务数据(ods_base_db) 、日志数据(ods_base_log)
从Kafka的ODS层读取用户行为日志以及业务数据,并进行简单处理,写回到Kafka作为DWD层。
2.用户行为日志
2.1开发环境搭建
1)包结构
2)pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>03_gmall2021</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.12.0</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
</properties>
<dependencies>
<!-- flink Web UI -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!--如果保存检查点到hdfs上,需要引入此依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<!--lomback插件依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--commons-beanutils是Apache开源组织提供的用于操作JAVA BEAN的工具包。
使用commons-beanutils,我们可以很方便的对bean对象的属性进行操作-->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<!--Guava工程包含了若干被Google的Java项目广泛依赖的核心库,方便开发-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.janeluo</groupId>
<artifactId>ikanalyzer</artifactId>
<version>2012_u6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
3)MykafkaUtil.java
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class MyKafkaUtil {
private static Properties properties = new Properties();
private static String DEFAULT_TOPIC = "dwd_default_topic";
static {
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
}
/**
* todo kafka sink:自定义序列化,各种类型自定义传输
*
* @return
*/
public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
Properties props = new Properties();
//kafka地址
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
//生产数据超时时间
props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 60 * 1000 + "");
return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
/**
* 获取生产者对象 ,只能传输String类型
*
* @param topic 主题
*/
public static FlinkKafkaProducer<String> getFlinkKafkaProducer(String topic) {
return new FlinkKafkaProducer<String>(topic,
new SimpleStringSchema(),
properties);
}
/**
* todo 构建消费者 -> 优化
*
* @param bootstrapServers:kafka地址
* @param topic :topic可以用逗号分隔
* @param groupId:消费者组
* @param isSecurity:是否kafka设置sasl
* @param offsetStrategy:消费策略:3种
* @return
*/
public static FlinkKafkaConsumer<String> getKafkaConsumer(String bootstrapServers, String topic, String groupId, String isSecurity, String offsetStrategy) {
SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
Properties props = new Properties();
props.setProperty("bootstrap.servers", bootstrapServers);
props.setProperty("group.id", groupId);
props.setProperty("flink.partition-discovery.interval-millis", "60000");
//kafka开启sasl认证
if ("true".equalsIgnoreCase(isSecurity)) {
props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";");
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "PLAIN");
}
//消费多个topic
String[] split = topic.split(",");
List<String> topics = Arrays.asList(split);
//kafka消费者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topics, simpleStringSchema, props);
//消费方式:earliest,latest,setStartFromTimestamp
switch (offsetStrategy) {
case "earliest":
consumer.setStartFromEarliest();
return consumer;
case "latest":
consumer.setStartFromLatest();
return consumer;
default:
consumer.setStartFromTimestamp(System.currentTimeMillis() - Integer.valueOf(offsetStrategy) * 60 * 1000);
return consumer;
}
}
/**
* 获取消费者对象
*
* @param topic 主题
* @param groupId 消费者组
*/
public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topic, String groupId) {
//添加消费组属性
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new FlinkKafkaConsumer<String>(topic,
new SimpleStringSchema(),
properties);
}
//拼接Kafka相关属性到DDL
public static String getKafkaDDL(String topic, String groupId) {
return "'connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = 'hadoop102:9092', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'latest-offset'";
}
}
4)log4j.properties
log4j.rootLogger=info,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
2.2 实现功能
我们前面采集的日志数据已经保存到Kafka中,作为日志数据的ODS层,从Kafka的ODS层读取的日志数据分为3类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回Kafka不同主题中,作为日志DWD层。
页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流
- 1)从kafka读取ods数据
- 2)判断新老用户
- 3)分流
- 4)写回到kafka的dwd层
1)代码实现
package com.flink.realtime.app.dwd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @description: todo->准备用户行为日志dwd层
* @author: HaoWu
* @create: 2021年06月22日
*/
public class BaseLogApp {
public static void main(String[] args) throws Exception {
// TODO 1.获取执行环境
// 1.1 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.2 并行度设置为Kafka的分区数
env.setParallelism(4);
/*
// 1.3 设置checkpoint
env.enableCheckpointing(5000L); //每5000ms做一次ck
env.getCheckpointConfig().setCheckpointTimeout(60000L); // ck超时时间:1min
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //ck模式,默认:exactly_once
//正常Cancel任务时,保留最后一次CK
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//状态后端:
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/checkpoint/base_log_app"));
// 访问hdfs访问权限问题
// 报错异常:Permission denied: user=haowu, access=WRITE, inode="/":atguigu:supergroup:drwxr-xr-x
// 解决:/根目录没有写权限 解决方案1.hadoop fs -chown 777 / 2.System.setProperty("HADOOP_USER_NAME", "atguigu");
System.setProperty("HADOOP_USER_NAME", "atguigu");
*/
//TODO 2.获取kafka ods_base_log 主题数据
String sourceTopic = "ods_base_log";
String groupId = "base_log_app_group";
//FlinkKafkaConsumer<String> consumer = MyKafkaUtil.getFlinkKafkaConsumer(sourceTopic, groupId);
FlinkKafkaConsumer<String> consumer = MyKafkaUtil.getKafkaConsumer("hadoop102:9092", sourceTopic, groupId, "false", "earliest");
DataStreamSource<String> kafkaDS = env.addSource(consumer);
//TODO 3.将每行数据转换为JSONObject
// 处理脏数据
OutputTag<String> dirty = new OutputTag<String>("DirtyData") {};
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
@Override
public void processElement(String value, Context context, Collector<JSONObject> collector) throws Exception {
try {
JSONObject jsonObject = JSON.parseObject(value);
//转JSON对象
collector.collect(jsonObject);
} catch (Exception e) {
//JSON解析异常输出脏数据
context.output(dirty, value);
}
}
});
//jsonObjDS.print("json>>>>>>>>");
//TODO 4.按照设备ID分组、使用状态编程做新老用户校验
//4.1 根据mid对日志进行分组
SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlag = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"))
.process(new KeyedProcessFunction<String, JSONObject, JSONObject>() {
//声明状态用于表示当前Mid是否已经访问过
private ValueState<String> firstVisitDateState;
private SimpleDateFormat simpleDateFormat;
//初始化状态
@Override
public void open(Configuration parameters) throws Exception {
firstVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("new-mid", String.class));
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
}
@Override
public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {
//取出新用户标记 :is_new:1->新用户 ,0->老用户
String isNew = value.getJSONObject("common").getString("is_new");
//如果当前前端传输数据表示为新用户,则进行校验
if ("1".equals(isNew)) {
//取出状态数据并取出当前访问时间
String firstDate = firstVisitDateState.value();
Long ts = value.getLong("ts");
//判断状态数据是否为Null
if (firstDate != null) {
//修复
value.getJSONObject("common").put("is_new", "0");
} else {
//更新状态
firstVisitDateState.update(simpleDateFormat.format(ts));
}
}
out.collect(value);
}
});
//测试打印
//jsonObjWithNewFlag.print();
//TODO 5.使用侧输出流将 启动、曝光、页面数据分流
OutputTag<String> startOutPutTag = new OutputTag<String>("start"){}; //启动
OutputTag<String> displayOutputTag = new OutputTag<String>("display"){}; //曝光
SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlag.process(new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
//判断是否为启动数据
String start = value.getString("start");
if (start != null && start.length() > 0) {
//启动数据
ctx.output(startOutPutTag, value.toJSONString());
} else {
//不是启动数据一定是页面数据
out.collect(value.toJSONString());
//抽取公共字段、页面信息、时间戳
JSONObject common = value.getJSONObject("common");
JSONObject page = value.getJSONObject("page");
Long ts = value.getLong("ts");
//获取曝光数据
JSONArray displayArr = value.getJSONArray("displays");
if (displayArr != null && displayArr.size() > 0) {
JSONObject displayObj = new JSONObject();
displayObj.put("common", common);
displayObj.put("page", page);
displayObj.put("ts", ts);
//遍历曝光信息
for (Object display : displayArr) {
displayObj.put("display", display);
//输出曝光数据到侧输出流
ctx.output(displayOutputTag, displayObj.toJSONString());
}
}
}
}
});
//TODO 6.将三个流的数据分别写入Kafka
//打印
jsonObjDS.getSideOutput(dirty).print("Dirty>>>>>>>>>>>");
//主流:页面
pageDS.print("Page>>>>>>>>>>>");
//侧流:启动
pageDS.getSideOutput(startOutPutTag).print("Start>>>>>>>>>>>>");
//侧流:曝光
pageDS.getSideOutput(displayOutputTag).print("Display>>>>>>>>>>>>>");
//输出到kafka
String pageSinkTopic = "dwd_page_log";
String startSinkTopic = "dwd_start_log";
String displaySinkTopic = "dwd_display_log";
pageDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(pageSinkTopic));
pageDS.getSideOutput(startOutPutTag).addSink(MyKafkaUtil.getFlinkKafkaProducer(startSinkTopic));
pageDS.getSideOutput(displayOutputTag).addSink(MyKafkaUtil.getFlinkKafkaProducer(displaySinkTopic));
env.execute();
}
}
2)部署运行
BaseLogApp.sh
#!/bin/bash
source ~/.bashrc
cd $(dirname $0)
day=$(date +%Y%m%d%H%M)
#flink
job_name=02_dwd_BaseLogApp
clazz=com.flink.realtime.app.dwd.BaseLogApp
jar_path=/opt/module/gmall-flink/03_gmall2021-1.0-SNAPSHOT-jar-with-dependencies.jar
#-----------------------run----------------------------------------------
#yarn模式:per-job
/opt/module/flink-1.12.0/bin/flink run
-t yarn-per-job
-Dyarn.application.name=${job_name}
-Dyarn.application.queue=default
-Djobmanager.memory.process.size=1024m
-Dtaskmanager.memory.process.size=1024m
-Dtaskmanager.numberOfTaskSlots=2
-c ${clazz} ${jar_path}
3.业务数据
3.1 实现功能
业务数据的变化,我们可以通过FlinkCDC采集到,但是FlinkCDC是把全部数据统一写入一个Topic中, 这些数据包括事实数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从Kafka的业务数据ODS层读取数据,经过处理后,将维度数据保存到HBase,将事实数据写回Kafka作为业务数据的DWD层。
3.2 动态分流
由于FlinkCDC是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。
这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这种可以有两个方案实现
-
一种是用Zookeeper存储,通过Watch感知数据变化。
-
另一种是用mysql数据库存储,周期性的同步。
这里选择第二种方案,主要是MySQL对于配置数据初始化和维护管理,使用FlinkCDC读取配置信息表,将配置流作为广播流与主流进行连接。
1)建配置表:create.sql
--配置表
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表',
`operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',
`sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
品牌维表
CREATE TABLE `base_trademark` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`tm_name` varchar(100) NOT NULL COMMENT '属性值',
`logo_url` varchar(200) DEFAULT NULL COMMENT '品牌logo的图片路径',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='品牌表'
--品牌表:base_trademark表 insert 操作,插入hbase的dim_base_trademark只要 `id`,`name` 字段, `id`作为主键
insert into table_process values ('base_trademark','insert','hbase','dim_base_trademark','id,name','id','');
mysql> select * from table_process;
+----------------+--------------+-----------+--------------------+--------------+---------+-------------+
| source_table | operate_type | sink_type | sink_table | sink_columns | sink_pk | sink_extend |
+----------------+--------------+-----------+--------------------+--------------+---------+-------------+
| base_trademark | insert | hbase | dim_base_trademark | id,name | id | |
+----------------+--------------+-----------+--------------------+--------------+---------+-------------+
1 row in set (0.00 sec)
2)配置类:TableProcess.java
import lombok.Data;
/**
* @description: TODO 配置表实体类
* @author: HaoWu
* @create: 2021年06月25日
*/
@Data
public class TableProcess {
//动态分流Sink常量
public static final String SINK_TYPE_HBASE = "hbase";
public static final String SINK_TYPE_KAFKA = "kafka";
public static final String SINK_TYPE_CK = "clickhouse";
//来源表
String sourceTable;
//操作类型 insert,update,delete
String operateType;
//输出类型 hbase kafka
String sinkType;
//输出表(主题)
String sinkTable;
//输出字段
String sinkColumns;
//主键字段
String sinkPk;
//建表扩展
String sinkExtend;
}
3)MysqlUtil.java
import com.flink.realtime.bean.TableProcess;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.base.CaseFormat;
import java.lang.reflect.InvocationTargetException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
/**
* @description: TODO Mysql工具类
* @author: HaoWu
* @create: 2021年07月23日
* 完成ORM,对象关系映射
* O:Object对象 Java中对象
* R:Relation关系 关系型数据库
* M:Mapping映射 将Java中的对象和关系型数据库的表中的记录建立起映射关系
*/
public class MysqlUtil {
/**
* @param sql 执行sql语句
* @param clazz 封装bean类型
* @param underScoreToCamel 是否列名转驼峰命名
* @param <T>
* @return
*/
public static <T> List<T> queryList(String sql, Class<T> clazz, Boolean underScoreToCamel) {
Connection con = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
// 注册驱动
Class.forName("com.mysql.jdbc.Driver");
// 获取连接
con = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall-realtime?characterEncoding=utf-8&useSSL=false", "root", "root");
// 获取数据库操作对象
ps = con.prepareStatement(sql);
// 执行sql
rs = ps.executeQuery();
// 处理结果集,封装list对象
ResultSetMetaData metaData = rs.getMetaData(); //获取结果集元数据
ArrayList<T> resultList = new ArrayList<>();
while (rs.next()) {
// 将单条记录封装对象
T obj = clazz.newInstance();
// 遍历所有列,转驼峰,对象属性赋值
for (int i = 1; i < metaData.getColumnCount(); i++) {
String columnName = metaData.getColumnName(i);
String propertyName = "";
if (underScoreToCamel) {
// 通过guava工具类,将表中的列转换为类属性的驼峰命名
propertyName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, columnName);
}
// 给属性赋值
BeanUtils.setProperty(obj,propertyName,rs.getObject(i));
}
resultList.add(obj);
}
return resultList;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("从Mysql查询数据失败");
} finally {
// 释放资源
if (rs != null) {
try {
rs.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
if (ps != null) {
try {
ps.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
if (con != null) {
try {
con.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
}
public static void main(String[] args) throws InvocationTargetException, IllegalAccessException {
String sql="select * from table_process";
List<TableProcess> list = MysqlUtil.queryList(sql, TableProcess.class, true);
System.out.println(list);
TableProcess tableProcess = new TableProcess();
BeanUtils.setProperty(tableProcess,"sourceTable","redis");
System.out.println(tableProcess);
}
}
4)常量类:GmallConfig.java
package com.flink.realtime.common;
/**
* @description: TODO 常量配置类
* @author: HaoWu
* @create: 2021年06月25日
*/
public class GmallConfig {
//Phoenix库名
public static final String HBASE_SCHEMA = "bigdata";
//Phoenix驱动
public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
//Phoenix连接参数
public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";
public static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
public static final String CLICKHOUSE_URL = "jdbc:clickhouse://hadoop102:8123/default";
}
5)主程序:BaseDBApp.java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.app.func.DimSink;
import com.flink.realtime.app.func.TableProcessFunction;
import com.flink.realtime.bean.TableProcess;
import com.flink.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
/**
* @description: todo->准备业务数据dwd层
* @author: HaoWu
* @create: 2021年06月22日
*/
public class BaseDbApp {
public static void main(String[] args) throws Exception {
// TODO 1.创建执行环境
// 1.1 创建stream执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.2 设置并行度
env.setParallelism(1);
/*
// 1.3 设置checkpoint参数
env.enableCheckpointing(5000L); //每5000ms做一次ck
env.getCheckpointConfig().setCheckpointTimeout(60000L); // ck超时时间:1min
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //ck模式,默认:exactly_once
//正常Cancel任务时,保留最后一次CK
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//状态后端:
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/checkpoint/base_db_app"));
// 访问hdfs访问权限问题
// 报错异常:Permission denied: user=haowu, access=WRITE, inode="/":atguigu:supergroup:drwxr-xr-x
// 解决:/根目录没有写权限 解决方案1.hadoop fs -chown 777 / 2.System.setProperty("HADOOP_USER_NAME", "atguigu");
System.setProperty("HADOOP_USER_NAME", "atguigu");
*/
// TODO 2.获取kafka的ods层业务数据:ods_basic_db
String ods_db_topic = "ods_base_db";
FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer("hadoop102:9092", ods_db_topic, "ods_base_db_consumer1", "false", "latest");
DataStreamSource<String> jsonStrDS = env.addSource(kafkaConsumer);
//jsonStrDS.print();
// TODO 3.对jsonStrDS结构转换
SingleOutputStreamOperator<JSONObject> jsonDS = jsonStrDS.map(jsonStr -> JSON.parseObject(jsonStr));
// TODO 4.对数据ETL
SingleOutputStreamOperator<JSONObject> filterDS = jsonDS.filter(
json -> {
boolean flag = json.getString("table") != null //表名不为null
&& json.getString("data") != null //数据不为null
&& json.getString("data").length() >= 3; //数据长度大于3
return flag;
}
);
//filterDS.print("filterDS>>>>>>>>>>");
// TODO 5. 动态分流:事实表放-主流 -> kafka dwd层 ,维度表-侧输出流 -> hbase
// 5.1 定义输出到Hbase的侧输出流标签
OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>(TableProcess.SINK_TYPE_HBASE) {
};
// 5.2 主流输出到kafka
SingleOutputStreamOperator<JSONObject> kafkaDS = filterDS.process(new TableProcessFunction(hbaseTag));
// 5.3 获取侧输出流到hbase
DataStream<JSONObject> hbaseDS = kafkaDS.getSideOutput(hbaseTag);
kafkaDS.print("实时:kafkaDS>>>>>>>>");
hbaseDS.print("维度:hbaseDS>>>>>>>>");
// TODO 6.维度数据保存到Hbase中
hbaseDS.addSink(new DimSink());
// TODO 7.实时数据保存到Kafka中,自定义序列化
FlinkKafkaProducer<JSONObject> kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
@Override
public void open(SerializationSchema.InitializationContext context) throws Exception {
System.out.println("kafka序列化");
}
@Override
public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long aLong) {
String sink_topic = jsonObject.getString("sink_table");
JSONObject data = jsonObject.getJSONObject("data");
return new ProducerRecord<>(sink_topic, data.toString().getBytes());
}
});
kafkaDS.addSink(kafkaSink);
// TODO 8.执行
env.execute();
}
}
6)自定义分流函数:TableProcessFunction.java
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.bean.TableProcess;
import com.flink.realtime.common.GmallConfig;
import com.flink.realtime.utils.MysqlUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.List;
/**
* @description: TODO 业务数据分流自定义Process函数
* @author: HaoWu
* @create: 2021年07月26日
*/
public class TableProcessFunction extends ProcessFunction<JSONObject, JSONObject> {
//维表侧输出标签
private OutputTag<JSONObject> outputTag;
//内存中存储表配置对象{表名,表配置信息}
private Map<String, TableProcess> tableProcessMap = new HashMap<>();
//内存中判断是否已经存在Hbase表
private Set<String> existsTables = new HashSet<>();
//定义Phoenix连接
private Connection connection;
public TableProcessFunction() {
}
public TableProcessFunction(OutputTag<JSONObject> outputTag) {
this.outputTag = outputTag;
}
@Override
public void open(Configuration parameters) throws Exception {
//初始化phoenix连接
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
//初始化配置表信息
initTableProcessMap();
//配置表的信息可能会发生表更,需要开启定时任务从现在起5000ms后,每隔5000ms更新一次
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
initTableProcessMap();
}
}, 5000, 5000);
}
@Override
public void processElement(JSONObject jsonObj, Context ctx, Collector<JSONObject> out) throws Exception {
//表名
String tableName = jsonObj.getString("table");
//操作类型
String type = jsonObj.getString("type");
//注意:问题修复 如果使用maxwell同步历史数据,他的操作类型是bootstrap-insert
if ("bootstrap-insert".equals(type)) {
type = "insert";
jsonObj.put("type", type);
}
if (tableProcessMap != null && tableProcessMap.size() > 0) {
//根据key取出配置信息
String key = tableName + ":" + type;
TableProcess tableProcess = tableProcessMap.get(key);
//判断是否获取到配置对象
if (tableProcess != null) {
//获取sinkTable,指明数据发往何处。 维度数据->hbase , 事实数据->kafka ,给这条数据打上一个标记。
jsonObj.put("sink_table", tableProcess.getSinkTable());
//指定了sinkcolumn,对需要保留的字段进行过滤
String sinkColumns = tableProcess.getSinkColumns();
if (sinkColumns != null && sinkColumns.length() > 0) {
filterColumn(jsonObj.getJSONObject("data"), sinkColumns);
}
} else {
System.out.println("No this Key <<<< " + key + ">>>> in MySQL");
}
//根据sinkType输出到不同的流
if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_HBASE)) {
//sinkType=hbase 输出到侧输出流
ctx.output(outputTag, jsonObj);
} else if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_KAFKA)) {
//sinkType=kafka 输出到主流
out.collect(jsonObj);
}
}
}
/**
* 从mysql查询配置信息,保存到map内存中
*/
private void initTableProcessMap() {
System.out.println("查询配置表信息");
//1.从mysql中查询配置信息
List<TableProcess> tableProcesses = MysqlUtil.queryList("select * from table_process", TableProcess.class, true);
for (TableProcess tableProcess : tableProcesses) {
String sourceTable = tableProcess.getSourceTable(); //源表
String operateType = tableProcess.getOperateType(); //操作类型
String sinkType = tableProcess.getSinkType(); //目标表类型
String sinkTable = tableProcess.getSinkTable(); //目标表名
String sinkPk = tableProcess.getSinkPk(); //目标表主键
String sinkColumns = tableProcess.getSinkColumns(); //目标表字段
String sinkExtend = tableProcess.getSinkExtend(); //扩展字段
//2.将配置信息封装成map集合
tableProcessMap.put(sourceTable + ":" + operateType, tableProcess);
//3.检查是表是否内存中存在
//如果向Hbase保存的表,那么判断内存中set是否存在过。
if ("insert".equals(operateType) && "hbase".equals(sinkType)) {
boolean isExist = existsTables.add(sinkTable);
//4.如果内存中不存在表数据信息,则创建新Hbase表
if (isExist) {
checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
}
}
}
}
/**
* 通过Phoenix创建Hbase表
*
* @param tableName 表名
* @param columns 列属性
* @param pk 主键
* @param extend 扩展字段
*/
private void checkTable(String tableName, String columns, String pk, String extend) {
//主键不存在给默认值
if (pk == null) {
pk = "id";
}
//扩展字段给默认值
if (extend == null) {
extend = "";
}
//拼接sql建表语句
StringBuilder createSql = new StringBuilder("create table if not exists " + GmallConfig.HBASE_SCHEMA + "." + tableName + "(");
//拼接列属性
String[] fieldArr = columns.split(",");
for (int i = 0; i < fieldArr.length; i++) {
String field = fieldArr[i];
//判断是否为主键
if (field.equals(pk)) {
createSql.append(field).append(" varchar primary key ");
} else {
createSql.append("info.").append(field).append(" varchar");
}
//非最后一个字段需要添加逗号
if (i < fieldArr.length - 1) {
createSql.append(",");
}
}
createSql.append(")");
createSql.append(extend);
System.out.println("建表sql:" + createSql);
//通过Phoenix创建hbase表
PreparedStatement ps = null;
try {
ps = connection.prepareStatement(createSql.toString());
ps.execute();
} catch (SQLException throwables) {
throwables.printStackTrace();
throw new RuntimeException("建表失败!!!!!" + tableName);
} finally {
if (ps != null) {
try {
ps.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
}
/**
* 筛选配置表中保留的字段
*
* @param data 每行数据记录
* @param sinkColumns 配置表保留字段
*/
private void filterColumn(JSONObject data, String sinkColumns) {
//需要保留的字段
String[] columns = sinkColumns.split(",");
//数组转集合,判断集合中是否包含某个元素
List<String> columnList = Arrays.asList(columns);
//获取json中封装的键值对,每个键值对封装为一个Entry类型
Set<Map.Entry<String, Object>> entrySet = data.entrySet();
/*for (Map.Entry<String, Object> entry : entrySet) {
if (!columnList.contains(entry.getKey())) {
entrySet.remove();
} 遍历集合删除元素使用迭代器,for循环删除会报错
}*/
Iterator<Map.Entry<String, Object>> iterator = entrySet.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Object> entry = iterator.next();
if (!columnList.contains(entry.getKey())) {
iterator.remove();
}
}
}
}
7)HbaseSink:DimSink.java
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.common.GmallConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Set;
/**
* @description: TODO Hbase sink 通过Phoenix向Hbase表中写数据
* @author: HaoWu
* @create: 2021年07月30日
*/
public class DimSink extends RichSinkFunction<JSONObject> {
//定义Phoenix连接
Connection connection = null;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
/**
* 生成语句提交hbase
*
* @param jsonObj
* @param context
* @throws Exception
*/
@Override
public void invoke(JSONObject jsonObj, Context context) {
String sinkTableName = jsonObj.getString("sink_table");
JSONObject dataObj = jsonObj.getJSONObject("data");
if (dataObj != null && dataObj.size() > 0) {
String upsertSql = genUpdateSql(sinkTableName.toUpperCase(), jsonObj.getJSONObject("data"));
System.out.println(upsertSql);
try {
PreparedStatement ps = connection.prepareStatement(upsertSql);
ps.executeUpdate();
connection.commit();
} catch (SQLException throwables) {
throwables.printStackTrace();
throw new RuntimeException("执行upsert语句失败!!!");
}
}
}
/**
* 生成upsert语句
*
* @param sinkTableName
* @param data
* @return
*/
private String genUpdateSql(String sinkTableName, JSONObject data) {
Set<String> fields = data.keySet();
String upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "." + sinkTableName + " (" + StringUtils.join(fields, ",") + ")";
String valuesSql = " values ('" + StringUtils.join(data.values(), "','") + "')";
return upsertSql + valuesSql;
}
}
8)自定义序列化 kafka sink
/**
* todo kafka sink 自定义序列化,各种类型自定义传输
*
* @return
*/
public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
Properties props = new Properties();
//kafka地址
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
//生产数据超时时间
props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 60 * 1000 + "");
return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
// TODO 7.实时数据保存到Kafka中,自定义序列化
FlinkKafkaProducer<JSONObject> kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
@Override
public void open(SerializationSchema.InitializationContext context) throws Exception {
System.out.println("kafka序列化");
}
@Override
public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long aLong) {
String sink_topic = jsonObject.getString("sink_table");
JSONObject data = jsonObject.getJSONObject("data");
return new ProducerRecord<>(sink_topic, data.toString().getBytes());
}
});
3.4 主程序:流程总结分析
TableProcessFunction是一个自定义算子,主要包括三条时间线任务
-
图中紫线,这个时间线与数据流入无关,只要任务启动就会执行。主要的任务方法是open()这个方法在任务启动时就会执行。他的主要工作就是初始化一些连接,开启周期调度。
-
图中绿线,这个时间线也与数据流入无关,只要周期调度启动,会自动周期性执行。主要的任务是同步配置表(tableProcessMap)。通过在open()方法中加入timer实现。同时还有个附带任务就是如果发现不存在数据表,要根据配置自动创建数据库表。
-
图中黑线,这个时间线就是随着数据的流入持续发生,这部分的任务就是根据同步到内存的tableProcessMap,来为流入的数据进行标识,同时清理掉没用的字段。
3.5 思考
1.目前的配置表只能识别新增的配置项,不支持修改原有的配置项 ?