zoukankan      html  css  js  c++  java
  • 封装Socket.BeginReceive/EndReceive支持Timeout简介

    .NET中的Socket类提供了网络通信常用的方法,分别提供了同步和异步两个版本,其中异步的实现是基于APM异步模式实现,即BeginXXX/EndXXX的方式。异步方法由于其非阻塞的特性,在需考虑程序性能和伸缩性的情况下,一般会选择使用异步方法。但使用过Socket提供的异步方法的同学,应该都会注意到了Socket的异步方法是无法设置Timeout的。以Receive操作为例,Socket提供了一个ReceiveTimeout属性,但该属性设置的是同步版本的Socket.Receive()方法的Timeout值,该设置对异步的Socket.BeginReceive()无效:如果对方没有返回任何消息,则BeginReceive操作将无法完成,其中提供的回调函数也将不会调用。如下示例代码所示:

    private static void TestSocketBeginReceive()


    {


        Socket socket = new Socket(AddressFamily.InterNetwork,


            SocketType.Dgram, ProtocolType.Udp);


        byte[] content = Encoding.ASCII.GetBytes("Hello world");




        IPAddress ip = Dns.Resolve("www.google.com").AddressList[0];


        IPEndPoint receiver = new IPEndPoint(ip, 80);




        socket.BeginSendTo(content, 0, content.Length, SocketFlags.None,


            receiver, SendToCb, socket);


        Console.WriteLine("Sent bytes: " + content.Length);


    }




    private static void SendToCb(IAsyncResult ar)


    {


        var socket = ar.AsyncState as Socket;


        socket.EndSendTo(ar);


        byte[] buffer = new byte[1024];




        IAsyncResult receiveAr = socket.BeginReceive(buffer, 0, buffer.Length,


            SocketFlags.None, null, null);


        int received = socket.EndReceive(receiveAr);


        Console.WriteLine("Received bytes: " + received);


    }

    由于接收方不会返回任何消息,Socket.BeginReceive将永远不会完成,SentToCb方法中的socket.EndReceive()调用将永远阻塞,应用程序也无法得知操作的状态。

    支持Timeout

    在个别的应用场景下,我们希望既能使用Socket的异步通信方法,保证程序的性能,同时又希望能指定Timeout值,当操作没有在指定的时间内完成时,应用程序能得到通知,以进行下一步的操作,如retry等。以下介绍的就是一种支持Timeout的Socket异步Receive操作的实现,方式如下:

    1.基于APM异步模式封装Socket.BeginReceive/EndReceive方法。

    2.使用ThreadPool提供的RegisterWaitForSingleObject()方法注册一个WaitOrTimerCallback,如果指定时间内操作未完成,则结束操作,并设置状态为Timeout。

    3.将上述封装实现为Socket的扩展方法方便调用。

    以下代码简化了所有的参数检查和异常处理,实际使用中需添加相关逻辑。

    AsyncResultWithTimeout

    首先看一下IAsyncResult接口的实现:

    public class AsyncResultWithTimeout : IAsyncResult


    {


        private ManualResetEvent m_waitHandle = new ManualResetEvent(false);


        public AsyncResultWithTimeout(AsyncCallback cb, object state)


        {


            this.AsyncState = state;


            this.Callback = cb;


        }




        #region IAsyncResult




        public object AsyncState { get; private set; }


        public WaitHandle AsyncWaitHandle { get { return m_waitHandle; } }


        public bool CompletedSynchronously { get { return false; } }


        public bool IsCompleted { get; private set; }




        #endregion




        public AsyncCallback Callback { get; private set; }


        public int ReceivedCount { get; private set; }


        public bool TimedOut { get; private set; }


        public void SetResult(int count)


        {


            this.IsCompleted = true;


            this.ReceivedCount = count;


            this.m_waitHandle.Set();




            if (Callback != null) Callback(this);


        }




        public void SetTimeout()


        {


            this.TimedOut = true;


            this.IsCompleted = true;


            this.m_waitHandle.Set();


        }


    }

    AsyncResultWithTimeOut类中包含了IAsyncResult接口中4个属性的实现、用户传入的AsyncCallback委托、接收到的字节数ReceivedCount以及两个额外的方法:

    1.SetResult(): 用于正常接收到消息时设置结果,标记操作完成以及执行回调。

    2.SetTimeout():当超时时,标记操作完成以及设置超时状态。

    StateInfo

    StateInfo类用于保存相关的状态信息,该对象会作为Socket.BeginReceive()的最后一个参数传入。当接收到消息时,接收到的字节数会保存到AsyncResult属性中,并设置操作完成。当超时时,WatchTimeOut方法会将AsyncResult设置为TimeOut状态,并通过RegisteredWaitHandle属性取消注册的WaitOrTimerCallback.

    public class StateInfo


    {


        public StateInfo(AsyncResultWithTimeout result, Socket socket)


        {


            this.AsycResult = result;


            this.Socket = socket;


        }




        public Socket Socket { get; private set; }


        public AsyncResultWithTimeout AsycResult { get; private set; }


        public RegisteredWaitHandle RegisteredWaitHandle { get; set; }


    }

    封装Socket.BeginReceive

    与Socket.BeginReceive方法相比,BeginReceive2添加了一个参数timeout,可以设置该操作的超时时间,单位为毫秒。BeginReceive2中调用Socket.BeginReceive()方法,其中指定的ReceiveCb回调将在正常接收到消息后将结果保存在stateInfo对象的AsyncResult属性中,该属性中的值就是BeginReceive2()方法返回的IAsyncResult。BeginReceive2调用Socket.BeginReceive后,在ThreadPool中注册了一个WaitOrTimerCallback委托。ThreadPool将在Receive操作完成或者Timeout时调用该委托。

    public static class SocketExtension


    {




        public static int EndReceive2(IAsyncResult ar)


        {


            var result = ar as AsyncResultWithTimeout;


            result.AsyncWaitHandle.WaitOne();




            return result.ReceivedCount;


        }




        public static AsyncResultWithTimeout BeginReceive2


        (


            this Socket socket,


            int timeout,


            byte[] buffer,


            int offset,


            int size,


            SocketFlags flags,


            AsyncCallback callback,


            object state


        )


        {


            var result = new AsyncResultWithTimeout(callback, state);




            var stateInfo = new StateInfo(result, socket);




            socket.BeginReceive(buffer, offset, size, flags, ReceiveCb, state);




            var registeredWaitHandle =


                ThreadPool.RegisterWaitForSingleObject(


                    result.AsyncWaitHandle,


                    WatchTimeOut,


                    stateInfo, // 作为state传递给WatchTimeOut


                    timeout,


                    true);




            // stateInfo中保存RegisteredWaitHandle,以方便在úWatchTimeOut


            // 中unregister.


            stateInfo.RegisteredWaitHandle = registeredWaitHandle;




            return result;


        }




        private static void WatchTimeOut(object state, bool timeout)


        {


            var stateInfo = state as StateInfo;


            // 设置的timeout前,操作未完成,则设置为操作Timeout


            if (timeout)


            {


                stateInfo.AsycResult.SetTimeout();


            }




            // 取消之前注册的WaitOrTimerCallback


            stateInfo.RegisteredWaitHandle.Unregister(


                stateInfo.AsycResult.AsyncWaitHandle);


        }




        private static void ReceiveCb(IAsyncResult result)


        {


            var state = result.AsyncState as StateInfo;


            var asyncResultWithTimeOut = state.AsycResult;


            var count = state.Socket.EndReceive(result);


            state.AsycResult.SetResult(count);


        }


    }

    试一下

    以下代码演示了如何使用BeginReceive2:

    private static void TestSocketBeginReceive2()


    {


        Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);


        byte[] content = Encoding.ASCII.GetBytes("Hello world");




        IPAddress ip = Dns.Resolve("www.google.com").AddressList[0];


        IPEndPoint receiver = new IPEndPoint(ip, 80);




        socket.BeginSendTo(content, 0, content.Length, SocketFlags.None, receiver, SendToCb2, socket);


        Console.WriteLine("Sent bytes: " + content.Length);


    }




    private static void SendToCb2(IAsyncResult ar)


    {


        var socket = ar.AsyncState as Socket;


        socket.EndSendTo(ar);


        byte[] buffer = new byte[1024];




        AsyncResultWithTimeout receiveAr = socket.BeginReceive2(2000, buffer, 0, buffer.Length, SocketFlags.None, null, null);


        receiveAr.AsyncWaitHandle.WaitOne();


        if (receiveAr.TimedOut)


        {


            Console.WriteLine("Operation timed out.");


        }


        else


        {


            int received = socket.EndReceive(ar);


            Console.WriteLine("Received bytes: " + received);


        }


    }

    输出结果如下:

                           

     

    上述实现是针对BeginReceive的封装,还可以以相同的方式将Send/Receive封装以支持Timeout, 或者更进一步支持retry操作。

    附示例代码:files.cnblogs.com/dytes/SocketAsyncOpWithTimeOut.zip

    本文转自:http://www.csharpwin.com/csharpspace/13263r8436.shtml

  • 相关阅读:
    Spark、BulkLoad Hbase、单列、多列
    centos 根目录扩容
    VMware Workstation 添加磁盘 挂载目录(centos)
    maven+scala+idea 环境构建
    Spark 调优之ShuffleManager、Shuffle
    Spark 调优之数据倾斜
    SparkSession、SparkContext、SQLContext和HiveContext之间的区别。
    Spark 自定义函数(udf,udaf)
    树莓派环境下使用python将h264格式的视频转为mp4
    C# NanUI WinFormium监听页面加载开始结束
  • 原文地址:https://www.cnblogs.com/lxxhome/p/5614819.html
Copyright © 2011-2022 走看看