zoukankan      html  css  js  c++  java
  • canal脚手架升级

    前情

    原来写过一个canal的脚手架,但是没有使用MQ
    Canal现在已经支持了直接发送到MQ,所以又写了一个脚手架

    过程

    我们需要监控的只是数据库的update、delete和insert方法,所以这里抽象出来一个接口

    public interface BaseService {
        void update(FlatMessage message);
    
        void delete(FlatMessage message);
    
        void insert(FlatMessage message);
    }
    

    接着由于不同的表处理逻辑不同,所以需要指定每一个实现类处理的是哪张表,这里采用注解的方式

    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Inherited
    @Documented
    public @interface Table {
        
        String value();
    }
    

    在实现类中只需要添加注解和实现接口即可

    @Service
    @Table("goods_info")
    public class goodsInfoServiceImpl implements BaseService {
        @Override
        public void update(FlatMessage message) {
    
        }
    
        @Override
        public void delete(FlatMessage message) {
    
        }
    
        @Override
        public void insert(FlatMessage message) {
    
        }
    }
    

    这里的实体类是官方提供的,下面是代码

    @Data
    public class FlatMessage implements Serializable {
    
        private static final long serialVersionUID = -3386650678735860050L;
    
        private long id;
        private String database;
        private String table;
        private List<String> pkNames;
        private Boolean isDdl;
        private String type;
        // binlog executeTime
        private Long es;
        // dml build timeStamp
        private Long ts;
        private String sql;
        private Map<String, Integer> sqlType;
        private Map<String, String> mysqlType;
        private List<Map<String, String>> data;
        private List<Map<String, String>> old;
    }
    

    接着我们需要在spring启动时获取所有实现的类,并建立表和类的对应关系,同时接收到消息后进行消费
    由于脚手架使用的是RocketMQ,公司使用的是Kafka这里抽象出一个类

    @Service
    public class BaseConsumer implements CommandLineRunner {
    
        @Autowired
        private ApplicationContext applicationContext;
    
        private Map<String, BaseService> baseServiceMap;
    
    
        /**
         * 启动时获取所有baseService实现类
         * 获取类上的注解,得到此类处理的表
         * 存储到baseServiceMap
         *
         * @param args 命令行参数
         * @throws Exception 父类异常
         */
        @Override
        public void run(String... args) throws Exception {
            Map<String, BaseService> beansOfType = applicationContext.getBeansOfType(BaseService.class);
            baseServiceMap = new HashMap<>();
            for (BaseService classObject : beansOfType.values()) {
                Table annotation = classObject.getClass().getAnnotation(Table.class);
                if (annotation != null) {
                    baseServiceMap.put(annotation.value().toLowerCase(), classObject);
                }
            }
        }
    
    
        /**
         * 接收到消息后,获取对应的表并执行
         * @param message 接收到的消息
         */
        public void doConsumption(FlatMessage message) {
            String table = message.getTable().toLowerCase();
            if (baseServiceMap.containsKey(table)) {
                BaseService baseService = baseServiceMap.get(table);
                String type = message.getType().toLowerCase();
                switch (type) {
                    case "update":
                        baseService.update(message);
                        break;
                    case "delete":
                        baseService.delete(message);
                        break;
                    case "insert":
                        baseService.insert(message);
                        break;
                }
            }
        }
    }
    

    接着实现RocketMQ的消费者

    @Service //由于rocketMQ 没有配置是否开启的开关,如果使用rocketMQ则添加此类到IOC中
    @RocketMQMessageListener(topic = "goods_goods_info", consumerGroup = "canal-Group")
    @Slf4j
    public class RocketMQConsumer implements RocketMQListener<FlatMessage> {
    
        @Autowired
        private BaseConsumer baseConsumer;
    
        /**
         * 接收到消息进行处理
         *
         * @param message canal监控到的消息 json格式
         */
        @Override
        public void onMessage(FlatMessage message) {
            log.info("canal监控到消息{}", message);
            baseConsumer.doConsumption(message);
        }
    }
    

    接着启动即可

    GitHub连接
    [Github]: https://github.com/ingxx/canal-starter

  • 相关阅读:
    poj 1743 Musical Theme 后缀数组
    poj 1743 Musical Theme 后缀数组
    cf 432D Prefixes and Suffixes kmp
    cf 432D Prefixes and Suffixes kmp
    hdu Data Structure? 线段树
    关于position和anchorPoint之间的关系
    ios POST 信息
    CALayers的代码示例
    CALayers详解
    ios中得sqlite使用基础
  • 原文地址:https://www.cnblogs.com/ingxx/p/14240111.html
Copyright © 2011-2022 走看看