Nodejs使用gRPC与Java进行远程通信
Java代码
加入依赖
plugins {
id 'java'
id 'com.google.protobuf' version '0.8.8'
}
group 'com.sakura'
version '1.0'
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
testCompile (
"junit:junit:4.12"
)
compile (
"io.netty:netty-all:4.1.46.Final",
'com.google.protobuf:protobuf-java:3.11.4',
'com.google.protobuf:protobuf-java-util:3.11.4',
'io.grpc:grpc-netty-shaded:1.28.0',
'io.grpc:grpc-protobuf:1.28.0',
'io.grpc:grpc-stub:1.28.0'
)
}
//构建protobuf插件配置
protobuf {
//输出目录的根目录名,生成的java文件的位置,会在指定的目录下的main下生成
generateProtoTasks.generatedFilesBaseDir = "$projectDir/src"
protoc {
artifact = "com.google.protobuf:protoc:3.11.0"
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.28.0'
}
}
generateProtoTasks {
all()*.plugins {
grpc {
//grpc的接口类的生成目录,默认是grpc
outputSubDir = 'java'
}
}
}
}
编写一个Proto文件
syntax = "proto3";//定义使用的proto版本
package com.sakura.proto;//所有语言适用的包路径定义语法
option java_package = "com.sakura.proto";//java包路径 优先级高于package
option java_outer_classname = "Student";//生成的外部类名
option java_multiple_files = true;//是否生成多个文件
//定义rpc的方法
service StudentService{
//1.客户端发出一个普通的请求,服务器的返回一个普通的响应
rpc GetRealNameByUsername(MyRequest) returns (MyResponse);
//grpc的请求以及响应不能是基本数据类型,必须是一个message类型,不管请求里有几个参数
//他必须是你定义的一个message类型的
//2.根据学生的年龄获取与这个年龄相等的学生对象客户端发生一个普通的请求,服务器的以流的形式返回
rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse);
//3.以流式的方式请求一个StudentRequest服务器会返回一个StudentResponseList
rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList){}
//4.客户源与服务端都以流式的方式,双向的数据流传递
rpc BiTalk(stream StreamRequest) returns (stream StreamResponse);
}
//消息
message MyRequest{
string username = 1;
}
message MyResponse{
string realname = 2;
}
//单向流使用的消息
message StudentResponse{
string name = 1;
int32 age = 2;
string city = 3;
}
message StudentRequest{
int32 age = 1;
}
message StudentResponseList{
repeated StudentResponse studentResponse = 1;
}
//双向数据流传递使用的消息
message StreamRequest{
string request_info = 1;
}
message StreamResponse{
string response_info = 1;
}
服务端代码
public class GrpcServer {
private Server server;
public static void main(String[] args) throws Exception{
GrpcServer server = new GrpcServer();
server.start();
server.awaitTermination();
}
private void start()throws Exception{
//创建服务通道配置端口,传入映射的方法的实现类,然后构建并启动
this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl())
.build().start();
System.out.println("server started!");
//设置一个回调钩子
Runtime.getRuntime().addShutdownHook(new Thread(() ->{
System.out.println("JVM 关闭");
try {
this.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
System.out.println("执行到这里");
}
private void stop() throws InterruptedException {
if(this.server != null){
//关闭服务
this.server.shutdown();
}
}
private void awaitTermination() throws InterruptedException {
if(this.server != null){
//等待终止,让服务不停止,可以设置超时时长
this.server.awaitTermination();
//this.server.awaitTermination(3000, TimeUnit.MILLISECONDS);
}
}
}
服务接口实现类
/**
* @ClassName : StudentServiceImpl
* @Description : 远程调用的方法的具体实现,实现生成代码中的内部抽象类
*/
public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
/**
* 重写父类的方法
* @param request 客户端发来的数据
* @param responseObserver 响应观察者 用于响应客户端的对象
*/
@Override
public void getRealNameByUsername(MyRequest request, StreamObserver<MyResponse> responseObserver) {
System.out.println("接收到客户端信息:" + request.getUsername());
/*
onCompleted() 标示这个方法调用结束,只能调用一次
onError() 异常时调用
onNext() 接下来要做什么事,可以用于结果返回
*/
//构造响应对象,并返回
responseObserver.onNext(MyResponse.newBuilder().setRealname("星空").build());
//标示服务器处理结束
responseObserver.onCompleted();
}
@Override
public void getStudentsByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
System.out.println("接受到客户端信息:" + request.getAge());
responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海").setAge(18).setCity("北京").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海2").setAge(20).setCity("上海").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海3").setAge(22).setCity("广州").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海4").setAge(18).setCity("深圳").build());
responseObserver.onCompleted();
}
@Override
public StreamObserver<StudentRequest> getStudentsWrapperByAges(StreamObserver<StudentResponseList> responseObserver) {
//实现StreamObserver接口,实现方法当特定的事件触发时,回调方法就会的到调用
return new StreamObserver<StudentRequest>() {
/**
* 接收客户端的请求,请求到来时被调用
* 每来一次请求,onNext()方法就会被调用一次
* 因为请求是流式的,onNext会被调用多次
* @param value
*/
@Override
public void onNext(StudentRequest value) {
System.out.println("onNext:" + value.getAge());
}
/**
* 出现异常时被调用
* @param t
*/
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
/**
* 表示客户端将流式数据全部发给服务器端之后,客户端就会有一个onCompleted事件,服务器端就会感知到
* 然后服务器端在onCompleted中为客户端返回最终结果
*/
@Override
public void onCompleted() {
StudentResponse.Builder studentResponse =
StudentResponse.newBuilder().setName("彩虹海1").setAge(18).setCity("宇宙");
StudentResponse.Builder studentResponse2 =
StudentResponse.newBuilder().setName("彩虹海2").setAge(20).setCity("宇宙");
StudentResponseList studentResponseList = StudentResponseList.newBuilder().addStudentResponse(studentResponse)
.addStudentResponse(studentResponse2).build();
//返回结果
responseObserver.onNext(studentResponseList);
//表示处理完成
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
return new StreamObserver<StreamRequest>() {
/**
* 客户端发来请求时被调用,每请求一次则被调用一次
* @param value 客户端发来的数据
*/
@Override
public void onNext(StreamRequest value) {
//打印客户端发来的数据
System.out.println(value.getRequestInfo());
//向客户端返回数据
responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
/**
* 客户端的onCompleted方法被调用时,被调用
*/
@Override
public void onCompleted() {
//双向的数据流传递,虽然是在两个不同的流中传递互不干扰
//但是当一方的流被关闭时另一方也要关闭与之交互的流
responseObserver.onCompleted();
}
};
}
}
客户端代码
/**
* @ClassName : GrpcClient
* @Description : grpc client
*/
public class GrpcClient {
public static void main(String[] args) throws InterruptedException {
//usePlaintext()使用纯文本的方式,不加密
ManagedChannel managedChannel =
ManagedChannelBuilder.forTarget("localhost:8899").usePlaintext().build();
//客户端与服务端交互的对象 server与client通信的对象
//blockingStub 阻塞的方式/同步 发出一个请求一定要等到另一端返回了响应才继续往下执行
StudentServiceGrpc.StudentServiceBlockingStub blockingStub =
StudentServiceGrpc.newBlockingStub(managedChannel);
//只要是客户端是以流式的方式向服务器发送请求,这种请求一定以异步的
//blockingStub是同步的阻塞的,则不会被提供方法
//获取一个异步的通信对象
//创建一个支持该服务的所有呼叫类型的新异步存根,不会等待对方响应会一直向下执行
StudentServiceGrpc.StudentServiceStub stub =
StudentServiceGrpc.newStub(managedChannel);
//构建消息
MyRequest request = MyRequest.newBuilder().setUsername("出发,目标彩虹海").build();
//调用具体方法,接收到响应
MyResponse response = blockingStub.getRealNameByUsername(request);
System.out.println("接收到服务器信息:" + response.getRealname());
System.out.println("--------------------普通请求与响应 结束----------------------");
/*
//返回一个流式的响应就是一个迭代器,每返回一个对象就进入到迭代器中,再返回对象再进入迭代器,以此类推
Iterator<StudentResponse> iter =
blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(18).build());
//iter.hasNext() 还有没有下一个
while (iter.hasNext()){
StudentResponse studentResponse = iter.next();
System.out.println(studentResponse.getName() + " , " +
studentResponse.getAge() + " , " + studentResponse.getCity());
}
System.out.println("-----------------------普通请求 流式响应 结束-------------------");
*/
//客户端请求一个steam(流式) blockingStub(同步)无法使用 只有使用异步形式
//构造接收服务端信息的方法
/* StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
/**
* 服务端向客户端响应结果时会被调用
* 服务端返回的数据,每返回一次数据则被调用一次
* 如果服务器端也是流式的并且返回了多个数据,那么每次返回数据的时候都会被调用一次
* @param value
*//*
@Override
public void onNext(StudentResponseList value) {
value.getStudentResponseList().forEach(studentResponse -> {
System.out.println(studentResponse.getName() + " , " +
studentResponse.getAge() + " , " + studentResponse.getCity());
System.out.println("**************");
});
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
};
//构造客户端向服务端发送的数据
//getStudentsWrapperByAges(传入处理服务端返回数据的回调对象)
StreamObserver<StudentRequest> studentsWrapperByAges =
stub.getStudentsWrapperByAges(studentResponseListStreamObserver);
//发送数据
studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(18).build());
studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(28).build());
studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(38).build());
studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(48).build());
//表示客户端调用结束
studentsWrapperByAges.onCompleted();
System.out.println("-----------------------流式请求 普通响应 结束-------------------");
*/
/*
StreamObserver<StreamRequest> streamRequestStreamObserver =
stub.biTalk(new StreamObserver<StreamResponse>() {
/**
* 收到服务器响应结果时,被调用
* @param value 服务器返回的数据
*//*
@Override
public void onNext(StreamResponse value) {
System.out.println(value.getResponseInfo());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
/**
* 服务器端onCompleted()被调用时,被触发
*//*
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
});
for (int i = 0; i < 10; i++) {
streamRequestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
Thread.sleep(1000);
}
streamRequestStreamObserver.onCompleted();
System.out.println("-----------------------双向数据流传递 结束-------------------");
*/
//客户端向服务器端发送数据,数据还没发送就继续向下执行走到了onCompleted(),
//然后在数据还未发出时程序就正常执行完成结束了
//线程睡眠,强制让程序等待,等待studentsWrapperByAges将数据全部发送给服务器端
//如果不睡眠的话,数据还没发送给服务器端,jvm就停止了,也就发送不出去了
Thread.sleep(5000);
}
}
安装Nodejs
ArchLinux通过nvm安装node npm
nvm ls-remote //显示所有远端node版本
nvm install 版本号 //安装node
nvm use 已安装node版本号 //切换node
nvm alias default 已安装node版本号 //设置默认使用的node版本
配置package.json
{
"name": "grpc-examples",
"version": "0.1.0",
"dependencies": {
"@grpc/proto-loader": "^0.1.0",
"async": "^1.5.2",
"google-protobuf": "^3.0.0",
"grpc": "^1.11.0",
"lodash": "^4.6.1",
"minimist": "^1.2.0"
}
}
安装grpc代码环境
npm config set registry https://registry.npm.taobao.org //npm设置淘宝源
sudo npm install //下载依赖,在项目目录下 `package.json`同目录
Nodejs动态代码生成
编写代码时是不需要提前使用proto文件生成对应的js代码的,只需要指定proto文件的位置,在运行的过程当中会动态的生成js文件,客户端与服务端都可以使用动态生成
服务端代码
//*.proto文件的绝对路径
var PROTO_FILE_PATH = "/home/ideaHome/netty/nodejs/grpc_nodejs/proto/Student.proto";
//引入grpc
var grpc = require("grpc");
//加载rpc的方法
var grpcService = grpc.load(PROTO_FILE_PATH).com.sakura.proto;
//通过grpc获取server
var server = new grpc.Server();
server.addService(grpcService.StudentService.service,{
//函数: 服务(想要自定义)
getRealNameByUsername: getRealNameByUsername,
getStudentsByAge: getStudentsByAge,
getStudentsWrapperByAges: getStudentsWrapperByAges,
biTalk: biTalk
});
//grpc.ServerCredentials.createInsecure() 使用纯文本的方式传输 不使用加密的
server.bind("localhost:8899",grpc.ServerCredentials.createInsecure());
//启动服务器
server.start();
//实现具体方法被调用时,处理的方法(服务)
//参数(客户端的请求,回调(函数,请求收到之后最后调用这个回调把结果返回给客户端))
function getRealNameByUsername(call, callback){
console.log("username:" + call.request.username);
//参数(错误对象,真正要给客户端返回的结果值)
callback(null,{realname: "星空"});
}
//用不到 空实现
function getStudentsByAge() {}
function getStudentsWrapperByAges() {}
function biTalk() {}
客户端代码
//*.proto文件的绝对路径
var PROTO_FILE_PATH = "/home/ideaHome/netty/nodejs/grpc_nodejs/proto/Student.proto";
//引入grpc
var grpc = require("grpc");
var grpcService = grpc.load(PROTO_FILE_PATH).com.sakura.proto;
//grpc.credentials.createInsecure() 不使用加密的
var client = new grpcService.StudentService("localhost:8899",grpc.credentials.createInsecure());
//node是一个异步的框架,绝大多数都是通过回调的方式获取对端的响应,也有同步的操作,
//但是对与node来说绝大多数都是通过异步的方式来获取对方返回的结果
//动态的代码生成就是以一个json对象的方式传输给对端
//方法 (参数,回调方法(异常,对端返回的数据))
client.getRealNameByUsername({username: "麦当"},function(error,respData){
console.log(respData);
});
测试
Java服务端 node客户端
可通过 node xxxx/xxxx.js //运行nodejs代码
//java服务端显示
server started!
执行到这里
接收到客户端信息:麦当
//node客户端显示
{ realname: '星空' }
node服务端 Java客户端
//node服务端显示
username:出发,目标彩虹海
//java客户端显示
接收到服务器信息:星空
--------------------普通请求与响应 结束----------------------
Nodejs静态代码生成
安装js代码生成工具
npm install -g grpc-tools //通过npm全局安装grpc-tools
生成js代码
格式:grpc_tools_node_protoc --js_out=import_style=commonjs,binary:js普通代码生成的文件位置 --grpc_out=调用接口的生成位置 --plugin=protoc-gen-grpc=grpc_tools_node_protoc_plugin插件的位置可通过which查看 proto文件
例如:grpc_tools_node_protoc --js_out=import_style=commonjs,binary:static_codegen/ --grpc_out=static_codegen/ --plugin=protoc-gen-grpc=/home/miki/.nvm/versions/node/v12.17.0/bin/grpc_tools_node_protoc_plugin proto/Student.proto
服务端代码
var service = require('../static_codegen/proto/Student_grpc_pb');
var messages = require('../static_codegen/proto/Student_pb');
var grpc = require('grpc');
var server = new grpc.Server();
server.addService(service.StudentServiceService,{
//函数: 服务(想要自定义)
getRealNameByUsername: getRealNameByUsername,
getStudentsByAge: getStudentsByAge,
getStudentsWrapperByAges: getStudentsWrapperByAges,
biTalk: biTalk
});
server.bind("localhost:8899",grpc.ServerCredentials.createInsecure());
server.start();
//参数(客户端的请求,回调(函数,请求收到之后最后调用这个回调把结果返回给客户端))
function getRealNameByUsername(call,callback) {
console.log("request:" + call.request.getUsername());
var myResponse = new messages.MyResponse();
myResponse.setRealname("目标,彩虹海");
callback(null, myResponse);
}
//用不到 空实现
function getStudentsByAge() {}
function getStudentsWrapperByAges() {}
function biTalk() {}
客户端代码
var service = require('../static_codegen/proto/Student_grpc_pb');
var messages = require('../static_codegen/proto/Student_pb');
var grpc = require('grpc');
//创建一个客户端
var client = new service.StudentServiceClient("localhost:8899",
grpc.credentials.createInsecure());
var request = new messages.MyRequest();
request.setUsername("米龙");
//发出请求
client.getRealNameByUsername(request, function (error, respData) {
console.log(respData.getRealname());
});
测试
Java服务端 node客户端
可通过 node xxxx/xxxx.js //运行nodejs代码
//java服务端显示
server started!
执行到这里
接收到客户端信息:米龙
//node客户端显示
星空
node服务端 Java客户端
//node服务端显示
request:出发,目标彩虹海
//java客户端显示
接收到服务器信息:目标,彩虹海
--------------------普通请求与响应 结束----------------------