zoukankan      html  css  js  c++  java
  • Netty学习笔记

    Netty是什么?

    概述——JBOSS提供的一个开源的Java网络应用框架

    特点——异步,基于事件驱动。提供了TCP/IP、HTTP协议栈,方便定制开发私有协议栈

    本质——NIO框架

    应用——开发高性能高可靠的网络IO程序,例如在分布式系统中作为RPC的基础通信组件。另外在大数据领域也被广泛应用,Akka,Flink,Spark等项目都用到了Netty

    数据传输的基础——TCP协议

    下图可以帮助了解Netty在Java网络编程所处的位置

    image-20210315141613432

    三种经典的I/O模型

    BIO

    概述

    同步并阻塞,客户端每来一个连接请求,服务器就要启动一个对应的线程进行处理,高并发(同一时间出现大量请求)场景下,服务器资源消耗严重,压力很大。同时,如果连接什么也不做,服务器仍然会让线程维持,造成不必要的线程开销。最重要的是,这种模式下,数据的读取写入必须阻塞在一个线程内等待其完成。

    工作机制

    1)服务器启动一个ServerSocket监听连接请求

    2)客户端启动Socket对服务器发起连接请求,服务端默认情况下为每个客户建立一个线程与之进行通讯

    3)客户端发出请求后,咨询服务端是否有线程响应。有响应,客户端线程在请求结束后继续执行;无响应,进入等待,等待超时后连接请求被拒绝。

    image-20210315144516566

    改进措施:使用线程池,实现并发,但并不能减少线程的使用个数。

    应用场景:连接数目比较小且固定,对服务器资源要求比较高。

    实战

    服务端借助线程池实现,客户端用Telnet模拟

    package com.youzikeji.bio;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.*;
    
    public class BioServer {
        public static void main(String[] args) throws IOException {
            //利用线程池实现BIO的server端
            //创建一个线程池,通过七大参数
            ExecutorService threadPool = new ThreadPoolExecutor(
                    5,
                    10,
                    3,
                    TimeUnit.SECONDS,
                    new LinkedBlockingDeque<>(5),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy()
            );
    
            //1.创建ServerSocket
            ServerSocket serverSocket = new ServerSocket(6666);
            System.out.println("服务器启动了");
            while (true){
                //监听,等待客户端连接
                final Socket socket = serverSocket.accept();
                System.out.println("连接到一个客户端");
    
                //2.创建一个线程与之通讯
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        handler(socket);
                    }
                });
            }
        }
    
        //线程和客户端通信的方法
        public static void handler(Socket socket){
            System.out.println("线程id : " + Thread.currentThread().getId());
            InputStream is = null;
            try {
                //接收数据的缓冲区
                byte[] bytes = new byte[1024];
                //通过socket获取输入流
                is = socket.getInputStream();
    
                //3.循环读取客户端发送的数据
                while (true){
                    System.out.println("线程id : " + Thread.currentThread().getId());
                    int read = is.read(bytes);
                    if (read != -1){
                        System.out.println(new String(bytes, 0, read));
                    } else {
                        //读完break
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                //释放资源
                if (is != null){
                    try {
                        is.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (socket != null){
                    try {
                        socket.close();
                        System.out.println("关闭与客户端的连接");
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    NIO

    概述

    同步非阻塞,客户端发送的请求会注册到多路复用器(选择器)上,多路复用器轮询到连接有I/O请求才进行处理。对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。

    特点:支持面向缓冲的,基于通道的 I/O 操作方法。

    应用场景:连接数目多且连接时间比较短,适用于聊天服务器,弹幕系统,服务器通信等场景。

    image-20210315165226877

    三大核心

    Channel
    • 每个Channel都会对应一个Buffer

    • Channel是双向的,可以返回底层操作系统的情况

    • Channel的切换是基于事件驱动的

    FileChannel + ByteBuffer——将字符串写入文本文件案例

    image-20210315195058080

    NioFileChannelDemo01.java

    package com.youzikeji.nio;
    
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.FileChannel;
    
    public class NioFileChannelDemo01 {
        public static void main(String[] args) throws IOException {
            String str = "hello, caoyusang";
    
            FileOutputStream os = new FileOutputStream("d:\file1.txt");
    
            //通过输出流获取对应的Channel,输出流把Channel包裹起来了
            FileChannel channel = os.getChannel();
    
            //为Channel创建一个对应的缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    
            //str放入缓冲区
            byteBuffer.put(str.getBytes());
    
            //对缓冲区进行flip,即position置0
            byteBuffer.flip();
    
            //将缓冲区数据写入到Channel
            channel.write(byteBuffer);
    
            //输出流关闭
            os.close();
    
        }
    }
    
    

    FileChannel + ByteBuffer——从文本文件读取文本,打印到控制台

    NioFileChannelDemo02.java

    package com.youzikeji.nio;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.FileChannel;
    
    public class NioFileChannelDemo02 {
        public static void main(String[] args) throws IOException {
            //创建文件的输入流
            File file = new File("d:\file1.txt");
            FileInputStream is = new FileInputStream(file);
    
            //通过输入流获取通道
            FileChannel channel = is.getChannel();
    
            //创建合适大小的缓冲区——根据文件大小
            ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
    
            //将Channel中的数据读到bytebuffer
            channel.read(byteBuffer);
    
            //将字节转成字符串,array方法return hb;
            System.out.println(new String(byteBuffer.array()));
    
        }
    }
    
    

    FileChannel+一个ByteBuffer实现文件的拷贝(需要读写)

    NioFileChannelDemo03.java

    package com.youzikeji.nio;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.FileChannel;
    
    public class NioFileChannelDemo03 {
        public static void main(String[] args) throws IOException {
            //获取被拷贝的文件的输入流
            File file = new File("d:\file1.txt");
            FileInputStream is = new FileInputStream(file);
    
            //获取输入流对应的channel
            FileChannel isChannel = is.getChannel();
    
            //获取输出流
            FileOutputStream os = new FileOutputStream("d:\file2.txt");
    
            //获取输出流对应的channel
            FileChannel osChannel = os.getChannel();
    
            //构建512大小的缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(512);
    
            //循环读取,防止读不完
            while (true){
                //清空buffer,复位操作,防止position==limit出现read一直为0的情况
                /*
                public Buffer clear() {
                    position = 0;
                    limit = capacity;
                    mark = -1;
                    return this;
                }
                */
                byteBuffer.clear();
    
                int read = isChannel.read(byteBuffer);
                //读到末尾
                if (read == -1){
                    break;
                }
                //读的同时,写,注意要先读写反转
                byteBuffer.flip();
                osChannel.write(byteBuffer);
            }
            //关闭输入流和输出流
            is.close();
            os.close();
    
    
    
    
        }
    }
    
    

    FileChannel.transferFrom()实现文件的拷贝

    NioFileChannelDemo04.java

    package com.youzikeji.nio;
    
    import java.io.*;
    import java.nio.channels.FileChannel;
    
    public class NioFileChannelDemo04 {
        public static void main(String[] args) throws IOException {
            //获取被拷贝的文件的输入流
            File file = new File("d:\file1.txt");
            FileInputStream is = new FileInputStream(file);
    
            //获取输入流对应的channel
            FileChannel isChannel = is.getChannel();
    
            //获取输出流
            FileOutputStream os = new FileOutputStream("d:\file3.txt");
    
            //获取输出流对应的channel
            FileChannel osChannel = os.getChannel();
    
            //使用transferFrom(src, begin, end)实现通道内数据的拷贝
            osChannel.transferFrom(isChannel, 0 , isChannel.size());
    
            //流的关闭
            is.close();
            os.close();
        }
    }
    
    
    Buffer
    • 内存块,可读可写,底层是一个数组
    • 普通buffer可以转换成只读buffer

    顶层父类Buffer抽象类的参数

    // Invariants: mark <= position <= limit <= capacity
    private int mark = -1;	
    private int position = 0;	 //下一个要被读或者写的数组元素的索引,每次读写完后都会更新
    private int limit;			//缓存区的当前终点,不能对缓冲区超过极限的位置进行读写操作,可修改
    private int capacity;		//缓冲区的容量,不可改变
    

    Buffer的一个子类ByteBuffer

    真正存放数据的是hb数组

    image-20210315172258183

    基本的buffer使用

    package com.youzikeji.nio;
    
    import java.nio.IntBuffer;
    
    public class BasicBuffer {
        public static void main(String[] args) {
            //创建buffer
            IntBuffer intBuffer = IntBuffer.allocate(5);
    
            //put向buffer中放数据
            for (int i = 0; i < intBuffer.capacity(); i++) {
                intBuffer.put(i * 2);
            }
    
            //从buffer中取数据
            //先将buffer转换,进行读写切换
            intBuffer.flip();
    
            while (intBuffer.hasRemaining()){
                System.out.println(intBuffer.get());
            }
    
        }
    }
    
    

    MappedByteBuffer类——允许文件直接在内存中修改,操作系统不需要再拷贝一次

    MappedByteBufferTest.java

    package com.youzikeji.nio;
    
    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.nio.MappedByteBuffer;
    import java.nio.channels.FileChannel;
    
    public class MappedByteBufferTest {
        public static void main(String[] args) throws IOException {
            //创建一个随机访问的文件流,读写模式
            RandomAccessFile randomAccessFile = new RandomAccessFile("d:\file1.txt", "rw");
    
            //获取通道
            FileChannel channel = randomAccessFile.getChannel();
    
            /**
             * 参数1: 读写模式
             * 参数2:可以直接修改的起始位置
             * 参数3: 映射到内存的大小,即可以修改的字节数
             */
            MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
            mappedByteBuffer.put(0, (byte) 'H');
            mappedByteBuffer.put(3, (byte) '9');
    
            randomAccessFile.close();
    
    
        }
    }
    
    Buffer分散与聚集

    分散:将数据写入buffer时,可以采用buffer数组,依次写入

    聚集:从buffer读取数据时,可以采用buffer,依次读

    Selector
    • 一个Selector对应一个线程,同时可以对应多个不同的Channel
    • 能够检测多个注册的通道上是否有事件发生,有则获取事件并针对其进行处理
    • Selector可以在各个通道上进行切换,即单线程多路复用

    NIO vs BIO

    1)BIO以流的方式处理数据,NIO以块的方式处理数据,块IO的效率要高很多。

    2)BIO是阻塞的即数据的读写必须阻塞在一个线程内完成,NIO是非阻塞的,面向缓冲区。

    3)BIO基于字节流和字符流进行操作,NIP基于管道和缓冲区进行操作,数据总是从通道读取到缓冲区,或者从缓冲区写入通道,单个Selector线程负责轮询监听多个管道中的事件。

    AIO

    异步非阻塞,基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。AIO尚未得到广泛应用。

    应用场景:连接数目多且连接时间较长。

    Netty概述

    NIO存在的一些问题

    • NIO的类库和API繁杂,使用麻烦,需要熟练掌握Select、ServerSocketChannel、SocketChannel、ByteBuffer等
    • NIO编程涉及Reactor模式,必须对多线程和网络编程相当熟悉
    • 开发工作量和难度比较大,客户端面临断连重连、网络闪断、半包读写、失败缓存和网络拥塞等问题
    • JDK NIO的Epoll bug,会导致Selector空轮询,最终导致CPU100%直到JDK1.7还未解决

    Netty的优点

    • 对JDK自带的NIO的API进行了封装,解决了上述传统原生NIO网络编程出现的问题
    • 设计优雅、使用方便、安全、社区活跃、高性能、吞吐更高、延迟更低,减少了资源消耗和不必要的内存复制

    线程模型

    Reactor模式

    单Reactor单线程

    image-20210502101356806

    说明

    (1)select是I/O复用的标准网络编程API,可以实现应用程序通过一个阻塞对象监听多路连接请求

    (2)Reactor通过select监控客户端的请求事件,收到事件后通过dispatch进行请求的分发处理

    (3)如果是建立连接事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理

    (4)如果不是建立连接事件,则Reactor会分发调用连接对应的Handler响应不同类型的请求

    (5)最后由Handler完成read —> 业务处理 —> send的完整流程

    模式的优缺点

    (1)优点:模型简单,没有多线程、进程通信、竞争的问题

    (2)缺点:只有一个线程,无法完全发挥多核CPU的性能。因为Reactor和Handler在同一个线程中,即请求监听和请求处理在同一个线程中完成,并发高的情况下,Handler在处理某个业务时,整个线程无法处理其他请求事件。

    单Reactor多线程

    image-20210502104022933

    说明

    (1)Reactor通过select监控客户端的请求事件,收到事件后通过dispatch进行请求的分发处理

    (2)如果是建立连接事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理

    (3)如果不是建立连接事件,则Reactor会分发调用连接对应的Handler响应不同类型的请求,该模式下handler只负责事件响应,而不做任何的业务处理,只做read和send,而具体的业务处理交付给worker线程池的某个线程

    (4)worker线程池会分配独立线程完成真正的业务,并把结果返回给handler

    模式的优缺点

    (1)优点:可以充分利用多核CPU的处理能力

    (2)缺点:多线程数据共享和访问较为复杂;reactor还是负责所有事件的监听和响应,即连接的监听和响应仍然是在单线程中运行,高并发场景下容易出现性能瓶颈。

    主从Reactor多线程

    image-20210502110433664

    说明

    (1)Reactor主线程MainReactor通过select监控客户端的请求事件,收到事件后通过Acceptor处理连接事件

    (2)当Acceptor处理连接事件后,MainReactor将连接分配给下一级的SubReactor

    (3)SubReactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理

    (4)当有新的事件发生时,SubReactor就会调用对应的handler进行处理

    (5)handler通过read读取数据,将业务处理移交worker线程池

    (6)worker线程池分配独立的一个线程进行业务处理,返回结果给对应的handler

    (7)handler收到响应结果后,通过send将结果返回给client

    模式优缺点

    (1)优点:父线程和子线程数据交互简单职责明确,父线程只需要接受连接请求,子线程完成后续的I/O及业务处理

    (2)编程复杂度较高

    Netty线程模式

    网络图

    img

    我自己画的

    image-20210503094734623

    说明

    (1)Netty抽象出两组线程池,Boss Group专门负责接收客户端的连接请求,Worker Group专门负责网络的读写

    (2)Boss Group和worker Group的类型都是NioEventLoopGroup,相当于一个事件循环组,组中有多个事件循环,每个事件循环都是一个NioEventLoop

    (3)NioEventLoop表示一个不断循环执行的处理任务的线程,NioEventLoop通过Selector监听绑定在其上的socket的网络通讯

    (4)每个Boss Group中的NioEventLoop循环执行的过程分为三步:

    ​ 1)轮询accept事件,及连接请求事件

    ​ 2)处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个worker NioEventLoop中的Selector上

    ​ 3)处理任务队列中的任务,即runAllTasks

    (5)每个Woker Group中的NioEventLoop的执行逻辑是:

    ​ 1)轮询处理读写(R/W)事件

    ​ 2)处理I/O事件,在对应的NioSocketChannel中处理

    ​ 3)处理任务队列的任务

    (6)每个Worker Group的NioEventLoop处理具体的业务时,会使用管道Pipeline,可以通过Pipeline获取对应的Channel的处理器ChannelHandler,从而进行真正的业务处理

    Netty实战

    Netty实现简单的TCP通信

    NettyTcpServer.java

    package com.youzikeji.netty;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    
    public class NettyTcpServer {
        public static void main(String[] args) {
    
            //创建BossGroup(只处理连接请求)和WorkerGroup(处理真正的业务)
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                //创建服务端的启动对象
                ServerBootstrap bootstrap = new ServerBootstrap();
    
                //进行参数设置
                bootstrap.group(bossGroup, workerGroup)     //设置两个线程组
                        .channel(NioServerSocketChannel.class)      //使用NioServerSocketChannel作为服务器的通道实现
                        .option(ChannelOption.SO_BACKLOG, 128)      //标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
                        .childOption(ChannelOption.SO_KEEPALIVE, true)      //启用心跳保活机制
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new NettyTcpServerHandler());
                            }
                        });
                System.out.println("服务器准备好了");
    
                //绑定端口并同步
                ChannelFuture cf = bootstrap.bind(7777).sync();
    
                //对关闭通道进行监听
                cf.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    }
    
    

    NettyTcpServerHandler.java

    package com.youzikeji.netty;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    //HandlerAdapter是Netty提供的适配器,规范
    public class NettyTcpServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         *
         * @param ctx : 上下文对象,含有pipeline, Channel等
         * @param msg : 客户端传来的数据
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("server ctx: " + ctx);
    
            //打印msg看看,先将msg转化成netty提供的ByteBuf
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端的地址:" + ctx.channel().remoteAddress());
    
        }
    
        //数据读取完毕
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", CharsetUtil.UTF_8));
        }
    
        //异常处理
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    

    NettyTcpClient.java

    package com.youzikeji.netty;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class NettyTcpClient {
        public static void main(String[] args) throws InterruptedException {
            //创建group
            EventLoopGroup group = new NioEventLoopGroup();
    
            try {
                //创建启动类
                Bootstrap bootstrap = new Bootstrap();
                //参数设置
                bootstrap.group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new NettyTcpClientHandler());
                            }
                        });
                System.out.println("客户端就绪");
                //连接服务器并同步
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7777).sync();
    
                //关闭通道监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }
    
    }
    
    

    NettyTcpClientHandler.java

    package com.youzikeji.netty;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    public class NettyTcpClientHandler extends ChannelInboundHandlerAdapter {
    
        //客户端就绪就会触发
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client ctx: " + ctx);
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服务器", CharsetUtil.UTF_8));
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("服务其消息: " + byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("服务器地址: " + ctx.channel().remoteAddress());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    

    服务端

    image-20210318102229436

    客户端

    image-20210318102238524

    Netty实现基本的RPC框架

    RPC概述

    RPC原理图

    RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。比如两个不同的服务 A、B 部署在两台不同的机器上,那么服务 A 如果想要调用服务 B 中的某个方法该怎么办呢?使用 HTTP请求当然可以,但是可能会比较慢而且一些优化做的并不好。 RPC 的出现就是为了让计算机调用远程服务就像调用本地服务一样快。

    RPC的执行流程

    1. 服务消费方(client)调用以本地调用方式调用服务;
    2. client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
    3. client stub找到服务地址,并将消息发送到服务端;
    4. server stub收到消息后进行解码
    5. server stub根据解码结果调用本地的服务
    6. 本地服务执行并将结果返回给server stub;
    7. server stub将返回结果打包成消息并发送至消费方;
    8. client stub接收到消息,并进行解码
    9. 服务消费方得到最终结果

    框架设计

    image-20210318104129844

    具体实现

    项目结构

    image-20210503112605108

    代码剖析

    服务接口HelloService.java —— 定义了服务提供者提供服务的规范

    package com.youzikeji.rpc.service;
    
    public interface HelloService {
    
        String hello(String msg);
    }
    

    服务实现类HelloServiceImpl.java —— 服务的具体实现(业务,返回服务调用结果)

    package com.youzikeji.rpc.provider;
    
    import com.youzikeji.rpc.service.HelloService;
    
    public class HelloServiceImpl implements HelloService {
        public String hello(String msg) {
            System.out.println("收到客户端消息:" + msg);
            if (msg != null) {
                return "客户端您好,已收到您的消息[" + msg + "]";
            } else {
                return "客户端您好,已收到您的消息";
            }
        }
    }
    

    服务端

    • ServerBootstrap.java —— 服务端启动类

      package com.youzikeji.rpc.provider;
      
      import com.youzikeji.rpc.server.NettyServer;
      
      //ServerBootStrap会启动一个服务提供者,即NettyServer
      public class ServerBootstrap {
          public static void main(String[] args) {
              NettyServer.startServer("127.0.0.1", 7000);
          }
      }
      
    • NettyServerHandler.java —— 具体的业务处理器

      package com.youzikeji.rpc.server;
      
      import com.youzikeji.rpc.provider.HelloServiceImpl;
      import io.netty.channel.ChannelHandlerContext;
      import io.netty.channel.ChannelInboundHandlerAdapter;
      
      public class NettyServerHandler extends ChannelInboundHandlerAdapter {
          @Override
          public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
              ctx.close();
          }
      	
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              //获取客户端发送的消息,并根据约定好的的协议调用服务,这里简单地规定消息必须以某个字符串作为开头
              if (msg.toString().startsWith("HelloService#hello#")){
                  String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
                  ctx.writeAndFlush(result);
              }
          }
      }
      
    • NettyServer.java —— Netty构建服务端

      package com.youzikeji.rpc.server;
      
      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.channel.ChannelFuture;
      import io.netty.channel.ChannelInitializer;
      import io.netty.channel.ChannelPipeline;
      import io.netty.channel.EventLoopGroup;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      import io.netty.handler.codec.string.StringDecoder;
      import io.netty.handler.codec.string.StringEncoder;
      
      public class NettyServer {
      
          //对外暴露方法
          public static void startServer(String hostname, int port){
              startServer0(hostname, port);
          }
          /**
           * 启动服务
           * @param hostname 主机名
           * @param port 端口号
           */
          private static void startServer0(String hostname, int port) {
              //Boss线程池和Worker线程池
              EventLoopGroup bossGroup = new NioEventLoopGroup(1);
              EventLoopGroup workerGroup = new NioEventLoopGroup();
      
              try {
                  //创建启动类
                  ServerBootstrap serverBootstrap = new ServerBootstrap();
      
                  serverBootstrap.group(bossGroup, workerGroup)
                          .channel(NioServerSocketChannel.class)
                          .childHandler(new ChannelInitializer<SocketChannel>() {
                              @Override
                              protected void initChannel(SocketChannel socketChannel) throws Exception {
                                  ChannelPipeline pipeline = socketChannel.pipeline();
                                  pipeline.addLast(new StringDecoder());
                                  pipeline.addLast(new StringEncoder());
                                  pipeline.addLast(new NettyServerHandler());     //业务处理器
                              }
                          });
                  //绑定并同步监听主机端口,然后做异步处理
                  ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
                  System.out.println("服务提供方开始提供服务...");
                  channelFuture.channel().closeFuture().sync();
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  bossGroup.shutdownGracefully();
                  workerGroup.shutdownGracefully();
              }
          }
      }
      

    客户端

    • ClientBootstrap.java —— 客户端启动类,通过构建远程服务代理对象实现服务调用

      package com.youzikeji.rpc.customer;
      
      import com.youzikeji.rpc.client.NettyClient;
      import com.youzikeji.rpc.service.HelloService;
      
      public class ClientBootstrap {
      
          //定义协议头
          public static final String head = "HelloService#hello#";
      
          public static void main(String[] args) {
              //创建消费者
              NettyClient cus = new NettyClient();
      
              //创建代理对象
              HelloService service = (HelloService) cus.getBean(HelloService.class, head);
      
              //通过代理对象调用服务提供者的方法
              String res = service.hello("您好, RPC");
      
              System.out.println("调用的结果:" + res);
      
          }
      }
      
    • NettyClientHandler.java —— 将调用服务的参数等信息发送给服务器,等待服务代理对象返回调用结果

      package com.youzikeji.rpc.client;
      
      import io.netty.channel.ChannelHandlerContext;
      import io.netty.channel.ChannelInboundHandlerAdapter;
      
      import java.util.concurrent.Callable;
      
      public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
      
          private ChannelHandlerContext context;
          private String result;  //返回的结果
          private String param;   //客户端调用方法时传入的参数
      
          /**
           * 被代理对象调用,发送数据给服务器,等待被唤醒,然后返回结果
           * @return 返回结果
           * @throws Exception 异常
           */
          @Override
          public synchronized Object call() throws Exception {
              context.writeAndFlush(param);
              //等待channelRead获取服务器返回的结果后,唤醒
              wait();
              return result;
          }
      
          @Override
          public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
              ctx.close();
          }
      
          /**
           * 与服务器连接创建后就被调用
           * @param ctx
           * @throws Exception 异常
           */
          @Override
          public void channelActive(ChannelHandlerContext ctx) throws Exception {
              context = ctx;  //其他方法中会使用到ctx
          }
      
          /**
           * 收到服务器的数据后,调用方法
           * @param ctx
           * @param msg
           * @throws Exception 异常
           */
          @Override
          public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              result = msg.toString();
              //唤醒等待的线程
              notify();
          }
      
          void setParam(String param){
              this.param = param;
          }
      }
      
    • NettyClient.java

      package com.youzikeji.rpc.client;
      
      import io.netty.bootstrap.Bootstrap;
      import io.netty.channel.ChannelInitializer;
      import io.netty.channel.ChannelOption;
      import io.netty.channel.ChannelPipeline;
      import io.netty.channel.EventLoopGroup;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;
      import io.netty.handler.codec.string.StringDecoder;
      import io.netty.handler.codec.string.StringEncoder;
      
      import java.lang.reflect.Proxy;
      import java.util.Objects;
      import java.util.concurrent.*;
      
      public class NettyClient {
          //创建线程池
          public static ExecutorService executor = new ThreadPoolExecutor(
                  3,
                  Runtime.getRuntime().availableProcessors(),
                  3,
                  TimeUnit.SECONDS,
                  new LinkedBlockingDeque<>(5),
                  Executors.defaultThreadFactory(),
                  new ThreadPoolExecutor.AbortPolicy()
          );
      
          private static NettyClientHandler client;
      
          //代理模式,获取代理对象
          public Object getBean(final Class<?> serviceClass, final String head) {
              return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                      new Class<?>[]{serviceClass}, ((proxy, method, args) -> {
                          if (client == null) {
                              initClient();
                          }
                          //设置要发给服务端的信息
                          client.setParam(head + args[0]);
                          return executor.submit(client).get();
                      }));
          }
      
          //初始化客户端
          public static void initClient() {
              client = new NettyClientHandler();
      
              EventLoopGroup group = new NioEventLoopGroup();
              Bootstrap bootstrap = new Bootstrap();
              bootstrap.group(group)
                      .channel(NioSocketChannel.class)
                      .option(ChannelOption.TCP_NODELAY, true)
                      .handler(
                              new ChannelInitializer<SocketChannel>() {
                                  @Override
                                  protected void initChannel(SocketChannel socketChannel) throws Exception {
                                      ChannelPipeline pipeline = socketChannel.pipeline();
                                      pipeline.addLast(new StringDecoder());
                                      pipeline.addLast(new StringEncoder());
                                      pipeline.addLast(client);
                                  }
                              }
                      );
              try {
                  bootstrap.connect("127.0.0.1", 7000).sync();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
      
          }
      
      }
      

    结果

    先后运行ServerBootstrap.java和ClientBootstrap.java

    服务端结果如下图:

    image-20210503150355519

    客户端结果如下:

    image-20210503150404678

  • 相关阅读:
    04-基本的mysql语句
    03-MySql安装和基本管理
    02-数据库的概述
    MySql的前戏
    Python3连接MySQL数据库之pymysql模块的使用
    mockjs简单易懂
    GitHub Android 开源项目汇总 (转)
    国内云计算的缺失环节: GPU并行计算(转)
    HDFS+MapReduce+Hive+HBase十分钟快速入门
    同步和异步
  • 原文地址:https://www.cnblogs.com/caoyusang/p/14906973.html
Copyright © 2011-2022 走看看