zoukankan      html  css  js  c++  java
  • Socket编程 (异步通讯,解决Udp丢包) Part4

    Socket编程 (异步通讯,解决Udp丢包)

      对于基于socket的udp协议通讯,丢包问题大家应该都见怪不怪了,但我们仍然希望在通讯方面使用Udp协议通讯,因为它即时,消耗资源少,响应迅速,灵活性强无需向Tcp那样建立连接消耗很长的时间等等很有优势的理由让我们对Udp通讯寄予了厚望。但它也存在一个不好的特点,经常丢包是时常发生的事。可能各位大侠已经有了很好的解决方案,本人在这也只是本着大家共同学习的目的,提供自己的解决方式。

    解决思路:模拟tcp三次握手协议,通过使用Timer定时器监视发送请求后接受数据的时间,如果一段时间内没有接受到数据包则判定丢包,并重新发送本次请求。

    下面我们将通过在客户端向服务器端发送文件以及请求下载文件...将该例子是如何解决Udp丢包的问题。

    个人优化:基于上章项目的Tcp协议下通讯,模拟聊天的同时发送文件。我们开辟两个端口,一个基于Tcp协议下进行聊天通讯,一个基于Udp协议下传送文件通讯。

    项目大致思路如下:

      发送文件:1.tcp客户端(文件发送端)先在本端初始化Udp服务端,并绑定好相应的端口,并将Ip地址以及端口号发送到tcp服务器端(文件接收端)。

             2.tcp服务端(文件接收端)接收到发送文件请求后,初始化Udp客户端,并向指定的Udp服务端发送已准备完毕信息。

           3.Udp服务器端接收到Udp客户端已准备好的信息后,初始化要发送文件对象,获取文件的基本参数(文件名、文件大小、数据大小、数据包总数)。

           4.Udp客户端收到文件的基本参数后想Udp服务端发送开始发送文件,以及此时接受到文件之后的偏移量,告诉Udp服务端应该从文件的哪个部分发送数据。此时启动Timer定时器。

           5.Udp服务器端收到文件继续发送的请求后,设置文件的偏移量,然后发送对应的数据包。此时关闭Timer定时器。

           6.循环4-5过程直到文件发送完毕后,Udp客户端向Udp服务端发送文件接受完毕消息,并与3秒后关闭Udp客户端套接字。

           7.Udp接受到文件接受完毕的消息后,关闭该套接字。

             8.Udp客户端一段时间没接受到消息则出发Timer的定时触发事件,重新发送本次请求。

      接收文件:1.tcp客户端(文件发送端)先在本端初始化Udp服务端,并绑定好相应的端口,并将Ip地址以及端口号发送到tcp服务器端(文件接收端)。

             2.tcp服务端(文件接收端)接收到发送文件请求后,初始化Udp客户端,并根据文件名初始化要发送文件对象,获取文件的基本参数(文件名、文件大小、数据大小、数据包总数),并将此信息转成协议信息发送到Udp服务端。

           后面的过程与上类似,我就不再马字了......

    下面的过程主要针对发送文件进行讲解:

      对于文件信息以及各种其他确认请求通过自定义协议发送后并解析的情况,在上一章都已经展示给大家看了。

      Udp服务端以及Udp客户端继承UdpCommon公用抽象类,主要存放两端共同所需对象及方法:  

       /// <summary>
        /// Udp异步通讯公有类
        /// 实现server与client相同方法
        /// </summary>
        public abstract class UdpCommon : IDisposable
        {
            #region 全局成员
            protected Socket worksocket;//当前套接字
            protected EndPoint sendEP;//发送端remote
            protected EndPoint reciveEP;//接受端remote
            protected int buffersize = 1024;//数据缓冲区大小
    
            protected RequestFile requestFile;//文件请求
    
            protected FileSendManager sendmanager;//文件发送管理
            protected FileReciveManager recivemanager;//文件接收管理
    
            protected HandlerMessage handlerMsg = new HandlerMessage();//消息协议处理类
    
            protected delegate void ReciveCallbackDelegate(int length, byte[] buffer);
            protected event ReciveCallbackDelegate ReciveCallbackEvent;//消息接收处理事件
    
            protected delegate void TimeOutHandlerDelegate(MutualMode mode, byte[] buffer);
            protected event TimeOutHandlerDelegate TimeOutHandlerEvent;//超时处理
            #endregion
    
            #region 构造函数
            /// <summary>
            /// 构造函数
            /// </summary>
            /// <param name="_requestFile">请求信息</param>
            public UdpCommon(RequestFile _requestFile)
            {
                requestFile = _requestFile;
            }
            #endregion
    
            #region 抽象方法
            /// <summary>
            /// 接受buffer处理
            /// </summary>
            /// <param name="length"></param>
            /// <param name="buffer"></param>
            public abstract void ReciveBufferHandler(int length, byte[] buffer);
            #endregion
    
            #region 异步接受/发送buffer
            /// <summary>
            /// 异步接受buffer
            /// </summary>
            protected void AsynRecive()
            {
                byte[] buffer = new byte[buffersize];
                worksocket.BeginReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref reciveEP,
                    asyncResult =>
                    {
                        if (asyncResult.IsCompleted)//信息接收完成
                        {
                            int length = worksocket.EndReceiveFrom(asyncResult, ref sendEP);
                            if (TimeOutHandlerEvent != null)
                            {
                                TimeOutHandlerEvent.Invoke(MutualMode.recive, null);
                            }
                            ReciveCallbackEvent.Invoke(length, buffer);
                        }
                    }, null);
            }
            /// <summary>
            /// 异步发送buffer
            /// </summary>
            /// <param name="buffer"></param>
            protected void AsynSend(byte[] buffer)
            {
                worksocket.BeginSendTo(buffer, 0, buffer.Length, SocketFlags.None, sendEP,
                    asyncResult =>
                    {
                        if (asyncResult.IsCompleted)//消息发送完成
                        {
                            worksocket.EndSendTo(asyncResult);
                            if (TimeOutHandlerEvent != null)
                            {
                                TimeOutHandlerEvent.Invoke(MutualMode.send, buffer);
                            }
                        }
                    }, null);
            }
            #endregion
    }

    Udp服务端初始化Socket套接字,并初始化相应的消息返回处理委托事件:

    using System;
    using System.Collections.Generic;
    using System.Text;
    #region 命名空间
    using System.Net;
    using System.Net.Sockets;
    using System.IO;
    using SocketCommon;
    #endregion
    
    namespace ClientConsole
    {
        public class UdpServer : UdpCommon
        {
            public UdpServer(RequestFile _requestFile)
                : base(_requestFile)
            {
                //初始化套接字
                IPEndPoint ipep = new IPEndPoint(IPAddress.Parse("192.168.1.108"), 0);
                base.worksocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
                base.worksocket.Bind(ipep);
                base.reciveEP = base.sendEP = (EndPoint)(new IPEndPoint(IPAddress.Any, 0));
    
                //返回Ip地址及端口信息
                IPEndPoint localEP = (IPEndPoint)base.worksocket.LocalEndPoint;
                requestFile.Address = localEP.Address.ToString();
                requestFile.Port = localEP.Port;
    
                //根据不同的请求初始化相应的事件
                if (requestFile.Mode == RequestMode.send)//发送准备
                {
                    base.ReciveCallbackEvent += new ReciveCallbackDelegate(ReadySendBuffer);
                }
                else //接受准备
                {
                    base.ReciveCallbackEvent += new ReciveCallbackDelegate(ReadyReciveBuffer);
                }
                base.AsynRecive();
            }
            /// <summary>
            /// 准备发送文件信息
            /// </summary>
            /// <param name="length"></param>
            /// <param name="buffer"></param>
            public void ReadySendBuffer(int length, byte[] buffer)
            {
                MessageProtocol msgPro = handlerMsg.HandlerObject(Encoding.UTF8.GetString(buffer, 0, length));
                if (msgPro.MessageType == MessageType.text && msgPro.MessageInfo.Content == "ready")
                {
                    ReciveCallbackEvent -= new ReciveCallbackDelegate(ReadySendBuffer);
                    ReciveCallbackEvent += new ReciveCallbackDelegate(ReciveBufferHandler);
                    if (sendmanager == null)
                    {
                        sendmanager = new FileSendManager(requestFile.FileObject);
                        Console.WriteLine("发送文件:{0}", sendmanager.fileobject.FileName);
                    }
                    msgPro = new MessageProtocol(RequestMode.send, sendmanager.fileobject);
                    AsynSend(msgPro.ToBytes());
                    AsynRecive();
                }
            }
            /// <summary>
            /// 接受下一个包请求或者发送下一个数据包
            /// </summary>
            /// <param name="length"></param>
            /// <param name="buffer"></param>
            public override void ReciveBufferHandler(int length, byte[] buffer)
            {
                if (requestFile.Mode == RequestMode.send) //发送文件
                {
                    SendFile(length, buffer);
                }
                else//写入文件
                {
                    ReciveFile(length, buffer);
                }
            }
        }
    }

      因为两端都存在SendFile以及ReciveFile的方法,我们在UdpCommon实现两个方法方便两端共同调用。

    #region 准备发送/接收文件
            /// <summary>
            /// 准备接收
            /// </summary>
            protected void ReciveReady()
            {
                recivemanager = new FileReciveManager(requestFile.FileObject);
                Console.WriteLine("接受文件:{0}", recivemanager.fileobject.FileName);
                RequestAction();
            }
            /// <summary>
            /// 准备接收文件Buffer
            /// </summary>
            /// <param name="length"></param>
            /// <param name="buffer"></param>
            protected void ReadyReciveBuffer(int length, byte[] buffer)
            {
                MessageProtocol msgPro = handlerMsg.HandlerObject(Encoding.UTF8.GetString(buffer, 0, length));
                ReciveCallbackEvent -= new ReciveCallbackDelegate(ReadyReciveBuffer);
                ReciveCallbackEvent += new ReciveCallbackDelegate(ReciveBufferHandler);
                requestFile = msgPro.RequestFile;
                ReciveReady();
            }
            #endregion
    
            #region 发送/接收文件
            /// <summary>
            /// 发送文件
            /// </summary>
            /// <param name="length"></param>
            /// <param name="buffer"></param>
            protected void SendFile(int length, byte[] buffer)
            {
                string protocol = Encoding.UTF8.GetString(buffer, 0, length);
                MessageProtocol msgPro = handlerMsg.HandlerObject(protocol);
                if (msgPro.MessageType == MessageType.text)
                {
                    string msg = msgPro.MessageInfo.Content;
                    string[] strArr = msg.Split('|');
                    Status status = (Status)Enum.Parse(typeof(Status), strArr[0]);
                    if (status == Status.keepon)
                    {
                        sendmanager.offset = Convert.ToInt32(strArr[1]);//文件偏移量
                        AsynSend(sendmanager.Read());//发送下一个包
                        Console.WriteLine("已发送:{0}%", sendmanager.GetPercent());
                        AsynRecive();
                    }
                    else Dispose();
                }
            }
            /// <summary>
            /// 接受文件
            ///     1.未接收完之前将文件保存为临时文件
            ///     2.完毕后通过moveTo重命名
            /// </summary>
            /// <param name="length"></param>
            /// <param name="buffer"></param>
            protected void ReciveFile(int length, byte[] buffer)
            {
                recivemanager.Write(buffer);
                Console.WriteLine("已接受:{0}%", recivemanager.GetPercent());
                RequestAction();
            }
            #endregion
    
            #region 发送请求下一个包
            /// <summary>
            /// 发送请求下一个包
            /// </summary>
            public void RequestAction()
            {
                //根据状态处理
                MessageProtocol msgPro = new MessageProtocol(
                        String.Format("{0}|{1}", (int)recivemanager.status, recivemanager.offset));
                if (recivemanager.status == Status.keepon)
                {
                    AsynSend(msgPro.ToBytes());
                    long tempsize = recivemanager.fileobject.FileLength - recivemanager.offset;
                    if (tempsize < recivemanager.fileobject.PacketSize)
                    {
                        buffersize = Convert.ToInt32(tempsize);
                    }
                    else buffersize = recivemanager.fileobject.PacketSize;
                    AsynRecive();
                }
                else
                {
                    TimeOutHandlerEvent = null;
                    AsynSend(msgPro.ToBytes());
                }
            }
            #endregion
    
            #region 释放资源
            /// <summary>
            /// 释放资源
            /// </summary>
            public void Dispose()
            {
                if (worksocket != null)
                {
                    Thread.Sleep(3);
                    worksocket.Close(3);
                }
            }
            #endregion

    下面我来看看Udp客户端是怎样工作的......

    using System;
    using System.Collections.Generic;
    using System.Text;
    #region 命名空间
    using System.Net;
    using System.Net.Sockets;
    using System.IO;
    using SocketCommon;
    using System.Timers;
    #endregion
    
    namespace ServerConsole
    {
        public class UdpClient : UdpCommon
        {
            /// <summary>
            /// 定时器辅助类
            /// </summary>
            TimerManager timermanager;//定时(用于超时重发)
            //当前发送请求数据缓存
            byte[] temp_buffer;
            public UdpClient(RequestFile _requestFile)
                : base(_requestFile)
            {
                //初始化套接字
                base.worksocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
                base.sendEP = new IPEndPoint(IPAddress.Parse(_requestFile.Address), requestFile.Port);
                base.reciveEP = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 0);
    
                //根据不同的请求返回对应的消息
                MessageProtocol msgPro;
                if (_requestFile.Mode == RequestMode.send)//接受准备
                {
                    base.ReciveCallbackEvent += new ReciveCallbackDelegate(ReadyReciveBuffer);
                    msgPro = new MessageProtocol("ready");
                }
                else //发送准备
                {
                    base.ReciveCallbackEvent += new ReciveCallbackDelegate(ReciveBufferHandler);
                    sendmanager = new FileSendManager(requestFile.FileObject);
                    msgPro = new MessageProtocol(RequestMode.recive, sendmanager.fileobject);
                    Console.WriteLine("发送文件:{0}", sendmanager.fileobject.FileName);
                }
                //初始化定时器
                timermanager = new TimerManager(3000);
                timermanager.ElapsedEvent += new TimerManager.ElapsedDelegate(ElapsedEvent);
                base.TimeOutHandlerEvent += new TimeOutHandlerDelegate(TimeOutHandler);
                base.AsynSend(msgPro.ToBytes());
                base.AsynRecive();
            }
            /// <summary>
            /// 请求下一个数据包或者发送下一个数据包
            /// </summary>
            /// <param name="length"></param>
            /// <param name="buffer"></param>
            public override void ReciveBufferHandler(int length, byte[] buffer)
            {
                if (requestFile.Mode == RequestMode.send)//接受来自客户端的文件
                {
                    ReciveFile(length, buffer);
                }
                else //向客户端发送文件
                {
                    SendFile(length, buffer);
                }
            }
    
            private void TimeOutHandler(MutualMode mode, byte[] buffer)
            {
                this.temp_buffer = buffer;
                if (mode == MutualMode.send)
                {
                    if (!timermanager.IsRuning)
                    {
                        timermanager.Start();
                    }
                }
                else
                {
                    if (timermanager.IsRuning)
                    {
                        timermanager.Stop();
                    }
                }
            }
            /// <summary>
            /// 超时后重发当前请求
            /// </summary>
            private void ElapsedEvent()
            {
                if (temp_buffer != null)
                {
                    Console.WriteLine("发生丢包,重新请求...");
                    base.AsynSend(temp_buffer);
                }
            }
        }
    }

    我们来看看运行效果:

      发送文件:

      接收文件:

    由此对于文件使用Udp传送的过程我们完成了,至于在上面的测试过程中丢包问题,我在同学的电脑上测试过丢包问题,由于不方便截图,就不放到上面了,上面测试过程中如果出现丢包会重新发送本次请求,并接受数据。

    附上源码:SocketProQuests.zip

    作者:曾庆雷
    出处:http://www.cnblogs.com/zengqinglei
    本页版权归作者和博客园所有,欢迎转载,但未经作者同意必须保留此段声明, 且在文章页面明显位置给出原文链接,否则保留追究法律责任的权利
  • 相关阅读:
    从零搭建Spring Boot脚手架(4):手写Mybatis通用Mapper
    从零搭建Spring Boot脚手架(3):集成mybatis
    从零搭建Spring Boot脚手架(2):增加通用的功能
    从零搭建Spring Boot脚手架(1):开篇以及技术选型
    Hibernate Validator校验参数全攻略
    Spring Data R2DBC响应式操作MySQL
    Spring Security 实战干货:从零手写一个验证码登录
    Spring Security 实战干货:图解用户是如何登录的
    基于.NetCore3.1系列 —— 日志记录之日志核心要素揭秘
    基于.NetCore3.1系列 —— 日志记录之日志配置揭秘
  • 原文地址:https://www.cnblogs.com/zengqinglei/p/3079007.html
Copyright © 2011-2022 走看看