zoukankan      html  css  js  c++  java
  • Canal订阅binlog变更并结合kafka实现消息缓冲

    阿里Canal项目请先了解:canal

    考虑可能binlog大批量变更,如果直接通过Canal订阅binlog变动,会造成CanalClient会瞬间爆掉。为了解决这个问题,我们可以引入kafka做一层封装,可以解决这个问题。

    公司实现一套框架,拿来分享大家。感谢原作者.

    1. 服务端-封装Canal订阅binlog消息并推送到kafka

    binlogService server 启动端:
    import java.util.concurrent.Executors
    
    import com.today.data.transfer.UTIL._
    import com.today.data.transfer.canal.CanalClient
    import com.today.data.transfer.kafka.BinlogKafkaProducer
    import com.today.data.transfer.util.SysEnvUtil
    import com.typesafe.config.ConfigFactory
    import org.slf4j.LoggerFactory
    
    /**
      *
      * 描述: binlogService server 启动端
      *
      * @author hz.lei
      * @since 2018年03月07日 上午1:08
      */
    object BinLogServer {
      val logger = LoggerFactory.getLogger(getClass)
    
      def main(args: Array[String]) {
        startServer()
      }
    
      /**
        * 以Java 环境变量模式启动
        */
      def startServer(): Unit = {
        logger.info(s"启动服务 binlogServer...")
    
        val producerBrokerHost = SysEnvUtil.CANAL_KAFKA_HOST
        val topic = SysEnvUtil.CANAL_KAFKA_TOPIC
    
        val canalServerIp = SysEnvUtil.CANAL_SERVER_IP
        val canalServerPort = SysEnvUtil.CANAL_SERVER_PORT.toInt
    
        val destination = SysEnvUtil.CANAL_DESTINATION
        val username = SysEnvUtil.CANAL_USERNAME
        val password = SysEnvUtil.CANAL_PASSWORD
    
        val kafkaProducer = new BinlogKafkaProducer(producerBrokerHost, topic)
        kafkaProducer.init()
    
    
        val canalClient = new CanalClient(canalServerIp, canalServerPort, destination, username, password);
        canalClient.registerBinlogListener(kafkaProducer)
    
        val executorService = Executors.newFixedThreadPool(1)
    
        executorService.execute(canalClient)
    
        logger.info("启动服务 binlogService 成功...")
    
    
      }
    
      def startServerWithScala(): Unit = {
        logger.info(s"启动服务 binlogServer...")
    
        val config = ConfigFactory.load()
    
        val producerBrokerHost = config.getStringProxy("kafka.producerBrokerHost")
        val topic = config.getStringProxy("kafka.topic")
    
        val canalServerIp = config.getStringProxy("canal.canalServerIp")
        val canalServerPort = config.getStringProxy("canal.canalServerPort").toInt
        val destination = config.getStringProxy("canal.destination")
        val username = config.getStringProxy("canal.username")
        val password = config.getStringProxy("canal.password")
    
        val kafkaProducer = new BinlogKafkaProducer(producerBrokerHost, topic)
        kafkaProducer.init()
    
    
        val canalClient = new CanalClient(canalServerIp, canalServerPort, destination, username, password);
        canalClient.registerBinlogListener(kafkaProducer)
    
        val executorService = Executors.newFixedThreadPool(1)
    
        executorService.execute(canalClient)
    
        logger.info("启动服务 binlogService 成功...")
      }
    
    }
    将收到的cannal 消息 发送到kafka:
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.today.data.transfer.listener.CanalBinaryListener;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.ByteArraySerializer;
    import org.apache.kafka.common.serialization.IntegerSerializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Properties;
    
    /**
     * 描述: 将收到的cannal 消息 发送到kafka
     *
     * @author hz.lei
     * @date 2018年03月07日 上午12:44
     */
    public class BinlogKafkaProducer implements CanalBinaryListener {
        private static Logger logger = LoggerFactory.getLogger(BinlogKafkaProducer.class);
        private String topic;
        private String host;
    
        protected Producer<Integer, byte[]> producer;
    
        public BinlogKafkaProducer(String kafkaHost, String topic) {
            this.topic = topic;
            this.host = kafkaHost;
        }
    
        public void init() {
            logger.info("[KafkaStringProducer] [init] " +
                    ") broker-list(" + host + " )");
    
            Properties properties = KafkaConfigBuilder.defaultProducer().bootstrapServers(host)
                    .withKeySerializer(IntegerSerializer.class)
                    .withValueSerializer(ByteArraySerializer.class)
                    .build();
    
            producer = new KafkaProducer<>(properties);
        }
    
        /**
         * 异步回调模式发送消息
         *
         * @param topic
         * @param message
         */
        public void send(String topic, byte[] message) {
            producer.send(new ProducerRecord<>(topic, message), (metadata, e) -> {
                if (e != null) {
                    logger.error("[" + getClass().getSimpleName() + "]: 消息发送失败,cause: " + e.getMessage(), e);
                }
                logger.info("[binlog]:消息发送成功,topic:{}, offset:{}, partition:{}, time:{}",
                        metadata.topic(), metadata.offset(), metadata.partition(), metadata.timestamp());
    
            });
        }
    
    
        @Override
        public void onBinlog(CanalEntry.Entry entry) {
            send(topic, entry.toByteArray());
        }
    }
    Canal 客户端,监听处理逻辑:
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry.Column;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
    import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
    import com.alibaba.otter.canal.protocol.CanalEntry.Header;
    import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
    import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
    import com.alibaba.otter.canal.protocol.Message;
    import com.today.data.transfer.listener.CanalBinaryListener;
    import com.today.data.transfer.listener.CanalGsonListener;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.net.InetSocketAddress;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    
    /**
     * 描述: Canal 客户端,监听处理 逻辑
     *
     * @author hz.lei
     * @date 2018年03月06日 下午8:21
     */
    public class CanalClient implements Runnable {
        private static final Logger logger = LoggerFactory.getLogger(CanalClient.class);
    
        private String hostname;
        private int port;
        private String destination;
        private String username;
        private String password;
    
    
        private CanalConnector connector;
    
        private final static int BatchSize = 1000;
        private final static long Sleep = 1000;
        private boolean runing = false;
    
        private List<CanalGsonListener> gsonListeners = new ArrayList<>();
        private List<CanalBinaryListener> binaryListeners = new ArrayList<>();
    
    
        /**
         * 构造函数
         *
         * @param hostname    canal服务端的ip
         * @param port        canal服务端 port
         * @param destination canal 实例地址
         * @param username    canal用户名
         * @param password    canal密码
         */
        public CanalClient(String hostname, int port, String destination, String username, String password) {
            this.hostname = hostname;
            this.port = port;
            this.destination = destination;
            this.username = username;
            this.password = password;
            init();
        }
    
        public void init() {
            try {
                logger.info(new StringBuffer("[Canal实例信息 CanalClient] [start] ")
                        .append("hostname: (").append(hostname)
                        .append("), port: (").append(port)
                        .append("), destination: (").append(destination)
                        .append("), username: (").append(username)
                        .append("), password: (").append(password).append(")").toString());
    
                connector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, username, password);
    
                connector.connect();
                connector.subscribe(".*\..*");
            } catch (Exception e) {
                logger.error("[CanalClient] [init] " + e.getMessage(), e);
            }
        }
    
    
        public void registerBinlogListener(CanalBinaryListener listener) {
            if (listener != null) {
                binaryListeners.add(listener);
            }
        }
    
        public void unregisterBinlogListener(CanalBinaryListener listener) {
            if (listener != null) {
                binaryListeners.remove(listener);
            }
        }
    
        @Override
        public void run() {
    
            logger.info("[CanalClient] [run] ");
    
            runing = true;
    
            work();
        }
    
        /**
         * 处理工作 work
         */
        private void work() {
    
            try {
                while (runing) {
    
                    Message message = connector.getWithoutAck(BatchSize);
    
                    long batchId = message.getId();
                    int size = message.getEntries().size();
    
                    if (batchId == -1 || size == 0) {
                        try {
                            Thread.sleep(Sleep);
                        } catch (InterruptedException e) {
                            logger.error(e.getMessage(), e);
                        }
    
                    } else {
                        if(logger.isDebugEnabled()) {
                            logger.debug("读取binlog日志 batchId: {}, size: {}, name: {}, offsets:{}", batchId, size,
                                    message.getEntries().get(0).getHeader().getLogfileName(),
                                    message.getEntries().get(0).getHeader().getLogfileOffset());
                        }
                        //处理消息
                        process(message.getEntries());
                    }
                    // 提交确认
                    connector.ack(batchId);
                }
    
            } catch (Exception e) {
                connector.disconnect();
                logger.error("[CanalClient] [run] " + e.getMessage(), e);
            } finally {
                reconnect();
            }
        }
    
        /**
         * 重连策略
         */
        private void reconnect() {
            logger.info("[CanalClient reconnect] 重新连接 ...");
    
            runing = false;
    
            while (!runing) {
                try {
                    connector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, username, password);
                    connector.connect();
                    connector.subscribe(".*\..*");
                    connector.rollback();
    
                    runing = true;
                } catch (Exception e) {
                    connector.disconnect();
                    logger.error("[CanalClient] [reconnect] " + e.getMessage(), e);
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e1) {
                        logger.error(e1.getMessage(), e1);
                    }
                }
            }
            logger.info("[CanalClient reconnect] 重新连接成功!");
            work();
        }
    
    
        private void process(List<Entry> entries) {
            try {
                for (Entry entry : entries) {
                    if(logger.isDebugEnabled()){
                        logger.debug("mysql binlog : " + entry.getHeader().getLogfileName() + "=>" + entry.getHeader().getLogfileOffset());
                    }
                    /**
                     * 忽略 事务开启 、结束 ,query 的 binlog 内容
                     */
                    if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND || entry.getHeader().getEventType() == EventType.QUERY) {
                        continue;
                    }
                    logger.info("解析偏移量:" + entry.getHeader().getLogfileName() + "=>" + entry.getHeader().getLogfileOffset() + " ," +
                            "操作表[" + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + "]," +
                            "变更类型[" + entry.getHeader().getEventType() + "]," +
                            "执行时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(entry.getHeader().getExecuteTime())));
    
    
                    RowChange rowChange;
                    try {
                        rowChange = RowChange.parseFrom(entry.getStoreValue());
                    } catch (Exception e) {
                        logger.error("[CanalClient] [process] 解析RowChange事件错误: " + e.getMessage(), entry.toString());
                        continue;
                    }
    
                    log(entry.getHeader(), rowChange);
    
                    if (gsonListeners.size() > 0) {
                        GsonEntry binlog = new GsonEntry(entry.getHeader(), rowChange);
    
                        for (CanalGsonListener listener : gsonListeners) {
                            listener.onBinlog(binlog);
                        }
                    }
    
                    if (binaryListeners.size() > 0) {
                        for (CanalBinaryListener listener : binaryListeners) {
                            listener.onBinlog(entry);
                        }
                    }
    
                }
            } catch (Exception e) {
                logger.error("[CanalClient] [process] " + e.getMessage(), e);
            }
    
        }
    
        private void log(Header header, RowChange rowChange) {
            EventType eventType = rowChange.getEventType();
    
            if(logger.isDebugEnabled()){
                logger.debug(String.format("binlog[%s:%s], name[%s,%s], eventType : %s",
                        header.getLogfileName(), header.getLogfileOffset(),
                        header.getSchemaName(), header.getTableName(),
                        eventType));
            }
    
            for (RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    log(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    log(rowData.getAfterColumnsList());
                } else {
                    log(rowData.getBeforeColumnsList());
                    log(rowData.getAfterColumnsList());
                }
            }
        }
    
        private void log(List<Column> columns) {
            for (Column column : columns) {
                if(logger.isDebugEnabled()){
                    logger.debug(new StringBuffer()
                            .append(column.getName()).append(" = ").append(column.getValue())
                            .append(" update[").append(column.getUpdated()).append("]").toString());
                }
            }
        }
    }

    kafka消息实体定义:

    import java.sql.Timestamp
    
    import com.alibaba.otter.canal.protocol.CanalEntry.EventType
    
    /**
      * desc: BinlogEvent bean
      *
      * @author hz.lei  2018年03月07日 下午3:43
      */
    case class BinlogEvent(schema: String, tableName: String, eventType: EventType, timestamp: Timestamp, before: String, after: String)

    2. 客户端-订阅kafka消息获取想要的binlog变更

    binlog监听类:

    import com.today.binlog.BinlogEvent
    import com.today.eventbus.annotation.{BinlogListener, KafkaConsumer}
    import com.today.service.binlog.action._
    import org.springframework.transaction.annotation.Transactional
    
    import scala.collection.JavaConverters._
    
    /**
      *
      * 描述: binlog 监听类
      *
      * @author hz.lei
      * @since 2018年03月08日 下午7:18
      */
    @KafkaConsumer(groupId = "GOODS_0.0.1_EVENT", topic = "Binlog")
    @Transactional(readOnly = true)
    class GoodsBinlogListener {
      @BinlogListener
      def onBinlog(event: java.util.List[BinlogEvent]): Unit = {
        event.asScala.foreach(new GoodsOnBinlogAction(_).action())
      }
    }

    注意必须添加如下两个注解:

    @KafkaConsumer(groupId = "GOODS_0.0.1_EVENT", topic = "Binlog")
    @BinlogListener

    添加注解扫描类:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:soa="http://soa-springtag.dapeng.com/schema/service"
           xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://soa-springtag.dapeng.com/schema/service
            http://soa-springtag.dapeng.com/schema/service/service.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
        <bean id="postProcessor" class="com.today.eventbus.spring.MsgAnnotationBeanPostProcessor"/>
    </beans>

    注解扫描类定义:

    import com.today.eventbus.ConsumerEndpoint;
    import com.today.eventbus.annotation.BinlogListener;
    import com.today.eventbus.annotation.KafkaConsumer;
    import com.today.eventbus.annotation.KafkaListener;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.aop.framework.Advised;
    import org.springframework.aop.support.AopUtils;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.BeanFactory;
    import org.springframework.beans.factory.BeanFactoryAware;
    import org.springframework.beans.factory.SmartInitializingSingleton;
    import org.springframework.beans.factory.config.BeanPostProcessor;
    import org.springframework.beans.factory.support.BeanDefinitionBuilder;
    import org.springframework.beans.factory.support.DefaultListableBeanFactory;
    import org.springframework.core.MethodIntrospector;
    import org.springframework.core.Ordered;
    import org.springframework.core.annotation.AnnotationUtils;
    import org.springframework.util.ReflectionUtils;
    import com.today.eventbus.utils.Constant;
    
    import java.lang.reflect.Method;
    import java.util.*;
    
    
    /**
     * 描述: MsgAnnotationBeanPostProcessor bean 后处理器,扫描自定义注解 @KafkaListener
     *
     * @author hz.lei
     * @see KafkaListenerRegistrar,BeanFactory,BeanPostProcessor,SmartInitializingSingleton
     * @since 2018年03月01日 下午9:36
     */
    public class MsgAnnotationBeanPostProcessor implements BeanPostProcessor, BeanFactoryAware, Ordered, SmartInitializingSingleton {
        /**
         * logger
         */
        private final Logger logger = LoggerFactory.getLogger(getClass());
        /**
         * hold beanFactory ,real impl is {@link DefaultListableBeanFactory}
         * for create bean dynamically, bean {@link KafkaListenerRegistrar}
         */
        private BeanFactory beanFactory;
        /**
         * 处理 kafka 消费者 的注册与创建
         */
        private KafkaListenerRegistrar registrar;
    
    
        /**
         * beanFactory 回调,让bean持有容器的引用
         *
         * @param beanFactory
         * @throws BeansException
         */
        @Override
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            this.beanFactory = beanFactory;
            createKafkaRegistryBean();
    
        }
    
        /**
         * 动态创建bean KafkaListenerRegistrar
         */
        private void createKafkaRegistryBean() {
            // 获取bean工厂并转换为DefaultListableBeanFactory
            DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;
            // 通过BeanDefinitionBuilder创建bean定义
            BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(KafkaListenerRegistrar.class);
            // 注册bean
            defaultListableBeanFactory.registerBeanDefinition(Constant.KAFKA_LISTENER_REGISTRAR_BEAN_NAME, beanDefinitionBuilder.getRawBeanDefinition());
    
            this.registrar = (KafkaListenerRegistrar) beanFactory.getBean(Constant.KAFKA_LISTENER_REGISTRAR_BEAN_NAME);
        }
    
        /**
         * 所有单例 bean 初始化完成后,调用此方法
         */
        @Override
        public void afterSingletonsInstantiated() {
            this.registrar.afterPropertiesSet();
        }
    
    
        /**
         * 实例化及依赖注入完成后、在任何初始化代码(比如配置文件中的init-method)调用之前调用
         *
         * @param bean
         * @param beanName
         * @return
         * @throws BeansException
         */
        @Override
        public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
    
            return bean;
        }
    
        /**
         * 实例化及依赖注入完成后、在任何初始化代码(比如配置文件中的init-method)调用之后调用
         *
         * @param bean
         * @param beanName
         * @return
         * @throws BeansException
         */
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            logger.debug("access to postProcessAfterInitialization bean {}, beanName {}", bean, beanName);
    
            Class<?> targetClass = AopUtils.getTargetClass(bean);
            //获取类上是否有注解 @KafkaConsumer
            Optional<KafkaConsumer> kafkaConsumer = findListenerAnnotations(targetClass);
            //类上是否有注解
            final boolean hasKafkaConsumer = kafkaConsumer.isPresent();
    
            if (hasKafkaConsumer) {
                //方法列表 ,查找方法上标有 @KafkaListener 的注解
                Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                        (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
                            Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
                            return (!listenerMethods.isEmpty() ? listenerMethods : null);
                        });
    
                //查找方法上标有 @BinlogListener 的注解
                Map<Method, Set<BinlogListener>> binlogMethods = MethodIntrospector.selectMethods(targetClass,
                        (MethodIntrospector.MetadataLookup<Set<BinlogListener>>) method -> {
                            Set<BinlogListener> listenerMethods = findBinlogListenerAnnotations(method);
                            return (!listenerMethods.isEmpty() ? listenerMethods : null);
                        });
    
                if (annotatedMethods.isEmpty() && binlogMethods.isEmpty()) {
                    throw new IllegalArgumentException("@KafkaConsumer found on class type , " +
                            "but no @KafkaListener or @BinlogListener found on the method ,please set it on the method");
                }
    
                if (!annotatedMethods.isEmpty() && !binlogMethods.isEmpty()) {
                    throw new IllegalArgumentException("@KafkaListener or @BinlogListener only one could on the same  bean class");
                }
    
                if (!annotatedMethods.isEmpty()) {
                    // Non-empty set of methods
                    for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
                        Method method = entry.getKey();
                        for (KafkaListener listener : entry.getValue()) {
                            // process annotation information
                            processKafkaListener(kafkaConsumer.get(), listener, method, bean, beanName);
                        }
                    }
                    logger.info("there are {} methods have @KafkaListener on This bean ", binlogMethods.size());
                }
    
                if (!binlogMethods.isEmpty()) {
                    // Non-empty set of methods
                    for (Map.Entry<Method, Set<BinlogListener>> entry : binlogMethods.entrySet()) {
                        Method method = entry.getKey();
                        // process annotation information
                        processBinlogListener(kafkaConsumer.get(), method, bean);
                    }
                    logger.info("there are {} methods have @BinlogListener on This bean ", binlogMethods.size());
                }
    
    
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
                            + beanName + "': " + annotatedMethods);
                }
            } else {
                this.logger.info("No @KafkaConsumer annotations found on bean type: " + bean.getClass());
            }
            return bean;
        }
    
    
        /**
         * 扫描 bean 类上 是否有注解 @KafkaConsumer,只有有此注解才说明 是kafka message 消费者
         */
        private Optional<KafkaConsumer> findListenerAnnotations(Class<?> clazz) {
            KafkaConsumer ann = AnnotationUtils.findAnnotation(clazz, KafkaConsumer.class);
            return Optional.ofNullable(ann);
        }
    
        /**
         * 扫描bean 方法上 是否有注解 @KafkaListener
         *
         * @param method
         * @return
         */
        private Set<KafkaListener> findListenerAnnotations(Method method) {
            Set<KafkaListener> listeners = new HashSet<>();
            KafkaListener ann = AnnotationUtils.findAnnotation(method, KafkaListener.class);
            if (ann != null) {
                listeners.add(ann);
            }
    
            return listeners;
        }
    
    
        /**
         * 扫描bean 方法上 是否有注解 @BinlogListener
         *
         * @param method
         * @return
         */
        private Set<BinlogListener> findBinlogListenerAnnotations(Method method) {
            Set<BinlogListener> listeners = new HashSet<>();
            BinlogListener ann = AnnotationUtils.findAnnotation(method, BinlogListener.class);
            if (ann != null) {
                listeners.add(ann);
            }
    
            return listeners;
        }
    
    
        /**
         * 处理有 @KafkaListener 注解的 方法上注解元信息,封装成 consumerEndpoint,注册
         *
         * @param consumer
         * @param listener
         * @param method
         * @param bean
         * @param beanName
         */
        protected void processKafkaListener(KafkaConsumer consumer, KafkaListener listener, Method method, Object bean, String beanName) {
            Method methodToUse = checkProxy(method, bean);
            ConsumerEndpoint endpoint = new ConsumerEndpoint();
            endpoint.setMethod(methodToUse);
            endpoint.setBean(bean);
            endpoint.setParameterTypes(Arrays.asList(method.getParameterTypes()));
            // class annotation information
            endpoint.setGroupId(consumer.groupId());
            endpoint.setTopic(consumer.topic());
            endpoint.setKafkaHostKey(consumer.kafkaHostKey());
            // method annotation information
            endpoint.setSerializer(listener.serializer());
            //session timeout
            if (consumer.sessionTimeout() < Constant.DEFAULT_SESSION_TIMEOUT) {
                throw new RuntimeException("抛出该异常原因为: kafkaConsumer session 超时时间设置太小 ,请设置至少为 10000L 以上,单位为 ms(毫秒)");
            }
            endpoint.setTimeout(consumer.sessionTimeout());
    
            this.registrar.registerEndpoint(endpoint);
        }
    
    
        private void processBinlogListener(KafkaConsumer consumer, Method method, Object bean) {
            Method methodToUse = checkProxy(method, bean);
    
            ConsumerEndpoint endpoint = new ConsumerEndpoint();
            endpoint.setMethod(methodToUse);
            endpoint.setBean(bean);
            // class annotation information
            endpoint.setGroupId(consumer.groupId());
            endpoint.setTopic(consumer.topic());
            endpoint.setKafkaHostKey(consumer.kafkaHostKey());
            // method annotation information
            endpoint.setBinlog(true);
    
            this.registrar.registerEndpoint(endpoint);
    
    
        }
    
        /**
         * 获取目标方法,如果是代理的,获得其目标方法
         *
         * @param methodArg
         * @param bean
         * @return
         */
        private Method checkProxy(Method methodArg, Object bean) {
            Method method = methodArg;
            if (AopUtils.isJdkDynamicProxy(bean)) {
                try {
                    // Found a @KafkaListener method on the target class for this JDK proxy ->
                    // is it also present on the proxy itself?
                    method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
                    Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
                    for (Class<?> iface : proxiedInterfaces) {
                        try {
                            method = iface.getMethod(method.getName(), method.getParameterTypes());
                            break;
                        } catch (NoSuchMethodException noMethod) {
                        }
                    }
                } catch (SecurityException ex) {
                    ReflectionUtils.handleReflectionException(ex);
                } catch (NoSuchMethodException ex) {
                    throw new IllegalStateException(String.format(
                            "@KafkaListener method '%s' found on bean target class '%s', " +
                                    "but not found in any interface(s) for bean JDK proxy. Either " +
                                    "pull the method up to an interface or switch to subclass (CGLIB) " +
                                    "proxies by setting proxy-target-class/proxyTargetClass " +
                                    "attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName()), ex);
                }
            }
            return method;
        }
    
        @Override
        public int getOrder() {
            return LOWEST_PRECEDENCE;
        }
    
    
    }


    处理 binlog 缓存监听事件:
    import com.github.dapeng.core.SoaException;
    import com.today.eventbus.common.MsgConsumer;
    import com.today.eventbus.common.retry.BinlogRetryStrategy;
    import com.today.eventbus.ConsumerEndpoint;
    import com.today.eventbus.config.KafkaConfigBuilder;
    import com.today.eventbus.serializer.KafkaIntDeserializer;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    
    import java.lang.reflect.InvocationTargetException;
    import java.util.List;
    import java.util.Properties;
    import java.util.stream.Collectors;
    
    /**
     * 描述: 处理 binlog 缓存 监听 事件
     *
     * @author hz.lei
     * @since 2018年03月07日 上午1:42
     */
    public class BinlogKafkaConsumer extends MsgConsumer<Integer, byte[], ConsumerEndpoint> {
    
    
        /**
         * @param kafkaHost host1:port1,host2:port2,...
         * @param groupId
         * @param topic
         */
        public BinlogKafkaConsumer(String kafkaHost, String groupId, String topic) {
            super(kafkaHost, groupId, topic);
        }
    
        @Override
        protected void init() {
            logger.info("[KafkaConsumer] [init] " +
                    "kafkaConnect(" + kafkaConnect +
                    ") groupId(" + groupId +
                    ") topic(" + topic + ")");
    
            KafkaConfigBuilder.ConsumerConfiguration builder = KafkaConfigBuilder.defaultConsumer();
    
            final Properties props = builder.bootstrapServers(kafkaConnect)
                    .group(groupId)
                    .withKeyDeserializer(KafkaIntDeserializer.class)
                    .withValueDeserializer(ByteArrayDeserializer.class)
                    .withOffsetCommitted(false)
                    .excludeInternalTopic(false)
                    .maxPollSize("100")
                    .build();
    
            consumer = new KafkaConsumer<>(props);
        }
    
        @Override
        protected void buildRetryStrategy() {
            retryStrategy = new BinlogRetryStrategy();
        }
    
    
        @Override
        protected void dealMessage(ConsumerEndpoint consumer, byte[] value, Integer keyId) throws SoaException {
            List<BinlogEvent> binlogEvents = BinlogMsgProcessor.process(value);
            // > 0 才处理
            if (binlogEvents.size() > 0) {
                try {
                    consumer.getMethod().invoke(consumer.getBean(), binlogEvents);
                } catch (IllegalAccessException e) {
                    logger.error("BinlogConsumer::实例化@BinlogListener 注解的方法 出错", e);
                } catch (InvocationTargetException e) {
                    throwRealException(e, consumer.getMethod().getName());
                }
    
                logger.info("BinlogConsumer::[dealMessage(id: {})] end, method: {}, groupId: {}, topic: {}, bean: {}",
                        keyId, consumer.getMethod().getName(), groupId, topic, consumer.getBean());
            }
        }
    }

    匹配binlog变动,获取变更前后信息:

    import com.alibaba.otter.canal.protocol.CanalEntry
    import com.today.binlog.BinlogEvent
    import com.today.service.GoodsDataSource
    import com.today.service.binlog.bean.SkuBean
    import com.today.service.commons.cache.MemcacheProcessor
    import com.today.service.commons.cache.dto.RedisBean
    import redis.clients.jedis.JedisPool
    import spray.json._
    import wangzx.scala_commons.sql._
    import com.today.service.commons.help.BizHelp.withJedis
    import com.today.service.commons.help.SqlHelp
    import com.today.service.commons.util.DateTools
    import com.today.service.commons.`implicit`.Implicits._
    
    import scala.collection.JavaConverters._
    
    class GoodsOnBinlogAction(binlogEvent: BinlogEvent) extends MemcacheProcessor("goods_db_sku") {
      def action() = {
        logger.info(s"${getClass.getSimpleName} onBinlog ")
        logger.info(s"binlogEvent:$binlogEvent")
    
        binlogEvent match {
          case BinlogEvent("goods_db", "sku", CanalEntry.EventType.INSERT, timestamp, before,
          json"""{"id": $id,"sku_no":${skuNoJsValue}}"""
          ) => {
            logger.info(s"${getClass.getSimpleName} onInsert...")
            val skuNo = toStringValue(skuNoJsValue)
            val skuBean = reloadBySkuNo(skuNo)
            if (skuBean.isDefined) insertSortSet(skuBean.get)
    
          }
          case BinlogEvent("goods_db", "sku", CanalEntry.EventType.UPDATE, timestamp, json"""{"id": $beforeSkuId,"sku_no":${beforeSkuNoJsValue}}""",
          json"""{"id": $afterSkuId,"sku_no":${afterSkuNoJsValue}}"""
          ) => {
            logger.info(s"${getClass.getSimpleName} onUpdate...")
            val beforeSkuNo = toStringValue(beforeSkuNoJsValue)
            val afterSkuNo = toStringValue(afterSkuNoJsValue)
            if (!beforeSkuNo.equals(afterSkuNo)) {
              deleteSortSet(beforeSkuNo)
            }
            val skuBean = reloadBySkuNo(afterSkuNo)
            if (skuBean.isDefined) updateSortSet(skuBean.get.primaryKey, skuBean.get)
          }
          case BinlogEvent("goods_db", "sku", CanalEntry.EventType.DELETE, timestamp, json"""{"id": $id,"sku_no":${skuNo}}""", after) => {
            logger.info(s"${getClass.getSimpleName} onDelete...")
            deleteSortSet(toStringValue(skuNo))
          }case _ =>
        }
      }
    }
  • 相关阅读:
    py2exe
    Python库
    Python正则表达式指南
    [Python]日志模块logging的应用
    [Python]python __init__.py
    如何安装配置ulipad
    [Python]如何快速知道要使用哪些python模块和有哪些功能
    CodeIgniter
    Python 绝对简明手册
    理解Python命名机制
  • 原文地址:https://www.cnblogs.com/barrywxx/p/10850675.html
Copyright © 2011-2022 走看看