一、gRPC 简介
gRPC 是Go实现的:一个高性能,开源,将移动和HTTP/2
放在首位通用的RPC
框架。使用gRPC
可以在客户端调用不同机器上的服务端的方法,而客户端和服务端的开发语言和
运行环境可以有很多种,基本涵盖了主流语言和平台。双方交互的协议可以在proto
文件中定义,客户端和服务端可以很方便的通过工具生成协议和代理代码。而消息的编码是采
用google protocol buffer
,数据量小、速度快。
gRPC具有以下特点:
(1)基于 HTTP/2, 继而提供了连接多路复用、Body 和 Header 压缩等机制。可以节省带宽、降低TCP链接次数、节省CPU使用和延长电池寿命等。
(2)支持主流开发语言(C, C++, Python, PHP, Ruby, NodeJS, C#, Objective-C、Golang、Java)
(3)IDL (Interface Definition Language) 层使用了 Protocol Buffers, 非常适合团队的接口设计
二、编写示例工程
-
下载安装protobuf,过程省略
-
新建maven项目,过程省略,工程结构
-
完整的pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.gm</groupId> <artifactId>stream_grpc</artifactId> <version>0.0.1-SNAPSHOT</version> <name>stream_grpc</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-all</artifactId> <version>1.17.1</version> </dependency> </dependencies> <build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.4.1.Final</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.5.0</version> <configuration> <!-- The version of protoc must match protobuf-java. If you don't depend on protobuf-java directly, you will be transitively depending on the protobuf-java version that grpc depends on. --> <protocArtifact>com.google.protobuf:protoc:3.0.0-beta-2:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:0.13.2:exe:${os.detected.classifier}</pluginArtifact> <protocExecutable>protoc.exe</protocExecutable> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
此处需注意
protoc
的版本,我的为libprotoc 3.4.0
,因此选择grpc-all 1.17.1
版本 -
编写 proto 文件
在src/main
目录下新建proto
目录,新建stream.proto文件,文件可名称任意syntax = "proto3"; package stream; service StreamService { //简单rpc rpc SimpleFun(RequestData) returns (ResponseData){} //服务器端流式rpc rpc ServerSideStreamFun(RequestData) returns (stream ResponseData){} //客户端流式rpc rpc ClientSideStreamFun(stream RequestData) returns (ResponseData){} //双向流式rpc rpc TwoWayStreamFun(stream RequestData) returns (stream ResponseData){} } message RequestData { string text = 1; } message ResponseData { string text = 1; }
-
编译 protobuf
执行命令:mvn generate-sources
在target
目下生成下图的文件
-
文件复制
在src/main/java
目录下新建stream
包,具体包取决于proto
文件中的package
名称 ,并java文件复制到新建包中
三、服务类型示例
gRPC
存在四种服务类型:
简单rpc
就是一般的rpc调用,一个请求对象对应一个返回对象
proto
语法,参考stream.proto
文件://简单rpc rpc SimpleFun(RequestData) returns (ResponseData){}
- 服务实现
@Override public void simpleFun(RequestData request, StreamObserver<ResponseData> responseObserver) { // TODO Auto-generated method stub System.out.println("请求参数:" + request.getText()); responseObserver.onNext(ResponseData.newBuilder().setText("hello gRPC").build()); responseObserver.onCompleted(); }
- server实现,后三种服务类型server实现相同
package simple.fun; import java.io.IOException; import stream.ServiceImpl; import stream.StreamServiceGrpc; import stream.StreamServiceGrpc.StreamService; public class Server { private static int port = 8883; private static io.grpc.Server server; public void run() { StreamService serviceImpl = new ServiceImpl(); server = io.grpc.ServerBuilder.forPort(port).addService(StreamServiceGrpc.bindService(serviceImpl)).build(); try { server.start(); System.out.println("Server start success on port:" + port); server.awaitTermination(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { Server server = new Server(); server.run(); } }
- client实现
package simple.fun; import java.io.FileNotFoundException; import java.io.IOException; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import stream.Stream.RequestData; import stream.Stream.ResponseData; import stream.StreamServiceGrpc; import stream.StreamServiceGrpc.StreamServiceBlockingStub; public class Client { public static void main(String[] args) throws FileNotFoundException, IOException { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8883).usePlaintext(true).build(); StreamServiceBlockingStub stub = StreamServiceGrpc.newBlockingStub(channel); long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { RequestData requestData = RequestData.newBuilder().setText("hello world").build(); ResponseData responseData = stub.simpleFun(requestData); System.out.println(responseData.getText()); } System.out.println(System.currentTimeMillis() - start + " MS"); channel.shutdown(); } }
- 执行结果
服务端流式rpc
一个请求对象,服务端可以传回多个结果对象
proto
语法,参考stream.proto
文件://服务器端流式rpc rpc ServerSideStreamFun(RequestData) returns (stream ResponseData){}
- 服务实现
@Override public void serverSideStreamFun(RequestData request, StreamObserver<ResponseData> responseObserver) { // TODO Auto-generated method stub System.out.println("请求参数:" + request.getText()); for (int i = 0; i < 10; i++) { responseObserver.onNext(ResponseData.newBuilder().setText("你好" + i).build()); } responseObserver.onCompleted(); }
- client实现
package server.side.stream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Iterator; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import stream.Stream.RequestData; import stream.Stream.ResponseData; import stream.StreamServiceGrpc; import stream.StreamServiceGrpc.StreamServiceBlockingStub; public class Client { public static void main(String[] args) throws FileNotFoundException, IOException { RequestData requestData = RequestData.newBuilder().setText("hello world").build(); ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8883).usePlaintext(true).build(); StreamServiceBlockingStub stub = StreamServiceGrpc.newBlockingStub(channel); Iterator<ResponseData> it = stub.serverSideStreamFun(requestData); long start = System.currentTimeMillis(); while (it.hasNext()) { System.out.println(it.next().getText()); } channel.shutdown(); System.out.println(System.currentTimeMillis() - start + " MS"); } }
- 执行结果
-
客户端流式rpc
客户端传入多个请求对象,服务端返回一个响应结果
proto
语法,参考stream.proto
文件://客户端流式rpc rpc ClientSideStreamFun(stream RequestData) returns (ResponseData){}
- 服务实现
@Override public StreamObserver<RequestData> clientSideStreamFun(StreamObserver<ResponseData> responseObserver) { // TODO Auto-generated method stub return new StreamObserver<RequestData>() { private ResponseData.Builder builder = ResponseData.newBuilder(); @Override public void onNext(RequestData value) { // TODO Auto-generated method stub System.out.println("请求参数:" + value.getText()); } @Override public void onError(Throwable t) { // TODO Auto-generated method stub } @Override public void onCompleted() { // TODO Auto-generated method stub builder.setText("数据接收完成"); responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } }; }
- client实现
package client.side.stream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Iterator; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import stream.Stream.RequestData; import stream.Stream.ResponseData; import stream.StreamServiceGrpc; import stream.StreamServiceGrpc.StreamServiceBlockingStub; import stream.StreamServiceGrpc.StreamServiceStub; public class Client { public static void main(String[] args) throws FileNotFoundException, IOException, InterruptedException { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8883).usePlaintext(true).build(); StreamServiceStub asyncStub = StreamServiceGrpc.newStub(channel); StreamObserver<ResponseData> responseData = new StreamObserver<ResponseData>() { @Override public void onNext(ResponseData value) { // TODO Auto-generated method stub System.out.println(value.getText()); } @Override public void onError(Throwable t) { // TODO Auto-generated method stub t.printStackTrace(); } @Override public void onCompleted() { // TODO Auto-generated method stub // 关闭channel channel.shutdown(); } }; StreamObserver<RequestData> requestData = asyncStub.clientSideStreamFun(responseData); long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { requestData.onNext(RequestData.newBuilder().setText("你好" + i).build()); } requestData.onCompleted(); System.out.println(System.currentTimeMillis() - start + " MS"); // 由于是异步获得结果,所以sleep 10秒 Thread.sleep(10000); } }
- 执行结果
双向流式rpc
结合客户端流式rpc和服务端流式rpc,可以传入多个对象,返回多个响应对象
proto
语法,参考stream.proto
文件://双向流式rpc rpc TwoWayStreamFun(stream RequestData) returns (stream ResponseData){}
- 服务实现
@Override public StreamObserver<RequestData> twoWayStreamFun(StreamObserver<ResponseData> responseObserver) { // TODO Auto-generated method stub return new StreamObserver<RequestData>() { @Override public void onNext(RequestData value) { // TODO Auto-generated method stub System.out.println("请求参数:" + value.getText()); responseObserver.onNext(ResponseData.newBuilder().setText(value.getText() + ",欢迎你的加入").build()); } @Override public void onError(Throwable t) { // TODO Auto-generated method stub t.printStackTrace(); } @Override public void onCompleted() { // TODO Auto-generated method stub responseObserver.onCompleted(); } }; }
- client实现
package two.way.stream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Iterator; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import stream.Stream.RequestData; import stream.Stream.ResponseData; import stream.StreamServiceGrpc; import stream.StreamServiceGrpc.StreamServiceBlockingStub; import stream.StreamServiceGrpc.StreamServiceStub; public class Client { public static void main(String[] args) throws FileNotFoundException, IOException, InterruptedException { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8883).usePlaintext(true).build(); StreamServiceStub asyncStub = StreamServiceGrpc.newStub(channel); StreamObserver<ResponseData> responseData = new StreamObserver<ResponseData>() { @Override public void onNext(ResponseData value) { // TODO Auto-generated method stub System.out.println(value.getText()); } @Override public void onError(Throwable t) { // TODO Auto-generated method stub t.printStackTrace(); } @Override public void onCompleted() { // TODO Auto-generated method stub // 关闭channel channel.shutdown(); } }; StreamObserver<RequestData> requestData = asyncStub.twoWayStreamFun(responseData); long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { requestData.onNext(RequestData.newBuilder().setText("你好" + i).build()); } requestData.onCompleted(); System.out.println(System.currentTimeMillis() - start + " MS"); // 由于是异步获得结果,所以sleep 10秒 Thread.sleep(10000); } }
- 执行结果
- 完整代码
https://github.com/gm19900510/stream_grpc_java