zoukankan      html  css  js  c++  java
  • Netty学习笔记(一)

    在互联网发达的今天,网络已经深入到生活的方方面面,一个高效、性能可靠的网络通信已经成为一个重要的诉求,在Java方面需要寻求一种高性能网络编程的实践。

    一、简介

    当前JDK(本文使用的JDK 1.8)中已经有网络编程相关的API,使用过程中或多或少会存在以下几个问题:

    • 阻塞:早期JDK里的API是用阻塞式的实现方式,在读写数据调用时数据还没有准备好,或者目前不可写,操作就会被阻塞直到数据准备好或目标可写为止。虽然可以采用每一个连接创建一个线程进行处理,但是可能会造成大量线程得不到释放,消耗资源。从JDK 1.4开始提供非阻塞的实现。
    • 处理和调度IO烦琐:偏底层的API实现暴露了更多的与业务无关的操作细节,使得在高负载下实现一个可靠和高效的逻辑就变得复杂和烦琐。

    Netty是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器和客户端。它拥有简单而强大的设计模型,易于使用,拥有比Java API更高的性能等特点,它屏蔽了底层实现的细节,使开发人员更关注业务逻辑的实现。

    二、组件和设计

    • Channel:屏蔽底层网络传输细节,提供简单易用的诸如bind、connect、read、write方法。
    • EventLoop:线程模型。处理连接生命周期过程中发生的事件,以及其他一些任务。
    • ChannelFuture:异步接口,用于注册Listener以便在某个操作完成时得到通知。
    • ChannelHandler:处理入站和出站数据的的一系列接口和抽象类,开发人员扩展这些类型来完成业务逻辑。
    • ChannelPipline:管理ChannelHandler的容器,将多个ChannelHandler以链式的方式管理,数据将在这个链上依次流动并被ChannelHandler逐个处理。
    • 引导(Bootstrap、ServerBootstrap):初始化客户端和服务端的入口类。

    三、一个简单的Demo

    创建一个maven工程,引入Netty。为了方便调试,Demo中引入了日志和junit5。

     1 <!-- pom.xml -->
     2 
     3 <dependencies>
     4     <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
     5     <dependency>
     6         <groupId>io.netty</groupId>
     7         <artifactId>netty-all</artifactId>
     8         <version>4.1.50.Final</version>
     9     </dependency>
    10 
    11     <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
    12     <dependency>
    13         <groupId>org.slf4j</groupId>
    14         <artifactId>slf4j-api</artifactId>
    15         <version>1.7.30</version>
    16     </dependency>
    17 
    18     <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
    19     <dependency>
    20         <groupId>ch.qos.logback</groupId>
    21         <artifactId>logback-classic</artifactId>
    22         <version>1.2.3</version>
    23     </dependency>
    24 
    25     <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-core -->
    26     <dependency>
    27         <groupId>ch.qos.logback</groupId>
    28         <artifactId>logback-core</artifactId>
    29         <version>1.2.3</version>
    30     </dependency>
    31 
    32     <dependency>
    33         <groupId>org.junit.jupiter</groupId>
    34         <artifactId>junit-jupiter</artifactId>
    35         <version>5.5.2</version>
    36         <scope>test</scope>
    37     </dependency>
    38 </dependencies>

    创建Client和Server

     1 package com.niklai.demo;
     2 
     3 import io.netty.bootstrap.Bootstrap;
     4 import io.netty.buffer.ByteBuf;
     5 import io.netty.buffer.Unpooled;
     6 import io.netty.channel.ChannelFuture;
     7 import io.netty.channel.ChannelHandlerContext;
     8 import io.netty.channel.ChannelInboundHandlerAdapter;
     9 import io.netty.channel.ChannelInitializer;
    10 import io.netty.channel.nio.NioEventLoopGroup;
    11 import io.netty.channel.socket.SocketChannel;
    12 import io.netty.channel.socket.nio.NioSocketChannel;
    13 import io.netty.util.CharsetUtil;
    14 import org.slf4j.Logger;
    15 import org.slf4j.LoggerFactory;
    16 
    17 import java.net.InetSocketAddress;
    18 
    19 public class Client {
    20     private static final Logger logger = LoggerFactory.getLogger(Client.class.getSimpleName());
    21 
    22     public static void init() {
    23         try {
    24             Bootstrap bootstrap = new Bootstrap();              // 初始化客户端引导
    25             NioEventLoopGroup group = new NioEventLoopGroup();
    26             bootstrap.group(group)                              // 指定适用于NIO的EventLoop
    27                     .channel(NioSocketChannel.class)            // 适用于NIO的Channel
    28                     .remoteAddress(new InetSocketAddress("localhost", 9999))    // 指定要绑定的IP和端口
    29                     .handler(new ChannelInitializer<SocketChannel>() {
    30                         protected void initChannel(SocketChannel socketChannel) throws Exception {
    31                             socketChannel.pipeline().addLast(new ClientHandler());      // 添加ChannelHandler到ChannelPipline
    32                         }
    33                     });
    34             ChannelFuture future = bootstrap.connect().sync();      // 阻塞直到连接到远程节点
    35             future.channel().closeFuture().sync();                  // 阻塞直到关闭Channel
    36             group.shutdownGracefully().sync();                      // 释放资源
    37         } catch (InterruptedException e) {
    38             logger.error(e.getMessage(), e);
    39         }
    40     }
    41 
    42     static class ClientHandler extends ChannelInboundHandlerAdapter {
    43         @Override
    44         public void channelActive(ChannelHandlerContext ctx) throws Exception {
    45             logger.info("channel active....");
    46 
    47             String msg = "Client message!";
    48             logger.info("send message: {}....", msg);
    49             ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
    50         }
    51 
    52         @Override
    53         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    54             ByteBuf buf = (ByteBuf) msg;
    55             logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8));
    56         }
    57     }
    58 }
     1 package com.niklai.demo;
     2 
     3 import io.netty.bootstrap.ServerBootstrap;
     4 import io.netty.buffer.ByteBuf;
     5 import io.netty.buffer.Unpooled;
     6 import io.netty.channel.*;
     7 import io.netty.channel.nio.NioEventLoopGroup;
     8 import io.netty.channel.socket.SocketChannel;
     9 import io.netty.channel.socket.nio.NioServerSocketChannel;
    10 import io.netty.util.CharsetUtil;
    11 import org.slf4j.Logger;
    12 import org.slf4j.LoggerFactory;
    13 
    14 import java.net.InetSocketAddress;
    15 
    16 public class Server {
    17     private static final Logger logger = LoggerFactory.getLogger(Server.class.getSimpleName());
    18 
    19     public static void init() {
    20         try {
    21             ServerBootstrap serverBootstrap = new ServerBootstrap();        // 初始化客户端引导
    22             NioEventLoopGroup group = new NioEventLoopGroup();
    23             serverBootstrap.group(group)                                    // 指定适用于NIO的EventLoop
    24                     .channel(NioServerSocketChannel.class)                  // 适用于NIO的Channel
    25                     .localAddress(new InetSocketAddress("localhost", 9999))     // 指定要绑定的IP和端口
    26                     .childHandler(new ChannelInitializer<SocketChannel>() {
    27                         protected void initChannel(SocketChannel socketChannel) throws Exception {
    28                             socketChannel.pipeline().addLast(new ServerHandler());      // 添加ChannelHandler到ChannelPipline
    29                         }
    30                     });
    31 
    32             ChannelFuture future = serverBootstrap.bind().sync();           // 异步绑定阻塞直到完成
    33             future.channel().closeFuture().sync();                          // 阻塞直到关闭Channel
    34             group.shutdownGracefully().sync();                              // 释放资源
    35         } catch (InterruptedException e) {
    36             logger.error(e.getMessage(), e);
    37         }
    38     }
    39 
    40     static class ServerHandler extends ChannelInboundHandlerAdapter {
    41         @Override
    42         public void channelActive(ChannelHandlerContext ctx) throws Exception {
    43             logger.info("channel active.....");
    44         }
    45 
    46         @Override
    47         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    48             ByteBuf buf = (ByteBuf) msg;
    49             logger.info("read message: {}.....", buf.toString(CharsetUtil.UTF_8));
    50         }
    51 
    52         @Override
    53         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    54             logger.info("read complete.....");
    55             ctx.writeAndFlush(Unpooled.copiedBuffer("receive message!", CharsetUtil.UTF_8))
    56                     .addListener(ChannelFutureListener.CLOSE);
    57         }
    58     }
    59 }

    日志配置文件

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 
     3 <configuration>
     4     <!-- 定义控制台输出 -->
     5     <appender name="consoleOut" class="ch.qos.logback.core.ConsoleAppender">
     6         <encoder>
     7             <pattern>%date %level [%thread] %class#%method [%file:%line] %msg%n</pattern>
     8         </encoder>
     9     </appender>
    10 
    11     <root level="info">
    12         <appender-ref ref="consoleOut" />
    13     </root>
    14 </configuration>
    logback.xml

    单元测试代码

     1 package com.niklai.demo;
     2 
     3 import org.junit.jupiter.api.Test;
     4 
     5 public class NettyTest {
     6 
     7     @Test
     8     public void test1() throws InterruptedException {
     9         new Thread(() -> {
    10             // 服务端
    11             Server.init();
    12         }).start();
    13         Thread.sleep(1000);
    14 
    15         // 客户端
    16         Client.init();
    17 
    18         Thread.sleep(5000);
    19     }
    20 }

     运行结果如下图

    从控制台日志中可以看到当Client连接到Server后, ServerHandler 和 ClientHandler 的 channerActive 方法都会被调用, ClientHandler 会调用 ctx.writeAndFlush() 方法给Server发送一条消息, ServerHandler 的 channelRead 方法被调用读取到消息,消息读取完毕后 channelReadComplete 方法被调用,发送应答消息给Client, ClientHandler 的 channelRead 方法被调用获取到应答消息。到此一个完整的发送--应答流程就结束了。

  • 相关阅读:
    javascript-jquery对象的事件处理
    javascript-jquery对象的动画处理
    vue.config.js配置 移动端配置自适应rem响应
    vue封装axios
    js工具类
    git解决空文件夹无法提交
    vue的axios
    vue中axios跨域问题
    全局控制vue的依赖
    linux搭建php环境
  • 原文地址:https://www.cnblogs.com/niklai/p/12934177.html
Copyright © 2011-2022 走看看