Netty:基于java NIO 网络通信框架,具有高效、简单、快速的应用特点。在当下互联网高并发场景下得到很好地应用,现在用java写的高并发产品
(如dubbo 、zookeeper、hadoop、rocketmq)
大都应用了netty作为底层的通信技术。RPC:远程调用,通过RPC框架,使得我们可以像调用本地方法一样地调用远程机器上的方法。
1.定义常量类,包含主机地址,端口号。
1 /** 2 * @author hsl 2020-07-02 13:26 3 * 一个常亮工具类 主机地址,端口号 4 */ 5 public class Constants { 6 public static String REMOTE_HOST = "localhost"; 7 public static int PORT = 6789; 8 }
2.RPC框架,使得我们可以像调用本地方法一样地调用远程机器上的方法,那么我们就定义接口并实现。
1 package com.netty.nettyOne.service.dateTimeService; 2 3 /** 4 * @author hsl 2020-07-02 13:29 5 */ 6 public interface DateTimeService { 7 public String getNow(String param); 8 }
1 package com.netty.nettyOne.service.dateTimeService; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 6 /** 7 * @author hsl 2020-07-02 13:29 8 */ 9 public class DateTimeServiceImpl implements DateTimeService { 10 11 @Override 12 public String getNow(String param) { 13 return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date()); 14 } 15 }
3.NettyServer
1 package com.netty.nettyOne.server; 2 3 import com.netty.nettyOne.service.dateTimeService.DateTimeServiceImpl; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.ChannelInboundHandlerAdapter; 6 7 /** 8 * @author hsl 2020-07-02 14:00 9 * 服务处理类实现 10 */ 11 public class NettyServerHandler extends ChannelInboundHandlerAdapter { 12 @Override 13 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 14 //System.out.println("Server has received Message: [" + msg + "]"); 15 //处理客户端发送来的信息 16 System.out.println("客户端发送的信息:" + msg.toString()); 17 //响应 18 String result = new DateTimeServiceImpl().getNow(msg.toString()); 19 System.out.println("服务器响应的信息:" + result); 20 ctx.writeAndFlush(result); 21 } 22 23 // @Override 24 // public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 25 // 26 // } 27 28 @Override 29 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 30 ctx.close(); //抛出关闭通信channel 31 } 32 }
1 package com.netty.nettyOne.server; 2 3 4 import io.netty.bootstrap.ServerBootstrap; 5 import io.netty.channel.ChannelFuture; 6 import io.netty.channel.ChannelInitializer; 7 import io.netty.channel.ChannelPipeline; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 import io.netty.handler.codec.string.StringDecoder; 12 import io.netty.handler.codec.string.StringEncoder; 13 14 /** 15 * @author hsl 2020-07-02 13:39 16 */ 17 public class NettyServer { 18 //主机名称 端口号 19 private String host; 20 private int port; 21 22 public NettyServer(String host, int port) { 23 this.host = host; 24 this.port = port; 25 } 26 27 /** 28 * 对外公开的方法 29 */ 30 public void serverStart() { 31 serverStart0(host, port); 32 } 33 34 /** 35 * @param host 主机名称 36 * @param port 端口号 37 */ 38 private void serverStart0(String host, int port) { 39 //处理 ACCEPT 事件的线程工作组 40 NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); 41 42 //处理 READ/WRITER 时间的线程工作组 43 NioEventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors()); 44 45 ServerBootstrap server = new ServerBootstrap(); 46 47 server.group(bossGroup, workerGroup) 48 .channel(NioServerSocketChannel.class) 49 .childHandler(new ChannelInitializer<SocketChannel>() { 50 @Override 51 protected void initChannel(SocketChannel socketChannel) throws Exception { 52 ChannelPipeline pipeline = socketChannel.pipeline(); 53 pipeline.addLast(new StringEncoder()) 54 .addLast(new StringDecoder()) 55 .addLast(new NettyServerHandler()); // 此处添加业务处理Handler 56 } 57 }); 58 59 try { 60 //启动服务 61 ChannelFuture channelFuture = server.bind(host, port).sync(); 62 System.out.println("NettyServer is started ..... "); 63 64 //关闭服务 65 channelFuture.channel().closeFuture().sync(); 66 } catch (InterruptedException e) { 67 e.printStackTrace(); 68 } finally { 69 bossGroup.shutdownGracefully(); 70 workerGroup.shutdownGracefully(); 71 } 72 } 73 }
netty服务器启动类
1 package com.netty.nettyOne.server; 2 3 import com.netty.nettyOne.util.Constants; 4 5 /** 6 * @author hsl 2020-07-02 14:29 7 */ 8 public class NettyServerBootstrap { 9 public static void main(String[] args) { 10 //初始化服务器 11 NettyServer nettyServer = new NettyServer(Constants.REMOTE_HOST, Constants.PORT); 12 nettyServer.serverStart(); 13 } 14 }
4.NettyClient
1 package com.netty.nettyOne.client; 2 3 import io.netty.channel.ChannelHandlerContext; 4 import io.netty.channel.ChannelInboundHandlerAdapter; 5 6 import java.util.concurrent.Callable; 7 8 /** 9 * @author hsl 2020-07-02 14:48 10 */ 11 public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { 12 //上下文对象 13 /** 14 * {@link #channelActive} 15 */ 16 private ChannelHandlerContext context; 17 //存放服务器返回结果 18 private String result; 19 //存放请求参数 20 private String requestParam; 21 22 public void setRequestParam(String requestParam) { 23 this.requestParam = requestParam; 24 } 25 26 @Override 27 public synchronized Object call() throws Exception { 28 System.out.println("NettyClientHandler call() Invoacated ... "); 29 //发送请求 30 context.writeAndFlush(requestParam); 31 //发送完了请求之后,等待.... 32 wait(); 33 //被唤醒之后,继续处理(返回结果即可) 34 return result; 35 } 36 37 /** 38 * @param ctx 39 * @throws Exception 40 * 当 该 NettyClientHandler 被初始化的时候,就会调用 该 channelActive 方法一次 41 */ 42 @Override 43 public void channelActive(ChannelHandlerContext ctx) throws Exception { 44 System.out.println(" channelActive 被调用 "); 45 // 向服务端发送请求。 46 this.context = ctx; 47 } 48 49 /** 50 * @param ctx 51 * @param msg 52 * @throws Exception 53 * channelRead 用来处理服务端返回来的数据。 54 */ 55 @Override 56 public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 57 result = msg.toString(); 58 System.out.println("Client received result: " + result); 59 60 //唤醒线程 61 notify(); 62 } 63 64 @Override 65 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 66 ctx.close(); 67 } 68 }
package com.netty.nettyOne.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author hsl 2020-07-02 14:43 */ public class NettyClient { private String host; private int port; public NettyClient(String host, int port) { this.host = host; this.port = port; //启动nettyCLient clientStart(); } /** * 初始化一个线程池,用来处理服务端返回的结果数据 */ private static int cpu_cores = Runtime.getRuntime().availableProcessors(); private static ExecutorService threadPool = Executors.newFixedThreadPool(cpu_cores); private NettyClientHandler clientHandler; /** * 客户端的启动方法 */ private void clientStart() { clientStart0(host, port); } private void clientStart0(String host, int port) { //服务工作组 NioEventLoopGroup workerGroup = new NioEventLoopGroup(); //客户端服务启动类 Bootstrap client = new Bootstrap(); clientHandler = new NettyClientHandler(); //绑定服务参数 client.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringEncoder()) .addLast(new StringDecoder()) .addLast(clientHandler); } }); try { //客户端启动 client.connect(host, port).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * NettyClient 端提供一个 代理方法,获取服务类的一个代理类 * * @param serviceClass */ public Object getProxy(final Class<?> serviceClass) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { System.out.println("客户端发送的请求参数是:" + args[0].toString()); clientHandler.setRequestParam(args[0].toString()); return threadPool.submit(clientHandler).get(); } }); } }
1 package com.netty.nettyOne.client; 2 3 import com.netty.nettyOne.service.dateTimeService.DateTimeService; 4 import com.netty.nettyOne.util.Constants; 5 6 /** 7 * @author hsl 2020-07-02 15:30 8 */ 9 public class NettyClientBootstrap { 10 public static void main(String[] args) { 11 //启动客户端 12 NettyClient client = new NettyClient(Constants.REMOTE_HOST, Constants.PORT); 13 14 //获取一个服务端 服务对象的 代理对象 15 DateTimeService dateTimeService = (DateTimeService) client.getProxy(DateTimeService.class); 16 //调用服务 17 String result = dateTimeService.getNow("Netty牛逼"); 18 // 输出结果 19 System.out.println("Client Received Result From Server: [" + result + "]"); 20 } 21 }
6.实现结果
服务器打印
客户端打印