zoukankan      html  css  js  c++  java
  • [Spring cloud 一步步实现广告系统] 16. 增量索引实现以及投送数据到MQ(kafka)

    实现增量数据索引

    上一节中,我们为实现增量索引的加载做了充足的准备,使用到mysql-binlog-connector-java 开源组件来实现MySQL 的binlog监听,关于binlog的相关知识,大家可以自行网络查阅。或者可以mailto:magicianisaac@gmail.com

    本节我们将根据binlog 的数据对象,来实现增量数据的处理,我们构建广告的增量数据,其实说白了就是为了在后期能把广告投放到索引服务,实现增量数据到增量索引的生成。Let's code.

    • 定义一个投递增量数据的接口(接收参数为我们上一节定义的binlog日志的转换对象)
    /**
     * ISender for 投递增量数据 方法定义接口
     *
     * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
     */
    public interface ISender {
    
        void sender(MysqlRowData rowData);
    }
    
    • 创建增量索引监听器
    /**
     * IncrementListener for 增量数据实现监听
     *
     * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
     * @since 2019/6/27
     */
    @Slf4j
    @Component
    public class IncrementListener implements Ilistener {
    
        private final AggregationListener aggregationListener;
    
        @Autowired
        public IncrementListener(AggregationListener aggregationListener) {
            this.aggregationListener = aggregationListener;
        }
    
        //根据名称选择要注入的投递方式
        @Resource(name = "indexSender")
        private ISender sender;
    
        /**
         * 标注为 {@link PostConstruct},
         * 即表示在服务启动,Bean完成初始化之后,立刻初始化
         */
        @Override
        @PostConstruct
        public void register() {
            log.info("IncrementListener register db and table info.");
            Constant.table2db.forEach((tb, db) -> aggregationListener.register(db, tb, this));
        }
    
        @Override
        public void onEvent(BinlogRowData eventData) {
            TableTemplate table = eventData.getTableTemplate();
            EventType eventType = eventData.getEventType();
    
            //包装成最后需要投递的数据
            MysqlRowData rowData = new MysqlRowData();
            rowData.setTableName(table.getTableName());
            rowData.setLevel(eventData.getTableTemplate().getLevel());
            //将EventType转为OperationTypeEnum
            OperationTypeEnum operationType = OperationTypeEnum.convert(eventType);
            rowData.setOperationTypeEnum(operationType);
    
            //获取模版中该操作对应的字段列表
            List<String> fieldList = table.getOpTypeFieldSetMap().get(operationType);
            if (null == fieldList) {
                log.warn("{} not support for {}.", operationType, table.getTableName());
                return;
            }
    
            for (Map<String, String> afterMap : eventData.getAfter()) {
                Map<String, String> _afterMap = new HashMap<>();
                for (Map.Entry<String, String> entry : afterMap.entrySet()) {
                    String colName = entry.getKey();
                    String colValue = entry.getValue();
    
                    _afterMap.put(colName, colValue);
                }
    
                rowData.getFieldValueMap().add(_afterMap);
            }
            sender.sender(rowData);
        }
    }
    
    开启binlog监听
    • 首先来配置监听binlog的数据库连接信息
    adconf:
      mysql:
        host: 127.0.0.1
        port: 3306
        username: root
        password: 12345678
        binlogName: ""
        position: -1 # 从当前位置开始监听
    

    编写配置类:

    /**
     * BinlogConfig for 定义监听Binlog的配置信息
     *
     * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
     */
    @Component
    @ConfigurationProperties(prefix = "adconf.mysql")
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class BinlogConfig {
        private String host;
        private Integer port;
        private String username;
        private String password;
        private String binlogName;
        private Long position;
    }
    

    在我们实现 监听binlog那节,我们实现了一个自定义client CustomBinlogClient,需要实现binlog的监听,这个监听的客户端就必须是一个独立运行的线程,并且要在程序启动的时候进行监听,我们来实现运行当前client的方式,这里我们会使用到一个新的Runnerorg.springframework.boot.CommandLineRunner,let's code.

    @Slf4j
    @Component
    public class BinlogRunner implements CommandLineRunner {
    
        @Autowired
        private CustomBinlogClient binlogClient;
    
        @Override
        public void run(String... args) throws Exception {
            log.info("BinlogRunner is running...");
            binlogClient.connect();
        }
    }
    
    增量数据投递

    在binlog监听的过程中,我们看到针对于int, String 这类数据字段,mysql的记录是没有问题的,但是针对于时间类型,它被格式化成了字符串类型:Fri Jun 21 15:07:53 CST 2019

    --------Insert-----------
    WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
    [10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
    --------Update-----------
    UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[
        {before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}
    

    对于这个时间格式,我们需要关注2点信息:

    • CST,这个时间格式会比我们的时间+ 8h(中国标准时间 China Standard Time UT+8:00)
    • 需要对这个日期进行解释处理

    当然,我们也可以通过设置mysql的日期格式来改变该行为,在此,我们通过编码来解析该时间格式:

      /**
       * Thu Jun 27 08:00:00 CST 2019
       */
      public static Date parseBinlogString2Date(String dateString) {
          try {
              DateFormat dateFormat = new SimpleDateFormat(
                      "EEE MMM dd HH:mm:ss zzz yyyy",
                      Locale.US
              );
              return DateUtils.addHours(dateFormat.parse(dateString), -8);
    
          } catch (ParseException ex) {
              log.error("parseString2Date error:{}", dateString);
              return null;
          }
      }
    

    因为我们在定义索引的时候,是根据表之间的层级关系(Level)来设定的,根据代码规范,不允许出现Magic Number, 因此我们定义一个数据层级枚举,来表达数据层级。

    /**
     * AdDataLevel for 广告数据层级
     *
     * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
     */
    @Getter
    public enum AdDataLevel {
    
        LEVEL2("2", "level 2"),
        LEVEL3("3", "level 3"),
        LEVEL4("4", "level 4");
    
        private String level;
        private String desc;
    
        AdDataLevel(String level, String desc) {
            this.level = level;
            this.desc = desc;
        }
    }
    
    实现数据投递

    因为增量数据可以投递到不同的位置以及用途,我们之前实现了一个投递接口com.sxzhongf.ad.sender.ISender,接下来我们实现一个投递类:

    @Slf4j
    @Component("indexSender")
    public class IndexSender implements ISender {
    
        /**
         * 根据广告级别,投递Binlog数据
         */
        @Override
        public void sender(MysqlRowData rowData) {
            if (AdDataLevel.LEVEL2.getLevel().equals(rowData.getLevel())) {
                Level2RowData(rowData);
            } else if (AdDataLevel.LEVEL3.getLevel().equals(rowData.getLevel())) {
                Level3RowData(rowData);
            } else if (AdDataLevel.LEVEL4.getLevel().equals(rowData.getLevel())) {
                Level4RowData(rowData);
            } else {
                log.error("Binlog MysqlRowData error: {}", JSON.toJSONString(rowData));
            }
        }
    
        private void Level2RowData(MysqlRowData rowData) {
    
            if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) {
                List<AdPlanTable> planTables = new ArrayList<>();
    
                for (Map<String, String> fieldValueMap : rowData.getFieldValueMap()) {
                    AdPlanTable planTable = new AdPlanTable();
                    //Map的第二种循环方式
                    fieldValueMap.forEach((k, v) -> {
                        switch (k) {
                            case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID:
                                planTable.setPlanId(Long.valueOf(v));
                                break;
                            case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID:
                                planTable.setUserId(Long.valueOf(v));
                                break;
                            case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS:
                                planTable.setPlanStatus(Integer.valueOf(v));
                                break;
                            case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE:
                                planTable.setStartDate(CommonUtils.parseBinlogString2Date(v));
                                break;
                            case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE:
                                planTable.setEndDate(CommonUtils.parseBinlogString2Date(v));
                                break;
                        }
                    });
                    planTables.add(planTable);
                }
    
                //投递推广计划
                planTables.forEach(p -> AdLevelDataHandler.handleLevel2Index(p, rowData.getOperationTypeEnum()));
            } else if (rowData.getTableName().equals(Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) {
                List<AdCreativeTable> creativeTables = new LinkedList<>();
    
                rowData.getFieldValueMap().forEach(afterMap -> {
                    AdCreativeTable creativeTable = new AdCreativeTable();
                    afterMap.forEach((k, v) -> {
                        switch (k) {
                            case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID:
                                creativeTable.setAdId(Long.valueOf(v));
                                break;
                            case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE:
                                creativeTable.setType(Integer.valueOf(v));
                                break;
                            case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE:
                                creativeTable.setMaterialType(Integer.valueOf(v));
                                break;
                            case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT:
                                creativeTable.setHeight(Integer.valueOf(v));
                                break;
                            case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH:
                                creativeTable.setWidth(Integer.valueOf(v));
                                break;
                            case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS:
                                creativeTable.setAuditStatus(Integer.valueOf(v));
                                break;
                            case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL:
                                creativeTable.setAdUrl(v);
                                break;
                        }
                    });
                    creativeTables.add(creativeTable);
                });
    
                //投递广告创意
                creativeTables.forEach(c -> AdLevelDataHandler.handleLevel2Index(c, rowData.getOperationTypeEnum()));
            }
        }
    
        private void Level3RowData(MysqlRowData rowData) {
           ...
        }
    
        /**
         * 处理4级广告
         */
        private void Level4RowData(MysqlRowData rowData) {
            ...
        }
    }
    
    投放增量数据到MQ(kafka)

    为了我们的数据投放更加灵活,方便数据统计,分析等系统的需求,我们来实现一个投放到消息中的接口,其他服务可以订阅当前MQ 的TOPIC来实现数据订阅。

    配置文件中配置TOPIC
    adconf:
      kafka:
        topic: ad-search-mysql-data
    
    --------------------------------------
    /**
     * KafkaSender for 投递Binlog增量数据到kafka消息队列
     *
     * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
     * @since 2019/7/1
     */
    @Component(value = "kafkaSender")
    public class KafkaSender implements ISender {
    
        @Value("${adconf.kafka.topic}")
        private String topic;
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        /**
         * 发送数据到kafka队列
         */
        @Override
        public void sender(MysqlRowData rowData) {
            kafkaTemplate.send(
                    topic, JSON.toJSONString(rowData)
            );
        }
    
        /**
         * 测试消费kafka消息
         */
        @KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search")
        public void processMysqlRowData(ConsumerRecord<?, ?> record) {
            Optional<?> kafkaMsg = Optional.ofNullable(record.value());
            if (kafkaMsg.isPresent()) {
                Object message = kafkaMsg.get();
                MysqlRowData rowData = JSON.parseObject(
                        message.toString(),
                        MysqlRowData.class
                );
                System.out.println("kafka process MysqlRowData: " + JSON.toJSONString(rowData));
                //sender.sender();
            }
    
        }
    }
    
  • 相关阅读:
    selenium1-安装
    jmeter9-图形监控
    InetAddress.getLocalHost().getHostAddress() 在macOS里面反应很慢
    自定义实现简易定时任务
    Redis性能监控
    Linux启动进程、线程数量查看及修改方式
    jmeter处理request和response
    centos安装netdata
    scp使用expect自动输入密码
    Linux系统下生成TPS,ResponseTime曲线图
  • 原文地址:https://www.cnblogs.com/zhangpan1244/p/11333229.html
Copyright © 2011-2022 走看看