针对有些耗时比较长的任务,我们一般会想到使用异步化的方式来进行优化逻辑。即客户端先发起一次任务请求并携带回调地址callbackUrl,然后服务端收到请求后立即返回成功,然后在后台处理具体事务,等任务完成后再回调客户端,通知完成。
首先这个方案是值得肯定的,但是我们得注意几点:1. 客户端回调是否可靠?2. 是否接受客户端的主动查询,从而从另一角度弥补各种环境的不确定性?
实际上,要提供一个状态查询的服务很简单,只需查询具体状态值返回即可。但要实现一个可靠的回调却是有点难度的,今天我们就来提供一个实现思路和实现,希望能帮助到需要的同学。
1. 要实现的目标
需要先给自己定个小目标,否则就没了方向。总体上是:要求稳定、可靠、不积压。细化如下:
1. 正常情况下能够及时通知到客户端结果状态;
2. 客户端服务短暂异常的情况下,仍然能够接到通知;
3. 服务端服务短暂异常的情况下,仍然能推送结果到客户端;
4. 网络环境异常时,仍然能尽可能通知到客户端;
5. 服务端回调尽量不要积压太多;
2. 实现思路
要达到以上目标,我们主要做的事也就相应出来了:
1. 使用重试机制保证尽量通知;
2. 使用次数限制保证积压不会太严重;
尽管看起来只是一个重试而已,但如果控制不好,要么给自己带来巨大的服务压力,要么就是进行无效地重试。所以,我们使用一种退避算法,提供一些重试测试,保证重试的合理性。
具体点说就是,回调失败后会进行重试,但每次重试都会有一定的延时控制,越往后延时越大,直到达到最大重试次数后结束。比如:
1. 第1次回调失败后,设置下一个回调时间间隔为30秒;
2. 第2次回调也失败后,设置下一个回调时间间隔为1分钟;
3. 第3次回调也失败后,设置下一个回调时间间隔为3分钟;
...
那么退避策略配置就为 30/60/180...
另外,我们需要借助于db的持久化,保证回调的可靠性,不至于因为机器宕机而丢失回调信息。
3. 具体代码实现
我们将此实现全部封装到一个类中,对外仅暴露一个 submitNewJobCallbackTask() 方法。如下:
import com.alibaba.fastjson.JSONObject; import com.ctrip.framework.apollo.Config; import com.ctrip.framework.apollo.ConfigService; import com.github.pagehelper.PageHelper; import com.my.common.util.HttpUtils; import com.my.common.util.SleepUtil; import com.my.enums.CallbackStatusEnum; import com.my.dao.entity.DistributeLock; import com.my.dao.entity.JobDataCallbackInfo; import com.my.dao.mapper.JobDataCallbackInfoMapper; import com.my.model.enums.DataJobStatus; import lombok.extern.log4j.Log4j2; import org.apache.lucene.util.NamedThreadFactory; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.*; import java.util.concurrent.*; /** * 功能描述: 查询结果成功执行回调处理任务 * */ @Log4j2 @Component public class ResultCallbackWorker implements Runnable { @Resource private JobDataCallbackInfoMapper jobDataCallbackInfoMapper; @Resource private LockService lockService; /** * 正在运行的回调任务容器,方便进行close */ private CallbackTaskWrapperContainer runningCallbacksContainer = new CallbackTaskWrapperContainer(); /** * 执行回调的线程池(队列无限,需外部限制) */ private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("JobCallbackWorker"), new ThreadPoolExecutor.CallerRunsPolicy()); /** * 自动运行该任务 */ @PostConstruct public void init() { new Thread(this, "ResultCallbackWorker") .start(); } @Override public void run() { Random random = new Random(); int baseSleepSec = 50; int maxRangeSec = 120; while (!Thread.currentThread().isInterrupted()) { // dataNumLevel 代表任务饱和度,该值越大,说明等待任务越多,需要更频繁执行 int dataNumLevel = 0; try { if(!tryCallbackTaskLock()) { dataNumLevel = random.nextInt(30); continue; } try { dataNumLevel = pullCallbackInfoFromDb(); } finally { unlock(); } } catch (Throwable e) { log.error("【任务回调】执行数据查询时发生了异常", e); } finally { // 添加一个随机10s, 避免集群机器同时请求,从而导致分配不均衡 int rndSleepSec = random.nextInt(10); int realSleepSec = baseSleepSec + rndSleepSec + maxRangeSec * (100 - dataNumLevel) * 100 / 100; SleepUtil.sleepSecs(realSleepSec); } } log.warn("【任务回调】任务结束"); } /** * 获取回调运行分布式锁() * * @return true: 成功, false: 未获取,不执行后续逻辑 */ private boolean tryCallbackTaskLock() { String methodName = "ResultCallbackWorker"; // 悲观锁实现, 乐观锁实现 return lockService.lock(methodName); } /** * 释放分布式锁 * * @return true:成功 */ private boolean unlock() { String methodName = "ResultCallbackWorker"; return lockService.unlock(methodName); } /** * 从db中拉取待回调列表并处理 * * @return 更新数据的饱和度: 满分100, 用于后续更新拉取速率 */ private Integer pullCallbackInfoFromDb() { Integer dealNums = getNoHandlerTaskAndUpdate(10); log.info("【任务回调】本次处理无handler的任务数:{}", dealNums); dealNums = getCallbackStatusTimeoutTaskAndUpdate(10); log.info("【任务回调】本次处理回调超时的任务数:{}", dealNums); return dealNums * 100 / 20; } /** * 获取未被任何机器处理的回调任务 * * @return 处理行数 */ private Integer getNoHandlerTaskAndUpdate(int limit) { PageHelper.startPage(1, limit, false); String[] statusEnums = { CallbackStatusEnum.WAIT_HANDLER.name() }; Map<String, Object> cond = new HashMap<>(); cond.put("statusList", statusEnums); // 拉取 5小时 ~ 1分钟 前应该回调的数据, 进行重试 cond.put("nextRetryTimeGt", new Date(System.currentTimeMillis() - 5 * 3600_000)); cond.put("nextRetryTimeLt", new Date(System.currentTimeMillis() - 60_000)); List<JobDataCallbackInfo> waitingCallbackInfos = jobDataCallbackInfoMapper.getExpiredCallbackTaskInfo(cond); return addRequeueCallbackTaskFromDb(waitingCallbackInfos); } /** * 获取未被任何机器处理的回调任务 * * @return 处理行数 */ private Integer getCallbackStatusTimeoutTaskAndUpdate(int limit) { PageHelper.startPage(1, limit, false); String[] statusEnums = { CallbackStatusEnum.HANDLER_RETRYING.name() }; Map<String, Object> cond = new HashMap<>(); cond.put("statusList", statusEnums); // 只处理6小时前的数据 cond.put("updateTimeGt", new Date(System.currentTimeMillis() - 6 * 3600_000L)); // 5小时 ~ 1分钟 前应该回调的数据 cond.put("nextRetryTimeGt", new Date(System.currentTimeMillis() - 5 * 3600_000)); cond.put("nextRetryTimeLt", new Date(System.currentTimeMillis() - 60_000)); cond.put("nowMinusUpdateTimeGt", 600); List<JobDataCallbackInfo> waitingCallbackInfos = jobDataCallbackInfoMapper.getExpiredCallbackTaskInfo(cond); return addRequeueCallbackTaskFromDb(waitingCallbackInfos); } /** * 将从db捞取出的待回调的任务,放入本地队列进行回调 * * @param waitingCallbackInfos 待处理的任务(from db) */ private Integer addRequeueCallbackTaskFromDb(List<JobDataCallbackInfo> waitingCallbackInfos) { int submittedTaskNum = 0; for (JobDataCallbackInfo callbackInfo : waitingCallbackInfos) { // 队列已满,不再添加数据 if(!submitTaskImmediately(callbackInfo)) { return submittedTaskNum; } submittedTaskNum++; updateCallbackFinalStatus(callbackInfo, CallbackStatusEnum.HANDLER_RETRYING, false); } return submittedTaskNum; } /** * 提交一个新的job回调任务 * * @param jobId 异步任务id * @param jobStatus 任务状态 * @param callbackUrl 回调地址 * @param bizId 业务id */ public void submitNewJobCallbackTask(String jobId, DataJobStatus jobStatus, String callbackUrl, String bizId) { JobDataCallbackInfo callbackInfo = new JobDataCallbackInfo(); callbackInfo.setJobId(jobId); callbackInfo.setBizId(bizId); callbackInfo.setJobStatus(jobStatus.name()); callbackInfo.setCallbackStatus(CallbackStatusEnum.HANDLER_RETRYING); int retryTimes = 0; callbackInfo.setNextRetryTime( getNextRetryTimeWithPolicy(retryTimes)); callbackInfo.setRetryTimes(retryTimes); callbackInfo.setCallbackUrl(callbackUrl); jobDataCallbackInfoMapper.insert(callbackInfo); if(!submitTaskImmediately(callbackInfo)) { updateCallbackFinalStatus(callbackInfo, CallbackStatusEnum.WAIT_HANDLER); } } /** * 立即提交一个任务到 * * @param callbackInfo 回调任务信息 * @return true: 提交成功, false: 提交失败 */ private boolean submitTaskImmediately(JobDataCallbackInfo callbackInfo) { if(runningCallbacksContainer.reachMaxQueue()) { return true; } Future<?> taskFuture = executorService.submit(() -> callback(callbackInfo)); boolean addSuccess = runningCallbacksContainer.addTask(callbackInfo, taskFuture); assert addSuccess; return true; } /** * 执行某个回调任务的处理逻辑 * * @param callbackInfo 回调参数信息 */ private void callback(JobDataCallbackInfo callbackInfo) { boolean callSuccess = false; try { callSuccess = doCallback(callbackInfo.getCallbackUrl(), callbackInfo.getJobId(), callbackInfo.getBizId(), DataJobStatus.valueOf(callbackInfo.getJobStatus())); } catch (Throwable e) { log.error("【回调任务】回调调用方失败,稍后将进行重试, jobId:{}", callbackInfo.getBizId(), e); } finally { log.info("【回调任务】回调完成:{}, jobId:{}", callbackInfo.getCallbackUrl(), callbackInfo.getJobId()); if(callSuccess) { updateCallbackFinalStatus(callbackInfo, CallbackStatusEnum.SUCCESS); } else { requeueFailedCallbackTaskIfNecessary(callbackInfo); } } } /** * 关机时,保存当前任务状态 */ public void shutdown() { runningCallbacksContainer.cancelAllTask(); } /** * 重新入队回调失败的队列(延时自行断定) * * @param callbackInfo 上一次回调信息 */ private void requeueFailedCallbackTaskIfNecessary(JobDataCallbackInfo callbackInfo) { Config config = ConfigService.getAppConfig(); Integer maxRetryTimes = config.getIntProperty( "_job_finish_callback_retry_max_times", 7); if(callbackInfo.getRetryTimes() >= maxRetryTimes) { updateCallbackFinalStatus(callbackInfo, CallbackStatusEnum.FAILED); return; } nextRetryCallback(callbackInfo); } /** * 进入下一次回调重试操作 * * @param callbackInfo 回调任务信息 */ private void nextRetryCallback(JobDataCallbackInfo callbackInfo) { int retryTimes = callbackInfo.getRetryTimes() + 1; callbackInfo.setRetryTimes(retryTimes); Date nextRetryTime = getNextRetryTimeWithPolicy(retryTimes); callbackInfo.setNextRetryTime(nextRetryTime); jobDataCallbackInfoMapper.update(callbackInfo); // 延时调度 Future<?> taskFuture = executorService.schedule(() -> callback(callbackInfo), nextRetryTime.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS); boolean addSuccess = runningCallbacksContainer.addTask(callbackInfo, taskFuture); assert !addSuccess; } /** * 回调任务终态更新(SUCCESS, FAILED, CANCELED) * * 或者不再被本次调用的任务,都会更新当前状态 * * @param callbackInfo 回调任务基本信息 * @param callbackStatus 当次回调结果 */ private void updateCallbackFinalStatus(JobDataCallbackInfo callbackInfo, CallbackStatusEnum callbackStatus) { updateCallbackFinalStatus(callbackInfo, callbackStatus, true); } /** * 更新db状态,同时处理本地队列 * * @param removeRunningTask 是否移除本地队列 * @see #updateCallbackFinalStatus(JobDataCallbackInfo, CallbackStatusEnum) */ private void updateCallbackFinalStatus(JobDataCallbackInfo callbackInfo, CallbackStatusEnum callbackStatus, boolean removeRunningTask) { callbackInfo.setCallbackStatus(callbackStatus); jobDataCallbackInfoMapper.update(callbackInfo); if(removeRunningTask) { runningCallbacksContainer.taskFinish(callbackInfo); } } /** * 回调客户端,通知任务结果 * * @param jobId 任务jobId * @param jobStatus 执行状态 * @return true: 成功 */ private boolean doCallback(String callbackUrl, String jobId, String bizId, DataJobStatus jobStatus) throws Exception { log.info("【回调任务】回调客户端:{} jobId:{}, jobStatus:{}", callbackUrl, jobId, jobStatus); Map<String, Object> params = new HashMap<>(); params.put("jobId", jobId); params.put("jobStatus", jobStatus); params.put("bizId", bizId); String response = HttpUtils.post(callbackUrl, JSONObject.toJSONString(params)); log.info("【回调任务】回调成功:{}, response:{}", callbackUrl, response); // 业务收到请求,应尽快响应成功结果, 响应 success 则成功 return "success".equals(response); } /** * 根据重试次数,获取相应的延时策略生成下一次重试时间 * * 退避算法实现1 * * @param retryTimes 重试次数, 0, 1, 2... * @return 下一次重试时间 */ private Date getNextRetryTimeWithPolicy(int retryTimes) { if(retryTimes < 1) { retryTimes = 1; } Config config = ConfigService.getAppConfig(); String retryIntervalPolicy = config.getProperty( "job_finish_callback_retry_policy", "30/60/180/1800/1800/1800/3600"); String[] retryIntervalArr = retryIntervalPolicy.split("/"); if(retryTimes > retryIntervalArr.length) { retryTimes = retryIntervalArr.length; } String hitPolicy = retryIntervalArr[retryTimes - 1]; return new Date(System.currentTimeMillis() + Integer.valueOf(hitPolicy) * 1000L); } /** * 回调任务管理容器 */ private class CallbackTaskWrapperContainer { /** * 正在运行的回调任务容器,方便进行close */ private Map<Long, CallbackTaskWrapper> runningCallbacksContainer = new ConcurrentHashMap<>(); /** * 添加一个回调任务(正在执行) * * @param callbackInfo 回调信息 * @param taskFuture 异步任务实例 */ boolean addTask(JobDataCallbackInfo callbackInfo, Future<?> taskFuture) { CallbackTaskWrapper oldTaskWrapper = runningCallbacksContainer.put(callbackInfo.getId(), new CallbackTaskWrapper(callbackInfo, taskFuture)); return oldTaskWrapper == null; } /** * 某任务完成处理 */ void taskFinish(JobDataCallbackInfo callbackInfo) { runningCallbacksContainer.remove(callbackInfo.getId()); } /** * 某任务取消处理 */ void cancelTask(JobDataCallbackInfo callbackInfo) { taskWrapper.cancel(); updateCallbackFinalStatus(callbackInfo, CallbackStatusEnum.CANCELED); taskFinish(callbackInfo); } /** * 取消所有内存任务, 重新放入等待队列 */ void cancelAllTask() { // 遍历 running task, 更新为 WAIT_HANDLER for (CallbackTaskWrapper taskWrapper : runningCallbacksContainer.values()) { taskWrapper.cancel(); updateCallbackFinalStatus(taskWrapper.getCallbackInfo(), CallbackStatusEnum.WAIT_HANDLER); taskFinish(taskWrapper.getCallbackInfo()); } } /** * 检查回调任务队列是否达到最大值 * * @return true:已到最大值, false:还可以接收新数据 */ boolean reachMaxQueue() { int retryQueueMaxSize = 4096; return runningCallbacksContainer.size() > retryQueueMaxSize; } } /** * 回调任务包装器 */ private class CallbackTaskWrapper { /** * 任务信息实体 */ private JobDataCallbackInfo callbackInfo; /** * 异步任务控制 */ private Future<?> taskFuture; CallbackTaskWrapper(JobDataCallbackInfo callbackInfo, Future<?> taskFuture) { this.callbackInfo = callbackInfo; this.taskFuture = taskFuture; } void rolloverFuture(Future<?> taskFuture) { this.taskFuture = taskFuture; } JobDataCallbackInfo getCallbackInfo() { return callbackInfo; } Future<?> getTaskFuture() { return taskFuture; } void cancel() { taskFuture.cancel(true); callbackInfo = null; } } }
其中,有一个重要的回调任务信息的数据结构参考如下:
CREATE TABLE `t_job_data_callback_info` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id', `job_id` varchar(64) NOT NULL COMMENT '任务id', `callback_status` varchar(30) NOT NULL DEFAULT 'WAIT_HANDLER' COMMENT '回调状态,SUCCESS:回调成功,HANDLER_RETRYING:被执行回调中, WAIT_HANDLER:回调任务等待被接收处理, FAILED:回调最终失败, CANCELED:主动取消', `callback_url` varchar(300) DEFAULT '' COMMENT '回调地址', `job_status` varchar(30) DEFAULT NULL COMMENT '任务执行状态,冗余字段,回调时使用', `retry_times` int(6) NOT NULL DEFAULT '0' COMMENT '已重试次数', `biz_id` varchar(200) DEFAULT '' COMMENT '业务id, 看业务作用', `next_retry_time` datetime NOT NULL COMMENT '下一次执行回调重试的时间', `err_msg` varchar(3000) DEFAULT '' COMMENT '错误信息描述', `server_ip` varchar(32) NOT NULL DEFAULT '' COMMENT '执行任务的机器', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), KEY `job_id` (`job_id`), KEY `next_retry_time` (`next_retry_time`,`callback_status`), KEY `update_time` (`update_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='异步任务回调客户端信息表';
以上基本就是整个的可靠优雅的回调实现了,其中一基础的db操作,枚举类之类的就不用细化了。
核心大部分可以简单描述为前面所说的重试机制. 但还有一点值得说明的是, 为了避免任务在集群环境中分布不均匀, 所以使用了一个饱和度+随机值延时的方式, 让每个机器都有差不多的机会执行回调任务.(不过具体的分布均匀性, 还需要实践去验证才行, 可以通过统计server_ip查看)
4. 时序图
下面以一个时序图, 展示整体工作流程的全貌: