zoukankan      html  css  js  c++  java
  • DotNetty实现高性能tcpserver

    DotNetty实现高性能tcpserver,超时断开链路,垃圾包,断包,粘包处理

    初始化类

    using DotNetty.Handlers.Timeout;
    using DotNetty.Transport.Bootstrapping;
    using DotNetty.Transport.Channels;
    using DotNetty.Transport.Channels.Sockets;
    using System;
    using System.Threading;
    
    namespace DotNettyUtil.tcpserver
    {
        public class TcpServerIntance
        {
            private int port;
            private IProtocolHandler handle;
            private static int READ_IDEL_TIME_OUT = 0; // 读超时
            private static int WRITE_IDEL_TIME_OUT = 0;// 写超时
            private static int ALL_IDEL_TIME_OUT = 90; // 所有超时
            //
            IEventLoopGroup bossGroup;
            IEventLoopGroup workerGroup;
            IChannel boundChannel;
    
            public TcpServerIntance(int port, IProtocolHandler handle)
            {
                this.port = port;
                this.handle = handle;
    
                ThreadPool.QueueUserWorkItem(new WaitCallback(RunThread));
            }
    
            async void RunThread(object obj)
            { 
                bossGroup = new MultithreadEventLoopGroup();
                workerGroup = new MultithreadEventLoopGroup();
    
                try
                {
                    var bootstrap = new ServerBootstrap();
                    bootstrap.Group(bossGroup, workerGroup);
                    bootstrap.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
                    {
                        IChannelPipeline pipeline = channel.Pipeline;
                        pipeline.AddLast(new IdleStateHandler(READ_IDEL_TIME_OUT, WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT)); // 需要定时心跳包
                        pipeline.AddLast(new TcpServerHandler(handle));
                    }));
                    bootstrap.Channel<TcpServerSocketChannel>()
                    .Option(ChannelOption.SoBacklog, 128)
                   .ChildOption(ChannelOption.SoKeepalive, true);
                    //
                    LogHelper.AppendLog("TcpServer 监听端口:" + port);
                    //
                    boundChannel = await bootstrap.BindAsync(port);
                }
                catch (Exception ex)
                {
                    LogHelper.AppendLog("[Error] TcpServer_RunThread,errmsg=" + ex.Message);
                }
            }
    
            void Close()
            {
                try
                {
                    boundChannel.CloseAsync();
                }
                catch (Exception ex)
                {
                    LogHelper.AppendLog("[Error] TcpServer_Close,errmsg=" + ex.Message);
                }
                finally
                {
                    bossGroup.ShutdownGracefullyAsync();
                    workerGroup.ShutdownGracefullyAsync();
                }
            }
        }
    }

    协议解析类

    using DotNetty.Buffers;
    using DotNetty.Common.Utilities;
    using DotNetty.Transport.Channels;
    using DotNettyUtil.tcpserver;
    using System;
    
    namespace TcpServer
    {
        public class ClientProtocol : IProtocolHandler
        {
            const int HEAD1 = 0x48;// H
            const int HEAD2 = 0x54;// T
            const int HEAD3 = 0x45;// E
            const int HEAD4 = 0x4D;// M
            const int HEAD5 = 0x50;// P
            const int HEAD6 = 0x3D;// =
            public const char SPLIT1 = '#';
            const char SPLIT2 = '@';
            const char SPLIT3 = '=';
            const char SPLIT4 = '+';
            const char SPLIT5 = ',';
    
            public void ChannelRead(IChannelHandlerContext ctx, object msg)
            {
                if (!ctx.Channel.Active) return;
    
                string data_content = "";
    
                try
                {
                    string sn = TcpServerMgr.GetSN(ctx);
                    UidEntity uidEntity = TcpServerMgr.GetUid(sn);
                    string uid = "";
    
                    if (null != uidEntity)
                    {
                        uid = uidEntity.Uid;
                        uidEntity.LastTime = Common.GetNowTimestamp();
                    }
    
                    if (!TcpServerMgr.dicSn2Buffer.ContainsKey(sn)) TcpServerMgr.dicSn2Buffer.Add(sn, Unpooled.Buffer(1024));
                    IByteBuffer oldBuffer = TcpServerMgr.dicSn2Buffer[sn];
    
                    if (null != msg)
                    {
                        IByteBuffer recvBuffer = (IByteBuffer)msg;
                        int size = recvBuffer.ReadableBytes;
                        if (size > 0)
                        {
                            //recvBuffer.markReaderIndex();
                            oldBuffer.WriteBytes(recvBuffer);
                            ReferenceCountUtil.Release(recvBuffer);
                            //recvBuffer.resetReaderIndex();
                            //byte[] recv = new byte[size];
                            //recvBuffer.readBytes(recv);
                            //CTxtHelp.AppendLog("接收数据:" + CDataHelper.ArrayByteToString(recv));
                        }
                    }
    
                    byte head1 = 0; byte head2 = 0; byte head3 = 0; byte head4 = 0; byte head5 = 0; byte head6 = 0; bool headok = false;
    
                    oldBuffer.MarkReaderIndex();
    
                    while (oldBuffer.IsReadable())
                    {
                        head1 = oldBuffer.ReadByte();
                        if (HEAD1 == head1)// 垃圾包处理
                        {
                            head2 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; }
                            head3 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; }
                            head4 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; }
                            head5 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; }
                            head6 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; }
                            if (HEAD2 == head2 && HEAD3 == head3 && HEAD4 == head4 && HEAD5 == head5 && HEAD6 == head6)
                            {
                                headok = true;
                                break;
                            }
    
                            break;
                        }
                        else
                        {
                            oldBuffer.MarkReaderIndex();
                            LogHelper.AppendLog("Error,Unable to parse the data:" + head1 + " source:" + (string.IsNullOrEmpty(uid) ? sn : uid));
                        }
                    }
    
                    if (!oldBuffer.IsReadable())
                    {
                        if (headok) oldBuffer.ResetReaderIndex();
                        return;
                    }
    
                    //byte[] arrlen = bBuffer.GetByteArray(4); if (!analysis.IsRemainData(iPosition, bBuffer, analysis)) return;
    
                    byte[] arrlen = new byte[4]; oldBuffer.ReadBytes(arrlen); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; }
                    int len = Common.String2Int(Common.ByteToString(arrlen)); if (-1 == len) return;
    
                    if (TcpServerMgr.GetWaitRecvRemain(oldBuffer, len)) { oldBuffer.ResetReaderIndex(); return; }
    
                    byte[] source = new byte[len]; oldBuffer.ReadBytes(source);
    
                    string data = Common.ByteToString(source);
                    if (null == data || 0 == data.Length || data.Length - 1 != data.LastIndexOf(SPLIT1))
                    {
                        return;
                    }
                    data = data.Substring(1, data.Length - 2);
                    string[] item = data.Split(SPLIT1);
                    if (null == item || 4 != item.Length)
                    {
                        return;
                    }
    
                    uid = item[0];
                    string taskid = item[1];
                    int cmd = Common.String2Int(item[2]);
                    string content = item[3];
    
                    //Program.AddMessage("R: [" + sn + "] cmd=" + cmd.ToString() + " data=" + data);  
    
                    switch (cmd)
                    {
                        case 1:
                            //analysis.Msg = "ok";
                            TcpServerMgr.AddUid(sn, uid, ctx);
                            LogHelper.AppendLog("心跳反馈,uid=" + uid);
                            break;
                        case 2:
                            //analysis.Msg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
                            break;
                        case 3:
                            // HTEMP=0263#WaterMeter-001#1520557004#03#buildid=44@edmid=37@meterid=1228@senddate=2018-02-05 17:36:22@[{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}]#
                            //analysis.Msg = "ok";
                            break;
                    }
    
                    if (!oldBuffer.IsReadable())
                    {
                        oldBuffer.Clear();
                    }
                    else
                    {
                        ChannelRead(ctx, null);// 处理粘包
                    }
                }
                catch (Exception ex)
                {
                    LogHelper.AppendLog("[Error] ClientProtocol_ChannelRead,data_content=" + data_content + ",errmsg=" + ex.Message);
                } 
            }
        }
    }
    qq:505645074
  • 相关阅读:
    java 数组
    数组(二)
    JVM内存分配策略
    JVM垃圾收集算法
    LINUX 查看硬件配置命令
    遗传算法
    svn简单使用
    Several concepts in Data Mining
    JVM判断对象存活的算法
    JVM运行时数据区
  • 原文地址:https://www.cnblogs.com/chen1880/p/14934027.html
Copyright © 2011-2022 走看看