zoukankan      html  css  js  c++  java
  • 居于Netty实现的一个简单的RPC

    Netty:基于java NIO 网络通信框架,具有高效、简单、快速的应用特点。在当下互联网高并发场景下得到很好地应用,现在用java写的高并发产品(如dubbo 、zookeeper、hadoop、rocketmq)大都应用了netty作为底层的通信技术。
    RPC:远程调用,通过RPC框架,使得我们可以像调用本地方法一样地调用远程机器上的方法。
    1.定义常量类,包含主机地址,端口号。
    1 /**
    2  * @author hsl 2020-07-02 13:26
    3  * 一个常亮工具类 主机地址,端口号
    4  */
    5 public class Constants {
    6     public static String REMOTE_HOST = "localhost";
    7     public static int PORT = 6789;
    8 }

    2.RPC框架,使得我们可以像调用本地方法一样地调用远程机器上的方法,那么我们就定义接口并实现。

    1 package com.netty.nettyOne.service.dateTimeService;
    2 
    3 /**
    4  * @author hsl 2020-07-02 13:29
    5  */
    6 public interface DateTimeService {
    7     public String getNow(String param);
    8 }
     1 package com.netty.nettyOne.service.dateTimeService;
     2 
     3 import java.text.SimpleDateFormat;
     4 import java.util.Date;
     5 
     6 /**
     7  * @author hsl 2020-07-02 13:29
     8  */
     9 public class DateTimeServiceImpl implements DateTimeService {
    10 
    11     @Override
    12     public String getNow(String param) {
    13         return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date());
    14     }
    15 }

    3.NettyServer

     1 package com.netty.nettyOne.server;
     2 
     3 import com.netty.nettyOne.service.dateTimeService.DateTimeServiceImpl;
     4 import io.netty.channel.ChannelHandlerContext;
     5 import io.netty.channel.ChannelInboundHandlerAdapter;
     6 
     7 /**
     8  * @author hsl 2020-07-02 14:00
     9  * 服务处理类实现
    10  */
    11 public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    12     @Override
    13     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    14         //System.out.println("Server has received Message: [" + msg + "]");
    15         //处理客户端发送来的信息
    16         System.out.println("客户端发送的信息:" + msg.toString());
    17         //响应
    18         String result = new DateTimeServiceImpl().getNow(msg.toString());
    19         System.out.println("服务器响应的信息:" + result);
    20         ctx.writeAndFlush(result);
    21     }
    22 
    23 //    @Override
    24 //    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    25 //
    26 //    }
    27 
    28     @Override
    29     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    30         ctx.close();    //抛出关闭通信channel
    31     }
    32 }
     1 package com.netty.nettyOne.server;
     2 
     3 
     4 import io.netty.bootstrap.ServerBootstrap;
     5 import io.netty.channel.ChannelFuture;
     6 import io.netty.channel.ChannelInitializer;
     7 import io.netty.channel.ChannelPipeline;
     8 import io.netty.channel.nio.NioEventLoopGroup;
     9 import io.netty.channel.socket.SocketChannel;
    10 import io.netty.channel.socket.nio.NioServerSocketChannel;
    11 import io.netty.handler.codec.string.StringDecoder;
    12 import io.netty.handler.codec.string.StringEncoder;
    13 
    14 /**
    15  * @author hsl 2020-07-02 13:39
    16  */
    17 public class NettyServer {
    18     //主机名称 端口号
    19     private String host;
    20     private int port;
    21 
    22     public NettyServer(String host, int port) {
    23         this.host = host;
    24         this.port = port;
    25     }
    26 
    27     /**
    28      * 对外公开的方法
    29      */
    30     public void serverStart() {
    31         serverStart0(host, port);
    32     }
    33 
    34     /**
    35      * @param host 主机名称
    36      * @param port     端口号
    37      */
    38     private void serverStart0(String host, int port) {
    39         //处理 ACCEPT 事件的线程工作组
    40         NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
    41 
    42         //处理 READ/WRITER 时间的线程工作组
    43         NioEventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
    44 
    45         ServerBootstrap server = new ServerBootstrap();
    46 
    47         server.group(bossGroup, workerGroup)
    48                 .channel(NioServerSocketChannel.class)
    49                 .childHandler(new ChannelInitializer<SocketChannel>() {
    50                     @Override
    51                     protected void initChannel(SocketChannel socketChannel) throws Exception {
    52                         ChannelPipeline pipeline = socketChannel.pipeline();
    53                         pipeline.addLast(new StringEncoder())
    54                                 .addLast(new StringDecoder())
    55                                 .addLast(new NettyServerHandler());  // 此处添加业务处理Handler
    56                     }
    57                 });
    58 
    59         try {
    60             //启动服务
    61             ChannelFuture channelFuture = server.bind(host, port).sync();
    62             System.out.println("NettyServer is started ..... ");
    63 
    64             //关闭服务
    65             channelFuture.channel().closeFuture().sync();
    66         } catch (InterruptedException e) {
    67             e.printStackTrace();
    68         } finally {
    69             bossGroup.shutdownGracefully();
    70             workerGroup.shutdownGracefully();
    71         }
    72     }
    73 }

    netty服务器启动类

     1 package com.netty.nettyOne.server;
     2 
     3 import com.netty.nettyOne.util.Constants;
     4 
     5 /**
     6  * @author hsl 2020-07-02 14:29
     7  */
     8 public class NettyServerBootstrap {
     9     public static void main(String[] args) {
    10         //初始化服务器
    11         NettyServer nettyServer = new NettyServer(Constants.REMOTE_HOST, Constants.PORT);
    12         nettyServer.serverStart();
    13     }
    14 }

    4.NettyClient

     1 package com.netty.nettyOne.client;
     2 
     3 import io.netty.channel.ChannelHandlerContext;
     4 import io.netty.channel.ChannelInboundHandlerAdapter;
     5 
     6 import java.util.concurrent.Callable;
     7 
     8 /**
     9  * @author hsl 2020-07-02 14:48
    10  */
    11 public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    12     //上下文对象
    13     /**
    14      * {@link #channelActive}
    15      */
    16     private ChannelHandlerContext context;
    17     //存放服务器返回结果
    18     private String result;
    19     //存放请求参数
    20     private String requestParam;
    21 
    22     public void setRequestParam(String requestParam) {
    23         this.requestParam = requestParam;
    24     }
    25 
    26     @Override
    27     public synchronized Object call() throws Exception {
    28         System.out.println("NettyClientHandler call() Invoacated ... ");
    29         //发送请求
    30         context.writeAndFlush(requestParam);
    31         //发送完了请求之后,等待....
    32         wait();
    33         //被唤醒之后,继续处理(返回结果即可)
    34         return result;
    35     }
    36 
    37     /**
    38      * @param ctx
    39      * @throws Exception
    40      * 当 该 NettyClientHandler 被初始化的时候,就会调用 该 channelActive 方法一次
    41      */
    42     @Override
    43     public void channelActive(ChannelHandlerContext ctx) throws Exception {
    44         System.out.println(" channelActive 被调用  ");
    45         // 向服务端发送请求。
    46         this.context = ctx;
    47     }
    48 
    49     /**
    50      * @param ctx
    51      * @param msg
    52      * @throws Exception
    53      * channelRead 用来处理服务端返回来的数据。
    54      */
    55     @Override
    56     public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    57         result = msg.toString();
    58         System.out.println("Client received result: " + result);
    59 
    60         //唤醒线程
    61         notify();
    62     }
    63 
    64     @Override
    65     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    66         ctx.close();
    67     }
    68 }
    package com.netty.nettyOne.client;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author hsl 2020-07-02 14:43
     */
    public class NettyClient {
        private String host;
        private int port;
    
        public NettyClient(String host, int port) {
            this.host = host;
            this.port = port;
            //启动nettyCLient
            clientStart();
        }
    
        /**
         * 初始化一个线程池,用来处理服务端返回的结果数据
         */
        private static int cpu_cores = Runtime.getRuntime().availableProcessors();
        private static ExecutorService threadPool = Executors.newFixedThreadPool(cpu_cores);
    
        private NettyClientHandler clientHandler;
    
        /**
         * 客户端的启动方法
         */
        private void clientStart() {
            clientStart0(host, port);
        }
    
        private void clientStart0(String host, int port) {
            //服务工作组
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
            //客户端服务启动类
            Bootstrap client = new Bootstrap();
            clientHandler = new NettyClientHandler();
    
            //绑定服务参数
            client.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder())
                                    .addLast(new StringDecoder())
                                    .addLast(clientHandler);
                        }
                    });
    
            try {
                //客户端启动
                client.connect(host, port).sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * NettyClient 端提供一个 代理方法,获取服务类的一个代理类
         *
         * @param serviceClass
         */
        public Object getProxy(final Class<?> serviceClass) {
            return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                    new Class<?>[]{serviceClass}, new InvocationHandler() {
                        @Override
                        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                            System.out.println("客户端发送的请求参数是:" + args[0].toString());
                            clientHandler.setRequestParam(args[0].toString());
                            return threadPool.submit(clientHandler).get();
                        }
                    });
        }
    }
     1 package com.netty.nettyOne.client;
     2 
     3 import com.netty.nettyOne.service.dateTimeService.DateTimeService;
     4 import com.netty.nettyOne.util.Constants;
     5 
     6 /**
     7  * @author hsl 2020-07-02 15:30
     8  */
     9 public class NettyClientBootstrap {
    10     public static void main(String[] args) {
    11         //启动客户端
    12         NettyClient client = new NettyClient(Constants.REMOTE_HOST, Constants.PORT);
    13 
    14         //获取一个服务端 服务对象的 代理对象
    15         DateTimeService dateTimeService = (DateTimeService) client.getProxy(DateTimeService.class);
    16         //调用服务
    17         String result = dateTimeService.getNow("Netty牛逼");
    18         // 输出结果
    19         System.out.println("Client Received Result From Server: [" + result + "]");
    20     }
    21 }

    6.实现结果

    服务器打印

    客户端打印

  • 相关阅读:
    2013上半年中国CRM市场分析报告
    windows运行命令大全
    JVM探索(二)
    JVM探索(一)
    1.数据结构和算法的基础笔记
    java程序性能优化
    http状态码
    mongodb清洗数据
    List的数据结构
    Foundation Data Structure
  • 原文地址:https://www.cnblogs.com/ualong/p/13259416.html
Copyright © 2011-2022 走看看