zoukankan      html  css  js  c++  java
  • 实时数仓(二):DWD层-数据处理

    实时数仓(二):DWD层-数据处理

    1.数据源

    dwd的数据来自Kafka的ods层原始数据:业务数据(ods_base_db) 、日志数据(ods_base_log)

    从Kafka的ODS层读取用户行为日志以及业务数据,并进行简单处理,写回到Kafka作为DWD层。

    2.用户行为日志

    2.1开发环境搭建

    1)包结构

    2)pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>03_gmall2021</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <java.version>1.8</java.version>
            <maven.compiler.source>${java.version}</maven.compiler.source>
            <maven.compiler.target>${java.version}</maven.compiler.target>
            <flink.version>1.12.0</flink.version>
            <scala.version>2.12</scala.version>
            <hadoop.version>3.1.3</hadoop.version>
        </properties>
    
        <dependencies>
            <!--  flink Web UI -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web_2.11</artifactId>
                <version>${flink.version}</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-cep_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-json</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.68</version>
            </dependency>
    
            <!--如果保存检查点到hdfs上,需要引入此依赖-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    
            <!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.25</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-to-slf4j</artifactId>
                <version>2.14.0</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>flink-connector-mysql-cdc</artifactId>
                <version>1.2.0</version>
            </dependency>
    
            <!--lomback插件依赖-->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.12</version>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.47</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.phoenix</groupId>
                <artifactId>phoenix-spark</artifactId>
                <version>5.0.0-HBase-2.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.glassfish</groupId>
                        <artifactId>javax.el</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <!--commons-beanutils是Apache开源组织提供的用于操作JAVA BEAN的工具包。
    使用commons-beanutils,我们可以很方便的对bean对象的属性进行操作-->
            <dependency>
                <groupId>commons-beanutils</groupId>
                <artifactId>commons-beanutils</artifactId>
                <version>1.9.3</version>
            </dependency>
    
            <!--Guava工程包含了若干被Google的Java项目广泛依赖的核心库,方便开发-->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>29.0-jre</version>
            </dependency>
    
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>3.3.0</version>
            </dependency>
    
            <dependency>
                <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.2.4</version>
                <exclusions>
                    <exclusion>
                        <groupId>com.fasterxml.jackson.core</groupId>
                        <artifactId>jackson-databind</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>com.fasterxml.jackson.core</groupId>
                        <artifactId>jackson-core</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>com.janeluo</groupId>
                <artifactId>ikanalyzer</artifactId>
                <version>2012_u6</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.0.0</version>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    
    3)MykafkaUtil.java
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    public class MyKafkaUtil {
    
        private static Properties properties = new Properties();
    
        private static String DEFAULT_TOPIC = "dwd_default_topic";
    
        static {
            properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
        }
    
        /**
         * todo kafka sink:自定义序列化,各种类型自定义传输
         *
         * @return
         */
        public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
            Properties props = new Properties();
            //kafka地址
            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
            //生产数据超时时间
            props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 60 * 1000 + "");
            return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        }
    
        /**
         * 获取生产者对象 ,只能传输String类型
         *
         * @param topic 主题
         */
        public static FlinkKafkaProducer<String> getFlinkKafkaProducer(String topic) {
            return new FlinkKafkaProducer<String>(topic,
                    new SimpleStringSchema(),
                    properties);
        }
    
    
        /**
         * todo 构建消费者 -> 优化
         *
         * @param bootstrapServers:kafka地址
         * @param topic                    :topic可以用逗号分隔
         * @param groupId:消费者组
         * @param isSecurity:是否kafka设置sasl
         * @param offsetStrategy:消费策略:3种
         * @return
         */
        public static FlinkKafkaConsumer<String> getKafkaConsumer(String bootstrapServers, String topic, String groupId, String isSecurity, String offsetStrategy) {
            SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", bootstrapServers);
            props.setProperty("group.id", groupId);
            props.setProperty("flink.partition-discovery.interval-millis", "60000");
            //kafka开启sasl认证
            if ("true".equalsIgnoreCase(isSecurity)) {
                props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";");
                props.setProperty("security.protocol", "SASL_PLAINTEXT");
                props.setProperty("sasl.mechanism", "PLAIN");
            }
            //消费多个topic
            String[] split = topic.split(",");
            List<String> topics = Arrays.asList(split);
            //kafka消费者
            FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topics, simpleStringSchema, props);
    
            //消费方式:earliest,latest,setStartFromTimestamp
            switch (offsetStrategy) {
                case "earliest":
                    consumer.setStartFromEarliest();
                    return consumer;
                case "latest":
                    consumer.setStartFromLatest();
                    return consumer;
                default:
                    consumer.setStartFromTimestamp(System.currentTimeMillis() - Integer.valueOf(offsetStrategy) * 60 * 1000);
                    return consumer;
            }
        }
    
        /**
         * 获取消费者对象
         *
         * @param topic   主题
         * @param groupId 消费者组
         */
        public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topic, String groupId) {
    
            //添加消费组属性
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    
            return new FlinkKafkaConsumer<String>(topic,
                    new SimpleStringSchema(),
                    properties);
        }
    
        //拼接Kafka相关属性到DDL
        public static String getKafkaDDL(String topic, String groupId) {
            return "'connector' = 'kafka', " +
                    " 'topic' = '" + topic + "'," +
                    " 'properties.bootstrap.servers' = 'hadoop102:9092', " +
                    " 'properties.group.id' = '" + groupId + "', " +
                    "  'format' = 'json', " +
                    "  'scan.startup.mode' = 'latest-offset'";
        }
    }
    
    4)log4j.properties
    log4j.rootLogger=info,stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.target=System.out
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    

    2.2 实现功能

    我们前面采集的日志数据已经保存到Kafka中,作为日志数据的ODS层,从Kafka的ODS层读取的日志数据分为3类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回Kafka不同主题中,作为日志DWD层。

    页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流

    • 1)从kafka读取ods数据
    • 2)判断新老用户
    • 3)分流
    • 4)写回到kafka的dwd层
    1)代码实现
    package com.flink.realtime.app.dwd;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import com.flink.realtime.utils.MyKafkaUtil;
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * @description: todo->准备用户行为日志dwd层
     * @author: HaoWu
     * @create: 2021年06月22日
     */
    public class BaseLogApp {
        public static void main(String[] args) throws Exception {
            // TODO 1.获取执行环境
            // 1.1 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.2 并行度设置为Kafka的分区数
            env.setParallelism(4);
    
            /*
            // 1.3 设置checkpoint
            env.enableCheckpointing(5000L); //每5000ms做一次ck
            env.getCheckpointConfig().setCheckpointTimeout(60000L); // ck超时时间:1min
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //ck模式,默认:exactly_once
            //正常Cancel任务时,保留最后一次CK
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            //重启策略
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
            //状态后端:
            env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/checkpoint/base_log_app"));
            // 访问hdfs访问权限问题
            // 报错异常:Permission denied: user=haowu, access=WRITE, inode="/":atguigu:supergroup:drwxr-xr-x
            // 解决:/根目录没有写权限 解决方案1.hadoop fs -chown 777 /   2.System.setProperty("HADOOP_USER_NAME", "atguigu");
            System.setProperty("HADOOP_USER_NAME", "atguigu");
            */
    
            //TODO 2.获取kafka ods_base_log 主题数据
            String sourceTopic = "ods_base_log";
            String groupId = "base_log_app_group";
            //FlinkKafkaConsumer<String> consumer = MyKafkaUtil.getFlinkKafkaConsumer(sourceTopic, groupId);
            FlinkKafkaConsumer<String> consumer = MyKafkaUtil.getKafkaConsumer("hadoop102:9092", sourceTopic, groupId, "false", "earliest");
            DataStreamSource<String> kafkaDS = env.addSource(consumer);
    
    
           //TODO 3.将每行数据转换为JSONObject
            // 处理脏数据
            OutputTag<String> dirty = new OutputTag<String>("DirtyData") {};
            SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
                @Override
                public void processElement(String value, Context context, Collector<JSONObject> collector) throws Exception {
                    try {
                        JSONObject jsonObject = JSON.parseObject(value);
                        //转JSON对象
                        collector.collect(jsonObject);
                    } catch (Exception e) {
                        //JSON解析异常输出脏数据
                        context.output(dirty, value);
                    }
                }
            });
    
            //jsonObjDS.print("json>>>>>>>>");
    
    
            //TODO 4.按照设备ID分组、使用状态编程做新老用户校验
            //4.1 根据mid对日志进行分组
            SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlag = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"))
                    .process(new KeyedProcessFunction<String, JSONObject, JSONObject>() {
                        //声明状态用于表示当前Mid是否已经访问过
                        private ValueState<String> firstVisitDateState;
                        private SimpleDateFormat simpleDateFormat;
    
                        //初始化状态
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            firstVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("new-mid", String.class));
                            simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                        }
    
                        @Override
                        public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {
                            //取出新用户标记 :is_new:1->新用户 ,0->老用户
                            String isNew = value.getJSONObject("common").getString("is_new");
                            //如果当前前端传输数据表示为新用户,则进行校验
                            if ("1".equals(isNew)) {
                                //取出状态数据并取出当前访问时间
                                String firstDate = firstVisitDateState.value();
                                Long ts = value.getLong("ts");
                                //判断状态数据是否为Null
                                if (firstDate != null) {
                                    //修复
                                    value.getJSONObject("common").put("is_new", "0");
                                } else {
                                    //更新状态
                                    firstVisitDateState.update(simpleDateFormat.format(ts));
                                }
                            }
                            out.collect(value);
                        }
    
                    });
            //测试打印
            //jsonObjWithNewFlag.print();
    
            //TODO 5.使用侧输出流将 启动、曝光、页面数据分流
            OutputTag<String> startOutPutTag = new OutputTag<String>("start"){}; //启动
            OutputTag<String> displayOutputTag = new OutputTag<String>("display"){}; //曝光
            SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlag.process(new ProcessFunction<JSONObject, String>() {
                @Override
                public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
                    //判断是否为启动数据
                    String start = value.getString("start");
                    if (start != null && start.length() > 0) {
                        //启动数据
                        ctx.output(startOutPutTag, value.toJSONString());
                    } else {
                        //不是启动数据一定是页面数据
                        out.collect(value.toJSONString());
    
                        //抽取公共字段、页面信息、时间戳
                        JSONObject common = value.getJSONObject("common");
                        JSONObject page = value.getJSONObject("page");
                        Long ts = value.getLong("ts");
    
                        //获取曝光数据
                        JSONArray displayArr = value.getJSONArray("displays");
    
                        if (displayArr != null && displayArr.size() > 0) {
                            JSONObject displayObj = new JSONObject();
                            displayObj.put("common", common);
                            displayObj.put("page", page);
                            displayObj.put("ts", ts);
                            //遍历曝光信息
                            for (Object display : displayArr) {
                                displayObj.put("display", display);
                                //输出曝光数据到侧输出流
                                ctx.output(displayOutputTag, displayObj.toJSONString());
                            }
    
                        }
    
                    }
                }
            });
    
            //TODO 6.将三个流的数据分别写入Kafka
            //打印
            jsonObjDS.getSideOutput(dirty).print("Dirty>>>>>>>>>>>");
            //主流:页面
            pageDS.print("Page>>>>>>>>>>>");
            //侧流:启动
            pageDS.getSideOutput(startOutPutTag).print("Start>>>>>>>>>>>>");
            //侧流:曝光
            pageDS.getSideOutput(displayOutputTag).print("Display>>>>>>>>>>>>>");
    
            //输出到kafka
            String pageSinkTopic = "dwd_page_log";
            String startSinkTopic = "dwd_start_log";
            String displaySinkTopic = "dwd_display_log";
            pageDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(pageSinkTopic));
            pageDS.getSideOutput(startOutPutTag).addSink(MyKafkaUtil.getFlinkKafkaProducer(startSinkTopic));
            pageDS.getSideOutput(displayOutputTag).addSink(MyKafkaUtil.getFlinkKafkaProducer(displaySinkTopic));
    
            env.execute();
        }
    }
    
    2)部署运行

    BaseLogApp.sh

    #!/bin/bash
    
    source  ~/.bashrc
    
    cd $(dirname $0)
    day=$(date +%Y%m%d%H%M)
    
    #flink
    job_name=02_dwd_BaseLogApp
    clazz=com.flink.realtime.app.dwd.BaseLogApp
    jar_path=/opt/module/gmall-flink/03_gmall2021-1.0-SNAPSHOT-jar-with-dependencies.jar
    
    #-----------------------run----------------------------------------------
    #yarn模式:per-job
    /opt/module/flink-1.12.0/bin/flink run 
    -t yarn-per-job 
    -Dyarn.application.name=${job_name} 
    -Dyarn.application.queue=default 
    -Djobmanager.memory.process.size=1024m 
    -Dtaskmanager.memory.process.size=1024m 
    -Dtaskmanager.numberOfTaskSlots=2 
    -c ${clazz} ${jar_path}
    

    3.业务数据

    3.1 实现功能

    业务数据的变化,我们可以通过FlinkCDC采集到,但是FlinkCDC是把全部数据统一写入一个Topic中, 这些数据包括事实数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从Kafka的业务数据ODS层读取数据,经过处理后,将维度数据保存到HBase,将事实数据写回Kafka作为业务数据的DWD层。

    3.2 动态分流

    由于FlinkCDC是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表。

    在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。

    这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。

    这种可以有两个方案实现

    • 一种是用Zookeeper存储,通过Watch感知数据变化。

    • 另一种是用mysql数据库存储,周期性的同步。

      这里选择第二种方案,主要是MySQL对于配置数据初始化和维护管理,使用FlinkCDC读取配置信息表,将配置流作为广播流与主流进行连接。

    1)建配置表:create.sql
    --配置表
    CREATE TABLE `table_process` (
      `source_table` varchar(200) NOT NULL COMMENT '来源表',
      `operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',
      `sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
      `sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)',
      `sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
      `sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
      `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
      PRIMARY KEY (`source_table`,`operate_type`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    

    品牌维表

     CREATE TABLE `base_trademark` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
      `tm_name` varchar(100) NOT NULL COMMENT '属性值',
      `logo_url` varchar(200) DEFAULT NULL COMMENT '品牌logo的图片路径',
      PRIMARY KEY (`id`) USING BTREE
    ) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='品牌表' 
    
    --品牌表:base_trademark表 insert 操作,插入hbase的dim_base_trademark只要 `id`,`name` 字段, `id`作为主键
    insert into table_process values ('base_trademark','insert','hbase','dim_base_trademark','id,name','id','');
    mysql> select * from table_process;
    +----------------+--------------+-----------+--------------------+--------------+---------+-------------+
    | source_table   | operate_type | sink_type | sink_table         | sink_columns | sink_pk | sink_extend |
    +----------------+--------------+-----------+--------------------+--------------+---------+-------------+
    | base_trademark | insert       | hbase     | dim_base_trademark | id,name      | id      |             |
    +----------------+--------------+-----------+--------------------+--------------+---------+-------------+
    1 row in set (0.00 sec)
    
    2)配置类:TableProcess.java
    import lombok.Data;
    
    /**
     * @description: TODO 配置表实体类
     * @author: HaoWu
     * @create: 2021年06月25日
     */
    
    @Data
    public class TableProcess {
        //动态分流Sink常量
        public static final String SINK_TYPE_HBASE = "hbase";
        public static final String SINK_TYPE_KAFKA = "kafka";
        public static final String SINK_TYPE_CK = "clickhouse";
        //来源表
        String sourceTable;
        //操作类型 insert,update,delete
        String operateType;
        //输出类型 hbase kafka
        String sinkType;
        //输出表(主题)
        String sinkTable;
        //输出字段
        String sinkColumns;
        //主键字段
        String sinkPk;
        //建表扩展
        String sinkExtend;
    }
    
    
    3)MysqlUtil.java
    import com.flink.realtime.bean.TableProcess;
    import org.apache.commons.beanutils.BeanUtils;
    import org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.base.CaseFormat;
    import java.lang.reflect.InvocationTargetException;
    import java.sql.*;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @description: TODO Mysql工具类
     * @author: HaoWu
     * @create: 2021年07月23日
     * 完成ORM,对象关系映射
     * O:Object对象       Java中对象
     * R:Relation关系     关系型数据库
     * M:Mapping映射      将Java中的对象和关系型数据库的表中的记录建立起映射关系
     */
    public class MysqlUtil {
    
        /**
         * @param sql               执行sql语句
         * @param clazz             封装bean类型
         * @param underScoreToCamel 是否列名转驼峰命名
         * @param <T>
         * @return
         */
        public static <T> List<T> queryList(String sql, Class<T> clazz, Boolean underScoreToCamel) {
            Connection con = null;
            PreparedStatement ps = null;
            ResultSet rs = null;
            try {
                // 注册驱动
                Class.forName("com.mysql.jdbc.Driver");
                // 获取连接
                con = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall-realtime?characterEncoding=utf-8&useSSL=false", "root", "root");
                // 获取数据库操作对象
                ps = con.prepareStatement(sql);
                // 执行sql
                rs = ps.executeQuery();
                // 处理结果集,封装list对象
                ResultSetMetaData metaData = rs.getMetaData(); //获取结果集元数据
                ArrayList<T> resultList = new ArrayList<>();
                while (rs.next()) {
                    // 将单条记录封装对象
                    T obj = clazz.newInstance();
                    // 遍历所有列,转驼峰,对象属性赋值
                    for (int i = 1; i < metaData.getColumnCount(); i++) {
                        String columnName = metaData.getColumnName(i);
                        String propertyName = "";
                        if (underScoreToCamel) {
                            // 通过guava工具类,将表中的列转换为类属性的驼峰命名
                            propertyName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, columnName);
                        }
                        // 给属性赋值
                        BeanUtils.setProperty(obj,propertyName,rs.getObject(i));
                    }
                    resultList.add(obj);
                }
                return resultList;
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException("从Mysql查询数据失败");
            } finally {
                // 释放资源
                if (rs != null) {
                    try {
                        rs.close();
                    } catch (SQLException throwables) {
                        throwables.printStackTrace();
                    }
                }
                if (ps != null) {
                    try {
                        ps.close();
                    } catch (SQLException throwables) {
                        throwables.printStackTrace();
                    }
                }
                if (con != null) {
                    try {
                        con.close();
                    } catch (SQLException throwables) {
                        throwables.printStackTrace();
                    }
                }
            }
        }
    
    
        public static void main(String[] args) throws InvocationTargetException, IllegalAccessException {
            String sql="select * from table_process";
            List<TableProcess> list = MysqlUtil.queryList(sql, TableProcess.class, true);
            System.out.println(list);
            TableProcess tableProcess = new TableProcess();
            BeanUtils.setProperty(tableProcess,"sourceTable","redis");
            System.out.println(tableProcess);
        }
    }
    
    4)常量类:GmallConfig.java
    package com.flink.realtime.common;
    
    /**
     * @description: TODO 常量配置类
     * @author: HaoWu
     * @create: 2021年06月25日
     */
    public class GmallConfig {
        //Phoenix库名
        public static final String HBASE_SCHEMA = "bigdata";
    
        //Phoenix驱动
        public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
    
        //Phoenix连接参数
        public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";
    
        public static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
    
        public static final String CLICKHOUSE_URL = "jdbc:clickhouse://hadoop102:8123/default";
    }
    
    5)主程序:BaseDBApp.java
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.flink.realtime.app.func.DimSink;
    import com.flink.realtime.app.func.TableProcessFunction;
    import com.flink.realtime.bean.TableProcess;
    import com.flink.realtime.utils.MyKafkaUtil;
    import org.apache.flink.api.common.serialization.SerializationSchema;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
    import org.apache.flink.util.OutputTag;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import javax.annotation.Nullable;
    
    /**
     * @description: todo->准备业务数据dwd层
     * @author: HaoWu
     * @create: 2021年06月22日
     */
    public class BaseDbApp {
        public static void main(String[] args) throws Exception {
            // TODO 1.创建执行环境
            // 1.1 创建stream执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.2 设置并行度
            env.setParallelism(1);
            /*
            // 1.3 设置checkpoint参数
            env.enableCheckpointing(5000L); //每5000ms做一次ck
            env.getCheckpointConfig().setCheckpointTimeout(60000L); // ck超时时间:1min
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //ck模式,默认:exactly_once
            //正常Cancel任务时,保留最后一次CK
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            //重启策略
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
            //状态后端:
            env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/checkpoint/base_db_app"));
            // 访问hdfs访问权限问题
            // 报错异常:Permission denied: user=haowu, access=WRITE, inode="/":atguigu:supergroup:drwxr-xr-x
            // 解决:/根目录没有写权限 解决方案1.hadoop fs -chown 777 /   2.System.setProperty("HADOOP_USER_NAME", "atguigu");
            System.setProperty("HADOOP_USER_NAME", "atguigu");
            */
    
            // TODO 2.获取kafka的ods层业务数据:ods_basic_db
            String ods_db_topic = "ods_base_db";
            FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer("hadoop102:9092", ods_db_topic, "ods_base_db_consumer1", "false", "latest");
            DataStreamSource<String> jsonStrDS = env.addSource(kafkaConsumer);
            //jsonStrDS.print();
            // TODO 3.对jsonStrDS结构转换
            SingleOutputStreamOperator<JSONObject> jsonDS = jsonStrDS.map(jsonStr -> JSON.parseObject(jsonStr));
    
            // TODO 4.对数据ETL
            SingleOutputStreamOperator<JSONObject> filterDS = jsonDS.filter(
                    json -> {
                        boolean flag = json.getString("table") != null //表名不为null
                                && json.getString("data") != null  //数据不为null
                                && json.getString("data").length() >= 3; //数据长度大于3
                        return flag;
                    }
            );
            //filterDS.print("filterDS>>>>>>>>>>");
            // TODO 5. 动态分流:事实表放-主流 -> kafka dwd层 ,维度表-侧输出流 -> hbase
            // 5.1 定义输出到Hbase的侧输出流标签
            OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>(TableProcess.SINK_TYPE_HBASE) {
            };
            // 5.2 主流输出到kafka
            SingleOutputStreamOperator<JSONObject> kafkaDS = filterDS.process(new TableProcessFunction(hbaseTag));
            // 5.3 获取侧输出流到hbase
            DataStream<JSONObject> hbaseDS = kafkaDS.getSideOutput(hbaseTag);
    
            kafkaDS.print("实时:kafkaDS>>>>>>>>");
            hbaseDS.print("维度:hbaseDS>>>>>>>>");
    
            // TODO 6.维度数据保存到Hbase中
            hbaseDS.addSink(new DimSink());
            // TODO 7.实时数据保存到Kafka中,自定义序列化
            FlinkKafkaProducer<JSONObject> kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
                @Override
                public void open(SerializationSchema.InitializationContext context) throws Exception {
                    System.out.println("kafka序列化");
                }
    
                @Override
                public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long aLong) {
                    String sink_topic = jsonObject.getString("sink_table");
                    JSONObject data = jsonObject.getJSONObject("data");
                    return new ProducerRecord<>(sink_topic, data.toString().getBytes());
                }
            });
            kafkaDS.addSink(kafkaSink);
            // TODO 8.执行
            env.execute();
        }
    }
    
    6)自定义分流函数:TableProcessFunction.java
    import com.alibaba.fastjson.JSONObject;
    import com.flink.realtime.bean.TableProcess;
    import com.flink.realtime.common.GmallConfig;
    import com.flink.realtime.utils.MysqlUtil;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.*;
    import java.util.List;
    
    /**
     * @description: TODO 业务数据分流自定义Process函数
     * @author: HaoWu
     * @create: 2021年07月26日
     */
    public class TableProcessFunction extends ProcessFunction<JSONObject, JSONObject> {
    
        //维表侧输出标签
        private OutputTag<JSONObject> outputTag;
    
        //内存中存储表配置对象{表名,表配置信息}
        private Map<String, TableProcess> tableProcessMap = new HashMap<>();
    
        //内存中判断是否已经存在Hbase表
        private Set<String> existsTables = new HashSet<>();
    
        //定义Phoenix连接
        private Connection connection;
    
        public TableProcessFunction() {
        }
    
        public TableProcessFunction(OutputTag<JSONObject> outputTag) {
            this.outputTag = outputTag;
        }
    
    
        @Override
        public void open(Configuration parameters) throws Exception {
            //初始化phoenix连接
            Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
            connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    
            //初始化配置表信息
            initTableProcessMap();
            //配置表的信息可能会发生表更,需要开启定时任务从现在起5000ms后,每隔5000ms更新一次
            Timer timer = new Timer();
            timer.schedule(new TimerTask() {
                @Override
                public void run() {
                    initTableProcessMap();
                }
            }, 5000, 5000);
        }
    
    
        @Override
        public void processElement(JSONObject jsonObj, Context ctx, Collector<JSONObject> out) throws Exception {
            //表名
            String tableName = jsonObj.getString("table");
            //操作类型
            String type = jsonObj.getString("type");
            //注意:问题修复 如果使用maxwell同步历史数据,他的操作类型是bootstrap-insert
            if ("bootstrap-insert".equals(type)) {
                type = "insert";
                jsonObj.put("type", type);
            }
    
            if (tableProcessMap != null && tableProcessMap.size() > 0) {
                //根据key取出配置信息
                String key = tableName + ":" + type;
                TableProcess tableProcess = tableProcessMap.get(key);
                //判断是否获取到配置对象
                if (tableProcess != null) {
                    //获取sinkTable,指明数据发往何处。 维度数据->hbase , 事实数据->kafka ,给这条数据打上一个标记。
                    jsonObj.put("sink_table", tableProcess.getSinkTable());
                    //指定了sinkcolumn,对需要保留的字段进行过滤
                    String sinkColumns = tableProcess.getSinkColumns();
                    if (sinkColumns != null && sinkColumns.length() > 0) {
                        filterColumn(jsonObj.getJSONObject("data"), sinkColumns);
                    }
                } else {
                    System.out.println("No this Key <<<< " + key + ">>>> in MySQL");
                }
    
                //根据sinkType输出到不同的流
                if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_HBASE)) {
                    //sinkType=hbase 输出到侧输出流
                    ctx.output(outputTag, jsonObj);
                } else if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_KAFKA)) {
                    //sinkType=kafka 输出到主流
                    out.collect(jsonObj);
                }
    
            }
        }
    
    
        /**
         * 从mysql查询配置信息,保存到map内存中
         */
        private void initTableProcessMap() {
            System.out.println("查询配置表信息");
            //1.从mysql中查询配置信息
            List<TableProcess> tableProcesses = MysqlUtil.queryList("select * from table_process", TableProcess.class, true);
            for (TableProcess tableProcess : tableProcesses) {
                String sourceTable = tableProcess.getSourceTable(); //源表
                String operateType = tableProcess.getOperateType(); //操作类型
                String sinkType = tableProcess.getSinkType(); //目标表类型
                String sinkTable = tableProcess.getSinkTable(); //目标表名
                String sinkPk = tableProcess.getSinkPk(); //目标表主键
                String sinkColumns = tableProcess.getSinkColumns(); //目标表字段
                String sinkExtend = tableProcess.getSinkExtend(); //扩展字段
                //2.将配置信息封装成map集合
                tableProcessMap.put(sourceTable + ":" + operateType, tableProcess);
                //3.检查是表是否内存中存在
                //如果向Hbase保存的表,那么判断内存中set是否存在过。
                if ("insert".equals(operateType) && "hbase".equals(sinkType)) {
                    boolean isExist = existsTables.add(sinkTable);
                    //4.如果内存中不存在表数据信息,则创建新Hbase表
                    if (isExist) {
                        checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
                    }
                }
            }
        }
    
        /**
         * 通过Phoenix创建Hbase表
         *
         * @param tableName 表名
         * @param columns   列属性
         * @param pk        主键
         * @param extend    扩展字段
         */
        private void checkTable(String tableName, String columns, String pk, String extend) {
            //主键不存在给默认值
            if (pk == null) {
                pk = "id";
            }
            //扩展字段给默认值
            if (extend == null) {
                extend = "";
            }
            //拼接sql建表语句
            StringBuilder createSql = new StringBuilder("create table if not exists " + GmallConfig.HBASE_SCHEMA + "." + tableName + "(");
            //拼接列属性
            String[] fieldArr = columns.split(",");
            for (int i = 0; i < fieldArr.length; i++) {
                String field = fieldArr[i];
                //判断是否为主键
                if (field.equals(pk)) {
                    createSql.append(field).append(" varchar primary key ");
                } else {
                    createSql.append("info.").append(field).append(" varchar");
                }
                //非最后一个字段需要添加逗号
                if (i < fieldArr.length - 1) {
                    createSql.append(",");
                }
            }
            createSql.append(")");
            createSql.append(extend);
            System.out.println("建表sql:" + createSql);
            //通过Phoenix创建hbase表
            PreparedStatement ps = null;
            try {
                ps = connection.prepareStatement(createSql.toString());
                ps.execute();
            } catch (SQLException throwables) {
                throwables.printStackTrace();
                throw new RuntimeException("建表失败!!!!!" + tableName);
            } finally {
                if (ps != null) {
                    try {
                        ps.close();
                    } catch (SQLException throwables) {
                        throwables.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 筛选配置表中保留的字段
         *
         * @param data        每行数据记录
         * @param sinkColumns 配置表保留字段
         */
        private void filterColumn(JSONObject data, String sinkColumns) {
            //需要保留的字段
            String[] columns = sinkColumns.split(",");
            //数组转集合,判断集合中是否包含某个元素
            List<String> columnList = Arrays.asList(columns);
            //获取json中封装的键值对,每个键值对封装为一个Entry类型
            Set<Map.Entry<String, Object>> entrySet = data.entrySet();
            /*for (Map.Entry<String, Object> entry : entrySet) {
                if (!columnList.contains(entry.getKey())) {
                    entrySet.remove();
                }  遍历集合删除元素使用迭代器,for循环删除会报错
            }*/
            Iterator<Map.Entry<String, Object>> iterator = entrySet.iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, Object> entry = iterator.next();
                if (!columnList.contains(entry.getKey())) {
                    iterator.remove();
                }
            }
        }
    }
    
    7)HbaseSink:DimSink.java
    import com.alibaba.fastjson.JSONObject;
    import com.flink.realtime.common.GmallConfig;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.Set;
    
    /**
     * @description: TODO Hbase sink 通过Phoenix向Hbase表中写数据
     * @author: HaoWu
     * @create: 2021年07月30日
     */
    public class DimSink extends RichSinkFunction<JSONObject> {
    
        //定义Phoenix连接
        Connection connection = null;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
            connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    
        }
    
        /**
         * 生成语句提交hbase
         *
         * @param jsonObj
         * @param context
         * @throws Exception
         */
        @Override
        public void invoke(JSONObject jsonObj, Context context) {
            String sinkTableName = jsonObj.getString("sink_table");
            JSONObject dataObj = jsonObj.getJSONObject("data");
            if (dataObj != null && dataObj.size() > 0) {
                String upsertSql = genUpdateSql(sinkTableName.toUpperCase(), jsonObj.getJSONObject("data"));
                System.out.println(upsertSql);
                try {
                    PreparedStatement ps = connection.prepareStatement(upsertSql);
                    ps.executeUpdate();
                    connection.commit();
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                    throw new RuntimeException("执行upsert语句失败!!!");
                }
            }
        }
    
        /**
         * 生成upsert语句
         *
         * @param sinkTableName
         * @param data
         * @return
         */
        private String genUpdateSql(String sinkTableName, JSONObject data) {
            Set<String> fields = data.keySet();
            String upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "." + sinkTableName + " (" + StringUtils.join(fields, ",") + ")";
            String valuesSql = " values ('" + StringUtils.join(data.values(), "','") + "')";
            return upsertSql + valuesSql;
        }
    }
    
    8)自定义序列化 kafka sink
        /**
         * todo kafka sink 自定义序列化,各种类型自定义传输
         *
         * @return
         */
        public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
            Properties props = new Properties();
            //kafka地址
            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
            //生产数据超时时间
            props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 60 * 1000 + "");
            return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        }
    
    
            // TODO 7.实时数据保存到Kafka中,自定义序列化
            FlinkKafkaProducer<JSONObject> kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
                @Override
                public void open(SerializationSchema.InitializationContext context) throws Exception {
                    System.out.println("kafka序列化");
                }
    
                @Override
                public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long aLong) {
                    String sink_topic = jsonObject.getString("sink_table");
                    JSONObject data = jsonObject.getJSONObject("data");
                    return new ProducerRecord<>(sink_topic, data.toString().getBytes());
                }
            });
    

    3.4 主程序:流程总结分析

    TableProcessFunction是一个自定义算子,主要包括三条时间线任务

    • 图中紫线,这个时间线与数据流入无关,只要任务启动就会执行。主要的任务方法是open()这个方法在任务启动时就会执行。他的主要工作就是初始化一些连接,开启周期调度。

    • 图中绿线,这个时间线也与数据流入无关,只要周期调度启动,会自动周期性执行。主要的任务是同步配置表(tableProcessMap)。通过在open()方法中加入timer实现。同时还有个附带任务就是如果发现不存在数据表,要根据配置自动创建数据库表。

    • 图中黑线,这个时间线就是随着数据的流入持续发生,这部分的任务就是根据同步到内存的tableProcessMap,来为流入的数据进行标识,同时清理掉没用的字段。

    3.5 思考

    1.目前的配置表只能识别新增的配置项,不支持修改原有的配置项 ?

    4.整体流程图分析

  • 相关阅读:
    HTTP——学习笔记(3)
    HTTP——状态码
    HTTP——学习笔记(2)
    HTTP——学习笔记(1)
    Sqlserver 存储过程
    Sqlserver 函数(例子)
    Sqlserver 函数
    sqlserver字段类型
    redis入门笔记(2)
    redis入门笔记(1)
  • 原文地址:https://www.cnblogs.com/wh984763176/p/15094516.html
Copyright © 2011-2022 走看看