zoukankan      html  css  js  c++  java
  • java和golang通过protobuf协议相互通信

    因为设备的通信协议准备采用protobuf,所以准备这篇protobuf的使用入门,golang作为客户端,java作为服务端,这才能真正体现出protobuf的无关语言特性。

    本文采用protobuf2,注重于如何快速入门使用,并不会涉及到具体的细节知识点。


    整体结构说明

    golang作为客户端,java作为服务端,protobuf2为两者的通信协议格式。


    protobuf2文件

    • protobuf2简介

      详细说明

    • helloworld.proto

      syntax = "proto2";
      
      package proto;
      
      message ProtocolMessage {
          message SearchRequest{
              required string name = 1;
              optional int32 search = 2 ;
          }
      
          message ActionRequest{
              required string name = 1;
              optional int32 action = 2 ;
          }
      
          message SearchResponse{
              required string name = 1;
              optional int32 search = 2 ;
          }
      
          message ActionResponse{
              required string name = 1;
              optional int32 action = 2 ;
          }
      
      
          optional SearchRequest searchRequest = 1;
          optional ActionRequest actionRequest = 2;
          optional SearchResponse searchResponse = 3;
          optional ActionResponse actionResponse = 4;
      }
      
      • SearchRequestSearchResponse为对应的请求和相应message;
      • ActionRequestActionResponse为对应的请求和相应message;
      • 由于服务端使用netty框架,限制了只能接受一个message进行编码解码,所以把SearchRequestSearchResponseActionRequestActionResponse都内嵌到ProtocolMessage中,通过对ProtocolMessage编码解码进行数据交互。

    golang客户端

    目录结构

    client_proto/
    ├── api
    │   ├── proto # 存放proto协议文件以及生产的pd.go文件
    │   	├── helloworld.pb.go
    │   	└── helloworld.proto
    ├── cmd
    │   	├── main.go
    │   	├── util
    │   		└── util.go
    

    采用go mod 进行开发

    生成pb.go文件

    • 安装proto

      自行百度......

    • 在.proto文件处,输入protoc --go_out=./ helloworld.proto

    • 即可生成helloworld.pb.go文件

    main.go

    package main
    
    import (
    	"github.com/gin-gonic/gin"
    	proto "grpc/api/grpc_proto"
    	"grpc/cmd/demo3/util"
    	"net/http"
    	"time"
    )
    
    func init()  {
    	util.InitTransfer()
    }
    
    func main() {
    	router := gin.Default()
    	// search 测试
    	router.GET("/search", func(c *gin.Context) {
    		name := "search"
    		search := int32(12)
    		message := &proto.ProtocolMessage{
    			SearchRequest:&proto.ProtocolMessage_SearchRequest{
    				Name:&name,
    				Search:&search,
    			},
    		}
    
    		if err := util.G_transfer.SendMsg(message); err != nil {
    			c.JSON(500, gin.H{
    				"err": err.Error(),
    			})
    			return
    		}
    
    
    		if err := util.G_transfer.ReadResponse(message); err != nil {
    			c.JSON(500, gin.H{
    				"err": err.Error(),
    			})
    
    			return
    		}
    
    		c.JSON(200, gin.H{
    			"message": message.SearchResponse.Name,
    		})
    	})
    
    	// action测试
    	router.GET("/action", func(c *gin.Context) {
    		name := "action"
    		action := int32(34)
    		message := &proto.ProtocolMessage{
    			ActionRequest: &proto.ProtocolMessage_ActionRequest{
    				Name:   &name,
    				Action: &action,
    			},
    		}
    
    		if err := util.G_transfer.SendMsg(message); err != nil {
    			c.JSON(500, gin.H{
    				"err": err.Error(),
    			})
    		}
    
    		if err := util.G_transfer.ReadResponse(message); err != nil {
    			c.JSON(500, gin.H{
    				"err": err.Error(),
    			})
    		}
    
    		c.JSON(200, gin.H{
    			"message": message.ActionResponse.Name,
    		})
    	})
    
    
    	ReadTimeout := time.Duration(60) * time.Second
    	WriteTimeout := time.Duration(60) * time.Second
    
    	s := &http.Server{
    		Addr:          ":8090",
    		Handler:        router,
    		ReadTimeout:    ReadTimeout,
    		WriteTimeout:   WriteTimeout,
    		MaxHeaderBytes: 1 << 20,
    	}
    
    	s.ListenAndServe()
    }
    

    util.go

    package util
    
    import (
    	"encoding/binary"
    	"errors"
    	"github.com/gogo/protobuf/proto"
    	grpc_proto "grpc/api/grpc_proto"
    	"net"
    )
    var (
    	G_transfer *Transfer
    )
    
    func InitTransfer()  {
    	var (
    		pTCPAddr *net.TCPAddr
    		conn net.Conn
    		err error
    	)
    	if pTCPAddr, err = net.ResolveTCPAddr("tcp", "127.0.0.1:3210"); err != nil {
    		return
    	}
    
    	if conn, err = net.DialTCP("tcp", nil, pTCPAddr); err != nil {
    		return
    	}
    
    	// 定义 Transfer 指针变量
    	G_transfer = &Transfer{
    		Conn:  conn,
    	}
    }
    
    // 声明 Transfer 结构体
    type Transfer struct {
    	Conn          net.Conn       // 连接
    	Buf           [1024 * 2]byte // 传输时,使用的缓冲
    }
    
    
    // 获取并解析服务器的消息
    func (transfer *Transfer) ReadResponse(response *grpc_proto.ProtocolMessage) (err error) {
    	_, err = transfer.Conn.Read(transfer.Buf[:4])
    	if err != nil {
    		return
    	}
    
    	// 根据 buf[:4] 转成一个 uint32 类型
    	var pkgLen uint32
    	pkgLen = binary.BigEndian.Uint32(transfer.Buf[:4])
    	//根据pkglen 读取消息内容
    	n, err := transfer.Conn.Read(transfer.Buf[:pkgLen])
    	if n != int(pkgLen) || err != nil {
    		return
    	}
    
    	if err = proto.Unmarshal(transfer.Buf[:pkgLen], response); err != nil {
    		return
    	}
    	return
    }
    
    // 发送消息到服务器
    func (transfer *Transfer) SendMsg(action *grpc_proto.ProtocolMessage) (err error) {
    	var (
    		sendBytes []byte
    		readLen   int
    	)
    	//sendBytes, ints := action.Descriptor()
    	if sendBytes, err = proto.Marshal(action); err != nil {
    		return
    	}
    
    	pkgLen := uint32(len(sendBytes))
    	var buf [4]byte
    	binary.BigEndian.PutUint32(buf[:4],pkgLen)
    
    	if readLen, err = transfer.Conn.Write(buf[:4]); readLen != 4 && err != nil {
    		if readLen == 0 {
    			return errors.New("发送数据长度发生异常,长度为0")
    		}
    		return
    	}
    	// 发送消息
    	if readLen, err = transfer.Conn.Write(sendBytes); err != nil {
    		if readLen == 0 {
    			return errors.New("检查到服务器关闭,客户端也关闭")
    		}
    		return
    	}
    	return
    }
    
    • 这里发送消息和读取消息都需要先发送/解析数据的长度,然后发送/解析数据本身;
    • 这里与服务端怎么样解析/发送数据有关,这是由于netty框架中定义的编码解码器决定的。

    java服务端

    目录结构

    server_proto/
    ├── src
    │   ├── main 
    │   	├── java
    │   		├── com
    │   			├── dust
    │   				├── proto_server
    │   					├── config
    │   						└── NettyConfig.java 
    │   					├── netty
    │   						└── NettyServerListener.java 
    │   						└── SocketServerHandler.java 
    │   					├── proto
    │   						└── Helloworld.java 
    │   						└── helloworld.proto # proto配置文件
    │   					└── Application.java # 启动配置类
    │   	├── resources
    │   		└── application.yml #配置文件
    │   ├── test
    └── pom.xml # maven配置文件
    

    采用springBoot+netty+maven开发

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.6.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.dust</groupId>
        <artifactId>proto_server</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>proto_server</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <!-- protobuf依赖-->
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>3.8.0</version>
            </dependency>
    
            <dependency>
                <groupId>com.googlecode.protobuf-java-format</groupId>
                <artifactId>protobuf-java-format</artifactId>
                <version>1.2</version>
            </dependency>
    
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.19.Final</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    • 注意:protobuf-java的版本为3.8.0,必须和安装proto.exe的版本保持一致。

    application.yml

    # netty配置
    netty:
      # 端口号
      port: 3210
      # 最大线程数
      maxThreads: 1024
      # 数据包的最大长度
      max_frame_length: 65535
    

    NettyConfig.java

    package com.dust.proto_server.config;
    
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    @Data
    @Component
    @ConfigurationProperties(prefix = "netty")
    public class NettyConfig {
        private int port;
    }
    

    生成Helloworld.java

    • 在.proto文件处,输入protoc --java_out=./ helloworld.proto
    • 即可生成Helloworld.java文件

    SocketServerHandler.java

    package com.dust.proto_server.netty;
    
    import com.dust.proto_server.proto.Helloworld;
    
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    
    @Component
    @ChannelHandler.Sharable
    public class SocketServerHandler extends ChannelInboundHandlerAdapter {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(SocketServerHandler.class);
    
        public ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx){
            Channel channel = ctx.channel();
            LOGGER.info(channel.id().toString()+"加入");
            CHANNEL_GROUP.add(channel);
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx){
            Channel channel = ctx.channel();
            LOGGER.info(channel.id().toString()+"退出");
            CHANNEL_GROUP.remove(channel);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    //
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            LOGGER.info("开始读取客户端发送过来的数据");
            Helloworld.ProtocolMessage protocolMessage = (Helloworld.ProtocolMessage) msg;
            Helloworld.ProtocolMessage.Builder builder = Helloworld.ProtocolMessage.newBuilder();
    
            if (protocolMessage.getSearchRequest().getSerializedSize() != 0) {
                Helloworld.ProtocolMessage.SearchRequest searchRequest = protocolMessage.getSearchRequest();
                LOGGER.info("searchRequest--{}",searchRequest);
    
                Helloworld.ProtocolMessage.SearchResponse searchResponse = Helloworld.ProtocolMessage.SearchResponse.newBuilder().setName("i am SearchResponse").setSearch(45).build();
                builder.setSearchResponse(searchResponse);
    
            } else if (protocolMessage.getActionRequest().getSerializedSize() != 0) {
                Helloworld.ProtocolMessage.ActionRequest actionRequest = protocolMessage.getActionRequest();
                LOGGER.info("actionRequest--{}",actionRequest);
    
                Helloworld.ProtocolMessage.ActionResponse actionResponse = Helloworld.ProtocolMessage.ActionResponse.newBuilder().setName("i am ActionResponse").setAction(67).build();
                builder.setActionResponse(actionResponse);
            }
    
            Helloworld.ProtocolMessage message = builder.build();
            // 发送数据长度
            ctx.channel().writeAndFlush(message.toByteArray().length);
             // 发送数据本身
            ctx.channel().writeAndFlush(message);
        }
    }
    

    NettyServerListener.java

    package com.dust.proto_server.netty;
    
    import com.dust.proto_server.config.NettyConfig;
    
    import com.dust.proto_server.proto.Helloworld;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PreDestroy;
    import javax.annotation.Resource;
    
    @Component
    public class NettyServerListener {
        /**
         * NettyServerListener 日志输出器
         *
         */
        private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener.class);
        /**
         * 创建bootstrap
         */
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        /**
         * BOSS
         */
        EventLoopGroup boss = new NioEventLoopGroup();
        /**
         * Worker
         */
        EventLoopGroup work = new NioEventLoopGroup();
    
        @Resource
        private SocketServerHandler socketServerHandler;
    
        /**
         * NETT服务器配置类
         */
        @Resource
        private NettyConfig nettyConfig;
    
        /**
         * 关闭服务器方法
         */
        @PreDestroy
        public void close() {
            LOGGER.info("关闭服务器....");
            //优雅退出
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    
        /**
         * 开启及服务线程
         */
        public void start() {
            // 从配置文件中(application.yml)获取服务端监听端口号
            int port = nettyConfig.getPort();
            serverBootstrap.group(boss, work).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 负责通过4字节Header指定的Body长度将消息切割
                            pipeline.addLast("frameDecoder",
                                    new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
                            // 负责将frameDecoder处理后的完整的一条消息的protobuf字节码转成ProtocolMessage对象
                            pipeline.addLast("protobufDecoder",
                                    new ProtobufDecoder(Helloworld.ProtocolMessage.getDefaultInstance()));
                            // 负责将写入的字节码加上4字节Header前缀来指定Body长度
                            pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                            // 负责将ProtocolMessage对象转成protobuf字节码
                            pipeline.addLast("protobufEncoder", new ProtobufEncoder());
    
                            pipeline.addLast(socketServerHandler);
                        }
    
                    }).option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO));
            try {
                LOGGER.info("netty服务器在[{}]端口启动监听", port);
                ChannelFuture f = serverBootstrap.bind(port).sync();
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                LOGGER.info("[出现异常] 释放资源");
                boss.shutdownGracefully();
                work.shutdownGracefully();
            }
        }
    }
    
    • 这个类就定义服务端是怎么样处理接受和发送数据的;

    • frameDecoderprotobufDecoder对应的handler用于解码Protobuf package数据包,他们都是Upstream Handles:先处理长度,然后再处理数据本身;

    • frameEncoderprotobufEncoder对应的handler用于编码Protobuf package数据包,他们都是Downstream Handles;

    • 此外还有一个handler,是一个自定义的Upstream Handles,用于开发者从网络数据中解析得到自己所需的数据socketServerHandler;

    • 上例Handles的执行顺序为

      upstream:frameDecoder,protobufDecoder,handler   //解码从Socket收到的数据 
      downstream:frameEncoder,protobufEncoder         //编码要通过Socket发送出去的数据
      

    Application.java

    package com.dust.proto_server;
    
    import com.dust.proto_server.netty.NettyServerListener;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import javax.annotation.Resource;
    
    @SpringBootApplication
    public class Application implements CommandLineRunner {
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
        
        @Resource
        private NettyServerListener nettyServerListener;
    
        @Override
        public void run(String... args) throws Exception {
            nettyServerListener.start();
        }
    }
    

    测试

    • 先启动服务端,再启动客户端

    • search测试

    • action测试

  • 相关阅读:
    C语言-第32课
    typeof和clamp
    C语言void*和*p的使用
    C语言二级指针和一级指针
    C语言结构体变量成员之指针变量成员的坑
    控制硬件三部曲
    C语言const char*使用
    jiffies是什么
    TPO3-1 Architecture
    相关关系|相关系数|线性关系|
  • 原文地址:https://www.cnblogs.com/dust90/p/11236581.html
Copyright © 2011-2022 走看看