zoukankan      html  css  js  c++  java
  • 基于数据库的消息队列

    /**
    * 批量获取 可以消费的消息
    * 先使用一个时间戳将被消费的消息锁定,然后再使用这个时间戳去查询锁定的数据。
    *
    * @param count
    * @return
    */
    public List<Queue> findActiveQueueNew(int count) {
    //先去更新数据
    String locker = String.valueOf(System.currentTimeMillis()) + random.nextInt(10000);
    int lockCount = 0;
    try {
    //将status为1的更新为3,设置locker,先锁定消息
    lockCount = queueDAO.updateActiveQueue(PayConstants.QUEUE_STATUS_LOCKED,
    PayConstants.QUEUE_STATUS_ACTIVE, count, locker);
    } catch (Exception e) {
    logger.error(
    "QueueDomainRepository.findActiveQueueNew error occured!"
    + e.getMessage(), e);
    throw new TuanRuntimeException(
    PayConstants.SERVICE_DATABASE_FALIURE,
    "QueueDomainRepository.findActiveQueue error occured!", e);
    }

    //如果锁定的数量为0,则无需再去查询
    if (lockCount == 0) {
    return null;
    }

    //休息一会在再询,防止数据已经被更改
    try {
    Thread.sleep(1);
    } catch (Exception e) {
    logger.error("QueueDomainRepository.findActiveQueue error sleep occured!"
    + e.getMessage(), e);
    }
    List<Queue> activeList = null;
    try {
    activeList = queueDAO.getByLocker(locker);
    } catch (Exception e) {
    logger.error("QueueDomainRepository.findActiveQueue error occured!"
    + e.getMessage(), e);
    throw new TuanRuntimeException(
    PayConstants.SERVICE_DATABASE_FALIURE,
    "QueueDomainRepository.findActiveQueue error occured!", e);
    }
    return activeList;
    }
    
    
    /**
    * 验证队列modle 的合法性
    *
    * @param model
    * @return boolean true,消息还可以消费。false,消息不允许消费。
    */
    public boolean validateQueue(final QueueModel model){
    int consumeCount = model.getConsumeCount();
    if (consumeCount >= PayConstants.QUEUE_MAX_CONSUME_COUNT) {
    //消费次数超过了最大次数
    return false;
    }
    int consumeStatus = model.getConsumeStatus();
    if(consumeStatus == PayConstants.QUEUE_STATUS_CONSUMER_SUCCESS){
    //消息已经被成功消费
    return false;
    }
    QueueStatusEnum queueStatusEnum = model.getQueueStatusEnum();
    if(queueStatusEnum == null || queueStatusEnum != QueueStatusEnum.LOCKED){
    //消息状态不正确
    return false;
    }
    String jsonData = model.getJsonData();
    if(StringUtils.isEmpty(jsonData)){
    //消息体为空
    return false;
    }
    return true;
    }


    //修改数据库状态

    public void consume(boolean isDelete, Long consumeMinTime,
    String tradeNo,int consumeCount) {
    QueueDO queueDO = new QueueDO();
    if (!isDelete) {
    //已经到了做大消费次数,消息作废 不再处理
    if (consumeCount >= PayConstants.QUEUE_MAX_CONSUME_COUNT) {
    //达到最大消费次数的也设置为消费成功
    queueDO.setConsumeStatus(PayConstants.QUEUE_STATUS_CONSUMER_SUCCESS);
    queueDO.setStatus(PayConstants.QUEUE_STATUS_CANCEL);
    } else {
    queueDO.setConsumeStatus(PayConstants.QUEUE_STATUS_CONSUMER_FAILED);
    //设置为可用状态等待下次继续发送
    queueDO.setStatus(PayConstants.QUEUE_STATUS_ACTIVE);
    }
    } else {
    //第三方消费成功
    queueDO.setConsumeStatus(PayConstants.QUEUE_STATUS_CONSUMER_SUCCESS);
    queueDO.setStatus(PayConstants.QUEUE_STATUS_DELETED);
    }
    queueDO.setNextConsumeTime(consumeMinTime == null ? QueueRuleUtil
    .getNextConsumeTime(consumeCount) : consumeMinTime);
    if (StringUtils.isNotBlank(tradeNo)) {
    queueDO.setTradeNo(tradeNo);
    }
    long now = System.currentTimeMillis();
    queueDO.setUpdateTime(now);
    queueDO.setLastConsumeTime(now);
    queueDO.setConsumeCount(consumeCount);
    queueDO.setQueueID(id);
    setQueueDOUpdate(queueDO);
    }















  • 相关阅读:
    互操作
    Rx基础
    数据流块基础
    C# 一个帮您理解回调函数的例子(新手必看)
    C# 多线程之通过Timer开启线程的例子
    C# 利用委托事件进行窗体间的传值(新手必看)
    C#XML文件操作随笔
    C# 委托学习笔记
    c# 关于抓取网页源码后中文显示乱码的原因分析和解决方法
    c# 异步编程 使用回调函数例子
  • 原文地址:https://www.cnblogs.com/lalalazar/p/13922028.html
Copyright © 2011-2022 走看看