/**
* 批量获取 可以消费的消息
* 先使用一个时间戳将被消费的消息锁定,然后再使用这个时间戳去查询锁定的数据。
*
* @param count
* @return
*/
public List<Queue> findActiveQueueNew(int count) {
//先去更新数据
String locker = String.valueOf(System.currentTimeMillis()) + random.nextInt(10000);
int lockCount = 0;
try {
//将status为1的更新为3,设置locker,先锁定消息
lockCount = queueDAO.updateActiveQueue(PayConstants.QUEUE_STATUS_LOCKED,
PayConstants.QUEUE_STATUS_ACTIVE, count, locker);
} catch (Exception e) {
logger.error(
"QueueDomainRepository.findActiveQueueNew error occured!"
+ e.getMessage(), e);
throw new TuanRuntimeException(
PayConstants.SERVICE_DATABASE_FALIURE,
"QueueDomainRepository.findActiveQueue error occured!", e);
}
//如果锁定的数量为0,则无需再去查询
if (lockCount == 0) {
return null;
}
//休息一会在再询,防止数据已经被更改
try {
Thread.sleep(1);
} catch (Exception e) {
logger.error("QueueDomainRepository.findActiveQueue error sleep occured!"
+ e.getMessage(), e);
}
List<Queue> activeList = null;
try {
activeList = queueDAO.getByLocker(locker);
} catch (Exception e) {
logger.error("QueueDomainRepository.findActiveQueue error occured!"
+ e.getMessage(), e);
throw new TuanRuntimeException(
PayConstants.SERVICE_DATABASE_FALIURE,
"QueueDomainRepository.findActiveQueue error occured!", e);
}
return activeList;
}
/**
* 验证队列modle 的合法性
*
* @param model
* @return boolean true,消息还可以消费。false,消息不允许消费。
*/
public boolean validateQueue(final QueueModel model){
int consumeCount = model.getConsumeCount();
if (consumeCount >= PayConstants.QUEUE_MAX_CONSUME_COUNT) {
//消费次数超过了最大次数
return false;
}
int consumeStatus = model.getConsumeStatus();
if(consumeStatus == PayConstants.QUEUE_STATUS_CONSUMER_SUCCESS){
//消息已经被成功消费
return false;
}
QueueStatusEnum queueStatusEnum = model.getQueueStatusEnum();
if(queueStatusEnum == null || queueStatusEnum != QueueStatusEnum.LOCKED){
//消息状态不正确
return false;
}
String jsonData = model.getJsonData();
if(StringUtils.isEmpty(jsonData)){
//消息体为空
return false;
}
return true;
}
//修改数据库状态
public void consume(boolean isDelete, Long consumeMinTime,
String tradeNo,int consumeCount) {
QueueDO queueDO = new QueueDO();
if (!isDelete) {
//已经到了做大消费次数,消息作废 不再处理
if (consumeCount >= PayConstants.QUEUE_MAX_CONSUME_COUNT) {
//达到最大消费次数的也设置为消费成功
queueDO.setConsumeStatus(PayConstants.QUEUE_STATUS_CONSUMER_SUCCESS);
queueDO.setStatus(PayConstants.QUEUE_STATUS_CANCEL);
} else {
queueDO.setConsumeStatus(PayConstants.QUEUE_STATUS_CONSUMER_FAILED);
//设置为可用状态等待下次继续发送
queueDO.setStatus(PayConstants.QUEUE_STATUS_ACTIVE);
}
} else {
//第三方消费成功
queueDO.setConsumeStatus(PayConstants.QUEUE_STATUS_CONSUMER_SUCCESS);
queueDO.setStatus(PayConstants.QUEUE_STATUS_DELETED);
}
queueDO.setNextConsumeTime(consumeMinTime == null ? QueueRuleUtil
.getNextConsumeTime(consumeCount) : consumeMinTime);
if (StringUtils.isNotBlank(tradeNo)) {
queueDO.setTradeNo(tradeNo);
}
long now = System.currentTimeMillis();
queueDO.setUpdateTime(now);
queueDO.setLastConsumeTime(now);
queueDO.setConsumeCount(consumeCount);
queueDO.setQueueID(id);
setQueueDOUpdate(queueDO);
}