zoukankan      html  css  js  c++  java
  • 分享一个异步任务在遇到IO异常时支持递归回调的辅助方法

    public void TryAsyncActionRecursively<TAsyncResult>(
        string asyncActionName,
        Func<Task<TAsyncResult>> asyncAction,
        Action<int> mainAction,
        Action<TAsyncResult> successAction,
        Func<string> getContextInfoFunc,
        Action<Exception> failedAction,
        int retryTimes) where TAsyncResult : AsyncOperationResult
    {
        var retryAction = new Action<int>(currentRetryTimes =>
        {
            if (currentRetryTimes >= _immediatelyRetryTimes)
            {
                Task.Factory.StartDelayedTask(_retryIntervalForIOException, () => mainAction(currentRetryTimes + 1));
            }
            else
            {
                mainAction(currentRetryTimes + 1);
            }
        });
        var executeFailedAction = new Action<Exception>(ex =>
        {
            try
            {
                if (failedAction != null)
                {
                    failedAction(ex);
                }
            }
            catch (Exception unknownEx)
            {
                _logger.Error(string.Format("Failed to execute the failedCallbackAction of asyncAction:{0}, contextInfo:{1}",
                    asyncActionName, getContextInfoFunc()), unknownEx);
            }
        });
        var processTaskException = new Action<Exception, int>((ex, currentRetryTimes) =>
        {
            if (ex is IOException)
            {
                _logger.Error(string.Format("Async task '{0}' has io exception, contextInfo:{1}, current retryTimes:{2}",
                    asyncActionName, getContextInfoFunc(), currentRetryTimes), ex);
                retryAction(retryTimes);
            }
            else
            {
                _logger.Error(string.Format("Async task '{0}' has unknown exception, contextInfo:{1}, current retryTimes:{2}",
                    asyncActionName, getContextInfoFunc(), currentRetryTimes), ex);
                executeFailedAction(ex);
            }
        });
        var completeAction = new Action<Task<TAsyncResult>>(t =>
        {
            if (t.Exception != null)
            {
                processTaskException(t.Exception.InnerException, retryTimes);
                return;
            }
            if (t.IsCanceled)
            {
                _logger.ErrorFormat("Async task '{0}' was cancelled, contextInfo:{1}, current retryTimes:{2}",
                    asyncActionName, getContextInfoFunc(), retryTimes);
                retryAction(retryTimes);
                return;
            }
            var result = t.Result;
            if (result.Status == AsyncOperationResultStatus.IOException)
            {
                _logger.ErrorFormat("Async task '{0}' has io exception, contextInfo:{1}, current retryTimes:{2}, errorMsg:{3}",
                    asyncActionName, getContextInfoFunc(), retryTimes, result.ErrorMessage);
                retryAction(retryTimes);
                return;
            }
            if (successAction != null)
            {
                successAction(result);
            }
        });
    
        try
        {
            asyncAction().ContinueWith(completeAction);
        }
        catch (IOException ex)
        {
            _logger.Error(string.Format("Execute async action '{0}' failed, contextInfo:{1}, current retryTimes:{2}",
                asyncActionName, getContextInfoFunc(), retryTimes), ex);
            retryAction(retryTimes);
        }
        catch (Exception ex)
        {
            _logger.Error(string.Format("Execute async action '{0}' failed, contextInfo:{1}, current retryTimes:{2}",
                asyncActionName, getContextInfoFunc(), retryTimes), ex);
            executeFailedAction(ex);
        }
    }

    该函数的功能是:执行一个异步任务(返回Task的方法),如果执行出现IO异常,则重试当前主函数(mainAction);用户的mainAction中会再次调用TryAsyncActionRecursively方法。从而实现当遇到IO异常时,能做到不断重试。另外,重试只立即重试指定的次数,超过指定次数,则不立即重试,而是暂停一定间隔后再次执行。该函数还提供当acyncAction执行成功或失败后的回调函数,以及允许传入当前上下文的一些说明信息,以便记录有意义的错误日志信息。

    下面是使用示例:

    private void PublishEventAsync(ProcessingCommand processingCommand, EventStream eventStream, int retryTimes)
    {
        TryAsyncActionRecursively<AsyncOperationResult>("PublishEventAsync",
        () => _eventPublisher.PublishAsync(eventStream),
        currentRetryTimes => PublishEventAsync(processingCommand, eventStream, currentRetryTimes),
        result =>
        {
            _logger.DebugFormat("Publish events success, {0}", eventStream);
            processingCommand.Complete(new CommandResult(CommandStatus.Success, processingCommand.Command.Id));
        },
        () => string.Format("[eventStream:{0}]", eventStream),
        ex => processingCommand.Complete(new CommandResult(CommandStatus.Failed, processingCommand.Command.Id)),
        retryTimes);
    }
    PublishEventAsync(processingCommand, eventStream, 0);
  • 相关阅读:
    Sublime : python环境
    shell:遍历目录和子目录的所有文件
    Java:方法的参数是传值还是传引用
    Sublime Text:Windows下配置C 编译环境和GDB调试环境
    代码优化的一些尝试
    go:关于变量地址的疑惑
    go:结构体的可访问性
    go:channel(未完)
    H5常见问题及解决方案。
    谷歌插件大全地址
  • 原文地址:https://www.cnblogs.com/netfocus/p/4301379.html
Copyright © 2011-2022 走看看