zoukankan      html  css  js  c++  java
  • 分布式事务

    1. 引言

      事务大家都知道,就是相当于一个原子操作,要么全部执行,要么发生异常全部回滚。但事务只限于本地事务,即各个数据库操作必须在同一数据库下执行。拿我最近的接手的项目来说,各个模块全部部署于不同的服务器,都有自己独立的数据库。前端想要删除一个用户,先调用用户平台的删除用户接口,再调用权限平台的删除权限接口。起初觉得这样操作没什么问题,后来有几次数据异常后,发现有的用户信息没有,但权限信息还存在,导致数据不一致。此时,就想到了用分布式事物来解决。所谓分布式事物,我个人理解是为了解决数据一致性的问题。

    2. kafka+本地事物表解决分布式事务

      消息队列的产生是为了解决各系统间通信问题,因为Kafka用的比较多,此处就想到用Kafka+本地事物表去解决分布式事务问题。关于Kafka+zookeeper的搭建此处不做详解。

      上图是自己基于Kafka+本地事物表实现的基本流程(图自己画的,可能不太清楚)代码后文贴出,(上图箭头只代表流程,和下文的1.2.3无关)此处讲一下自己的思路。先申明,kafka只能保证最终一致性,并不是强一致性。我们最终目的是保证上图2个蓝色方块的任务执行。方便说明,假定2个系统A,B 分别对应的2个数据库A库和B库。其中A库中的事务表叫做A事务表,B库中的事务表叫做B事务表。要执行的蓝色方块叫A业务和B业务。

      1. 在A系统中,启用A库的事物,执行如下2步操作。

        1)A系统执行A业务

        2)A系统在A库的A事物表中写一条状态为NEW的数据(此处数据的ID唯一)

        此处启用A库的事务,即2步操作要木全部执行,要木不执行。

      2. A系统中启用一个定时任务,5s中执行一次,轮训A库的A事物表,看是否有状态为NEW的数据,如果有,将此记录发送到Kafka消息队列中,并修改此条数据的状态为Published。此时A系统的操作全部执行完毕。

      3. B系统启用进程拉取kafka数据,如果发现有从A系统来的数据,将此数据记录到B系统的B事务表中,更新此数据在B系统的B事务表状态为NEW(因为ID唯一,此条数据的ID和存放在A库中的数据的ID相同,如果出现网络异常导致B系统重复收到数据,但看到自己库中已有此ID的数据,便会将重复消息弃用,此处是保证只执行一次),更新完成后,Kafka确认提交(此处要关闭Kafka的自动提交)

      4. B系统启用定时任务,5s执行一次,轮训B库的B事物表,看是否有状态为NEW的数据,如果有,执行如下2步操作。

        1)B系统执行B业务

        2)B系统更新B库的B事物表,将此条状态为New的数据改为状态为Published

        此处启用B库的事务,即2步操作要木全部执行,要木不执行。

    3. 实现代码 

      相对于Kafka来说,A系统相当于消息生产者,B系统相当于消息消费者。下面为SQL建表语句。

    -- A系统事务表 
    CREATE TABLE `kafka_event_publish` (
      `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
      `payload` varchar(2000) NOT NULL,
      `eventType` varchar(30) NOT NULL,
      `status` varchar(30) NOT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;
    
    -- B系统事务表
    CREATE TABLE `kafka_event_process` (
      `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
      `payload` varchar(2000) NOT NULL,
      `eventType` varchar(30) NOT NULL,
      `status` varchar(30) NOT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;

      Kafka用来发送消息,接收消息,下面为Kafka的配置类。

    package com.boot.util;
    
    // 消费者消息状态
    public enum EventProcessStatus {
        NEW,
        PROCESSED;
    
        private EventProcessStatus() {
        }
    }
    --------------------------------------
    package com.boot.util;
    
    // 生产者消息状态
    public enum EventPublishStatus {
        NEW,
        PUBLISHED;
    
        private EventPublishStatus() {
        }
    }
    ---------------------------------------
    package com.boot.util;
    
    // Kafka主题
    public enum EventType {
        USER_CREATED;
    
        private EventType() {
        }
    }
    package com.boot.util;
    
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.function.Consumer;
    import org.apache.kafka.clients.consumer.CommitFailedException;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    // kafka工具类
    public class KafkaUtil {
        private static Producer<String, String> producer;
        private static KafkaConsumer<String, String> consumer;
    
        public KafkaUtil() {
        }
        // Kafka发送消息,topic为主题,value为具体消息
        public static void sendSync(String topic, String value) throws ExecutionException, InterruptedException {
            producer.send(new ProducerRecord(topic, value)).get();
        }
        // Kafka接收消息
        public static void consume(Consumer<String> c) {
        // 订阅主题为USER_CREATED的消息
            consumer.subscribe(Arrays.asList(EventType.USER_CREATED.name()));
    
            while(true) {
                ConsumerRecords<String, String> records = consumer.poll(100L);
                Iterator var2 = records.iterator();
    
                while(var2.hasNext()) {
                    ConsumerRecord<String, String> record = (ConsumerRecord)var2.next();
                    System.out.println(record);
                    c.accept(record.value());
                }
    
                try {
                    consumer.commitSync();
                } catch (CommitFailedException var4) {
                    System.out.println("Kafka消费者提交offset失败");
                }
            }
        }
        // kafka基础配置
        static {
            Properties producerProps = new Properties();
            producerProps.put("bootstrap.servers", "10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092");
            producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer(producerProps);
            Properties consumerProps = new Properties();
            consumerProps.put("bootstrap.servers", "10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092");
            consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumerProps.put("group.id", "VoucherGroup");
            consumerProps.put("enable.auto.commit", "false");
            consumer = new KafkaConsumer(consumerProps);
        }
    }

      A系统主要执行的操作有 1)执行业务操作,2)插入New消息到数据库,3)定时任务轮训数据库为New的数据,4)发送到Kafka中,5)修改数据库消息状态为Published。此处1),2)步操作不贴代码。下面为A系统中(即生产者)代码。

    import com.boot.kafka.transaction.EventPublishService;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    /**
     * @Author  xiabing5
     * @Create  2019/8/2 10:13
     * @Desc    spring定时器,定时向kafka中发送事物消息
     **/
    @Component
    public class EventPublishSchedule {
    
        @Resource
        private EventPublishService eventPublishService;
    
        /*
        * 每N毫秒执行一次*/
        @Scheduled(fixedRate = 5000)
        private void publish() {
            eventPublishService.publish();
        }
    }
    import com.boot.mapper.KafkaEventPublishMapper;
    import com.boot.pojo.KafkaEventPublish;
    import com.boot.util.EventPublishStatus;
    import com.boot.util.KafkaUtil;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    import org.springframework.util.CollectionUtils;
    
    import javax.annotation.Resource;
    import java.util.*;
    
    /**
     * @Author  xiabing5
     * @Create  2019/8/2 9:34
     * @Desc    kafka解决分布式事物(消息发送端)
     **/
    @Service
    public class EventPublishService {
        
        @Resource
        private KafkaEventPublishMapper eventPublishMapper; // 事务表的Mapper
    
        @Transactional(rollbackFor = Exception.class)
        public void publish() {
            // 查询所有状态为NEW的事件
            Map<String,Object> params = new HashMap<String,Object>();
            params.put("status", EventPublishStatus.NEW.name());
            List<KafkaEventPublish> eventPublishList = eventPublishMapper.selectEventPublish(params);
    
            if(!CollectionUtils.isEmpty(eventPublishList)) {
                // 发送消息队列
                List<Long> ids = sendEventPublish(eventPublishList);
    
                if (!CollectionUtils.isEmpty(ids)) {
                    //更新数据库状态为PUBLISHED
                    eventPublishMapper.updateEventStatus(ids, EventPublishStatus.PUBLISHED.name());
                }
            }
        }
    
        /**
         * @Author  xiabing5
         * @Create  2019/8/2 10:32
         * @Desc    发送EventPublish对象集合 返回发送成功的EventPublish的ID集合
         **/
        private static List<Long> sendEventPublish(List<KafkaEventPublish> kafkaEventPublishes) {
    
            if(CollectionUtils.isEmpty(kafkaEventPublishes)) {
                return Collections.emptyList();
            }
            List<Long> ids = new ArrayList<Long>();
            for(KafkaEventPublish kafkaEventPublish : kafkaEventPublishes) {
                try {
                    KafkaUtil.sendSync(kafkaEventPublish.getEventType().name(),kafkaEventPublish.getPayload());
                    ids.add(kafkaEventPublish.getId());
                    System.out.println("发送kafka消息成功");
                }
                catch (Exception e) {
                    System.out.println("发送kafka消息失败 "+ kafkaEventPublish);
                }
            }
            return ids;
        }
    }

      B系统主要执行的操作有,1)从kafka中拉取数据 ,2)将此数据放入数据库事务表,更新状态为New ,3) 定时任务轮询状态为New的数据,执行相应业务操作,4)更新New数据状态为Complete 。下面为B系统中(即消费者)代码。

    import com.boot.kafka.transaction.EventProcessService;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    // 消费者定时任务
    @Component
    public class EventProcessSchedule {
    
        @Resource
        private EventProcessService eventProcessService;
    
    
        @Scheduled(fixedRate = 5000)
        private void process() {
            eventProcessService.process();
        }
    
    }
    import com.boot.mapper.KafkaEventProcessMapper;
    import com.boot.pojo.KafkaEventProcess;
    import com.boot.util.EventProcessStatus;
    import com.boot.util.EventType;
    import com.boot.util.KafkaUtil;
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.Resource;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    import java.util.stream.Collectors;
    
    /**
     * @Author  xiabing5
     * @Create  2019/8/2 13:37
     * @Desc    接收kafka消息service类
     **/
    @Service
    public class EventProcessService {
    
        @Resource
        private KafkaEventProcessMapper kafkaEventProcessMapper;
    
        // 创建单一线程线程池
        @PostConstruct
        public void init() {
            ThreadFactory threadFactory = new ThreadFactoryBuilder()
                    .setNameFormat("MqMessageConsumerThread-%d")
                    .setDaemon(true)
                    .build();
            ExecutorService executorService = Executors.newSingleThreadExecutor(threadFactory);
            executorService.execute(new MqMessageConsumerThread());
        }
    
        // 自定义接收线程
        private class MqMessageConsumerThread implements Runnable {
    
            @Override
            public void run() {
                KafkaUtil.consume(consumerRecord -> {
                    KafkaEventProcess kafkaEventProcess = new KafkaEventProcess();
                    kafkaEventProcess .setPayload(consumerRecord);
                    kafkaEventProcess .setEventType(EventType.USER_CREATED);
                    kafkaEventProcess .setStatus(EventProcessStatus.NEW);
                    kafkaEventProcessMapper.insertEventProcess(kafkaEventProcess);
                });
            }
        }
    
        // 执行业务逻辑操作
       @Transactional(rollbackFor = Exception.class)
        public void process() {
    
            // 查询表中状态为new的事件
            Map<String,Object> params = new HashMap<String,Object>();
            params.put("status",EventProcessStatus.NEW.name());
    
            List<KafkaEventProcess> kafkaEventProcessList = kafkaEventProcessMapper.selectEventProcess(params);
    
           for(KafkaEventProcess kafkaEventProcess : kafkaEventProcessList) {
               // 执行业务操作
               System.out.println("删除你");
           }
    
            List<Long> ids = kafkaEventProcessList.stream().map(item -> item.getId()).collect(Collectors.toList());
           kafkaEventProcessMapper.updateEventStatus(ids,EventProcessStatus.PROCESSED.name());
    
        }
    
    }

      补充:此处没有贴事务表的sql语句(即Mapper.xml)无非是添加数据库记录,更新记录状态语句。此代码在我的实践中能运行。

    4. 总结

      分布式问题一直是我最近比较棘手问题,如分布式锁,定时任务在集群下重复执行等。自己也是个小白,希望通过每次实践后,能总结出点东西,便于以后去遍历。

      

  • 相关阅读:
    MySQL 获得 当前日期时间 函数
    07/26/2019 00:38:17 INFO macOS users need to install XQuartz. See https://support.apple.com/en-gb/HT201341
    ModuleNotFoundError: No module named 'pynvx'
    ModuleNotFoundError: No module named 'cv2'
    ModuleNotFoundError: No module named 'tqdm'
    WARNING: You are using pip version 19.1.1, however version 19.2.1 is available. You should consider upgrading via the 'pip install --upgrade pip' command.
    多进程和多线程的理解
    python 装饰器demo
    python 日志内容提取
    python 中 list 去重复
  • 原文地址:https://www.cnblogs.com/xiaobingblog/p/11540341.html
Copyright © 2011-2022 走看看