zoukankan      html  css  js  c++  java
  • DotNetty关键概念及简单示例(基于NET5)

    DotNetty关键概念及简单示例(基于NET5)

    1.DotNetty 设计的关键

    异步和事件驱动是Netty设计的关键。

    1.1 核心组件

    1.1.1 Channel

    Channel:一个连接就是一个Channel。
    Channel是Socket的封装,提供绑定,读,写等操作,降低了直接使用Socket的复杂性。Channel是Socket的抽象,可以被注册到一个EventLoop上,EventLoop相当于Selector,每一个EventLoop又有自己的处理线程。

    1.1.2 回调

    回调:通知的基础。

    1.1.3 EventLoop

    EventLoop
    我们之前就讲过EventLoop这里回顾一下:

    一个 EventLoopGroup 包含一个或者多个 EventLoop;
    一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
    所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
    一个 Channel 在它的生命周期内只注册于一个 EventLoop;
    一个 EventLoop 可能会被分配给一个或多个 Channel。

    1.1.4 ChannelHandler

    ChannelHandler是处理数据的逻辑容器

    ChannelInboundHandler是接收并处理入站事件的逻辑容器,可以处理入站数据以及给客户端以回复。

    1.1.5 ChannelPipeline

    ChannelPipeline是将ChannelHandler穿成一串的的容器。

    1.1.6 编码器和解码器

    编码器和解码器都实现了ChannelInboundHandler和 ChannelOutboundHandler接口用于处理入站或出站数据。

    1.1.7 Bootstrap引导类

    • Bootstrap用于引导客户端,ServerBootstrap用于引导服务器
    • 客户端引导类只需要一个EventLoopGroup服务器引导类需要两个EventLoopGroup。但是在简单使用中,也可以公用一个EventLoopGroup。为什么服务器需要两个EventLoopGroup呢?是因为服务器的第一个EventLoopGroup只有一个EventLoop,只含有一个SeverChannel用于监听本地端口,一旦连接建立,这个EventLoop就将Channel控制权移交给另一个EventLoopGroup,这个EventLoopGroup分配一个EventLoop给Channel用于管理这个Channel。

    1.1.8 AbstractByteBuffer IByteBuffer IByteBufferHolder

    字节级操作,工控协议的话大多都是字节流,我们以前的方式就是拼,大概就是:对照协议这两个字节是什么,后四个字节表示什么意思。现在DotNetty提供了一个ByteBuffer来简化我们对于字节流的操作。适用工控协议。json格式的,但是底层还是字节流。

    2 DotNetty Nuget包

    DotNetty由九个项目构成,在NuGet中都是单独的包,可以按需引用,其中比较重要的几个是以下几个:

    • DotNetty.Common 是公共的类库项目,包装线程池,并行任务和常用帮助类的封装
    • DotNetty.Transport 是DotNetty核心的实现
    • DotNetty.Buffers 是对内存缓冲区管理的封装
    • DotNetty.Codes 是对编码器解码器的封装,包括一些基础基类的实现,我们在项目中自定义的协议,都要继承该项目的特定基类和实现
    • DotNetty.Handlers 封装了常用的管道处理器,比如Tls编解码,超时机制,心跳检查,日志等,如果项目中没有用到可以不引用,不过一般都会用到

    3 一个例子

    3.1 服务端代码示例

    3.1.1 服务端配置

    配置成服务器管道,TcpServerSocketChannel,之所以配置成服务器管道原因是与客户端管道不同,服务器管道多了侦听服务。将服务端的逻辑处理代码Handler以pipeline形式添加到channel中去。

    using DotNetty.Transport.Bootstrapping;
    using DotNetty.Transport.Channels;
    using DotNetty.Transport.Channels.Sockets;
    using System;
    using System.Threading.Tasks;
    
    namespace EchoServer
    {
        class Program
        {
            static async Task RunServerAsync()
            {
                IEventLoopGroup eventLoop;
                eventLoop = new MultithreadEventLoopGroup();
                try
                {
                    // 服务器引导程序
                    var bootstrap = new ServerBootstrap();
                    bootstrap.Group(eventLoop);
                    bootstrap.Channel<TcpServerSocketChannel>()
                        // 保持长连接
                   .ChildOption(ChannelOption.SoKeepalive, true);
                    bootstrap.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
                    {
                        IChannelPipeline pipeline = channel.Pipeline;
                        pipeline.AddLast(new EchoServerHandler());
                    }));
                    IChannel boundChannel = await bootstrap.BindAsync(3000);
                    Console.ReadLine();
                    await boundChannel.CloseAsync();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                }
                finally
                {
                    await eventLoop.ShutdownGracefullyAsync();
                }
            }
            static void Main(string[] args) => RunServerAsync().Wait();
        }
    }
    

    3.1.2 服务端处理逻辑代码

    接收连入服务端代码的客户端消息,并将此消息重新返回给客户端。

    using DotNetty.Buffers;
    using DotNetty.Transport.Channels;
    using System;
    using System.Text;
    
    namespace EchoServer
    {
        /// <summary>
        /// 因为服务器只需要响应传入的消息,所以只需要实现ChannelHandlerAdapter就可以了
        /// </summary>
        public class EchoServerHandler : ChannelHandlerAdapter
        {
            /// <summary>
            /// 每个传入消息都会调用
            /// 处理传入的消息需要复写这个方法
            /// </summary>
            /// <param name="ctx"></param>
            /// <param name="msg"></param>
            public override void ChannelRead(IChannelHandlerContext ctx, object msg)
            {
                IByteBuffer message = msg as IByteBuffer;
                Console.WriteLine("收到信息:" + message.ToString(Encoding.UTF8));
                ctx.WriteAsync(message);
            }
            /// <summary>
            /// 批量读取中的最后一条消息已经读取完成
            /// </summary>
            /// <param name="context"></param>
            public override void ChannelReadComplete(IChannelHandlerContext context)
            {
                context.Flush();
            }
            /// <summary>
            /// 发生异常
            /// </summary>
            /// <param name="context"></param>
            /// <param name="exception"></param>
            public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
            {
                Console.WriteLine(exception);
                context.CloseAsync();
            }
        }
    }
    

    3.2 客户端代码示例

    3.2.1 客户端服务配置

    配置需要连接的服务端ip地址及其端口号,并且配置客户端的处理逻辑代码以pipeline形式添加到channel中去。

    using DotNetty.Transport.Bootstrapping;
    using DotNetty.Transport.Channels;
    using DotNetty.Transport.Channels.Sockets;
    using System;
    using System.Net;
    using System.Threading.Tasks;
    
    namespace EchoClient
    {
        class Program
        {
            static async Task RunClientAsync()
            {
                var group = new MultithreadEventLoopGroup();
                try
                {
                    var bootstrap = new Bootstrap();
                    bootstrap
                        .Group(group)
                        .Channel<TcpSocketChannel>()
                        .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
                        {
                            IChannelPipeline pipeline = channel.Pipeline;
                            pipeline.AddLast(new EchoClientHandler());
                        }));
                    IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse("192.168.1.11"), 3000));
                    Console.ReadLine();
                    await clientChannel.CloseAsync();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                }
                finally
                {
                    await group.ShutdownGracefullyAsync();
                }
            }
            static void Main(string[] args) => RunClientAsync().Wait();
        }
    }
    
    

    3.2.2 客户端处理逻辑代码

    客户端连接成功后,向服务端发送消息,接受到服务端消息,对消息计数再发还给服务端。

    using DotNetty.Buffers;
    using DotNetty.Transport.Channels;
    using System;
    using System.Collections.Generic;
    using System.Text;
    
    namespace EchoClient
    {
        public class EchoClientHandler : SimpleChannelInboundHandler<IByteBuffer>
        {
            public static int i=0;
            /// <summary>
            /// Read0是DotNetty特有的对于Read方法的封装
            /// 封装实现了:
            /// 1. 返回的message的泛型实现
            /// 2. 丢弃非该指定泛型的信息
            /// </summary>
            /// <param name="ctx"></param>
            /// <param name="msg"></param>
            protected override void ChannelRead0(IChannelHandlerContext ctx, IByteBuffer msg)
            {
                if (msg != null)
                {
                    i++;
                    Console.WriteLine($"Receive From Server {i}:" + msg.ToString(Encoding.UTF8));
                }
                ctx.WriteAsync(Unpooled.CopiedBuffer(msg));
            }
            public override void ChannelReadComplete(IChannelHandlerContext context)
            {
    
                context.Flush();
            }
            public override void ChannelActive(IChannelHandlerContext context)
            {
                Console.WriteLine($"发送客户端消息");
                context.WriteAndFlushAsync(Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes($"客户端消息!")));
            }
    
            public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
            {
                Console.WriteLine(exception);
                context.CloseAsync();
            }
        }
    }
    

    4 最终输出效果

    备注:红框表示接收到消息的序号。

    5 参考博客

    DotNetty完全教程

    .NET Core3.1 Dotnetty实战

    DotNetty系列

    dotNetty modbus系列

    编解码框架


    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://www.cnblogs.com/JerryMouseLi/p/14086199.html
  • 相关阅读:
    Xaml引用图片路径的方式
    并发概念模型:JMM(JAVA内存模型)
    并发组件之一:ThreadLocal线程本地变量
    并发锁之二:ReentrantReadWriteLock读写锁
    并发锁之一:ReentrantLock重入锁
    AQS同步队列器之二:等待通知机制
    AQS同步队列器之一:使用和原理
    css常用操作
    自动生成mybatis代码
    jdk动态代理源码分析(二)---依赖接口的实现
  • 原文地址:https://www.cnblogs.com/JerryMouseLi/p/14086199.html
Copyright © 2011-2022 走看看