zoukankan      html  css  js  c++  java
  • 分布式事务10_最大努力通知形-copy

    实现
    业务活动的主动发,在完成业务处理后,向业务活动的被动方发送消息允许消息丢失
    业务活动的被动方根据定时策略,向业务活动的主动发查询,恢复丢失的业务消息
    约束
    被动方的处理结果不影响主动方的处理结果
    成本
    业务查询与校对系统的建设成本
    适用范围
    对业务最终一致性的时间敏感度低
    跨企业的业务活动
    用到的服务模式
    可查询操作
    方案特点
    业务活动的主动发在完成业务处理后,向业务活动被动方发送通知消息(允许消息丢失)
    主动发可以设置时间梯度通知规则,在通知失败后按照规则重复通知,直到通知N次后不再通知
    主动发提供校对查询接口给被动方按需校对查询,用于恢复丢失的业务消息
    行业应用案例
    银行通知、商户通知等(各大交易业务平台的商户通知:多次通知、查询校对、对账文件)
    设计
    数据库
    rp_notify_record
    DROP TABLE IF EXISTS `rp_notify_record`;
    CREATE TABLE `rp_notify_record` (
    `id` varchar(50) NOT NULL DEFAULT '' COMMENT '主键ID',
    `version` int(11) NOT NULL DEFAULT '0' COMMENT '版本事情',
    `create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',
    `edit_time` datetime DEFAULT NULL COMMENT '最后修改时间',
    `notify_rule` varchar(255) DEFAULT NULL COMMENT '通知规则(单位:分钟)',
    `notify_times` int(11) NOT NULL DEFAULT '0' COMMENT '已通知次数',
    `limit_notify_times` int(11) NOT NULL DEFAULT '0' COMMENT '最大通知次数限制',
    `url` varchar(2000) NOT NULL DEFAULT '' COMMENT '通知请求链接(包含通知内容)',
    `merchant_order_no` varchar(50) NOT NULL DEFAULT '' COMMENT '商户订单号',
    `merchant_no` varchar(50) NOT NULL DEFAULT '' COMMENT '商户编号',
    `status` varchar(50) NOT NULL DEFAULT '' COMMENT '通知状态(对应枚举值)',
    `notify_type` varchar(30) DEFAULT NULL COMMENT '通知类型',
    PRIMARY KEY (`id`),
    KEY `AK_KEY_2` (`merchant_order_no`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='通知记录表 RP_NOTIFY_RECORD';
    rp_notify_record_log
    DROP TABLE IF EXISTS `rp_notify_record_log`;
    CREATE TABLE `rp_notify_record_log` (
    `id` varchar(50) NOT NULL DEFAULT '' COMMENT 'ID',
    `version` int(11) NOT NULL DEFAULT '0' COMMENT '版本号',
    `edit_time` datetime DEFAULT NULL COMMENT '最后修改时间',
    `create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',
    `notify_id` varchar(50) NOT NULL DEFAULT '' COMMENT '通知记录ID',
    `request` varchar(2000) NOT NULL DEFAULT '' COMMENT '请求内容',
    `response` varchar(2000) NOT NULL DEFAULT '' COMMENT '响应内容',
    `merchant_no` varchar(50) NOT NULL DEFAULT '' COMMENT '商户编号',
    `merchant_order_no` varchar(50) NOT NULL COMMENT '商户订单号',
    `http_status` varchar(50) NOT NULL COMMENT 'HTTP状态',
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='通知记录日志表 RP_NOTIFY_RECORD_LOG';
    接口
    RpNotifyService
    public interface RpNotifyService {

    /**
    * 创建消息通知
    * @param rpNotifyRecord
    */
    public long createNotifyRecord(RpNotifyRecord rpNotifyRecord) throws NotifyBizException;

    /**
    * 修改消息通知
    * @param rpNotifyRecord
    */
    public void updateNotifyRecord(RpNotifyRecord rpNotifyRecord) throws NotifyBizException;

    /**
    * 创建消息通知记录
    * @param rpNotifyRecordLog
    * @return
    */
    public long createNotifyRecordLog(RpNotifyRecordLog rpNotifyRecordLog) throws NotifyBizException;

    /**
    * 发送消息通知
    * @param notifyUrl 通知地址
    * @param merchantOrderNo 商户订单号
    * @param merchantNo 商户编号
    */
    public void notifySend(String notifyUrl,String merchantOrderNo,String merchantNo) throws NotifyBizException;


    /**
    * 通过ID获取通知记录
    * @param id
    * @return
    */
    public RpNotifyRecord getNotifyRecordById(String id) throws NotifyBizException;

    /**
    * 根据商户编号,商户订单号,通知类型获取通知记录
    * @param merchantNo 商户编号
    * @param merchantOrderNo 商户订单号
    * @param notifyType 消息类型
    * @return
    */
    public RpNotifyRecord getNotifyByMerchantNoAndMerchantOrderNoAndNotifyType(String merchantNo , String merchantOrderNo , String notifyType) throws NotifyBizException;

    /**
    * 按条件分页查询通知记录.
    */
    public PageBean<RpNotifyRecord> queryNotifyRecordListPage(PageParam pageParam , Map<String, Object> paramMap) throws NotifyBizException;


    }
    RpNotifyServiceImpl
    public class RpNotifyServiceImpl implements RpNotifyService {

    @Autowired
    private JmsTemplate notifyJmsTemplate;

    @Autowired
    private RpNotifyRecordDao rpNotifyRecordDao;

    @Autowired
    private RpNotifyRecordLogDao rpNotifyRecordLogDao;

    /**
    * 创建消息通知
    *
    * @param rpNotifyRecord
    */
    @Override
    public long createNotifyRecord(RpNotifyRecord rpNotifyRecord) {
    return rpNotifyRecordDao.insert(rpNotifyRecord);
    }

    /**
    * 修改消息通知
    *
    * @param rpNotifyRecord
    */
    @Override
    public void updateNotifyRecord(RpNotifyRecord rpNotifyRecord) {
    rpNotifyRecordDao.update(rpNotifyRecord);
    }

    /**
    * 创建消息通知记录
    *
    * @param rpNotifyRecordLog
    * @return
    */
    @Override
    public long createNotifyRecordLog(RpNotifyRecordLog rpNotifyRecordLog) {
    return rpNotifyRecordLogDao.insert(rpNotifyRecordLog);
    }



    /**
    * 发送消息通知
    *
    * @param notifyUrl 通知地址
    * @param merchantOrderNo 商户订单号
    * @param merchantNo 商户编号
    */
    @Override
    public void notifySend(String notifyUrl, String merchantOrderNo, String merchantNo) {

    RpNotifyRecord record = new RpNotifyRecord();
    record.setNotifyTimes(0);
    record.setLimitNotifyTimes(5);
    record.setStatus(NotifyStatusEnum.CREATED.name());
    record.setUrl(notifyUrl);
    record.setMerchantOrderNo(merchantOrderNo);
    record.setMerchantNo(merchantNo);
    record.setNotifyType(NotifyTypeEnum.MERCHANT.name());

    Object toJSON = JSONObject.toJSON(record);
    final String str = toJSON.toString();

    notifyJmsTemplate.send(new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage(str);
    }
    });
    }

    /**
    * 通过ID获取通知记录
    *
    * @param id
    * @return
    */
    @Override
    public RpNotifyRecord getNotifyRecordById(String id) {
    return rpNotifyRecordDao.getById(id);
    }

    /**
    * 根据商户编号,商户订单号,通知类型获取通知记录
    *
    * @param merchantNo 商户编号
    * @param merchantOrderNo 商户订单号
    * @param notifyType 消息类型
    * @return
    */
    @Override
    public RpNotifyRecord getNotifyByMerchantNoAndMerchantOrderNoAndNotifyType(String merchantNo, String merchantOrderNo, String notifyType) {
    return rpNotifyRecordDao.getNotifyByMerchantNoAndMerchantOrderNoAndNotifyType(merchantNo,merchantOrderNo,notifyType);
    }

    @SuppressWarnings("unchecked")
    @Override
    public PageBean<RpNotifyRecord> queryNotifyRecordListPage(PageParam pageParam, Map<String, Object> paramMap) {
    return rpNotifyRecordDao.listPage(pageParam,paramMap);
    }


    }
    延时执行
    import java.util.concurrent.DelayQueue;

    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

    import com.roncoo.pay.app.notify.core.NotifyPersist;
    import com.roncoo.pay.app.notify.core.NotifyTask;
    import com.roncoo.pay.service.notify.aip.RpNotifyService;

    /**
    * 商户通知应用启动类.
    *
    */
    public class App
    {
    private static final Log LOG = LogFactory.getLog(App.class);

    /**
    * 通知任务延时队列,对象只能在其到期时才能从队列中取走。
    */
    public static DelayQueue<NotifyTask> tasks = new DelayQueue<NotifyTask>();

    private static ClassPathXmlApplicationContext context;

    private static ThreadPoolTaskExecutor threadPool;

    public static RpNotifyService rpNotifyService;

    public static NotifyPersist notifyPersist;

    public static void main(String[] args) {
    try {
    context = new ClassPathXmlApplicationContext(new String[] { "spring/spring-context.xml" });
    context.start();
    threadPool = (ThreadPoolTaskExecutor) context.getBean("threadPool");
    rpNotifyService = (RpNotifyService) context.getBean("rpNotifyService");

    // 从数据库中取一次数据用来当系统启动时初始化
    notifyPersist = (NotifyPersist) context.getBean("notifyPersist");
    .initNotifyDataFromDB();

    startThread(); // 启动任务处理线程

    LOG.info("== context start");
    } catch (Exception e) {
    LOG.error("== application start error:", e);
    return;
    }
    synchronized (App.class) {
    while (true) {
    try {
    App.class.wait();
    } catch (InterruptedException e) {
    LOG.error("== synchronized error:", e);
    }
    }
    }
    }

    private static void startThread() {
    LOG.info("==>startThread");

    threadPool.execute(new Runnable() {
    public void run() {
    try {
    while (true) {
    LOG.info("==>threadPool.getActiveCount():" + threadPool.getActiveCount());
    LOG.info("==>threadPool.getMaxPoolSize():" + threadPool.getMaxPoolSize());
    // 如果当前活动线程等于最大线程,那么不执行
    if (threadPool.getActiveCount() < threadPool.getMaxPoolSize()) {
    LOG.info("==>tasks.size():" + tasks.size());
    //使用take方法获取过期任务,如果获取不到,就一直等待,知道获取到数据
    final NotifyTask task = tasks.take();
    if (task != null) {
    threadPool.execute(new Runnable() {
    public void run() {
    tasks.remove(task);
    task.run(); // 执行通知处理
    LOG.info("==>tasks.size():" + tasks.size());
    }
    });
    }
    }
    }
    } catch (Exception e) {
    LOG.error("系统异常;",e);
    }
    }
    });
    }

    }
    ————————————————
    版权声明:本文为CSDN博主「chenshiying007」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_27384769/article/details/79331027

  • 相关阅读:
    crash收集上报方案
    keychain的使用
    自定义Xcode文件模板
    iOS实现一个简单的扫码功能
    tableView渲染延迟
    iOS app icons
    fastlane自动打包
    iOS pod封装和升级
    手写代码 -- 数组扁平化
    手写代码 -- Promise
  • 原文地址:https://www.cnblogs.com/hanease/p/14471449.html
Copyright © 2011-2022 走看看