zoukankan      html  css  js  c++  java
  • [Spring cloud 一步步实现广告系统] 16. 增量索引实现以及投送数据到MQ(kafka)

    实现增量数据索引

    上一节中,我们为实现增量索引的加载做了充足的准备,使用到mysql-binlog-connector-java 开源组件来实现MySQL 的binlog监听,关于binlog的相关知识,大家可以自行网络查阅。或者可以mailto:magicianisaac@gmail.com

    本节我们将根据binlog 的数据对象,来实现增量数据的处理,我们构建广告的增量数据,其实说白了就是为了在后期能把广告投放到索引服务,实现增量数据到增量索引的生成。Let's code.

    • 定义一个投递增量数据的接口(接收参数为我们上一节定义的binlog日志的转换对象)
    /**
     * ISender for 投递增量数据 方法定义接口
     *
     * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
     */
    public interface ISender {
    
        void sender(MysqlRowData rowData);
    }
    
    • 创建增量索引监听器
    /**
     * IncrementListener for 增量数据实现监听
     *
     * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
     * @since 2019/6/27
     */
    @Slf4j
    @Component
    public class IncrementListener implements Ilistener {
    
        private final AggregationListener aggregationListener;
    
        @Autowired
        public IncrementListener(AggregationListener aggregationListener) {
            this.aggregationListener = aggregationListener;
        }
    
        //根据名称选择要注入的投递方式
        @Resource(name = "indexSender")
        private ISender sender;
    
        /**
         * 标注为 {@link PostConstruct},
         * 即表示在服务启动,Bean完成初始化之后,立刻初始化
         */
        @Override
        @PostConstruct
        public void register() {
            log.info("IncrementListener register db and table info.");
            Constant.table2db.forEach((tb, db) -> aggregationListener.register(db, tb, this));
        }
    
        @Override
        public void onEvent(BinlogRowData eventData) {
            TableTemplate table = eventData.getTableTemplate();
            EventType eventType = eventData.getEventType();
    
            //包装成最后需要投递的数据
            MysqlRowData rowData = new MysqlRowData();
            rowData.setTableName(table.getTableName());
            rowData.setLevel(eventData.getTableTemplate().getLevel());
            //将EventType转为OperationTypeEnum
            OperationTypeEnum operationType = OperationTypeEnum.convert(eventType);
            rowData.setOperationTypeEnum(operationType);
    
            //获取模版中该操作对应的字段列表
            List<String> fieldList = table.getOpTypeFieldSetMap().get(operationType);
            if (null == fieldList) {
                log.warn("{} not support for {}.", operationType, table.getTableName());
                return;
            }
    
            for (Map<String, String> afterMap : eventData.getAfter()) {
                Map<String, String> _afterMap = new HashMap<>();
                for (Map.Entry<String, String> entry : afterMap.entrySet()) {
                    String colName = entry.getKey();
                    String colValue = entry.getValue();
    
                    _afterMap.put(colName, colValue);
                }
    
                rowData.getFieldValueMap().add(_afterMap);
            }
            sender.sender(rowData);
        }
    }
    
    开启binlog监听
    • 首先来配置监听binlog的数据库连接信息
    adconf:
      mysql:
        host: 127.0.0.1
        port: 3306
        username: root
        password: 12345678
        binlogName: ""
        position: -1 # 从当前位置开始监听
    

    编写配置类:

    /**
     * BinlogConfig for 定义监听Binlog的配置信息
     *
     * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
     */
    @Component
    @ConfigurationProperties(prefix = "adconf.mysql")
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class BinlogConfig {
        private String host;
        private Integer port;
        private String username;
        private String password;
        private String binlogName;
        private Long position;
    }
    

    在我们实现 监听binlog那节,我们实现了一个自定义client CustomBinlogClient,需要实现binlog的监听,这个监听的客户端就必须是一个独立运行的线程,并且要在程序启动的时候进行监听,我们来实现运行当前client的方式,这里我们会使用到一个新的Runnerorg.springframework.boot.CommandLineRunner,let's code.

    @Slf4j
    @Component
    public class BinlogRunner implements CommandLineRunner {
    
        @Autowired
        private CustomBinlogClient binlogClient;
    
        @Override
        public void run(String... args) throws Exception {
            log.info("BinlogRunner is running...");
            binlogClient.connect();
        }
    }
    
    增量数据投递

    在binlog监听的过程中,我们看到针对于int, String 这类数据字段,mysql的记录是没有问题的,但是针对于时间类型,它被格式化成了字符串类型:Fri Jun 21 15:07:53 CST 2019

    --------Insert-----------
    WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
    [10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
    --------Update-----------
    UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[
        {before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}
    

    对于这个时间格式,我们需要关注2点信息:

    • CST,这个时间格式会比我们的时间+ 8h(中国标准时间 China Standard Time UT+8:00)
    • 需要对这个日期进行解释处理

    当然,我们也可以通过设置mysql的日期格式来改变该行为,在此,我们通过编码来解析该时间格式:

      /**
       * Thu Jun 27 08:00:00 CST 2019
       */
      public static Date parseBinlogString2Date(String dateString) {
          try {
              DateFormat dateFormat = new SimpleDateFormat(
                      "EEE MMM dd HH:mm:ss zzz yyyy",
                      Locale.US
              );
              return DateUtils.addHours(dateFormat.parse(dateString), -8);
    
          } catch (ParseException ex) {
              log.error("parseString2Date error:{}", dateString);
              return null;
          }
      }
    

    因为我们在定义索引的时候,是根据表之间的层级关系(Level)来设定的,根据代码规范,不允许出现Magic Number, 因此我们定义一个数据层级枚举,来表达数据层级。

    /**
     * AdDataLevel for 广告数据层级
     *
     * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
     */
    @Getter
    public enum AdDataLevel {
    
        LEVEL2("2", "level 2"),
        LEVEL3("3", "level 3"),
        LEVEL4("4", "level 4");
    
        private String level;
        private String desc;
    
        AdDataLevel(String level, String desc) {
            this.level = level;
            this.desc = desc;
        }
    }
    
    实现数据投递

    因为增量数据可以投递到不同的位置以及用途,我们之前实现了一个投递接口com.sxzhongf.ad.sender.ISender,接下来我们实现一个投递类:

    @Slf4j
    @Component("indexSender")
    public class IndexSender implements ISender {
    
        /**
         * 根据广告级别,投递Binlog数据
         */
        @Override
        public void sender(MysqlRowData rowData) {
            if (AdDataLevel.LEVEL2.getLevel().equals(rowData.getLevel())) {
                Level2RowData(rowData);
            } else if (AdDataLevel.LEVEL3.getLevel().equals(rowData.getLevel())) {
                Level3RowData(rowData);
            } else if (AdDataLevel.LEVEL4.getLevel().equals(rowData.getLevel())) {
                Level4RowData(rowData);
            } else {
                log.error("Binlog MysqlRowData error: {}", JSON.toJSONString(rowData));
            }
        }
    
        private void Level2RowData(MysqlRowData rowData) {
    
            if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) {
                List<AdPlanTable> planTables = new ArrayList<>();
    
                for (Map<String, String> fieldValueMap : rowData.getFieldValueMap()) {
                    AdPlanTable planTable = new AdPlanTable();
                    //Map的第二种循环方式
                    fieldValueMap.forEach((k, v) -> {
                        switch (k) {
                            case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID:
                                planTable.setPlanId(Long.valueOf(v));
                                break;
                            case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID:
                                planTable.setUserId(Long.valueOf(v));
                                break;
                            case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS:
                                planTable.setPlanStatus(Integer.valueOf(v));
                                break;
                            case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE:
                                planTable.setStartDate(CommonUtils.parseBinlogString2Date(v));
                                break;
                            case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE:
                                planTable.setEndDate(CommonUtils.parseBinlogString2Date(v));
                                break;
                        }
                    });
                    planTables.add(planTable);
                }
    
                //投递推广计划
                planTables.forEach(p -> AdLevelDataHandler.handleLevel2Index(p, rowData.getOperationTypeEnum()));
            } else if (rowData.getTableName().equals(Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) {
                List<AdCreativeTable> creativeTables = new LinkedList<>();
    
                rowData.getFieldValueMap().forEach(afterMap -> {
                    AdCreativeTable creativeTable = new AdCreativeTable();
                    afterMap.forEach((k, v) -> {
                        switch (k) {
                            case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID:
                                creativeTable.setAdId(Long.valueOf(v));
                                break;
                            case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE:
                                creativeTable.setType(Integer.valueOf(v));
                                break;
                            case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE:
                                creativeTable.setMaterialType(Integer.valueOf(v));
                                break;
                            case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT:
                                creativeTable.setHeight(Integer.valueOf(v));
                                break;
                            case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH:
                                creativeTable.setWidth(Integer.valueOf(v));
                                break;
                            case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS:
                                creativeTable.setAuditStatus(Integer.valueOf(v));
                                break;
                            case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL:
                                creativeTable.setAdUrl(v);
                                break;
                        }
                    });
                    creativeTables.add(creativeTable);
                });
    
                //投递广告创意
                creativeTables.forEach(c -> AdLevelDataHandler.handleLevel2Index(c, rowData.getOperationTypeEnum()));
            }
        }
    
        private void Level3RowData(MysqlRowData rowData) {
           ...
        }
    
        /**
         * 处理4级广告
         */
        private void Level4RowData(MysqlRowData rowData) {
            ...
        }
    }
    
    投放增量数据到MQ(kafka)

    为了我们的数据投放更加灵活,方便数据统计,分析等系统的需求,我们来实现一个投放到消息中的接口,其他服务可以订阅当前MQ 的TOPIC来实现数据订阅。

    配置文件中配置TOPIC
    adconf:
      kafka:
        topic: ad-search-mysql-data
    
    --------------------------------------
    /**
     * KafkaSender for 投递Binlog增量数据到kafka消息队列
     *
     * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
     * @since 2019/7/1
     */
    @Component(value = "kafkaSender")
    public class KafkaSender implements ISender {
    
        @Value("${adconf.kafka.topic}")
        private String topic;
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        /**
         * 发送数据到kafka队列
         */
        @Override
        public void sender(MysqlRowData rowData) {
            kafkaTemplate.send(
                    topic, JSON.toJSONString(rowData)
            );
        }
    
        /**
         * 测试消费kafka消息
         */
        @KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search")
        public void processMysqlRowData(ConsumerRecord<?, ?> record) {
            Optional<?> kafkaMsg = Optional.ofNullable(record.value());
            if (kafkaMsg.isPresent()) {
                Object message = kafkaMsg.get();
                MysqlRowData rowData = JSON.parseObject(
                        message.toString(),
                        MysqlRowData.class
                );
                System.out.println("kafka process MysqlRowData: " + JSON.toJSONString(rowData));
                //sender.sender();
            }
    
        }
    }
    
  • 相关阅读:
    sublime text 4 vim 插件配置
    ssh-keygen 的使用
    distribution transaction solution
    bilibili 大数据 视频下载 you-get
    Deepin 20.2.1 安装 MS SQL 2019 容器版本
    【转】使用Linux下Docker部署MSSQL并加载主机目录下的数据库
    【转】You Can Now Use OneDrive in Linux Natively Thanks to Insync
    dotnet 诊断工具安装命令
    Linux 使用 xrandr 设置屏幕分辨率
    【转】CentOS 7.9 2009 ISO 官方原版镜像下载
  • 原文地址:https://www.cnblogs.com/zhangpan1244/p/11333229.html
Copyright © 2011-2022 走看看