zoukankan      html  css  js  c++  java
  • 封装Socket.BeginReceive/EndReceive支持Timeout简介

    .NET中的Socket类提供了网络通信常用的方法,分别提供了同步和异步两个版本,其中异步的实现是基于APM异步模式实现,即BeginXXX/EndXXX的方式。异步方法由于其非阻塞的特性,在需考虑程序性能和伸缩性的情况下,一般会选择使用异步方法。但使用过Socket提供的异步方法的同学,应该都会注意到了Socket的异步方法是无法设置Timeout的。以Receive操作为例,Socket提供了一个ReceiveTimeout属性,但该属性设置的是同步版本的Socket.Receive()方法的Timeout值,该设置对异步的Socket.BeginReceive()无效:如果对方没有返回任何消息,则BeginReceive操作将无法完成,其中提供的回调函数也将不会调用。如下示例代码所示:

    private static void TestSocketBeginReceive()


    {


        Socket socket = new Socket(AddressFamily.InterNetwork,


            SocketType.Dgram, ProtocolType.Udp);


        byte[] content = Encoding.ASCII.GetBytes("Hello world");




        IPAddress ip = Dns.Resolve("www.google.com").AddressList[0];


        IPEndPoint receiver = new IPEndPoint(ip, 80);




        socket.BeginSendTo(content, 0, content.Length, SocketFlags.None,


            receiver, SendToCb, socket);


        Console.WriteLine("Sent bytes: " + content.Length);


    }




    private static void SendToCb(IAsyncResult ar)


    {


        var socket = ar.AsyncState as Socket;


        socket.EndSendTo(ar);


        byte[] buffer = new byte[1024];




        IAsyncResult receiveAr = socket.BeginReceive(buffer, 0, buffer.Length,


            SocketFlags.None, null, null);


        int received = socket.EndReceive(receiveAr);


        Console.WriteLine("Received bytes: " + received);


    }

    由于接收方不会返回任何消息,Socket.BeginReceive将永远不会完成,SentToCb方法中的socket.EndReceive()调用将永远阻塞,应用程序也无法得知操作的状态。

    支持Timeout

    在个别的应用场景下,我们希望既能使用Socket的异步通信方法,保证程序的性能,同时又希望能指定Timeout值,当操作没有在指定的时间内完成时,应用程序能得到通知,以进行下一步的操作,如retry等。以下介绍的就是一种支持Timeout的Socket异步Receive操作的实现,方式如下:

    1.基于APM异步模式封装Socket.BeginReceive/EndReceive方法。

    2.使用ThreadPool提供的RegisterWaitForSingleObject()方法注册一个WaitOrTimerCallback,如果指定时间内操作未完成,则结束操作,并设置状态为Timeout。

    3.将上述封装实现为Socket的扩展方法方便调用。

    以下代码简化了所有的参数检查和异常处理,实际使用中需添加相关逻辑。

    AsyncResultWithTimeout

    首先看一下IAsyncResult接口的实现:

    public class AsyncResultWithTimeout : IAsyncResult


    {


        private ManualResetEvent m_waitHandle = new ManualResetEvent(false);


        public AsyncResultWithTimeout(AsyncCallback cb, object state)


        {


            this.AsyncState = state;


            this.Callback = cb;


        }




        #region IAsyncResult




        public object AsyncState { get; private set; }


        public WaitHandle AsyncWaitHandle { get { return m_waitHandle; } }


        public bool CompletedSynchronously { get { return false; } }


        public bool IsCompleted { get; private set; }




        #endregion




        public AsyncCallback Callback { get; private set; }


        public int ReceivedCount { get; private set; }


        public bool TimedOut { get; private set; }


        public void SetResult(int count)


        {


            this.IsCompleted = true;


            this.ReceivedCount = count;


            this.m_waitHandle.Set();




            if (Callback != null) Callback(this);


        }




        public void SetTimeout()


        {


            this.TimedOut = true;


            this.IsCompleted = true;


            this.m_waitHandle.Set();


        }


    }

    AsyncResultWithTimeOut类中包含了IAsyncResult接口中4个属性的实现、用户传入的AsyncCallback委托、接收到的字节数ReceivedCount以及两个额外的方法:

    1.SetResult(): 用于正常接收到消息时设置结果,标记操作完成以及执行回调。

    2.SetTimeout():当超时时,标记操作完成以及设置超时状态。

    StateInfo

    StateInfo类用于保存相关的状态信息,该对象会作为Socket.BeginReceive()的最后一个参数传入。当接收到消息时,接收到的字节数会保存到AsyncResult属性中,并设置操作完成。当超时时,WatchTimeOut方法会将AsyncResult设置为TimeOut状态,并通过RegisteredWaitHandle属性取消注册的WaitOrTimerCallback.

    public class StateInfo


    {


        public StateInfo(AsyncResultWithTimeout result, Socket socket)


        {


            this.AsycResult = result;


            this.Socket = socket;


        }




        public Socket Socket { get; private set; }


        public AsyncResultWithTimeout AsycResult { get; private set; }


        public RegisteredWaitHandle RegisteredWaitHandle { get; set; }


    }

    封装Socket.BeginReceive

    与Socket.BeginReceive方法相比,BeginReceive2添加了一个参数timeout,可以设置该操作的超时时间,单位为毫秒。BeginReceive2中调用Socket.BeginReceive()方法,其中指定的ReceiveCb回调将在正常接收到消息后将结果保存在stateInfo对象的AsyncResult属性中,该属性中的值就是BeginReceive2()方法返回的IAsyncResult。BeginReceive2调用Socket.BeginReceive后,在ThreadPool中注册了一个WaitOrTimerCallback委托。ThreadPool将在Receive操作完成或者Timeout时调用该委托。

    public static class SocketExtension


    {




        public static int EndReceive2(IAsyncResult ar)


        {


            var result = ar as AsyncResultWithTimeout;


            result.AsyncWaitHandle.WaitOne();




            return result.ReceivedCount;


        }




        public static AsyncResultWithTimeout BeginReceive2


        (


            this Socket socket,


            int timeout,


            byte[] buffer,


            int offset,


            int size,


            SocketFlags flags,


            AsyncCallback callback,


            object state


        )


        {


            var result = new AsyncResultWithTimeout(callback, state);




            var stateInfo = new StateInfo(result, socket);




            socket.BeginReceive(buffer, offset, size, flags, ReceiveCb, state);




            var registeredWaitHandle =


                ThreadPool.RegisterWaitForSingleObject(


                    result.AsyncWaitHandle,


                    WatchTimeOut,


                    stateInfo, // 作为state传递给WatchTimeOut


                    timeout,


                    true);




            // stateInfo中保存RegisteredWaitHandle,以方便在úWatchTimeOut


            // 中unregister.


            stateInfo.RegisteredWaitHandle = registeredWaitHandle;




            return result;


        }




        private static void WatchTimeOut(object state, bool timeout)


        {


            var stateInfo = state as StateInfo;


            // 设置的timeout前,操作未完成,则设置为操作Timeout


            if (timeout)


            {


                stateInfo.AsycResult.SetTimeout();


            }




            // 取消之前注册的WaitOrTimerCallback


            stateInfo.RegisteredWaitHandle.Unregister(


                stateInfo.AsycResult.AsyncWaitHandle);


        }




        private static void ReceiveCb(IAsyncResult result)


        {


            var state = result.AsyncState as StateInfo;


            var asyncResultWithTimeOut = state.AsycResult;


            var count = state.Socket.EndReceive(result);


            state.AsycResult.SetResult(count);


        }


    }

    试一下

    以下代码演示了如何使用BeginReceive2:

    private static void TestSocketBeginReceive2()


    {


        Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);


        byte[] content = Encoding.ASCII.GetBytes("Hello world");




        IPAddress ip = Dns.Resolve("www.google.com").AddressList[0];


        IPEndPoint receiver = new IPEndPoint(ip, 80);




        socket.BeginSendTo(content, 0, content.Length, SocketFlags.None, receiver, SendToCb2, socket);


        Console.WriteLine("Sent bytes: " + content.Length);


    }




    private static void SendToCb2(IAsyncResult ar)


    {


        var socket = ar.AsyncState as Socket;


        socket.EndSendTo(ar);


        byte[] buffer = new byte[1024];




        AsyncResultWithTimeout receiveAr = socket.BeginReceive2(2000, buffer, 0, buffer.Length, SocketFlags.None, null, null);


        receiveAr.AsyncWaitHandle.WaitOne();


        if (receiveAr.TimedOut)


        {


            Console.WriteLine("Operation timed out.");


        }


        else


        {


            int received = socket.EndReceive(ar);


            Console.WriteLine("Received bytes: " + received);


        }


    }

    输出结果如下:

                           

     

    上述实现是针对BeginReceive的封装,还可以以相同的方式将Send/Receive封装以支持Timeout, 或者更进一步支持retry操作。

    附示例代码:files.cnblogs.com/dytes/SocketAsyncOpWithTimeOut.zip

    本文转自:http://www.csharpwin.com/csharpspace/13263r8436.shtml

  • 相关阅读:
    ThinkPHP 3.2.2 实现持久登录 ( 记住我 )
    Java实现 LeetCode 20 有效的括号
    Java实现 LeetCode 20 有效的括号
    Java实现 LeetCode 19删除链表的倒数第N个节点
    Java实现 LeetCode 19删除链表的倒数第N个节点
    Java实现 LeetCode 19删除链表的倒数第N个节点
    Java实现 LeetCode 18 四数之和
    Java实现 LeetCode 18 四数之和
    Java实现 LeetCode 18 四数之和
    Java实现 LeetCode 17 电话号码的字母组合
  • 原文地址:https://www.cnblogs.com/lxxhome/p/5614819.html
Copyright © 2011-2022 走看看