zoukankan      html  css  js  c++  java
  • .net IO异步和Producer/Consumer队列实现一分钟n次http请求

    简介

    最近工作中有一个需求:要求发送http请求到某站点获取相应的数据,但对方网站限制了请求的次数:一分钟最多200次请求。

    搜索之后,在stackoverflow网站查到一个类似的问题.。但里面用到了Reactive Extensions,权衡之下最后还是决定自己简单实现一分钟最多200次请求。

    思路

    思路很简单,一分钟200次,平均下来一次请求300ms,大概3次的时候将近一秒,所以一次异步发送三个请求,然后线程暂停900ms。

    这里的关键是运行代码时尽量不要堵塞线程,可以速度很快执行发送请求之前的代码。

    实现

    异步请求

    http请求属于IO请求,其异步可以调用HttpWebRequest.BeginGetResponse方法实现,但现在流行TPL,方法TaskFactory.FromAsync更加方便简介。

    request = (HttpWebRequest)WebRequest.Create(addUrl);
    request.Method = "POST";
    request.Timeout = timeOut;
    request.Proxy = null;
    request.Accept = "application/xml, */*";
    request.ContentType = "application/xml";
    
    XElement inputElem = BuildRequestInputXml(userName, pwd, ctripPolicy);
    byte[] inputBytes = Encoding.UTF8.GetBytes(inputStr);
    inputBytes = ms.ToArray();
    using (var stream = request.GetRequestStream())
    {
        stream.Write(inputBytes, 0, inputBytes.Length);
    }
    Task.Factory.FromAsync<WebResponse>(request.BeginGetResponse, request.EndGetResponse, null, TaskCreationOptions.None).ContinueWith(
        t =>
        {
            HttpWebResponse response = t.Result;
            using (StreamReader reader = new StreamReader(responseStream))
            {
                responseStr = reader.ReadToEnd();
            }
            DBAccessHelper.UpdateDB(responseStr);
    
        });
    

     异步实现之后就是发送三次请求,然后暂停900ms:

    for (int i = 0; i < datas.Length; i++)
    {
        StartOneQueryAsync(datas[i].t1, datas[i].t2);
    
        if ((i + 1) % 3 == 0)
        {
        Thread.Sleep(TimeSpan.FromMilliseconds(900));
        }            
     }
    

     测试和改进

    简单实现之后就开始测试,但后来发现代码在发送了200次请求之后,后续请求就会被堵塞很长的时间(3~4s),最后测试结果是4000个请求大概30分钟才完成,这个和我们理想的20分钟有很大的差距。

    一开始的分析是发送请求次数过多,因为是异步发送,后续处理的线程可能不够而导致线程被堵塞。

    这里的解决方法就是使用 生产者/消费者队列,如网上的MSMQ,RabbitMQ等。

    不过在.net 4.0中添加了一些异步集合类:ConcurrentStack<T>,ConcurrentQueue<T>,ConcurrentBag<T>,ConcurrentDictionary<TKey,TValue>。

    所以这里的思路就是用异步队列ConcurrentQueue<Action>将要执行的方法Action添加到异步队列中,然后开启2到3个格外的线程从异步队列中获取Action再执行之。

    这种ProducerConsumer模式在.net 4.0中也已存在,其中有BlockingCollection<T>类就实现了IProducerConsumerCollection<T>接口,有了这些之后我们就可以实现一个Producer/Consumer 队列:

    public class UpdateDBQueue : IDisposable
    {
        BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
    
        public UpdateDBQueue(int workerCount)
        {
            // 创建格外的线程来执行task
            for (int i = 0; i < workerCount; i++)
            {
                Task.Factory.StartNew(Consume);
            }
        }
    
        public void Enqueue(Action action) { _taskQ.Add(action); }
    
        void Consume()
        {
            // 队列中没有数据就会被堵塞,在方法CompleteAdding被调用之后就会自动结束
            foreach (Action action in _taskQ.GetConsumingEnumerable())
            {
                action(); // Perform task.
            }
        }
    
        public void Dispose()
        {
            _taskQ.CompleteAdding();
        }     
    }
    

     这里如果还是 .net 2.0的同学可以参考stackoverflow的这篇文章,里面有介绍2.0如何实现Producer/Consumer 队列

    下面就是新加了队列后的代码。

    Task.Factory.FromAsync<WebResponse>(request.BeginGetResponse, request.EndGetResponse, null, TaskCreationOptions.None).ContinueWith(
        t =>
        {
            HttpWebResponse response = t.Result;
            using (StreamReader reader = new StreamReader(responseStream))
            {
                responseStr = reader.ReadToEnd();
            }
          //数据更新推送到队列
           updateUBQueue.Enqueue(() =>{
               DBAccessHelper.UpdateDB(responseStr);
            });
        });
    

     持续测试和改进

    加上队列之后再次测试,发现在200次请求之后还是有堵塞的情况发生,这样看样子应该不是后续处理线程不够,应该是请求的时候线程被堵塞。

    代码比较简单,后来发现request.GetRequestStream方法也有对应的HttpWebRequest.GetRequestStream,看样子也是IO请求,所以最后也写成异步:

    Task.Factory.FromAsync<Stream>(request.BeginGetRequestStream, request.EndGetRequestStream, null, TaskCreationOptions.None)
    .ContinueWith(streamTask => 
    {
        using (var stream = streamTask.Result)
        {
            stream.Write(inputBytes, 0, inputBytes.Length);
        }
    
        Task.Factory.FromAsync<WebResponse>(request.BeginGetResponse, request.EndGetResponse, null, TaskCreationOptions.None)
                .ContinueWith(responseTask =>
                {
                    #region Get Response
                    HttpWebResponse response = null;
                    try
                    {
                        response = (HttpWebResponse)responseTask.Result;
                        string responseStr = string.Empty;
                        using (Stream responseStream = response.GetResponseStream())
                        {
                                using (StreamReader reader = new StreamReader(responseStream))
                                {
                                    responseStr = reader.ReadToEnd();
                                }
                        }
                    updateUBQueue.Enqueue(() =>
                    {
                        DBAccessHelper.UpdateDB(responseStr);
                    });
                    }
                    catch (Exception ex)
                    {
                        LogHelper.Log("*****", ex);
                    }
                    finally
                    {
                        if (response != null)
                        {
                            response.Close();
                        }
                        if (request != null)
                        {
                            request.Abort();
                        }
                    }
    
                });
    
    });
    

     最后测试的时候发现4000多个请求在21分钟就可以完成,测试通过。

    总结:

    http异步请求的时候GetResponse也需要异步,.net 4.0已经包含了异步队列ConcurrentQueue<T>,使用BlockingCollection<T>可以实现自己的Producer/Consumer 队列。

  • 相关阅读:
    7-1 N个数求和
    3662. 最大上升子序列和
    树状数组
    堆优化Dijkstra java模板
    皮亚诺曲线距离
    最长公共子序列(计数问题)
    最小路径覆盖
    极角排序
    2619. 询问
    Hessian矩阵与局部极小值
  • 原文地址:https://www.cnblogs.com/julyluo/p/4905361.html
Copyright © 2011-2022 走看看