Netty实现RPC
RPC(Remote Procedure Call)远程过程调用,一种计算机通信协议
- 即:一台计算机的程序调用另一台计算机的子程序,并且不需要对这个交互,进行额外的编程;
RPC机制
RPC调用者要调用远程API,首先调用RPCProxy代理,再通过RCPInvoker调用者,打开RCPConnector连接;
这里的RPCChannel就是Netty的通信方式(SocketChannel);
RPC的协议,我们进行自定义,可以通过字符串来定义一个协议头,代表,要调用的目标方法服务;
常用RPC框架
阿里Dubbo、Apache的thrift、Spring的Spring Cloud、google的gRPC
实现流程
- 创建接口,定义抽象方法,用于约定消费者和提供者;
- 创建提供者,需要监听消费者请求,并按照约定返回数据;
- 创建消费者,该类需要透明的调用自己不存在的方法,使用Netty来发送请求;
公用接口
RPC的两端,都要有此接口;
客户端目的就是要调用服务端的实现;所以客户端不实现此接口;
server端实现此接口;
public interface HelloService {
String hello(String msg);
}
RPC Server
- 在server端,实现接口,以供远程调用
// 多次调用,同一个实例,会创建多个实例,而非同一个!
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String msg) {
if (null != msg) {
return "RPC returns message :" + msg;
} else {
return "message is null";
}
}
}
- Server端通过Netty实现
public class NettyServer {
//netty server的启动和初始化
public static void startServer(String hostName, int port) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync();
System.out.println("-------- Netty server starts --------");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
-
Server端自定义Handler
主要是读取客户端的消息;
通过消息,获取规定好的协议头,来判断,具体调用哪个方法;
这里的协议头为:HelloService#hello#,后面跟上方法参数;
拿到协议头后,截取出方法参数,调用对应方法,拿到返回结果;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
// read:读取客户端消息,并调用服务
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("client msg : " + msg);
/**
* 规定RPC调用协议:
* 要想调用某个服务,在这里自定义协议头:必须以字符串"HelloService#hello#"开头(接口#方法)
*/
if (msg.toString().startsWith("HelloService#hello#")) {
String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(result);
}
}
// exception
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
- 最后是Server端的启动类:启动Netty Server
public class RPCServerBootstrap {
public static void main(String[] args) {
NettyServer.startServer("localhost", 8899);
}
}
RPC Client
-
客户端是通过一个线程调用RPC的;也就是call方法,发送RPC请求给Server;
-
请求发送之后,消息并不是立刻返回的,手动将线程阻塞;
-
Server端将RPC调用结果,返回给Client,是首先进入Handler的channelRead方法中;
在此方法中,拿到msg数据,并执行notify,唤醒等待的线程;
-
阻塞的线程,被唤醒,并拿到数据,返回RPC调用结果;
执行流程:
1. call线程发送RPC请求,以及参数,发送之后,阻塞等待被唤醒;
2. Server端根据参数,确定要调用的方法,以及参数,执行方法,返回结果
3. client的Handler的channelRead方法,读取到返回的结果;
4. 唤醒call线程,拿到RPC结果,返回;
-
客户端这边,我们先完成自定义的ClientHandler
注意:call方法与channelRead必须一起同步,否则无法唤醒线程;
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
// ctx上下文,需要在call方法中使用ctx
private ChannelHandlerContext context;
// RPC返回结果
private String result;
// client调用RPC传入参数:即方法参数
private String para;
void setPara(String para) {
this.para = para;
}
/**
* call方法发送请求数据,并阻塞,等待被唤醒;
* 发送RPC请求---> wait等待唤醒---> channelRead notify唤醒 --> 获取RPC结果
*/
@Override
public Object call() throws Exception {
lock.lock();
try {
System.out.println("------ NettyClientHandler.call is called ---------");
// 发送RPC请求参数
context.writeAndFlush(para);
// 等待结果
condition.await();
// 唤醒之后,返回结果
System.out.println("----- NettyClientHandler.call is notified ------");
return result;
} finally {
lock.unlock();
}
}
// 与server创建连接时,调用 channelActive
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("--------- channelActive is called ---------");
context = ctx;
}
// 收到server数据后,必须与call方法同步!
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
lock.lock();
try {
// RPC返回结果
result = msg.toString();
// 唤醒等待的线程 call方法
condition.signalAll();
} finally {
lock.unlock();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
-
Netty客户端的初始化代码
这里通过提交线程池的方法启动RPC请求的发送;
通过代理的方式,调用并返回结果;
public class NettyClient {
// 创建一个线程池 线程数 = CPU处理器数量
private static int nThread = Runtime.getRuntime().availableProcessors();
private static ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(nThread, nThread, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
private static NettyClientHandler clientHandler;
/**
* 使用代理模式
* 反射获取一个代理对象
*/
public Object getBean(final Class<?> serivceClass, final String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serivceClass}, (proxy, method, args) -> {
System.out.println("------------ Proxy ------------");
if (clientHandler == null) {
// 初始化
initClient();
}
//设置要发给服务器端的信息
//providerName 协议头 args[0] 就是客户端调用api hello(???), 参数
clientHandler.setPara(providerName + args[0]);
// 提交线程到线程池,返回结果
return threadPoolExecutor.submit(clientHandler).get();
});
}
// 初始化客户端
public static void initClient() {
// 必须提前实例化!
clientHandler = new NettyClientHandler();
try {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(clientHandler);
}
});
bootstrap.connect("localhost", 8899).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 最后是客户端启动类
public class RPCClientBootstrap {
// 相当于协议头,代表要调用哪个方法
public static final String PROVIDER_NAME = "HelloService#hello#";
public static void main(String[] args) {
// 创建client
NettyClient client = new NettyClient();
// 获取proxy代理
HelloService helloServiceProxy = (HelloService) client.getBean(HelloService.class, PROVIDER_NAME);
// 代理对象调用PRC获取结果
String res = helloServiceProxy.hello("RPC Request");
System.out.println("RPC result: " + res);
}
}