zoukankan      html  css  js  c++  java
  • 分享一个异步任务在遇到IO异常时支持递归回调的辅助方法

    public void TryAsyncActionRecursively<TAsyncResult>(
        string asyncActionName,
        Func<Task<TAsyncResult>> asyncAction,
        Action<int> mainAction,
        Action<TAsyncResult> successAction,
        Func<string> getContextInfoFunc,
        Action<Exception> failedAction,
        int retryTimes) where TAsyncResult : AsyncOperationResult
    {
        var retryAction = new Action<int>(currentRetryTimes =>
        {
            if (currentRetryTimes >= _immediatelyRetryTimes)
            {
                Task.Factory.StartDelayedTask(_retryIntervalForIOException, () => mainAction(currentRetryTimes + 1));
            }
            else
            {
                mainAction(currentRetryTimes + 1);
            }
        });
        var executeFailedAction = new Action<Exception>(ex =>
        {
            try
            {
                if (failedAction != null)
                {
                    failedAction(ex);
                }
            }
            catch (Exception unknownEx)
            {
                _logger.Error(string.Format("Failed to execute the failedCallbackAction of asyncAction:{0}, contextInfo:{1}",
                    asyncActionName, getContextInfoFunc()), unknownEx);
            }
        });
        var processTaskException = new Action<Exception, int>((ex, currentRetryTimes) =>
        {
            if (ex is IOException)
            {
                _logger.Error(string.Format("Async task '{0}' has io exception, contextInfo:{1}, current retryTimes:{2}",
                    asyncActionName, getContextInfoFunc(), currentRetryTimes), ex);
                retryAction(retryTimes);
            }
            else
            {
                _logger.Error(string.Format("Async task '{0}' has unknown exception, contextInfo:{1}, current retryTimes:{2}",
                    asyncActionName, getContextInfoFunc(), currentRetryTimes), ex);
                executeFailedAction(ex);
            }
        });
        var completeAction = new Action<Task<TAsyncResult>>(t =>
        {
            if (t.Exception != null)
            {
                processTaskException(t.Exception.InnerException, retryTimes);
                return;
            }
            if (t.IsCanceled)
            {
                _logger.ErrorFormat("Async task '{0}' was cancelled, contextInfo:{1}, current retryTimes:{2}",
                    asyncActionName, getContextInfoFunc(), retryTimes);
                retryAction(retryTimes);
                return;
            }
            var result = t.Result;
            if (result.Status == AsyncOperationResultStatus.IOException)
            {
                _logger.ErrorFormat("Async task '{0}' has io exception, contextInfo:{1}, current retryTimes:{2}, errorMsg:{3}",
                    asyncActionName, getContextInfoFunc(), retryTimes, result.ErrorMessage);
                retryAction(retryTimes);
                return;
            }
            if (successAction != null)
            {
                successAction(result);
            }
        });
    
        try
        {
            asyncAction().ContinueWith(completeAction);
        }
        catch (IOException ex)
        {
            _logger.Error(string.Format("Execute async action '{0}' failed, contextInfo:{1}, current retryTimes:{2}",
                asyncActionName, getContextInfoFunc(), retryTimes), ex);
            retryAction(retryTimes);
        }
        catch (Exception ex)
        {
            _logger.Error(string.Format("Execute async action '{0}' failed, contextInfo:{1}, current retryTimes:{2}",
                asyncActionName, getContextInfoFunc(), retryTimes), ex);
            executeFailedAction(ex);
        }
    }

    该函数的功能是:执行一个异步任务(返回Task的方法),如果执行出现IO异常,则重试当前主函数(mainAction);用户的mainAction中会再次调用TryAsyncActionRecursively方法。从而实现当遇到IO异常时,能做到不断重试。另外,重试只立即重试指定的次数,超过指定次数,则不立即重试,而是暂停一定间隔后再次执行。该函数还提供当acyncAction执行成功或失败后的回调函数,以及允许传入当前上下文的一些说明信息,以便记录有意义的错误日志信息。

    下面是使用示例:

    private void PublishEventAsync(ProcessingCommand processingCommand, EventStream eventStream, int retryTimes)
    {
        TryAsyncActionRecursively<AsyncOperationResult>("PublishEventAsync",
        () => _eventPublisher.PublishAsync(eventStream),
        currentRetryTimes => PublishEventAsync(processingCommand, eventStream, currentRetryTimes),
        result =>
        {
            _logger.DebugFormat("Publish events success, {0}", eventStream);
            processingCommand.Complete(new CommandResult(CommandStatus.Success, processingCommand.Command.Id));
        },
        () => string.Format("[eventStream:{0}]", eventStream),
        ex => processingCommand.Complete(new CommandResult(CommandStatus.Failed, processingCommand.Command.Id)),
        retryTimes);
    }
    PublishEventAsync(processingCommand, eventStream, 0);
  • 相关阅读:
    对象池使用时要注意几点
    Flash3D学习计划(一)——3D渲染的一般管线流程
    714. Best Time to Buy and Sell Stock with Transaction Fee
    712. Minimum ASCII Delete Sum for Two Strings
    647. Palindromic Substrings(马拉车算法)
    413. Arithmetic Slices
    877. Stone Game
    338. Counting Bits
    303. Range Sum Query
    198. House Robber
  • 原文地址:https://www.cnblogs.com/netfocus/p/4301379.html
Copyright © 2011-2022 走看看