zoukankan      html  css  js  c++  java
  • 分布式事务 (定时任务 + 事件表 + MQ)实现小例子

    业务流程图:

    Controller 模拟接收请求 生成 100条数据 插入事件表:

       
       @RequestMapping("/producerMq")
        public @ResponseBody String mq() throws Exception {
    
            for(int i = 0;i<100;i++){
            
                Long userId = SnowFlake.nextId();//雪花算法生成id
                AopExsInfo aopExsInfo = new AopExsInfo();
                aopExsInfo.setId(userId);
                aopExsInfo.setType("new"); //状态对应为  新建
                int Anum = aopExsInfoMapper.insert(aopExsInfo); //插入事件表
    //            System.out.println("新增结果: "+Anum);
            }
            return "新增完成";
        }

    服务A:

    package com...service_RabbitMQ;
    
    import com.alibaba.fastjson.JSONObject;
    import com...entity.AopExsInfo;
    import com...mapper.AopExsInfoMapper;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    import javax.annotation.PostConstruct;
    import java.util.List;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    @Component
    public class SendMqService {
    
        @Autowired
        AopExsInfoMapper aopExsInfoMapper;
    
        @Autowired
        private AmqpTemplate template;
        
        //项目启动自动循环执行
        //无论哪一步出异常都全部回滚,不影响数据一致性
        @PostConstruct
        @Transactional(rollbackFor = Exception.class)
        public void sendData(){
    
            System.out.println("开始扫描未推送的办件....");
    
            ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
    
            executor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
    
                    //查出新增的数据 
                    List<AopExsInfo> list = aopExsInfoMapper.selectNew("new");
    
                    for (AopExsInfo aopExsInfo : list){
    
                        //先改事件表数据状态
                        AopExsInfo old = new AopExsInfo();
                        old = aopExsInfo;
                        old.setCupSize("old"); //对应为 已发送
                        int i = aopExsInfoMapper.updateByPrimaryKey(old);
    //                    System.out.println("更改事件表状态返回: "+i);
    
                        //开始往MQ里推送
                        JSONObject json = new JSONObject();
                        json.put("id",aopExsInfo.getId());
                        json.put("cupSize",aopExsInfo.getCupSize());
    //                    System.out.println("未推送的办件信息 : "+json.toString());
    
                        template.convertAndSend(RabbitConstant.QUEUE_HELLOWORLD,json.toString());
    
                    }
                }
            },0,10, TimeUnit.SECONDS);  //每10秒 查询推送 一次
        }
    }

    服务B:

    package com...service_RabbitMQ;
    
    import com.alibaba.fastjson.JSONObject;
    import com.rabbitmq.client.Channel;
    import com...entity.XueHuaInfo;
    import com...mapper.XueHuaInfoMapper;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    import javax.annotation.PostConstruct;
    import java.util.List;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    @Component
    public class AcceptMqService{
    
        @Autowired
        XueHuaInfoMapper xueHuaInfoMapper;
    
        //从MQ读取数据,插入事件表
       @RabbitListener(queues = RabbitConstant.QUEUE_HELLOWORLD)
       public void acceptData(String info, Channel channel, Message message){
    
           try {
               //手动 ACK
    //           channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
               System.out.println("监听到的数据: "+info);
    //           System.out.println("message : " +message);
               JSONObject JSON = JSONObject.parseObject(info);
               XueHuaInfo xueHuaInfo = new XueHuaInfo();
               xueHuaInfo.setId(JSON.getString("id")); //根据数据库主键唯一保证幂等性
               xueHuaInfo.setName("start"); //对应状态为 已接受
               xueHuaInfoMapper.insert(xueHuaInfo);
    
           } catch (Exception e) {
               e.getMessage();
           }
       }
    
        //项目启动自动执行
       @PostConstruct
       @Transactional(rollbackFor = Exception.class)
        public void updataData(){
    
           System.out.println("开始执行本地业务....");
    
           ScheduledExecutorService Service = Executors.newScheduledThreadPool(2);
    
           Service.scheduleAtFixedRate(new Runnable() {
               @Override
               public void run() {
    
                    //查出新增的数据
                   List<XueHuaInfo> infos = xueHuaInfoMapper.selectNew("start");
    
                   System.out.println("本次读取办数量: "+infos.size());
                   for(XueHuaInfo info : infos){
    
                    //业务处理 省略....
    
                       XueHuaInfo endInfo = info;
                       endInfo.setName("end");//对应状态为 已处理
                       xueHuaInfoMapper.updateEnd(endInfo);
                   }
               }
           },0,10, TimeUnit.SECONDS); ////每10秒查询一次
       }
    }
  • 相关阅读:
    SQL Server 2005 数据定义语言触发器(Data Definition Language Triggers)[翻译]
    sqlserver 存储过程例子
    微软CEO鲍尔默:科技产业终将成为经济救世主
    poj:2689用筛选法选素数求区间[L,U]的所有素数
    ZOJ Problem Set 1002 Fire Net
    去除多余括号
    模板元编程:求N的阶乘
    算法导论10.2习题
    奇数阶魔方算法
    TSQL 编程常用例子
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14710113.html
Copyright © 2011-2022 走看看