zoukankan      html  css  js  c++  java
  • UDP对像库加线程并行发送

     internal class IocpUdpServer
        {
            private Socket _listen;
            private bool _isRun;
            private readonly int _bufferLength;
            private readonly SaeQueuePool _saeQueue = new SaeQueuePool(512);
            /// <summary>
            /// 获取系统分配的UDP端口号
            /// </summary>
            public int Port { get; set; }
    
            //报文接收处理队列
            private readonly IUdpRequestHandler _handlerQueue;
    
            /// <summary>
            ///  发送缓存
            /// </summary>
            private readonly BufferBlock<SendInfo> _sendBlocking = new BufferBlock<SendInfo>(new DataflowBlockOptions()
            {
                BoundedCapacity = 256
            });
            private readonly ActionBlock<SendInfo> _sendAction; //操作 
            DefaultObjectPool<SocketAsyncEventArgs> recPool;
            /// <summary>
            /// 实例化UDP服务器,并直接启动监听
            /// </summary>
            /// <param name="handlerQueue">报文接收处理队列</param>
            /// <param name="bufferLength">缓冲区大小</param>
            /// <param name="port">监听的端口号</param>
            public IocpUdpServer(IUdpRequestHandler handlerQueue, int port, int bufferLength = 1024)
            {
                _handlerQueue = handlerQueue;
                _bufferLength = bufferLength;
                Port = port;
    
                var recPolicy = new DefaultPooledObjectPolicy<SocketAsyncEventArgs>();
                recPool = new DefaultObjectPool<SocketAsyncEventArgs>(recPolicy);
                _sendAction = new ActionBlock<SendInfo>(pkg =>
                {
                    try
                    {
                        var socketSendAsync = _saeQueue.Dequeue();
                        if (socketSendAsync == null)
                            return;
                        socketSendAsync.SetBuffer(pkg.bytes, 0, pkg.bytes.Length);
                        socketSendAsync.RemoteEndPoint = pkg.endPoint;
                        if (!_listen.SendToAsync(socketSendAsync))
                        {
                            _saeQueue.Enqueue(socketSendAsync);
                            LogHelper.WriteInfoLog("UDP发送失败报文:" + pkg.bytes.ToHexString());
                            Thread.Yield();
                        }
                    }
                    catch (Exception ex)
                    {
                        LogHelper.WriteErrLog("UDP发送失败:", ex);
                    }
                }, new ExecutionDataflowBlockOptions
                {
                    MaxDegreeOfParallelism = 3,
                });
    
                _sendBlocking.LinkTo(_sendAction);
            }
    
    
    
    
    
            /// <summary>
            /// 启动UPD监听,接收来自网络上的UDP消息
            /// </summary>
            public void Start()
            {
                if (!_isRun)
                {
                    var local = new IPEndPoint(IPAddress.Any, Port);
                    _listen = new Socket(local.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
                    _listen.Bind(local);
    
                    var socketAsync = recPool.Get();
                    socketAsync.SetBuffer(new byte[_bufferLength], 0, _bufferLength);
                    socketAsync.Completed += SocketAsync_Completed;
                    socketAsync.RemoteEndPoint = local;
                    _isRun = true;
                    StartReceive(socketAsync);
                }
            }
    
            public void Stop()
            {
                Close();
            }
    
            /// <summary>
            /// 开始异步接收UDP消息
            /// </summary>
            private void StartReceive(SocketAsyncEventArgs socketAsync)
            {
                if (!_isRun) return;
                if (!_listen.ReceiveFromAsync(socketAsync))
                    SocketAsync_Completed(_listen, socketAsync);
            }
    
            private void SocketAsync_Completed(object sender, SocketAsyncEventArgs e)
            {
                if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
                {
                    if (e.LastOperation == SocketAsyncOperation.ReceiveFrom)
                    {
                        var data = new byte[e.BytesTransferred];
                        Buffer.BlockCopy(e.Buffer, e.Offset, data, 0, data.Length);
                        StartReceive(e);
                        OnReceive((IPEndPoint)e.RemoteEndPoint, data);
                        recPool.Return(e);
                        return;
                    }
                }
                StartReceive(e);
            }
    
            /// <summary>
            /// 远程端点收到消息处理
            /// </summary>
            private void OnReceive(IPEndPoint iPEndPoint, byte[] data)
            {
                if (!_isRun)
                    return;
    
                //小于8字节的报文不处理
                //if (data.Length < 8 + 4)
                //{
                //    LogHelper.WriteInfoLog("报文过短,抛弃:" + data.ToHexString());
                //    return;
                //}
    
                //UDP的到达顺序不确定性可能导致问题
                //TODO:拆包
                var prefix = data.First();
                var subfix = data.Last();
                if (prefix != G.FrameHeader || subfix != G.FrameTail)
                {
                    LogHelper.WriteInfoLog("收尾错误,抛弃:" + data.ToHexString());
                    return;
                }
                _handlerQueue.Enqueue(new UdpRequestWrapper(data, iPEndPoint));
    
            }
    
            /// <summary>
            /// 向远程网络端点发送数据
            /// </summary>
            public async void SendAsync(EndPoint endPoint, byte[] bytes)
            {
                if (!_isRun || _listen == null)
                    return;
                var sendInfo = new SendInfo
                {
                    endPoint = endPoint,
                    bytes = bytes
                };
                await _sendBlocking.SendAsync(sendInfo);
    
    
    
    
                //优先从可复用队列中取  
                //var socketSendAsync = _saeQueue.Dequeue();
                //socketSendAsync.SetBuffer(bytes, 0, bytes.Length);
                //socketSendAsync.RemoteEndPoint = endPoint;
    
                ////如果发送失败,则强制归队
                //if (!_listen.SendToAsync(socketSendAsync))
                //    _saeQueue.Enqueue(socketSendAsync);
            }
    
            /// <summary>
            /// 关闭服务
            /// </summary>
            public void Close()
            {
                _isRun = false;
                _listen?.Close();
                _listen = null;
            }
        }
    
    
        internal class SendInfo
        {
            public EndPoint endPoint { get; set; }
            public byte[] bytes { get; set; }
    
        }
    !_listen.SendToAsync(socketSendAsync)

    如果I/O操作挂起,则为true。这个System.Net.Sockets接口.SocketAsyncEventArgs。已完成操作完成后将引发e参数上的事件。

    如果I/O操作同步完成,则返回false。

    在这种情况下System.Net.Sockets接口.SocketAsyncEventArgs。已完成不会引发e参数上的事件,并且可以在方法调用返回以检索操作结果后立即检查作为参数传递的e对象。

    public const byte FrameHeader = 0x7E;

    /// <summary>
    /// 命令帧帧尾(0x7F)
    /// </summary>
    public const byte FrameTail = 0x7F;

  • 相关阅读:
    微信小程序view标签以及display:flex的测试
    微信小程序简单入门理解
    spring+mybatis的简单配置示例
    反链与外链的区别汇总
    隐性URL与显性URL区别与SEO考虑
    你是如何为公司死心塌地卖命的?
    大三下学期十七周总结
    IP地址、子网掩码、网络号、主机号、网络地址、主机地址、IP段/数字
    大三下学期十六周总结
    图解高内聚与低耦合
  • 原文地址:https://www.cnblogs.com/robertyao/p/13959390.html
Copyright © 2011-2022 走看看