zoukankan      html  css  js  c++  java
  • Filnk实时数仓(DWS层)

    第1章 DWM层和DWS设计

    1.1 设计思路

      DWM(Data WareHouse Middle),一般称为数据中间层. 该层会在DWD层的基础上,对数据做轻度的聚合操作,生成一系列的中间表,提升公共指标的复用性,减少重复加工。直观来讲,就是对通用的核心维度进行聚合操作,算出相应的统计指标。

      我们在之前通过分流等手段,把数据分拆成了独立的kafka topic。那么接下来如何处理数据,就要思考一下我们到底要通过实时计算出哪些指标项。

      因为实时计算与离线不同,实时计算的开发和运维成本都是非常高的,要结合实际情况考虑是否有必要象离线数仓一样,建一个大而全的中间层。如果没有必要大而全,这时候就需要大体规划一下要实时计算出的指标需求了。把这些指标以主题宽表的形式输出就是我们的DWS层

    1.2 需求梳理

    统计主题

    需求指标

    输出方式

    计算来源

    来源层级

    访客

    pv

    可视化大屏

    page_log直接可求

    dwd

    uv

    可视化大屏

    需要用page_log过滤去重

    dwm

    跳出率

    可视化大屏

    需要通过page_log行为判断

    dwd

    连续访问页面数

    可视化大屏

    需要识别开始访问标识

    dwd

    连续访问时长

    可视化大屏

    需要识别开始访问标识

    dwd

    商品

    点击

    多维分析

    page_log直接可求

    dwd

    曝光

    多维分析

    page_log直接可求

    dwd

    收藏

    多维分析

    收藏表

    dwd

    加入购物车

    多维分析

    购物车表

    dwd

    下单

    可视化大屏

    订单宽表

    dwm

    支付

    多维分析

    支付宽表

    dwm

    退款

    多维分析

    退款表

    dwd

    评论

    多维分析

    评论表

    dwd

    地区

    pv

    多维分析

    page_log直接可求

    dwd

    uv

    多维分析

    需要用page_log过滤去重

    dwm

    下单

    可视化大屏

    订单宽表

    dwd

    关键词

    搜索关键词

    可视化大屏

    页面访问日志 直接可求

    dwd

    点击商品关键词

    可视化大屏

    商品主题下单再次聚合

    dws

    下单商品关键词

    可视化大屏

    商品主题下单再次聚合

    dws

    1.3 DWS层的定位是什么

      1)轻度聚合,因为ADS层要应对很多实时查询,如果是完全的明细,那么查询的压力是非常大的。

      2)将更多的实时数据以主题的方式组合起来便于管理,同时也能减少维度查询的次数。

    第2章 DWS层:访客主题宽表

    访客

    pv

    可视化大屏

    page_log直接可求

    dwd

    uv

    可视化大屏

    需要用page_log过滤去重

    dwm

    跳出率

    可视化大屏

    需要通过跳出明细和page_log行为判断

    dwd/dwm

    连续访问页面数

    可视化大屏

    需要识别开始访问标识

    dwd

    连续访问时长

    可视化大屏

    需要识别开始访问标识

    dwd

      设计一张DWS层的表其实就两件事:维度和度量(事实数据)

        1)度量包括PV、UV、跳出次数、连续访问页面数、连续访问时长

        2)维度包括在分析中比较重要的几个字段:渠道、地区、版本、新老用户进行聚合

    2.1 需求分析与思路

      1)接收各个明细数据,变为数据流

      2)把数据流合并在一起,成为一个相同格式对象的数据流

      3)对合并的流进行聚合,聚合的时间窗口决定了数据的时效性

      4)把聚合结果写在数据库中

    2.2 具体实现代码

    2.2.1 定义主题宽表的POJO

    package com.yuange.flinkrealtime.bean;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/4 20:07
     */
    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    public class VisitorStats {
        //统计开始时间
        private String stt;
        //统计结束时间
        private String edt;
        //维度:版本
        private String vc;
        //维度:渠道
        private String ch;
        //维度:地区
        private String ar;
        //维度:新老用户标识
        private String is_new;
        //度量:独立访客数
        private Long uv_ct = 0L;
        //度量:页面访问数
        private Long pv_ct = 0L;
        //度量: 进入次数
        private Long sv_ct = 0L;
        //度量: 跳出次数
        private Long uj_ct = 0L;
        //度量: 持续访问时间
        private Long dur_sum = 0L;
        //统计时间
        private Long ts;
    }

    2.2.2 消费Kafka数据, 4条流合并为一个流 

      1)消费Kafka数据, 解析成pojo, 并把流合并为一个流

    package com.yuange.flinkrealtime.app.dws;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.yuange.flinkrealtime.app.BaseAppV2;
    import com.yuange.flinkrealtime.bean.VisitorStats;
    import com.yuange.flinkrealtime.common.Constant;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.Map;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/4 20:17
     */
    public class DwsVisitorStatsApp extends BaseAppV2 {
    
        public static void main(String[] args) {
            new DwsVisitorStatsApp().init(
                    4001,
                    1,
                    "DwsVisitorStatsApp",
                    "DwsVisitorStatsApp",
                    Constant.TOPIC_DWD_PAGE,Constant.TOPIC_DWM_UV,Constant.TOPIC_DWM_USER_JUMP_DETAIL
            );
        }
    
        @Override
        protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
            // 1. 解析流, 并按照统一的格式union在一起
            DataStream<VisitorStats> visitorStatsDataStream = parseAndUnion(streams);
            visitorStatsDataStream.print();
        }
    
        private DataStream<VisitorStats> parseAndUnion(Map<String, DataStreamSource<String>> streams) {
            DataStreamSource<String> pageStream = streams.get(Constant.TOPIC_DWD_PAGE);
            DataStreamSource<String> uvStream = streams.get(Constant.TOPIC_DWM_UV);
            DataStreamSource<String> userJumpStream = streams.get(Constant.TOPIC_DWM_USER_JUMP_DETAIL);
    
            // 1. 计算pv 和持续访问时长
            SingleOutputStreamOperator<VisitorStats> pvAndDuringTimeStatsStream = pageStream.map(s -> {
                JSONObject jsonObject = JSON.parseObject(s);    //将数据转化为JSON格式
    
                JSONObject common = jsonObject.getJSONObject("common"); //取出common数据
                String vc = common.getString("vc");
                String ch = common.getString("ch");
                String ar = common.getString("ar");
                String is_new = common.getString("is_new");
    
                JSONObject page = jsonObject.getJSONObject("page"); //取出page数据
                Long during_time = page.getLong("during_time");
    
                Long ts = jsonObject.getLong("ts"); //取出ts字段
    
                VisitorStats visitorStats = new VisitorStats(
                        "", "",
                        vc, ch, ar, is_new,
                        0L, 1L, 0L, 0L, during_time,
                        ts
                );
                return visitorStats;
            });
    
            // 2. 计算uv
            SingleOutputStreamOperator<VisitorStats> uvStatsStream = uvStream.map(s -> {
                JSONObject jsonObject = JSON.parseObject(s);    //将数据转化为JSON格式
                JSONObject common = jsonObject.getJSONObject("common");
                String vc = common.getString("vc");
                String ch = common.getString("ch");
                String ar = common.getString("ar");
                String is_new = common.getString("is_new");
    
                Long ts = jsonObject.getLong("ts");
    
                VisitorStats visitorStats = new VisitorStats(
                        "", "",
                        vc, ch, ar, is_new,
                        1L, 0L, 0L, 0L, 0L,
                        ts
                );
                return visitorStats;
            });
    
            // 3. 计算跳出次数
            SingleOutputStreamOperator<VisitorStats> ujStatsStream = userJumpStream.map(s -> {
                JSONObject jsonObject = JSON.parseObject(s);    //将数据转化为JSON格式
                JSONObject common = jsonObject.getJSONObject("common");
                String vc = common.getString("vc");
                String ch = common.getString("ch");
                String ar = common.getString("ar");
                String is_new = common.getString("is_new");
    
                Long ts = jsonObject.getLong("ts");
    
                VisitorStats visitorStats = new VisitorStats(
                        "", "",
                        vc, ch, ar, is_new,
                        0L, 0L, 0L, 1L, 0L,
                        ts
                );
                return visitorStats;
            });
    
            // 4. 计算sv进入从哪个数据源?
            SingleOutputStreamOperator<VisitorStats> svStatsStream = pageStream.flatMap(new FlatMapFunction<String, VisitorStats>() {
                @Override
                public void flatMap(String value, Collector<VisitorStats> out) throws Exception {
                    JSONObject jsonObject = JSON.parseObject(value);    //将数据转化为JSON格式
                    String last_page_id = jsonObject.getJSONObject("page").getString("last_page_id");
    
                    if (last_page_id == null || last_page_id.length() == 0) {    //无跳出行为
                        JSONObject common = jsonObject.getJSONObject("common");
                        String vc = common.getString("vc");
                        String ch = common.getString("ch");
                        String ar = common.getString("ar");
                        String is_new = common.getString("is_new");
    
                        Long ts = jsonObject.getLong("ts");
    
                        VisitorStats visitorStats = new VisitorStats(
                                "", "",
                                vc, ch, ar, is_new,
                                0L, 0L, 1L, 0L, 0L,
                                ts
                        );
                        out.collect(visitorStats);
                    }
                }
            });
            return pvAndDuringTimeStatsStream.union(uvStatsStream,ujStatsStream,svStatsStream);
        }
    }

      2)启动Hadoop

    hadoop.sh start

      3)启动Zookeeper

    zk start

      4)启动Kafka

    kafka.sh start

      5)启动日志服务器

    log-lg.sh start

      6)启动Flink的yarn-session模式

    /opt/module/flink-yarn/bin/yarn-session.sh -d

      7)提交DwdLogApp、DwmUvApp、DwmJumpDetailApp_Two程序至yarn-session上

    realtime.sh

      8)在Idea中启动DwsVisitorStatsApp

      9)生产日志数据

    cd /opt/software/mock/mock_log
    java -jar gmall2020-mock-log-2020-12-18.jar

      10)查看控制是否有打印

    2.2.3 根据维度进行聚合 

      1)代码如下

    package com.yuange.flinkrealtime.app.dws;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.yuange.flinkrealtime.app.BaseAppV2;
    import com.yuange.flinkrealtime.bean.VisitorStats;
    import com.yuange.flinkrealtime.common.Constant;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    import java.text.SimpleDateFormat;
    import java.time.Duration;
    import java.util.Map;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/4 20:17
     */
    public class DwsVisitorStatsApp extends BaseAppV2 {
    
        public static void main(String[] args) {
            new DwsVisitorStatsApp().init(
                    4001,
                    1,
                    "DwsVisitorStatsApp",
                    "DwsVisitorStatsApp",
                    Constant.TOPIC_DWD_PAGE,Constant.TOPIC_DWM_UV,Constant.TOPIC_DWM_USER_JUMP_DETAIL
            );
        }
    
        @Override
        protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
            // 1. 解析流, 并按照统一的格式union在一起
            DataStream<VisitorStats> visitorStatsDataStream = parseAndUnion(streams);
    //        visitorStatsDataStream.print();
            // 2. 开窗聚合
            SingleOutputStreamOperator<VisitorStats> aggregatedStream = aggWithWindow(visitorStatsDataStream);
            aggregatedStream.print();
        }
    
        private SingleOutputStreamOperator<VisitorStats> aggWithWindow(DataStream<VisitorStats> visitorStatsDataStream) {
            return visitorStatsDataStream
                    .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<VisitorStats>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                    )
                    .keyBy(vs -> vs.getVc() + "_" + vs.getCh() + "_" + vs.getAr() + "_" + vs.getIs_new())   //以维度信息作为key
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .sideOutputLateData(new OutputTag<VisitorStats>("late"){})
                    .reduce(
                            new ReduceFunction<VisitorStats>() {
                                @Override
                                public VisitorStats reduce(VisitorStats value1,
                                                           VisitorStats value2) throws Exception {
                                    value1.setUv_ct(value1.getUv_ct() + value2.getUv_ct());
                                    value1.setUj_ct(value1.getUj_ct() + value2.getUj_ct());
                                    value1.setPv_ct(value1.getPv_ct() + value2.getPv_ct());
                                    value1.setSv_ct(value1.getSv_ct() + value2.getSv_ct());
                                    value1.setDur_sum(value1.getDur_sum() + value2.getDur_sum());
                                    return value1;
                                }
                            },
                            new ProcessWindowFunction<VisitorStats, VisitorStats, String, TimeWindow>() {
                                SimpleDateFormat simpleDateFormat;
                                @Override
                                public void open(Configuration parameters) throws Exception {
                                    simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                }
    
                                @Override
                                public void process(String s,
                                                    Context context,
                                                    Iterable<VisitorStats> elements,
                                                    Collector<VisitorStats> out) throws Exception {
                                    TimeWindow window = context.window();
                                    String start = simpleDateFormat.format(window.getStart());
                                    String end = simpleDateFormat.format(window.getEnd());
    
                                    VisitorStats visitorStats = elements.iterator().next();
                                    visitorStats.setStt(start);
                                    visitorStats.setEdt(end);
                                    out.collect(visitorStats);
                                }
                            }
                    );
        }
    
        private DataStream<VisitorStats> parseAndUnion(Map<String, DataStreamSource<String>> streams) {
            DataStreamSource<String> pageStream = streams.get(Constant.TOPIC_DWD_PAGE);
            DataStreamSource<String> uvStream = streams.get(Constant.TOPIC_DWM_UV);
            DataStreamSource<String> userJumpStream = streams.get(Constant.TOPIC_DWM_USER_JUMP_DETAIL);
    
            // 1. 计算pv 和持续访问时长
            SingleOutputStreamOperator<VisitorStats> pvAndDuringTimeStatsStream = pageStream.map(s -> {
                JSONObject jsonObject = JSON.parseObject(s);    //将数据转化为JSON格式
    
                JSONObject common = jsonObject.getJSONObject("common"); //取出common数据
                String vc = common.getString("vc");
                String ch = common.getString("ch");
                String ar = common.getString("ar");
                String is_new = common.getString("is_new");
    
                JSONObject page = jsonObject.getJSONObject("page"); //取出page数据
                Long during_time = page.getLong("during_time");
    
                Long ts = jsonObject.getLong("ts"); //取出ts字段
    
                VisitorStats visitorStats = new VisitorStats(
                        "", "",
                        vc, ch, ar, is_new,
                        0L, 1L, 0L, 0L, during_time,
                        ts
                );
                return visitorStats;
            });
    
            // 2. 计算uv
            SingleOutputStreamOperator<VisitorStats> uvStatsStream = uvStream.map(s -> {
                JSONObject jsonObject = JSON.parseObject(s);    //将数据转化为JSON格式
                JSONObject common = jsonObject.getJSONObject("common");
                String vc = common.getString("vc");
                String ch = common.getString("ch");
                String ar = common.getString("ar");
                String is_new = common.getString("is_new");
    
                Long ts = jsonObject.getLong("ts");
    
                VisitorStats visitorStats = new VisitorStats(
                        "", "",
                        vc, ch, ar, is_new,
                        1L, 0L, 0L, 0L, 0L,
                        ts
                );
                return visitorStats;
            });
    
            // 3. 计算跳出次数
            SingleOutputStreamOperator<VisitorStats> ujStatsStream = userJumpStream.map(s -> {
                JSONObject jsonObject = JSON.parseObject(s);    //将数据转化为JSON格式
                JSONObject common = jsonObject.getJSONObject("common");
                String vc = common.getString("vc");
                String ch = common.getString("ch");
                String ar = common.getString("ar");
                String is_new = common.getString("is_new");
    
                Long ts = jsonObject.getLong("ts");
    
                VisitorStats visitorStats = new VisitorStats(
                        "", "",
                        vc, ch, ar, is_new,
                        0L, 0L, 0L, 1L, 0L,
                        ts
                );
                return visitorStats;
            });
    
            // 4. 计算sv进入从哪个数据源?
            SingleOutputStreamOperator<VisitorStats> svStatsStream = pageStream.flatMap(new FlatMapFunction<String, VisitorStats>() {
                @Override
                public void flatMap(String value, Collector<VisitorStats> out) throws Exception {
                    JSONObject jsonObject = JSON.parseObject(value);    //将数据转化为JSON格式
                    String last_page_id = jsonObject.getJSONObject("page").getString("last_page_id");
    
                    if (last_page_id == null || last_page_id.length() == 0) {    //无跳出行为
                        JSONObject common = jsonObject.getJSONObject("common");
                        String vc = common.getString("vc");
                        String ch = common.getString("ch");
                        String ar = common.getString("ar");
                        String is_new = common.getString("is_new");
    
                        Long ts = jsonObject.getLong("ts");
    
                        VisitorStats visitorStats = new VisitorStats(
                                "", "",
                                vc, ch, ar, is_new,
                                0L, 0L, 1L, 0L, 0L,
                                ts
                        );
                        out.collect(visitorStats);
                    }
                }
            });
    
            return pvAndDuringTimeStatsStream.union(uvStatsStream,ujStatsStream,svStatsStream);
        }
    }

      2)在Idea中启动DwsVisitorStatsApp

      3)生产日志数据

    cd /opt/software/mock/mock_log
    java -jar gmall2020-mock-log-2020-12-18.jar

      4)查看控制台

    2.2.4 写入OLAP数据库

      1)为何要写入ClickHouse数据库?

        ClickHouse数据库作为专门解决大量数据统计分析的数据库,在保证了海量数据存储的能力,同时又兼顾了响应速度。而且还支持标准SQL,即灵活又易上手。

      2)在clickhouse中创建表

        (1)启动clickhouse

    sudo systemctl start clickhouse-server

        (2)进入clickhouse客户端

    clickhouse-client -m

        (3)建库并使用

    create database flinkdb;
    use flinkdb;

        (4)建表

    create table  visitor_stats_2021 (
            stt DateTime,
            edt DateTime,
            vc  String,
            ch  String ,
            ar  String ,
            is_new String ,
            uv_ct UInt64,
            pv_ct UInt64,
            sv_ct UInt64,
            uj_ct UInt64,
            dur_sum  UInt64,
            ts UInt64
            ) engine =ReplacingMergeTree( ts)
            partition by  toYYYYMMDD(stt)
            order by  ( stt,edt,is_new,vc,ch,ar);

      3)加入jdbc-connectorClickHouse依赖包

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!--ClickHouse 依赖开始-->
    <!-- https://mvnrepository.com/artifact/ru.yandex.clickhouse/clickhouse-jdbc -->
    <dependency>
        <groupId>ru.yandex.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>0.2.4</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.11.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5</version>
    </dependency>
    <!--ClickHouse 依赖结束-->

      4)在Constant中添加常量

    public static final String CLICKHOUSE_DB = "flinkdb";
        public static final String TABLE_VISITOR_STATS = "visitor_stats_2021";
        public static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
        public static final String CLICKHOUSE_URL_PRE = "jdbc:clickhouse://hadoop164:8123";

      5)封装FlinkSinkUtil工具类,考虑后面多个地方需要向clickhouse写入数据, 方便使用, 所以对FlinkSinkUtil做封装

    public static <T> SinkFunction<T> getClickHouseSink(String db,
                                                            String table,
                                                            Class<T> tClass) {
            Field[] fields = tClass.getDeclaredFields();
    
            String clickhouseDriver = Constant.CLICKHOUSE_DRIVER;
            String url = Constant.CLICKHOUSE_URL_PRE + "/" + db;
    
            StringBuilder sql = new StringBuilder();
            sql.append("insert into ")
                    .append(table)
                    .append("(");
            //找到字段名
            for (Field field : fields) {
                sql.append(field.getName())
                        .append(",");
            }
            sql.deleteCharAt(sql.length() - 1); //把最后一个逗号删除
            sql.append(") values(");
            for (Field field : fields) {
                sql.append("?,");
            }
            sql.deleteCharAt(sql.length() - 1);
            sql.append(")");
    
    //        System.out.println(sql.toString());
            //借助jdbc sink封装一个ClickHouse sink
            return getJdbcSink(
                    url,
                    clickhouseDriver,
                    sql.toString()
            );
        }
    
        private static <T> SinkFunction<T> getJdbcSink(String url, String clickhouseDriver, String sql) {
            return JdbcSink.sink(
                    sql,
                    new JdbcStatementBuilder<T>() {
                        @Override
                        public void accept(PreparedStatement ps, T t) throws SQLException {
                            Class<?> aClass = t.getClass();
                            try {
                                Field[] fields = aClass.getDeclaredFields();
                                for (int i = 0; i < fields.length; i++) {
                                    Field field = fields[i];
                                    field.setAccessible(true);  //反射对象在使用时应禁止 Java 语言访问检查
                                    Object v = field.get(t);
                                    ps.setObject(i + 1,v);
                                }
                            }catch (IllegalAccessException e){
                                e.printStackTrace();
                            }
                        }
                    },
                    new JdbcExecutionOptions.Builder()
                            .withBatchIntervalMs(100)   //100毫秒处理一次数据
                            .withMaxRetries(3)  //失败后最大重试次数
                            .build(),
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withUrl(url)
                            .withDriverName(clickhouseDriver)
                            .build()
            );
        }

      6)写入到ClickHouse中

    @Override
        protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
            // 1. 解析流, 并按照统一的格式union在一起
            DataStream<VisitorStats> visitorStatsDataStream = parseAndUnion(streams);
    //        visitorStatsDataStream.print();
            // 2. 开窗聚合
            SingleOutputStreamOperator<VisitorStats> aggregatedStream = aggWithWindow(visitorStatsDataStream);
    //        aggregatedStream.print();
            // 3. 写入到ClickHouse中
            writeToClickHouse(aggregatedStream);
        }
    
        private void writeToClickHouse(SingleOutputStreamOperator<VisitorStats> aggregatedStream) {
            // 自定义ck sink
            aggregatedStream.addSink(FlinkSinkUtil.getClickHouseSink(Constant.CLICKHOUSE_DB, Constant.TABLE_VISITOR_STATS, VisitorStats.class));
        }

      7)打包上传至Linux的/opt/module/applog/

      8)在realtime.sh脚本中添加DwsVisitorStatsApp

    com.yuange.flinkrealtime.app.dws.DwsVisitorStatsApp

      9)运行脚本,启动程序

    realtime.sh

      10)生产日志数据

    cd /opt/software/mock/mock_log
    java -jar gmall2020-mock-log-2020-12-18.jar

      11)查看clickhouse中有没有数据

    clickhouse-client -m --host=hadoop164
    use flinkdb;
    select count(*) from visitor_stats_2021;

    第3章 DWS层:商品主题宽表

    统计主题

    需求指标

    输出方式

    计算来源

    来源层级

    商品

    点击

    多维分析

    dwd_page_log直接可求

    dwd

    曝光

    多维分析

    dwd_display_log直接可求

    dwd

    收藏

    多维分析

    收藏表

    dwd

    加入购物车

    多维分析

    购物车表

    dwd

    下单

    可视化大屏

    订单宽表

    dwm

    支付

    多维分析

    支付宽表

    dwm

    退款

    多维分析

    退款表

    dwd

    评价

    多维分析

    评价表

    dwd

      与访客的dws层的宽表类似,也是把多个事实表的明细数据汇总起来组合成宽表

    3.1 需求分析与思路

      1)从Kafka主题中获得数据流

      2)Json字符串数据流转换为统一数据对象的数据流

      3)把统一的数据结构流合并为一个流

      4)设定事件时间与水位线

      5)分组、开窗、聚合

      6)写入ClickHouse

    3.2 功能实现

    3.2.1 自定义注解

    package com.yuange.flinkrealtime.app.annotation;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/6 18:25
     */
    
    @Target(ElementType.FIELD)  //作用在类的成员变量上
    @Retention(RetentionPolicy.RUNTIME) //运行时使用
    public @interface NoSink {
    }

    3.2.2 封装商品统计实体类ProductStats

    package com.yuange.flinkrealtime.bean;
    
    import com.yuange.flinkrealtime.app.annotation.NoSink;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.math.BigDecimal;
    import java.util.HashSet;
    import java.util.Set;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/6 18:24
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class ProductStats {
    
        private String stt;//窗口起始时间
        private String edt;  //窗口结束时间
        private Long sku_id; //sku编号
        private String sku_name;//sku名称
        private BigDecimal sku_price; //sku单价
        private Long spu_id; //spu编号
        private String spu_name;//spu名称
        private Long tm_id; //品牌编号
        private String tm_name;//品牌名称
        private Long category3_id;//品类编号
        private String category3_name;//品类名称
    
        private Long click_ct = 0L;  //点击数
        private Long display_ct = 0L; //曝光数
    
        private Long favor_ct = 0L; //收藏数
    
        private Long cart_ct = 0L;  //添加购物车数
    
        private Long order_sku_num = 0L; //下单商品个数
    
        //下单商品金额  不是整个订单的金额
        private BigDecimal order_amount = BigDecimal.ZERO;
    
        private Long order_ct = 0L; //订单数
    
        //支付金额
        private BigDecimal payment_amount = BigDecimal.ZERO;
    
        private Long paid_order_ct = 0L;  //支付订单数
    
        private Long refund_order_ct = 0L; //退款订单数
    
        private BigDecimal refund_amount = BigDecimal.ZERO;
    
        private Long comment_ct = 0L;//评论订单数
    
        private Long good_comment_ct = 0L; //好评订单数
    
        @NoSink
        private Set<Long> orderIdSet = new HashSet<>();  //用于统计订单数  存储下单的订单id, 通过这个集合的长度来得到订单数
    
        @NoSink
        private Set<Long> paidOrderIdSet = new HashSet<>(); //用于统计支付订单数
    
        @NoSink
        private Set<Long> refundOrderIdSet = new HashSet<>();//用于退款支付订单数
    
        private Long ts; //统计时间戳
    }

    3.2.4 在Constant中添加常量

    public static final String TOPIC_DWD_FAVOR_INFO = "dwd_favor_info";
        public static final String TOPIC_DWD_CART_INFO = "dwd_cart_info";
        public static final String TOPIC_DWD_ORDER_REFUND_INFO = "dwd_order_refund_info";
        public static final String TOPIC_DWD_COMMENT_INFO = "dwd_comment_info";
    
        public static final Object FOUR_STAR_COMMENT = "1204";
        public static final Object FIVE_STAR_COMMENT = "1205";

    3.2.5 消费Kfka数据, 合成一个流

      1)代码如下,编写完成后在Idea中启动它

    package com.yuange.flinkrealtime.app.dws;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.yuange.flinkrealtime.app.BaseAppV2;
    import com.yuange.flinkrealtime.bean.OrderWide;
    import com.yuange.flinkrealtime.bean.PaymentWide;
    import com.yuange.flinkrealtime.bean.ProductStats;
    import com.yuange.flinkrealtime.common.Constant;
    import com.yuange.flinkrealtime.util.YuangeCommonUtil;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    import java.util.Map;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/6 18:19
     */
    public class DwsProductStatsApp extends BaseAppV2 {
    
        public static void main(String[] args) {
            new DwsProductStatsApp().init(
                    4002,
                    1,
                    "DwsProductStatsApp",
                    "DwsProductStatsApp",
                    Constant.TOPIC_DWD_PAGE, Constant.TOPIC_DWD_DISPLAY,
                    Constant.TOPIC_DWD_FAVOR_INFO, Constant.TOPIC_DWD_CART_INFO,
                    Constant.TOPIC_DWM_ORDER_WIDE, Constant.TOPIC_DWM_PAYMENT_WIDE,
                    Constant.TOPIC_DWD_ORDER_REFUND_INFO, Constant.TOPIC_DWD_COMMENT_INFO
            );
        }
    
        @Override
        protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
            // 1. 把8个流union在一起
            DataStream<ProductStats> productStatsDataStream = unionStreams(streams);
            productStatsDataStream.print();
        }
    
        private DataStream<ProductStats> unionStreams(Map<String, DataStreamSource<String>> streams) {
            //1. 点击
            SingleOutputStreamOperator<ProductStats> clickStatsStream = streams.get(Constant.TOPIC_DWD_PAGE)
                    .flatMap(new FlatMapFunction<String, ProductStats>() {
                        @Override
                        public void flatMap(String value,
                                            Collector<ProductStats> out) throws Exception {
                            JSONObject obj = JSON.parseObject(value);   //将string类型转化为JSON类型
                            JSONObject page = obj.getJSONObject("page");    //获取page
                            String page_id = page.getString("page_id"); //从page中获取page_id
                            String item_type = page.getString("item_type"); //从page中获取item_type
    
                            if ("good_detail".equals(page_id) && page.get("item") != null && "sku_id".equals(item_type)) {    //点击行为
                                Long item = page.getLong("item");
                                ProductStats productStats = new ProductStats();
                                productStats.setSku_id(item);
                                productStats.setClick_ct(1L);
                                productStats.setTs(obj.getLong("ts"));
                                out.collect(productStats);
                            }
                        }
                    });
    
            //2. 曝光
            SingleOutputStreamOperator<ProductStats> displayStatsStream = streams.get(Constant.TOPIC_DWD_DISPLAY)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);   //将string类型转化为JSON类型
                        Long skuId = obj.getLong("item");
                        Long ts = obj.getLong("ts");
    
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(skuId);
                        productStats.setTs(ts);
                        productStats.setDisplay_ct(1L);
                        return productStats;
                    });
    
            //3.收藏
            SingleOutputStreamOperator<ProductStats> favorStatsStream = streams.get(Constant.TOPIC_DWD_FAVOR_INFO)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(obj.getLong("sku_id"));
                        productStats.setFavor_ct(1L);
                        productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                        return productStats;
                    });
    
            //4.购物车cart
            SingleOutputStreamOperator<ProductStats> cartStatsStream = streams.get(Constant.TOPIC_DWD_CART_INFO)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(obj.getLong("sku_id"));
                        productStats.setCart_ct(1L);
                        productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                        return productStats;
                    });
    
            //5.订单order
            SingleOutputStreamOperator<ProductStats> orderWideStatsStream = streams.get(Constant.TOPIC_DWM_ORDER_WIDE)
                    .map(s -> {
                        OrderWide orderWide = JSON.parseObject(s, OrderWide.class); //将string类型转为Json类型,然后封装在OrderWide对象中
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(orderWide.getSku_id());
                        productStats.getOrderIdSet().add(orderWide.getOrder_id());
                        productStats.setTs(YuangeCommonUtil.toTs(orderWide.getCreate_time()));
                        productStats.setOrder_amount(orderWide.getSplit_total_amount());
                        productStats.setOrder_sku_num(orderWide.getSku_num());
                        return productStats;
                    });
    
            //6.支付
            SingleOutputStreamOperator<ProductStats> paymentWideStatsStream = streams.get(Constant.TOPIC_DWM_PAYMENT_WIDE)
                    .map(s -> {
                        PaymentWide paymentWide = JSON.parseObject(s, PaymentWide.class);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(paymentWide.getSku_id());
                        productStats.setTs(YuangeCommonUtil.toTs(paymentWide.getPayment_create_time()));
                        productStats.getPaidOrderIdSet().add(paymentWide.getOrder_id());
                        productStats.setPayment_amount(paymentWide.getSplit_total_amount());
                        return productStats;
                    });
    
            //7.退款
            SingleOutputStreamOperator<ProductStats> refundStatsStream = streams.get(Constant.TOPIC_DWD_ORDER_REFUND_INFO)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(obj.getLong("sku_id"));
                        productStats.setRefund_amount(obj.getBigDecimal("refund_amount"));
                        productStats.getRefundOrderIdSet().add(obj.getLongValue("order_id"));
                        productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                        return productStats;
                    });
    
            //8.评价
            SingleOutputStreamOperator<ProductStats> commentStatsStream = streams.get(Constant.TOPIC_DWD_COMMENT_INFO)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(obj.getLong("sku_id"));
                        productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                        productStats.setComment_ct(1L);
                        String appraise = obj.getString("appraise");
                        if (Constant.FOUR_STAR_COMMENT.equals(appraise) || Constant.FIVE_STAR_COMMENT.equals(appraise)) {
                            productStats.setGood_comment_ct(1L);
                        }
                        return productStats;
                    });
    
            return clickStatsStream.union(
                    displayStatsStream,
                    cartStatsStream,
                    favorStatsStream,
                    orderWideStatsStream,
                    paymentWideStatsStream,
                    commentStatsStream,
                    refundStatsStream
            );
        }
    }

      2)启动Hadoop

    hadoop.sh start

      3)启动ZK

    zk start

      4)启动Kafka

    kafka.sh start

      5)启动MaxWell

    maxwell.sh start

      6)启动日志服务器

    log-lg.sh start

      7)启动Hbase

    start-hbase.sh

      8)启动redis

    redis.sh

      9)启动Flink的yarn-session

    /opt/module/flink-yarn/bin/yarn-session.sh -d

      10)提交如下程序至yarn-session中运行

      11)生产日志数据、业务数据

      12)查看Idea控制台打印的数据

    3.2.6 开窗, 聚合

    package com.yuange.flinkrealtime.app.dws;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.yuange.flinkrealtime.app.BaseAppV2;
    import com.yuange.flinkrealtime.bean.OrderWide;
    import com.yuange.flinkrealtime.bean.PaymentWide;
    import com.yuange.flinkrealtime.bean.ProductStats;
    import com.yuange.flinkrealtime.common.Constant;
    import com.yuange.flinkrealtime.util.YuangeCommonUtil;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    import java.text.SimpleDateFormat;
    import java.time.Duration;
    import java.util.Map;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/6 18:19
     */
    public class DwsProductStatsApp extends BaseAppV2 {
    
        public static void main(String[] args) {
            new DwsProductStatsApp().init(
                    4002,
                    1,
                    "DwsProductStatsApp",
                    "DwsProductStatsApp",
                    Constant.TOPIC_DWD_PAGE, Constant.TOPIC_DWD_DISPLAY,
                    Constant.TOPIC_DWD_FAVOR_INFO, Constant.TOPIC_DWD_CART_INFO,
                    Constant.TOPIC_DWM_ORDER_WIDE, Constant.TOPIC_DWM_PAYMENT_WIDE,
                    Constant.TOPIC_DWD_ORDER_REFUND_INFO, Constant.TOPIC_DWD_COMMENT_INFO
            );
        }
    
        @Override
        protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
            //1.把8个流union在一起
            DataStream<ProductStats> productStatsDataStream = unionStreams(streams);
    //        productStatsDataStream.print();
            //2.开窗聚合
            SingleOutputStreamOperator<ProductStats> aggregatedStream = agg(productStatsDataStream);
            aggregatedStream.print();
        }
    
        private SingleOutputStreamOperator<ProductStats> agg(DataStream<ProductStats> productStatsDataStream) {
            return productStatsDataStream
                    .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<ProductStats>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                                .withTimestampAssigner((ps,ts) -> ps.getTs())
                    )
                    .keyBy(ProductStats::getSku_id)
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .reduce(new ReduceFunction<ProductStats>() {
                                @Override
                                public ProductStats reduce(ProductStats value1, ProductStats value2) throws Exception {
    
                                    value1.setClick_ct(value1.getClick_ct() + value2.getClick_ct());
                                    value1.setDisplay_ct(value1.getDisplay_ct() + value2.getDisplay_ct());
                                    value1.setFavor_ct(value1.getFavor_ct() + value2.getFavor_ct());
                                    value1.setCart_ct(value1.getCart_ct() + value2.getCart_ct());
                                    value1.setComment_ct(value1.getComment_ct() + value2.getComment_ct());
                                    value1.setGood_comment_ct(value1.getGood_comment_ct() + value2.getGood_comment_ct());
    
                                    value1.setOrder_sku_num(value1.getOrder_sku_num() + value2.getOrder_sku_num());
                                    value1.setOrder_amount(value1.getOrder_amount().add(value2.getOrder_amount()));
    
                                    value1.setPayment_amount(value1.getPayment_amount().add(value2.getPayment_amount()));
                                    value1.setRefund_amount(value1.getRefund_amount().add(value2.getRefund_amount()));
    
                                    value1.getOrderIdSet().addAll(value2.getOrderIdSet());
                                    value1.getPaidOrderIdSet().addAll(value2.getPaidOrderIdSet());
                                    value1.getRefundOrderIdSet().addAll(value2.getRefundOrderIdSet());
    
                                    return value1;
                                }
                            },
                            new ProcessWindowFunction<ProductStats, ProductStats, Long, TimeWindow>() {
                                SimpleDateFormat sdf;
                                @Override
                                public void open(Configuration parameters) throws Exception {
                                    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                }
    
                                @Override
                                public void process(Long key,
                                                    Context context,
                                                    Iterable<ProductStats> elements,
                                                    Collector<ProductStats> out) throws Exception {
                                    TimeWindow window = context.window();
                                    ProductStats productStats = elements.iterator().next();
                                    productStats.setStt(sdf.format(window.getStart()));
                                    productStats.setEdt(sdf.format(window.getEnd()));
                                    productStats.setOrder_ct((long)productStats.getOrderIdSet().size());
                                    productStats.setPaid_order_ct((long)productStats.getPaidOrderIdSet().size());
                                    productStats.setRefund_order_ct((long)productStats.getRefundOrderIdSet().size());
                                    out.collect(productStats);
                                }
                            }
                    );
        }
    
        private DataStream<ProductStats> unionStreams(Map<String, DataStreamSource<String>> streams) {
            //1. 点击
            SingleOutputStreamOperator<ProductStats> clickStatsStream = streams.get(Constant.TOPIC_DWD_PAGE)
                    .flatMap(new FlatMapFunction<String, ProductStats>() {
                        @Override
                        public void flatMap(String value,
                                            Collector<ProductStats> out) throws Exception {
                            JSONObject obj = JSON.parseObject(value);   //将string类型转化为JSON类型
                            JSONObject page = obj.getJSONObject("page");    //获取page
                            String page_id = page.getString("page_id"); //从page中获取page_id
                            String item_type = page.getString("item_type"); //从page中获取item_type
    
                            if ("good_detail".equals(page_id) && page.get("item") != null && "sku_id".equals(item_type)) {    //点击行为
                                Long item = page.getLong("item");
                                ProductStats productStats = new ProductStats();
                                productStats.setSku_id(item);
                                productStats.setClick_ct(1L);
                                productStats.setTs(obj.getLong("ts"));
                                out.collect(productStats);
                            }
                        }
                    });
    
            //2. 曝光
            SingleOutputStreamOperator<ProductStats> displayStatsStream = streams.get(Constant.TOPIC_DWD_DISPLAY)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);   //将string类型转化为JSON类型
                        Long skuId = obj.getLong("item");
                        Long ts = obj.getLong("ts");
    
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(skuId);
                        productStats.setTs(ts);
                        productStats.setDisplay_ct(1L);
                        return productStats;
                    });
    
            //3.收藏
            SingleOutputStreamOperator<ProductStats> favorStatsStream = streams.get(Constant.TOPIC_DWD_FAVOR_INFO)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(obj.getLong("sku_id"));
                        productStats.setFavor_ct(1L);
                        productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                        return productStats;
                    });
    
            //4.购物车cart
            SingleOutputStreamOperator<ProductStats> cartStatsStream = streams.get(Constant.TOPIC_DWD_CART_INFO)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(obj.getLong("sku_id"));
                        productStats.setCart_ct(1L);
                        productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                        return productStats;
                    });
    
            //5.订单order
            SingleOutputStreamOperator<ProductStats> orderWideStatsStream = streams.get(Constant.TOPIC_DWM_ORDER_WIDE)
                    .map(s -> {
                        OrderWide orderWide = JSON.parseObject(s, OrderWide.class); //将string类型转为Json类型,然后封装在OrderWide对象中
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(orderWide.getSku_id());
                        productStats.getOrderIdSet().add(orderWide.getOrder_id());
                        productStats.setTs(YuangeCommonUtil.toTs(orderWide.getCreate_time()));
                        productStats.setOrder_amount(orderWide.getSplit_total_amount());
                        productStats.setOrder_sku_num(orderWide.getSku_num());
                        return productStats;
                    });
    
            //6.支付
            SingleOutputStreamOperator<ProductStats> paymentWideStatsStream = streams.get(Constant.TOPIC_DWM_PAYMENT_WIDE)
                    .map(s -> {
                        PaymentWide paymentWide = JSON.parseObject(s, PaymentWide.class);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(paymentWide.getSku_id());
                        productStats.setTs(YuangeCommonUtil.toTs(paymentWide.getPayment_create_time()));
                        productStats.getPaidOrderIdSet().add(paymentWide.getOrder_id());
                        productStats.setPayment_amount(paymentWide.getSplit_total_amount());
                        return productStats;
                    });
    
            //7.退款
            SingleOutputStreamOperator<ProductStats> refundStatsStream = streams.get(Constant.TOPIC_DWD_ORDER_REFUND_INFO)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(obj.getLong("sku_id"));
                        productStats.setRefund_amount(obj.getBigDecimal("refund_amount"));
                        productStats.getRefundOrderIdSet().add(obj.getLongValue("order_id"));
                        productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                        return productStats;
                    });
    
            //8.评价
            SingleOutputStreamOperator<ProductStats> commentStatsStream = streams.get(Constant.TOPIC_DWD_COMMENT_INFO)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(obj.getLong("sku_id"));
                        productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                        productStats.setComment_ct(1L);
                        String appraise = obj.getString("appraise");
                        if (Constant.FOUR_STAR_COMMENT.equals(appraise) || Constant.FIVE_STAR_COMMENT.equals(appraise)) {
                            productStats.setGood_comment_ct(1L);
                        }
                        return productStats;
                    });
    
            return clickStatsStream.union(
                    displayStatsStream,
                    cartStatsStream,
                    favorStatsStream,
                    orderWideStatsStream,
                    paymentWideStatsStream,
                    commentStatsStream,
                    refundStatsStream
            );
        }
    }

    3.2.7 补充维度信息

      1)代码如下

    package com.yuange.flinkrealtime.app.dws;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.yuange.flinkrealtime.app.BaseAppV2;
    import com.yuange.flinkrealtime.bean.OrderWide;
    import com.yuange.flinkrealtime.bean.PaymentWide;
    import com.yuange.flinkrealtime.bean.ProductStats;
    import com.yuange.flinkrealtime.common.Constant;
    import com.yuange.flinkrealtime.function.DimAsyncFunction;
    import com.yuange.flinkrealtime.util.DimUtil;
    import com.yuange.flinkrealtime.util.YuangeCommonUtil;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.AsyncDataStream;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import redis.clients.jedis.Jedis;
    
    import java.sql.Connection;
    import java.text.SimpleDateFormat;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/6 18:19
     */
    public class DwsProductStatsApp extends BaseAppV2 {
    
        public static void main(String[] args) {
            new DwsProductStatsApp().init(
                    4002,
                    1,
                    "DwsProductStatsApp",
                    "DwsProductStatsApp",
                    Constant.TOPIC_DWD_PAGE, Constant.TOPIC_DWD_DISPLAY,
                    Constant.TOPIC_DWD_FAVOR_INFO, Constant.TOPIC_DWD_CART_INFO,
                    Constant.TOPIC_DWM_ORDER_WIDE, Constant.TOPIC_DWM_PAYMENT_WIDE,
                    Constant.TOPIC_DWD_ORDER_REFUND_INFO, Constant.TOPIC_DWD_COMMENT_INFO
            );
        }
    
        @Override
        protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
            //1.把8个流union在一起
            DataStream<ProductStats> productStatsDataStream = unionStreams(streams);
    //        productStatsDataStream.print();
            //2.开窗聚合
            SingleOutputStreamOperator<ProductStats> aggregatedStream = agg(productStatsDataStream);
    //        aggregatedStream.print();
            //3.补齐维度
            SingleOutputStreamOperator<ProductStats> psStreamWithDim = joinDim(aggregatedStream);
            psStreamWithDim.print();
        }
    
        private SingleOutputStreamOperator<ProductStats> joinDim(SingleOutputStreamOperator<ProductStats> aggregatedStream) {
            return AsyncDataStream.unorderedWait(
                    aggregatedStream,
                    new DimAsyncFunction<ProductStats>() {
                        @Override
                        public void addDim(Connection phoenixConn,
                                           Jedis client,
                                           ProductStats ps,
                                           ResultFuture<ProductStats> resultFuture) throws Exception {
                            JSONObject skuInfo = DimUtil.readDim(phoenixConn, client, Constant.DIM_SKU_INFO, ps.getSku_id().toString());
                            ps.setSku_name(skuInfo.getString("SKU_NAME"));
                            ps.setSku_price(skuInfo.getBigDecimal("PRICE"));
    
                            ps.setSpu_id(skuInfo.getLong("SPU_ID"));
                            ps.setTm_id(skuInfo.getLong("TM_ID"));
                            ps.setCategory3_id(skuInfo.getLong("CATEGORY3_ID"));
    
                            JSONObject spuInfo = DimUtil.readDim(phoenixConn, client, Constant.DIM_SPU_INFO, ps.getSpu_id().toString());
                            ps.setSpu_name(spuInfo.getString("SPU_NAME"));
    
                            JSONObject tmInfo = DimUtil.readDim(phoenixConn, client, Constant.DIM_BASE_TRADEMARK, ps.getTm_id().toString());
                            ps.setTm_name(tmInfo.getString("TM_NAME"));
    
                            JSONObject c3Info = DimUtil.readDim(phoenixConn, client, Constant.DIM_BASE_CATEGORY3, ps.getCategory3_id().toString());
                            ps.setCategory3_name(c3Info.getString("NAME"));
    
                            resultFuture.complete(Collections.singletonList(ps));
                        }
                    },
                    60,
                    TimeUnit.SECONDS
            );
        }
    
        private SingleOutputStreamOperator<ProductStats> agg(DataStream<ProductStats> productStatsDataStream) {
            return productStatsDataStream
                    .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<ProductStats>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                                .withTimestampAssigner((ps,ts) -> ps.getTs())
                    )
                    .keyBy(ProductStats::getSku_id)
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .reduce(new ReduceFunction<ProductStats>() {
                                @Override
                                public ProductStats reduce(ProductStats value1, ProductStats value2) throws Exception {
    
                                    value1.setClick_ct(value1.getClick_ct() + value2.getClick_ct());
                                    value1.setDisplay_ct(value1.getDisplay_ct() + value2.getDisplay_ct());
                                    value1.setFavor_ct(value1.getFavor_ct() + value2.getFavor_ct());
                                    value1.setCart_ct(value1.getCart_ct() + value2.getCart_ct());
                                    value1.setComment_ct(value1.getComment_ct() + value2.getComment_ct());
                                    value1.setGood_comment_ct(value1.getGood_comment_ct() + value2.getGood_comment_ct());
    
                                    value1.setOrder_sku_num(value1.getOrder_sku_num() + value2.getOrder_sku_num());
                                    value1.setOrder_amount(value1.getOrder_amount().add(value2.getOrder_amount()));
    
                                    value1.setPayment_amount(value1.getPayment_amount().add(value2.getPayment_amount()));
                                    value1.setRefund_amount(value1.getRefund_amount().add(value2.getRefund_amount()));
    
                                    value1.getOrderIdSet().addAll(value2.getOrderIdSet());
                                    value1.getPaidOrderIdSet().addAll(value2.getPaidOrderIdSet());
                                    value1.getRefundOrderIdSet().addAll(value2.getRefundOrderIdSet());
    
                                    return value1;
                                }
                            },
                            new ProcessWindowFunction<ProductStats, ProductStats, Long, TimeWindow>() {
                                SimpleDateFormat sdf;
                                @Override
                                public void open(Configuration parameters) throws Exception {
                                    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                }
    
                                @Override
                                public void process(Long key,
                                                    Context context,
                                                    Iterable<ProductStats> elements,
                                                    Collector<ProductStats> out) throws Exception {
                                    TimeWindow window = context.window();
                                    ProductStats productStats = elements.iterator().next();
                                    productStats.setStt(sdf.format(window.getStart()));
                                    productStats.setEdt(sdf.format(window.getEnd()));
                                    productStats.setOrder_ct((long)productStats.getOrderIdSet().size());
                                    productStats.setPaid_order_ct((long)productStats.getPaidOrderIdSet().size());
                                    productStats.setRefund_order_ct((long)productStats.getRefundOrderIdSet().size());
                                    out.collect(productStats);
                                }
                            }
                    );
        }
    
        private DataStream<ProductStats> unionStreams(Map<String, DataStreamSource<String>> streams) {
            //1. 点击
            SingleOutputStreamOperator<ProductStats> clickStatsStream = streams.get(Constant.TOPIC_DWD_PAGE)
                    .flatMap(new FlatMapFunction<String, ProductStats>() {
                        @Override
                        public void flatMap(String value,
                                            Collector<ProductStats> out) throws Exception {
                            JSONObject obj = JSON.parseObject(value);   //将string类型转化为JSON类型
                            JSONObject page = obj.getJSONObject("page");    //获取page
                            String page_id = page.getString("page_id"); //从page中获取page_id
                            String item_type = page.getString("item_type"); //从page中获取item_type
    
                            if ("good_detail".equals(page_id) && page.get("item") != null && "sku_id".equals(item_type)) {    //点击行为
                                Long item = page.getLong("item");
                                ProductStats productStats = new ProductStats();
                                productStats.setSku_id(item);
                                productStats.setClick_ct(1L);
                                productStats.setTs(obj.getLong("ts"));
                                out.collect(productStats);
                            }
                        }
                    });
    
            //2. 曝光
            SingleOutputStreamOperator<ProductStats> displayStatsStream = streams.get(Constant.TOPIC_DWD_DISPLAY)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);   //将string类型转化为JSON类型
                        Long skuId = obj.getLong("item");
                        Long ts = obj.getLong("ts");
    
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(skuId);
                        productStats.setTs(ts);
                        productStats.setDisplay_ct(1L);
                        return productStats;
                    });
    
            //3.收藏
            SingleOutputStreamOperator<ProductStats> favorStatsStream = streams.get(Constant.TOPIC_DWD_FAVOR_INFO)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(obj.getLong("sku_id"));
                        productStats.setFavor_ct(1L);
                        productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                        return productStats;
                    });
    
            //4.购物车cart
            SingleOutputStreamOperator<ProductStats> cartStatsStream = streams.get(Constant.TOPIC_DWD_CART_INFO)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(obj.getLong("sku_id"));
                        productStats.setCart_ct(1L);
                        productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                        return productStats;
                    });
    
            //5.订单order
            SingleOutputStreamOperator<ProductStats> orderWideStatsStream = streams.get(Constant.TOPIC_DWM_ORDER_WIDE)
                    .map(s -> {
                        OrderWide orderWide = JSON.parseObject(s, OrderWide.class); //将string类型转为Json类型,然后封装在OrderWide对象中
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(orderWide.getSku_id());
                        productStats.getOrderIdSet().add(orderWide.getOrder_id());
                        productStats.setTs(YuangeCommonUtil.toTs(orderWide.getCreate_time()));
                        productStats.setOrder_amount(orderWide.getSplit_total_amount());
                        productStats.setOrder_sku_num(orderWide.getSku_num());
                        return productStats;
                    });
    
            //6.支付
            SingleOutputStreamOperator<ProductStats> paymentWideStatsStream = streams.get(Constant.TOPIC_DWM_PAYMENT_WIDE)
                    .map(s -> {
                        PaymentWide paymentWide = JSON.parseObject(s, PaymentWide.class);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(paymentWide.getSku_id());
                        productStats.setTs(YuangeCommonUtil.toTs(paymentWide.getPayment_create_time()));
                        productStats.getPaidOrderIdSet().add(paymentWide.getOrder_id());
                        productStats.setPayment_amount(paymentWide.getSplit_total_amount());
                        return productStats;
                    });
    
            //7.退款
            SingleOutputStreamOperator<ProductStats> refundStatsStream = streams.get(Constant.TOPIC_DWD_ORDER_REFUND_INFO)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(obj.getLong("sku_id"));
                        productStats.setRefund_amount(obj.getBigDecimal("refund_amount"));
                        productStats.getRefundOrderIdSet().add(obj.getLongValue("order_id"));
                        productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                        return productStats;
                    });
    
            //8.评价
            SingleOutputStreamOperator<ProductStats> commentStatsStream = streams.get(Constant.TOPIC_DWD_COMMENT_INFO)
                    .map(s -> {
                        JSONObject obj = JSON.parseObject(s);
                        ProductStats productStats = new ProductStats();
                        productStats.setSku_id(obj.getLong("sku_id"));
                        productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                        productStats.setComment_ct(1L);
                        String appraise = obj.getString("appraise");
                        if (Constant.FOUR_STAR_COMMENT.equals(appraise) || Constant.FIVE_STAR_COMMENT.equals(appraise)) {
                            productStats.setGood_comment_ct(1L);
                        }
                        return productStats;
                    });
    
            return clickStatsStream.union(
                    displayStatsStream,
                    cartStatsStream,
                    favorStatsStream,
                    orderWideStatsStream,
                    paymentWideStatsStream,
                    commentStatsStream,
                    refundStatsStream
            );
        }
    }

      2)Idea中启动该程序

      3)生产日志数据和业务数据

    cd /opt/software/mock/mock_db
    java -jar gmall2020-mock-db-2020-12-23.jar
    cd /opt/software/mock/mock_log
    java -jar gmall2020-mock-log-2020-12-18.jar

      4)查看Idea控制台

    3.2.5 写入到ClickHouse

      1)在ClickHouse中创建主题宽表

    use flinkdb;
    create table product_stats_2021 (
       stt DateTime,
       edt DateTime,
       sku_id  UInt64,
       sku_name String,
       sku_price Decimal64(2),
       spu_id UInt64,
       spu_name String ,
       tm_id UInt64,
       tm_name String,
       category3_id UInt64,
       category3_name String ,
       display_ct UInt64,
       click_ct UInt64,
       favor_ct UInt64,
       cart_ct UInt64,
       order_sku_num UInt64,
       order_amount Decimal64(2),
       order_ct UInt64 ,
       payment_amount Decimal64(2),
       paid_order_ct UInt64,
       refund_order_ct UInt64,
       refund_amount Decimal64(2),
       comment_ct UInt64,
       good_comment_ct UInt64 ,
       ts UInt64
    )engine =ReplacingMergeTree( ts)
            partition by  toYYYYMMDD(stt)
            order by   (stt,edt,sku_id );

      2)写数据到ClickHouse

        (1)在Constant中添加常量

    public static final String TABLE_PRODUCT_STATS = "product_stats_2021";

        (2)承接上面代码,将其补充完整

    @Override
        protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
            //1.把8个流union在一起
            DataStream<ProductStats> productStatsDataStream = unionStreams(streams);
    //        productStatsDataStream.print();
            //2.开窗聚合
            SingleOutputStreamOperator<ProductStats> aggregatedStream = agg(productStatsDataStream);
    //        aggregatedStream.print();
            //3.补齐维度
            SingleOutputStreamOperator<ProductStats> psStreamWithDim = joinDim(aggregatedStream);
    //        psStreamWithDim.print();
            //4.写入到ClickHouse中
            writeToClickHouse(psStreamWithDim);
        }
    
        private void writeToClickHouse(SingleOutputStreamOperator<ProductStats> psStreamWithDim) {
            psStreamWithDim.addSink(
                    FlinkSinkUtil.getClickHouseSink(
                            Constant.CLICKHOUSE_DB,
                            Constant.TABLE_PRODUCT_STATS,
                            ProductStats.class
                    )
            );
        }

        (3)在Idea中启动程序

        (4)生产日志数据和业务数据

    cd /opt/software/mock/mock_db
    java -jar gmall2020-mock-db-2020-12-23.jar
    cd /opt/software/mock/mock_log
    java -jar gmall2020-mock-log-2020-12-18.jar

        (5)查看clickhouse中是否有数据

    select * from product_stats_2021;

      3)使用Maven将程序打包上传至Linux

      4)修改realtime.sh脚本,将DwsProductStatsApp 提交至yarn-session上运行

    #!/bin/bash
    flink=/opt/module/flink-yarn/bin/flink
    jar=/opt/module/applog/flink-realtime-1.0-SNAPSHOT.jar
    
    apps=(
            com.yuange.flinkrealtime.app.dwd.DwdLogApp
            com.yuange.flinkrealtime.app.dwd.DwdDbApp
            #com.yuange.flinkrealtime.app.dwm.DwmUvApp
            #com.yuange.flinkrealtime.app.dwm.DwmJumpDetailApp_Two
            com.yuange.flinkrealtime.app.dwm.DwmOrderWideApp_Cache_Async
            com.yuange.flinkrealtime.app.dwm.DwmPaymentWideApp
            #com.yuange.flinkrealtime.app.dws.DwsVisitorStatsApp
            com.yuange.flinkrealtime.app.dws.DwsProductStatsApp
            #com.yuange.flinkrealtime.app.dws.DwsProvinceStatsApp
    )
    
    for app in ${apps[*]} ; do
            $flink run -d -c $app $jar
    done

      5)提交程序,运行realtime.sh脚本

      6)生产日志数据和业务数据

    cd /opt/software/mock/mock_db
    java -jar gmall2020-mock-db-2020-12-23.jar
    cd /opt/software/mock/mock_log
    java -jar gmall2020-mock-log-2020-12-18.jar

      7)查看clickhouse中是否有数据增加

    第4章 DWS层:地区主题宽表

      前面的所有操作都是使用的StreamApi来完成的, 这次我们采用FlinkSql来完成.

    4.1 需求分析与思路

      1)定义Table流环境

      2)把数据源定义为动态表

      3)通过SQL查询出结果表

      4)把结果表转换为数据流

      5)把数据流写入目标数据库

      6)直接把结果表的数据写入到ClickHouse中

      如果是Flink官方支持的数据库,也可以直接把目标数据表定义为动态表,用insert into 写入。由于ClickHouse目前官方没有支持的jdbc连接器(目前支持Mysql、 PostgreSQL、Derby)。阿里云有实现好的connector, 我们使用这个connector.参考地址: https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.574.d9c541ea3J78mc

    4.2 功能实现

      数据来源于topic: dwm_order_wide

    4.2.1 导入ClickHouse连接器(此步骤直接跳过即可,因为我们下面自定义连接器)

    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>flink-connector-clickhouse</artifactId>
        <version>1.11.0</version>
    </dependency>

      注意: 

        1)由于该连接器目前在远程的maven仓库中找不到, 我们需要下载该连接器, 然后安装到本地仓库使用

        2)下载地址: https://clickhouse-release-open-access.oss-cn-shanghai.aliyuncs.com/doc-data/flink-connector-clickhouse-1.11.0.jar?spm=a2c4g.11186623.2.6.d9c541ea3J78mc&file=flink-connector-clickhouse-1.11.0.jar 

        3)假设jar包被下载在如下目录: C:\Users\lzc\Desktop\connector

        4)安装jar到本地仓库(需要保证mvn命令已经配置到了path中):

    mvn install:install-file -Dfile=C:\Users\lzc\Desktop\connector\flink-connector-clickhouse-1.11.0.jar -DgroupId=com.aliyun -DartifactId=flink-connector-clickhouse -Dversion=1.11.0 -Dpackaging=jar

    4.2.2 ClickHouse中创建表

    use flinkdb;
    create table province_stats_2021 (
       stt DateTime,
       edt DateTime,
       province_id  UInt64,
       province_name String,
       area_code String ,
       iso_code String,
       iso_3166_2 String , 
       order_amount Decimal64(2),
       order_count UInt64, 
       ts UInt64
    )engine =ReplacingMergeTree( ts)
            partition by  toYYYYMMDD(stt)
            order by   (stt,edt,province_id );

    4.2.3 编写抽象类

    package com.yuange.flinkrealtime.app;
    
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/6 21:01
     */
    public abstract class BaseSqlApp {
    
        public void init(int port, int p, String ck){
            System.setProperty("HADOOP_USER_NAME","atguigu");
            Configuration configuration = new Configuration();
            configuration.setInteger("rest.port",port);
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration).setParallelism(p);
    
            environment.enableCheckpointing(5000);  //检查点之间的时间间隔,单位是毫秒
            environment.setStateBackend(new HashMapStateBackend()); //定义状态后端,以保证将检查点状态写入远程(HDFS)
            environment.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop162:8020/flinkparent/ck/" + ck);   //配置检查点存放地址
    
            environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置检查点模式:精准一次
            environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);   //设置检查点失败时重试次数
            environment.getCheckpointConfig()
                    .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  //设置检查点持久化:取消作业时保留外部化检查点
    
            StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
    
            run(tableEnvironment);
    
            try {
                environment.execute(ck);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        protected abstract void run(StreamTableEnvironment tableEnvironment);
    }

    4.2.4 封装数据的对象

    package com.yuange.flinkrealtime.bean;
    
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.math.BigDecimal;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/6 21:23
     */
    @Data
    @NoArgsConstructor
    public class ProvinceStats {
        private String stt;
        private String edt;
        private Long province_id;
        private String province_name;
        private String area_code;
        private String iso_code;
        private String iso_3166_2;
        private BigDecimal order_amount;
        private Long order_count;
        private Long ts;
    }

    4.2.5 Constant中添加常量

    public static final String TABLE_PROVINCE_STATS = "province_stats_2021";

    4.2.6 具体实现代码

    package com.yuange.flinkrealtime.app.dws;
    
    import com.yuange.flinkrealtime.app.BaseSqlApp;
    import com.yuange.flinkrealtime.bean.ProvinceStats;
    import com.yuange.flinkrealtime.common.Constant;
    import com.yuange.flinkrealtime.util.FlinkSinkUtil;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/6 21:01
     */
    public class DwsProvinceStatsApp extends BaseSqlApp {
    
        public static void main(String[] args) {
            new DwsProvinceStatsApp().init(
                    4003,
                    1,
                    "DwsProvinceStatsApp"
            );
        }
    
        @Override
        protected void run(StreamTableEnvironment tableEnvironment) {
            // 1. 使用ddl建动态表, 与kafka的topic进行关联  dwm_order_wide
            tableEnvironment.executeSql("create table order_wide(" +
                    "   province_id bigint, " +
                    "   province_name string, " +
                    "   province_area_code string, " +
                    "   province_iso_code string, " +
                    "   province_3166_2_code string, " +
                    "   order_id bigint, " +
                    "   split_total_amount decimal(20,2), " +
                    "   create_time string, " +
                    "   et as to_timestamp(create_time), " +
                    "   watermark for et as et - interval '20' second " +
                    ")with(" +
                    "   'connector' = 'kafka', " +
                    "   'properties.bootstrap.servers' = 'hadoop162:9092,hadoop163:9092, hadoop164:9092', " +
                    "   'properties.group.id' = 'DwsProvinceStatsApp', " +
                    "   'topic' = '" + Constant.TOPIC_DWM_ORDER_WIDE + "', " +
                    "   'scan.startup.mode' = 'earliest-offset', " +
                    "   'format' = 'json' " +
                    ")"
            );
    
            //2.在order_wide这张表执行连续查询
            Table table = tableEnvironment.sqlQuery("select" +
                    "   date_format(tumble_start(et,interval '5' second), 'yyyy-MM-dd HH:mm:ss') stt, " +
                    "   date_format(tumble_end(et,interval '5' second), 'yyyy-MM-dd HH:mm:ss') edt, " +
                    "   province_id, " +
                    "   province_name, " +
                    "   province_area_code area_code, " +
                    "   province_iso_code iso_code, " +
                    "   province_3166_2_code iso_3166_2, " +
                    "   sum(split_total_amount) order_amount, " +
                    "   count(distinct(order_id)) order_count, " +
                    "   unix_timestamp() * 1000 ts " +
                    " from order_wide " +
                    " group by " +
                    "   province_id, " +
                    "   province_name, " +
                    "   province_area_code, " +
                    "   province_iso_code, " +
                    "   province_3166_2_code, " +
                    "   tumble(et,interval '5' second)" +
                    "");
            //3.写入到ClickHouse中
            tableEnvironment.toRetractStream(table, ProvinceStats.class)
                    .filter(t -> t.f0)  //过滤掉flase开头的数据
                    .map(t -> t.f1) //返回我们想要的数据
                    .addSink(
                            FlinkSinkUtil.getClickHouseSink(
                                        Constant.CLICKHOUSE_DB,
                                        Constant.TABLE_PROVINCE_STATS,
                                        ProvinceStats.class
                                    )
                    );
        }
    }

    4.2.7 修改FlinkSinkUtil中代码,将getClickHouseSink和getJdbcSink修改一下

    public static <T> SinkFunction<T> getClickHouseSink(String db,
                                                            String table,
                                                            Class<T> tClass) {
            Field[] fields = tClass.getDeclaredFields();
    
            String clickhouseDriver = Constant.CLICKHOUSE_DRIVER;
            String url = Constant.CLICKHOUSE_URL_PRE + "/" + db;
    
            StringBuilder sql = new StringBuilder();
            sql.append("insert into ")
                    .append(table)
                    .append("(");
            //找到字段名,如果这个字段有NoSink这个注解,则不要拼接
            for (Field field : fields) {
                NoSink noSink = field.getAnnotation(NoSink.class);
                if (noSink == null){
                    sql.append(field.getName())
                            .append(",");
                }
            }
            sql.deleteCharAt(sql.length() - 1); //把最后一个逗号删除
            sql.append(") values(");
            for (Field field : fields) {
                NoSink noSink = field.getAnnotation(NoSink.class);
                if (noSink == null) {
                    sql.append("?,");
                }
            }
            sql.deleteCharAt(sql.length() - 1);
            sql.append(")");
    
    //        System.out.println(sql.toString());
            //借助jdbc sink封装一个ClickHouse sink
            return getJdbcSink(
                    url,
                    clickhouseDriver,
                    sql.toString()
            );
        }
    
        private static <T> SinkFunction<T> getJdbcSink(String url, String clickhouseDriver, String sql) {
            return JdbcSink.sink(
                    sql,
                    new JdbcStatementBuilder<T>() {
                        @Override
                        public void accept(PreparedStatement ps, T t) throws SQLException {
                            // 类名.class   对象.getClass()  Class.forName("...")
                            Class<?> tClass = t.getClass();
                            try {
                                Field[] fields = tClass.getDeclaredFields();
                                for (int i = 0, position = 1; i < fields.length; i++) {
    
                                    Field field = fields[i];
                                    NoSink noSink = field.getAnnotation(NoSink.class);
                                    if (noSink == null) {
                                        field.setAccessible(true);
                                        Object v = field.get(t);
                                        ps.setObject(position++, v);
                                    }
                                }
                            } catch (IllegalAccessException e) {
                                e.printStackTrace();
                            }
                        }
                    },
                    new JdbcExecutionOptions.Builder()
                            .withBatchIntervalMs(100)   //100毫秒处理一次数据
                            .withMaxRetries(3)  //失败后最大重试次数
                            .build(),
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withUrl(url)
                            .withDriverName(clickhouseDriver)
                            .build()
            );
        }

    4.2.8 测试

      1)将程序打包上传至Linux

      2)启动hadoop

    hadoop.sh start

      3)启动ZK

    zk start

      4)启动Kafka

    kafka.sh start

      5)启动maxwell

    maxwell.sh start

      6)启动redis

    redis.sh 

      7)启动HBase

    start-hbase.sh

      8)修改realtime.sh脚本

    #!/bin/bash
    flink=/opt/module/flink-yarn/bin/flink
    jar=/opt/module/applog/flink-realtime-1.0-SNAPSHOT.jar
    
    apps=(
            #com.yuange.flinkrealtime.app.dwd.DwdLogApp
            com.yuange.flinkrealtime.app.dwd.DwdDbApp
            #com.yuange.flinkrealtime.app.dwm.DwmUvApp
            #com.yuange.flinkrealtime.app.dwm.DwmJumpDetailApp_Two
            com.yuange.flinkrealtime.app.dwm.DwmOrderWideApp_Cache_Async
            #com.yuange.flinkrealtime.app.dwm.DwmPaymentWideApp
            #com.yuange.flinkrealtime.app.dws.DwsVisitorStatsApp
         #com.yuange.flinkrealtime.app.dws.DwsProductStatsApp com.yuange.flinkrealtime.app.dws.DwsProvinceStatsApp )
    for app in ${apps[*]} ; do $flink run -d -c $app $jar done

      9)启动yarn-session

    /opt/module/flink-yarn/bin/yarn-session.sh -d

      10)提交realtime.sh脚本,将程序提交至yarn-session上运行

    realtime.sh

      11)启动Clickchouse

    sudo systemctl start clickhouse-server

      12)生产业务数据

    cd /opt/software/mock/mock_db
    java -jar gmall2020-mock-db-2020-12-23.jar

      13)查看Clickhouse是否生成数据

    select count(*) from province_stats_2021;

    第5章 DWS层:搜索关键词主题宽表

    5.1 需求分析与思路

      关键词主题这个主要是为了大屏展示中的字符云的展示效果,用于感性的让大屏观看者感知目前的用户都更关心的那些商品和关键词。关键词的展示也是一种维度聚合的结果,根据聚合的大小来决定关键词的大小。关键词的第一重要来源的就是用户在搜索栏的搜索,另外就是从以商品为主题的统计中获取关键词

    5.2 关于分词

      因为无论是从用户的搜索栏中,还是从商品名称中文字都是可能是比较长的,且由多个关键词组成,如下图:

      所以我们需要根据把长文本分割成一个一个的词,这种分词技术,在搜索引擎中可能会用到。对于中文分词,现在的搜索引擎基本上都是使用的第三方分词器,咱们在计算数据中也可以,使用和搜索引擎中一致的分词器,IK

    5.3 搜索关键词功能实现

    5.3.1 导入IK分词器依赖

    <dependency>
        <groupId>com.janeluo</groupId>
        <artifactId>ikanalyzer</artifactId>
        <version>2012_u6</version>
    </dependency>

    5.3.2 封装分词工具类

    package com.yuange.flinkrealtime.util;
    
    import org.wltea.analyzer.core.IKSegmenter;
    import org.wltea.analyzer.core.Lexeme;
    
    import java.io.IOException;
    import java.io.StringReader;
    import java.util.HashSet;
    import java.util.Set;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/7 11:42
     */
    public class IkUtil {
        public static void main(String[] args) {
            System.out.println(analyzer("我是中国人"));
        }
    
        public static Set<String> analyzer(String text) {
            //把字符串变成,内存流
            StringReader reader = new StringReader(text);
            IKSegmenter ikSegmenter = new IKSegmenter(reader, true);
            HashSet<String> hashSet = new HashSet<>();
    
            try {
                Lexeme next = ikSegmenter.next();
                while (next != null) {
                    String lexemeText = next.getLexemeText();
                    hashSet.add(lexemeText);
                    next = ikSegmenter.next();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return hashSet;
        }
    }

    5.4 自定义函数

      有了分词器,那么另外一个要考虑的问题就是如何把分词器的使用揉进FlinkSQL中。因为SQL的语法和相关的函数都是Flink内定的,想要使用外部工具,就必须结合自定义函数

    5.4.1 Flink自定义函数概述

      自定义函数(UDF)是一种扩展开发机制,可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。

      函数分类:

        1)Scalar functions:类似于spark的udf

        2)Table functions:类似于 sparkudtf

        3)Aggregate functions:类似于spark的udaf

        4)Table aggregate functions

        5)Async table functins

      考虑到一个词条包括多个词语所以分词是指上是一种一对多的拆分,一拆多的情况,我们应该选择Table Function

    5.4.2 实现自定义的Table Function

      1)参考: https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/functions/udfs/

      2)代码

    package com.yuange.flinkrealtime.function;
    
    import com.yuange.flinkrealtime.util.IkUtil;
    import org.apache.flink.table.annotation.DataTypeHint;
    import org.apache.flink.table.annotation.FunctionHint;
    import org.apache.flink.table.functions.TableFunction;
    import org.apache.flink.types.Row;
    
    import java.util.Set;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/7 11:32
     */
    @FunctionHint(output = @DataTypeHint("row<word string>"))   //输出数据的类型
    public class KWUdtf extends TableFunction<Row> {
        public void eval(String kw){
            Set<String> words = IkUtil.analyzer(kw);
            for (String word : words) {
                collect(Row.of(word));
            }
        }
    }

    5.5 主题宽表具体实现代码

    5.5.1 ClickHouse中建表

    use flinkdb;
    create table keyword_stats_2021 (
        stt DateTime,
        edt DateTime,
        keyword String ,
        source String ,
        ct UInt64 ,
        ts UInt64
    )engine =ReplacingMergeTree( ts)
    partition by toYYYYMMDD(stt)
    order by (stt,edt,keyword,source );

    5.5.2 代码清单

      1)封装数据的POJO

    package com.yuange.flinkrealtime.bean;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/7 15:40
     */
    @AllArgsConstructor
    @Data
    @NoArgsConstructor
    public class KeywordStats {
        private String stt;
        private String edt;
        private String keyword;
        private String source;
        private Long ct;
        private Long ts;
    }

      2)在Constant中新建常量

    public static final String TABLE_KEYWORD_STATS = "keyword_stats_2021";

      3)DwsKeyWordSearchStatsApp代码

    package com.yuange.flinkrealtime.app.dws;
    
    import com.yuange.flinkrealtime.app.BaseSqlApp;
    import com.yuange.flinkrealtime.bean.KeywordStats;
    import com.yuange.flinkrealtime.common.Constant;
    import com.yuange.flinkrealtime.function.KWUdtf;
    import com.yuange.flinkrealtime.util.FlinkSinkUtil;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/7 11:19
     */
    public class DwsKeyWordSearchStatsApp extends BaseSqlApp {
    
        public static void main(String[] args) {
            new DwsKeyWordSearchStatsApp().init(
                    4004,
                    1,
                    "DwsKeyWordSearchStatsApp"
            );
        }
    
        @Override
        protected void run(StreamTableEnvironment tableEnvironment) {
            tableEnvironment.executeSql("create table page_log(" +
                    "   common map<string,string>, " +
                    "   page map<string,string>, " +
                    "   ts bigint, " +
                    "   et as to_timestamp(from_unixtime(ts/1000)), " +
                    "   watermark for et as et - interval '10' second " +
                    ")with(" +
                    "   'connector' = 'kafka', " +
                    "   'properties.bootstrap.servers' = 'hadoop162:9092,hadoop163:9092, hadoop164:9092', " +
                    "   'properties.group.id' = 'DwsKeyWordSearchStatsApp', " +
                    "   'topic' = '" + Constant.TOPIC_DWD_PAGE + "', " +
                    "   'scan.startup.mode' = 'earliest-offset', " +
                    "   'format' = 'json' " +
                    ")");
    
            //1. 过滤出搜索记录
            Table table = tableEnvironment.sqlQuery("select" +
                    " page['item'] keyword, " +
                    " et " +
                    "from page_log " +
                    "where page['page_id'] = 'good_list' and page['item'] is not null");
            tableEnvironment.createTemporaryView("t1",table);
    
            //2.注册自定义函数
            tableEnvironment.createTemporaryFunction("ik_analyzer", KWUdtf.class);
    
            Table table2 = tableEnvironment.sqlQuery("select" +
                    " word, " +
                    " et " +
                    "from t1 " +
                    "join lateral table(ik_analyzer(keyword)) on true");
            tableEnvironment.createTemporaryView("t2",table2);
    
            //3.统计每个窗口,每个关键词搜索的次数
            Table table3 = tableEnvironment.sqlQuery("select" +
                    " date_format(tumble_start(et,interval '5' second),'yyyy-MM-dd HH:mm:ss') stt, " +
                    " date_format(tumble_end(et,interval '5' second),'yyyy-MM-dd HH:mm:ss') edt, " +
                    " word keyword, " +
                    " 'search' source, " +
                    " count(*) ct, " +
                    " unix_timestamp() * 1000 ts " +
                    " from t2 " +
                    " group by word, tumble(et,interval '5' second)");
    
            //4.写入到ClickHouse中
            tableEnvironment.toRetractStream(table3, KeywordStats.class)
                    .filter(t -> t.f0)
                    .map(t -> t.f1)
                    .addSink(FlinkSinkUtil.getClickHouseSink(Constant.CLICKHOUSE_DB,Constant.TABLE_KEYWORD_STATS,KeywordStats.class));
        }
    }

    5.5.3 测试

      1)将程序打包上传至Linux

      2)启动hadoop

    hadoop.sh start

      3)启动ZK

    zk start

      4)启动Kafka

    kafka.sh start

      5)启动日志服务器

    log-lg.sh start

      6)修改realtime.sh脚本

    #!/bin/bash
    flink=/opt/module/flink-yarn/bin/flink
    jar=/opt/module/applog/flink-realtime-1.0-SNAPSHOT.jar
    
    apps=(
            com.yuange.flinkrealtime.app.dwd.DwdLogApp
            #com.yuange.flinkrealtime.app.dwd.DwdDbApp
            #com.yuange.flinkrealtime.app.dwm.DwmUvApp
            #com.yuange.flinkrealtime.app.dwm.DwmJumpDetailApp_Two
            #com.yuange.flinkrealtime.app.dwm.DwmOrderWideApp_Cache_Async
            #com.yuange.flinkrealtime.app.dwm.DwmPaymentWideApp
            #com.yuange.flinkrealtime.app.dws.DwsVisitorStatsApp
            #com.yuange.flinkrealtime.app.dws.DwsProductStatsApp
            #com.yuange.flinkrealtime.app.dws.DwsProvinceStatsApp
            com.yuange.flinkrealtime.app.dws.DwsKeyWordSearchStatsApp
    )
    
    for app in ${apps[*]} ; do
            $flink run -d -c $app $jar
    done

      7)启动yarn-session

    /opt/module/flink-yarn/bin/yarn-session.sh -d

      8)运行脚本,提交程序至yarn-session上运行

    realtime.sh

      9)生产日志数据

    /opt/software/mock/mock_log
    java -jar gmall2020-mock-log-2020-12-18.jar

      10)查看Clickhouse中是否有数据

    select count(*) from keyword_stats_2021;

    第6章 DWS层:商品行为关键词主题宽表

      从商品主题获得,商品关键词与点击次数、订单次数、添加购物次数的统计表。

    6.1 重构DWSProductStatsApp

      重构DWSProductStatsApp, 增加最终的数据Kafka的代码

    @Override
        protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
            //1.把8个流union在一起
            DataStream<ProductStats> productStatsDataStream = unionStreams(streams);
    //        productStatsDataStream.print();
            //2.开窗聚合
            SingleOutputStreamOperator<ProductStats> aggregatedStream = agg(productStatsDataStream);
    //        aggregatedStream.print();
            //3.补齐维度
            SingleOutputStreamOperator<ProductStats> psStreamWithDim = joinDim(aggregatedStream);
    //        psStreamWithDim.print();
            //4.写入到ClickHouse中
            writeToClickHouse(psStreamWithDim);
            // 5. 写入到Kafka中 为了给产品关键词准备数据
            writeToKafka(psStreamWithDim);
        }
    
        private void writeToKafka(SingleOutputStreamOperator<ProductStats> stream) {
            stream
                    .map(JSON::toJSONString)
                    .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWS_PRODUCT_STATS));
        }

    6.2 自定义UDTF函数

      实现点击次数、订单次数、添加购物次数的统计

    package com.yuange.flinkrealtime.function;
    
    import org.apache.flink.table.annotation.DataTypeHint;
    import org.apache.flink.table.annotation.FunctionHint;
    import org.apache.flink.table.functions.TableFunction;
    import org.apache.flink.types.Row;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/7 16:29
     */
    @FunctionHint(output = @DataTypeHint("row<source string, ct bigint>"))
    public class KWProductUdtf extends TableFunction<Row> {
        public void eval(Long click_ct, Long cart_ct, Long order_ct) {
            if (click_ct > 0) {
                collect(Row.of("click", click_ct));
            }
    
            if (cart_ct > 0) {
                collect(Row.of("cart", cart_ct));
            }
    
            if (order_ct > 0) {
                collect(Row.of("order", order_ct));
            }
        }
    }

    6.3 具体实现代码

      1)在Constant中添加常量

    public static final String TOPIC_DWS_PRODUCT_STATS = "dws_product_stats";

      2)数据仍然写入到 关键词主题表: keyword_stats_2021

    package com.yuange.flinkrealtime.app.dws;
    
    import com.yuange.flinkrealtime.app.BaseSqlApp;
    import com.yuange.flinkrealtime.bean.KeywordStats;
    import com.yuange.flinkrealtime.common.Constant;
    import com.yuange.flinkrealtime.function.KWProductUdtf;
    import com.yuange.flinkrealtime.function.KWUdtf;
    import com.yuange.flinkrealtime.util.FlinkSinkUtil;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    /**
     * @作者:袁哥
     * @时间:2021/8/7 16:20
     */
    public class DwsKeyWordProductStatsApp extends BaseSqlApp {
    
        public static void main(String[] args) {
            new DwsKeyWordProductStatsApp().init(
                    4005,
                    1,
                    "DwsKeyWordProductStatsApp"
            );
        }
    
        @Override
        protected void run(StreamTableEnvironment tableEnvironment) {
            tableEnvironment.executeSql("create table product_stats(" +
                    "   stt string, " +
                    "   edt string, " +
                    "   sku_name string, " +
                    "   click_ct bigint, " +
                    "   cart_ct bigint, " +
                    "   order_ct bigint " +
                    ")with(" +
                    "   'connector' = 'kafka', " +
                    "   'properties.bootstrap.servers' = 'hadoop162:9092,hadoop163:9092, hadoop164:9092', " +
                    "   'properties.group.id' = 'DwsKeyWordProductStatsApp', " +
                    "   'topic' = '" + Constant.TOPIC_DWS_PRODUCT_STATS + "', " +
                    "   'scan.startup.mode' = 'latest-offset', " +
                    "   'format' = 'json' " +
                    ")");
    
            // 1. 过滤出来有效数据
            Table table = tableEnvironment.sqlQuery("select * " +
                    "from product_stats " +
                    "where click_ct > 0 " +
                    "or cart_ct > 0 " +
                    "or order_ct > 0");
            tableEnvironment.createTemporaryView("t1",table);
    
            tableEnvironment.createTemporaryFunction("ik_analyzer", KWUdtf.class);
    
            // 2. 对关键词进行分词
            Table table2 = tableEnvironment.sqlQuery("select " +
                    " stt,edt,word, " +
                    " sum(click_ct) click_ct, " +
                    " sum(cart_ct) cart_ct, " +
                    " sum(order_ct) order_ct " +
                    " from (select " +
                    " stt, " +
                    " edt, " +
                    " word, " +
                    " click_ct, " +
                    " cart_ct, " +
                    " order_ct " +
                    "from t1, " +
                    " lateral table(ik_analyzer(sku_name))) t " +
                    " group by stt,edt,word");
            tableEnvironment.createTemporaryView("t2",table2);
    
            // 3. 把3个指标分别定位到3行
            tableEnvironment.createTemporaryFunction("kw_product", KWProductUdtf.class);
    
            Table table3 = tableEnvironment.sqlQuery("select " +
                    "   stt, " +
                    "   edt, " +
                    "   word keyword, " +
                    "   source, " +
                    "   ct, " +
                    "   unix_timestamp() * 1000 ts " +
                    "from t2 " +
                    "join lateral table(kw_product(click_ct, cart_ct, order_ct)) on true");
    
            tableEnvironment.toRetractStream(table3, KeywordStats.class)
                    .filter(t -> t.f0)
                    .map(t -> t.f1)
                    .addSink(
                            FlinkSinkUtil.getClickHouseSink(
                                    Constant.CLICKHOUSE_DB,
                                    Constant.TABLE_KEYWORD_STATS,
                                    KeywordStats.class
                            )
                    );
        }
    }

    6.4 测试

      1)将程序打包上传至Linux

      2)启动hadoop

    hadoop.sh start

      3)启动ZK

    zk start

      4)启动Kafka

    kafka.sh start

      5)启动日志服务器

    log-lg.sh start

      6)启动Redis

    redis.sh 

      7)启动Maxwell

    maxwell.sh start

      8)启动HBase

    start-hbase.sh

      9)启动ClickHouse

    sudo systemctl start clickhouse-server

      10)启动yarn-session

    /opt/module/flink-yarn/bin/yarn-session.sh -d

      11)修改realtime.sh启动脚本

    #!/bin/bash
    flink=/opt/module/flink-yarn/bin/flink
    jar=/opt/module/applog/flink-realtime-1.0-SNAPSHOT.jar
    
    apps=(
            com.yuange.flinkrealtime.app.dwd.DwdLogApp
            com.yuange.flinkrealtime.app.dwd.DwdDbApp
            #com.yuange.flinkrealtime.app.dwm.DwmUvApp
            #com.yuange.flinkrealtime.app.dwm.DwmJumpDetailApp_Two
            com.yuange.flinkrealtime.app.dwm.DwmOrderWideApp_Cache_Async
            com.yuange.flinkrealtime.app.dwm.DwmPaymentWideApp
            #com.yuange.flinkrealtime.app.dws.DwsVisitorStatsApp
            com.yuange.flinkrealtime.app.dws.DwsProductStatsApp
            #com.yuange.flinkrealtime.app.dws.DwsProvinceStatsApp
            #com.yuange.flinkrealtime.app.dws.DwsKeyWordSearchStatsApp
            com.yuange.flinkrealtime.app.dws.DwsKeyWordProductStatsApp
    )
    
    for app in ${apps[*]} ; do
            $flink run -d -c $app $jar
    done

      12)运行脚本,将程序提交至yarn-session上运行

    realtime.sh

      13)生产日志数据和业务数据

    cd /opt/software/mock/mock_db
    java -jar gmall2020-mock-db-2020-12-23.jar
    cd /opt/software/mock/mock_log
    java -jar gmall2020-mock-log-2020-12-18.jar

      14)查看ClickHouse中是否有新增数据

    第7章 DWS层:总结

      1)DWS层主要是基于DWD和DWM层的数据进行轻度聚合统计。

      2)掌握利用union操作实现多流的合并

      3)掌握窗口聚合操作

      4)掌握对clickhouse数据库的写入操作

      5)掌握用FlinkSQL实现业务

      6)掌握分词器的使用

      7)掌握在FlinkSQL中自定义函数的使用

      8)截止至目前,所有的数据走向流程图如下图,高清版链接:https://www.processon.com/view/link/6111ce5a0e3e7407d391cbf5

  • 相关阅读:
    决定迁移过来,深耕于此。。。
    一篇搞定MongoDB
    一篇搞定vue请求和跨域
    自定义全局组件
    一篇搞定vue-router
    一篇搞定Vuex
    vue系列
    .Vue.js大全
    一篇搞定spring Jpa操作数据库
    自定义admin
  • 原文地址:https://www.cnblogs.com/LzMingYueShanPao/p/15100586.html
Copyright © 2011-2022 走看看