zoukankan      html  css  js  c++  java
  • C#异步案例一则

    场景

      生产者和消费者队列, 生产者有多个, 消费者也有多个, 生产到消费需要异步.

    下面用一个Asp.NetCore Web-API项目来模拟

      创建两个API, 一个Get(), 一个Set(), Get返回一个字符串, Set放入一个字符串, Get返回的就是Set进去的字符串.

      实现如下:  

    [Route("api/[controller]/[action]")]
    public class FooController : Control
    {
        IMessageQueue _mq;
        public FooController(IMessageQueue mq)
        {
            _mq = mq;
        }
    
        [HttpGet]
        public string Get()
        {
            string str = _mq.ReadOne<string>();
            return str;
        }
    
        [HttpGet]
        public void Set(string v)
        {
            _mq.WriteOne(v);
        }
    }
    
    public interface IMessageQueue
    {
        T ReadOne<T>();
        void WriteOne<T>(T value);
    }
    
    public class MessageQueue: IMessageQueue
    {
        private object _value;
    
        public T ReadOne<T>()
        {
            return (T)_value;
        }
    
        public void WriteOne<T>(T value)
        {
            _value = value;
    
        }
    }

    接着在StartUp中把IMessageQueue给注入了.

    services.AddSingleton<IMessageQueue, MessageQueue>();

    运行后, 先调用/api/foo/set/?v=xxx, 再调用/api/foo/get/

    可以看到成功返回了xxx

    第二步, value字段改为队列:

    使set进去的值不会被下一个覆盖, get取队列最前的值

    为了线程安全, 这里使用了ConcurrentQueue<T>

    代码如下:

    public class MessageQueue: IMessageQueue
    {
        private readonly ConcurrentQueue<object> _queue = new ConcurrentQueue<object>();
    
        public T ReadOne<T>()
        {
            _queue.TryDequeue(out object str);
            return (T)str ;
        }
    
        public void WriteOne<T>(Tvalue)
        {
            _queue.Enqueue(value);
        }
    }

    那么此时, 只要get不断地轮询, 就可以取到set生产出来的数据了.

    调用/api/foo/set/

    三, 异步阻塞

    再增加需求, 调换get和set的顺序,先get后set模拟异步, (我这里的demo是个web-api会有http请求超时之类的...假装不存在)我想要get调用等待有数据时才返回.

    也就是说我想要在浏览器地址栏输入http://localhost:5000/api/foo/get/之后会不断地转圈直到我用set接口放入一个值

    方案A: while(true), 根本无情简直无敌, 死等Read() != null时break; 为防单核满转加个Thread.Sleep();

    方案B: Monitor, 一个Wait()一个Exit/Release();

    但是以上两个方案都是基于Thread的, .Net4.0之后伴随ConcurrentQueue一起来的还有个BlockingCollection<T>相当好用

    方案C: 修改后代码如下:

    public class MessageQueue : IMessageQueue
    {
        private readonly BlockingCollection<object> _queue = new BlockingCollection<object>(new ConcurrentQueue<object>());
    
        public T ReadOne<T>()
        {
            var obj = _queue.Take();
            return (T)obj;
        }
    
        public void WriteOne<T>(T value)
        {
            _queue.Add(value);
        }
    }

    此时, 如果先get, 会阻塞等待set; 如果已经有set过数据就会直接返回队列中的数据. get不会无功而返了. 基于这个类型, 可以实现更像样的订阅模型.

    扩展RPC

    这里的set是生产者, get是消费者, 那如果我的这个生产者并不单纯产生数据返回void而是需要等待一个结果的呢? 此时订阅模型不够用了, 我需要一个异步的RPC .

    比如有个Ask请求会携带参数发起请求, 并等待, 知道另外有个地方处理了这个任务产生结果, ask结束等待返回这个结果answer. 

    我可以回头继续用方案A或B, 但连.net4.0都已经过去很久了, 所以应该用更好的基于Task的异步方案.

    代码如下, 首先新增两个接口:

    public interface IMessageQueue
    {
        void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func);
        Task<TResponse> Rpc<TRequest, TResponse>(TRequest req);
    
        T ReadOne<T>();
        void WriteOne<T>(T data);
    }

    接着定义一个特殊的任务类:

    public class RpcTask<TRequest, TResponse>
    {
        public TaskCompletionSource<TResponse> Tcs { get; set; }
        public TRequest Request { get; set; }
    }

    实现刚才新加的两个接口:

    public Task<TResponse> Rpc<TRequest, TResponse>(TRequest req)
    {
        TaskCompletionSource<TResponse> tcs = new TaskCompletionSource<TResponse>();
        _queue.Add(new RpcTask<TRequest, TResponse> { Request = req, Tcs = tcs});
        return tcs.Task;
    }
    
    public void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func)
    {
        var obj = _queue.Take();
        if(obj is RpcTask<TRequest, TResponse> t)
        {
            var response = func(t.Request);
            t.Tcs.SetResult(response);
        }
    }

    同样的, 写两个Web API接口, 一个请求等待结果 一个负责处理工作

    [HttpGet]
    public async Task<string> Ask(string v)
    {
        var response = await _mq.Rpc<MyRequest, MyResponse>(new MyRequest { Id = v });
        return $"[{response.DoneTime}] {response.Id}";
    }
    
    [HttpGet]
    public void Answer()
    {
        _mq.Respond<MyRequest, MyResponse>((req)=> new MyResponse { Id = req.Id, DoneTime = DateTime.Now });
    }

    上面还随便写了两个class作为请求和返回

    public class MyRequest
    {
        public string Id { get; set; }
    }
    public class MyResponse
    {
        public string Id { get; set; }
        public DateTime DoneTime { get; set; }
    }

    测试一下, 用浏览器或postman打开三个选项卡, 各发起一个Ask接口的请求, 参数v分别为1 2 3, 三个选项卡都开始转圈等待

    然后再打开一个选项卡访问answer接口, 处理刚才放进队列的任务, 发起一次之前的三个选项卡之中就有一个停止等待并显示返回数据. 需求实现.

    这里用到的关键类型是TaskCompletionSource<T>.

    再扩展

    如果是个分布式系统, 请求和处理逻辑不是在一个程序里呢? 那么这个队列可能也是一个单独的服务. 此时就要再加个返回队列了, 给队列中传输的每一个任务打上Id, 返回队列中取出返回之后再找到Id对于的TCS.SetResult()

      

  • 相关阅读:
    Unity shader 代码高亮+提示
    PTA --- L2-003 月饼
    PTA --- L2-002 链表去重
    计蒜客 —— 字符串p型编码
    计蒜客 —— 最好的草
    最近忙科研立项 & 对博客的优化
    计蒜客 —— 删除单词后缀
    Tensorflow 保存模型 & 在java中调用
    Tensorflow 用训练好的模型预测
    Tensorflow 保存和载入训练过程
  • 原文地址:https://www.cnblogs.com/pasoraku/p/11971944.html
Copyright © 2011-2022 走看看