zoukankan      html  css  js  c++  java
  • netty 自定义通讯协议

    Netty中,通讯的双方建立连接后,会把数据按照ByteBuf的方式进行传输,例如http协议中,就是通过HttpRequestDecoder对ByteBuf数据流进行处理,转换成http的对象。基于这个思路,我自定义一种通讯协议:Server和客户端直接传输java对象。

    实现的原理是通过Encoder把java对象转换成ByteBuf流进行传输,通过Decoder把ByteBuf转换成java对象进行处理,处理逻辑如下图所示:

    传输的java bean为Person:

    [java] view plain copy
    1. package com.guowl.testobjcoder;  
    2.   
    3. import java.io.Serializable;  
    4.   
    5. // 必须实现Serializable接口  
    6. public class Person implements Serializable{  
    7.     private static final long   serialVersionUID    = 1L;  
    8.     private String  name;  
    9.     private String  sex;  
    10.     private int     age;  
    11.   
    12.     public String toString() {  
    13.         return "name:" + name + " sex:" + sex + " age:" + age;  
    14.     }  
    15.   
    16.     public String getName() {  
    17.         return name;  
    18.     }  
    19.   
    20.     public void setName(String name) {  
    21.         this.name = name;  
    22.     }  
    23.   
    24.     public String getSex() {  
    25.         return sex;  
    26.     }  
    27.   
    28.     public void setSex(String sex) {  
    29.         this.sex = sex;  
    30.     }  
    31.   
    32.     public int getAge() {  
    33.         return age;  
    34.     }  
    35.   
    36.     public void setAge(int age) {  
    37.         this.age = age;  
    38.     }  
    39. }  

    Server端类:Server PersonDecoder BusinessHandler

    1、Server:启动netty服务

    [java] view plain copy
    1. package com.guowl.testobjcoder;  
    2.   
    3. import io.netty.bootstrap.ServerBootstrap;  
    4. import io.netty.channel.ChannelFuture;  
    5. import io.netty.channel.ChannelInitializer;  
    6. import io.netty.channel.ChannelOption;  
    7. import io.netty.channel.EventLoopGroup;  
    8. import io.netty.channel.nio.NioEventLoopGroup;  
    9. import io.netty.channel.socket.SocketChannel;  
    10. import io.netty.channel.socket.nio.NioServerSocketChannel;  
    11.   
    12. public class Server {  
    13.     public void start(int port) throws Exception {  
    14.         EventLoopGroup bossGroup = new NioEventLoopGroup();   
    15.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
    16.         try {  
    17.             ServerBootstrap b = new ServerBootstrap();   
    18.             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)   
    19.                     .childHandler(new ChannelInitializer<SocketChannel>() {   
    20.                                 @Override  
    21.                                 public void initChannel(SocketChannel ch) throws Exception {  
    22.                                     ch.pipeline().addLast(new PersonDecoder());  
    23.                                     ch.pipeline().addLast(new BusinessHandler());  
    24.                                 }  
    25.                             }).option(ChannelOption.SO_BACKLOG, 128)   
    26.                     .childOption(ChannelOption.SO_KEEPALIVE, true);   
    27.   
    28.             ChannelFuture f = b.bind(port).sync();   
    29.   
    30.             f.channel().closeFuture().sync();  
    31.         } finally {  
    32.             workerGroup.shutdownGracefully();  
    33.             bossGroup.shutdownGracefully();  
    34.         }  
    35.     }  
    36.   
    37.     public static void main(String[] args) throws Exception {  
    38.         Server server = new Server();  
    39.         server.start(8000);  
    40.     }  
    41. }  

    2、PersonDecoder:把ByteBuf流转换成Person对象,其中ByteBufToBytes是读取ButeBuf的工具类,上一篇文章中提到过,在此不在详述。ByteObjConverter是byte和obj的互相转换的工具。

    [java] view plain copy
    1. package com.guowl.testobjcoder;  
    2.   
    3. import io.netty.buffer.ByteBuf;  
    4. import io.netty.channel.ChannelHandlerContext;  
    5. import io.netty.handler.codec.ByteToMessageDecoder;  
    6.   
    7. import java.util.List;  
    8.   
    9. import com.guowl.utils.ByteBufToBytes;  
    10. import com.guowl.utils.ByteObjConverter;  
    11.   
    12. public class PersonDecoder extends ByteToMessageDecoder {  
    13.     @Override  
    14.     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {  
    15.         ByteBufToBytes read = new ByteBufToBytes();  
    16.         Object obj = ByteObjConverter.ByteToObject(read.read(in));  
    17.         out.add(obj);  
    18.     }  
    19.   
    20. }  

    3、BusinessHandler 读取Person信息,并打印

    [java] view plain copy
    1. package com.guowl.testobjcoder;  
    2.   
    3. import io.netty.channel.ChannelHandlerContext;  
    4. import io.netty.channel.ChannelInboundHandlerAdapter;  
    5.   
    6. import org.slf4j.Logger;  
    7. import org.slf4j.LoggerFactory;  
    8.   
    9. public class BusinessHandler extends ChannelInboundHandlerAdapter {  
    10.     private Logger  logger  = LoggerFactory.getLogger(BusinessHandler.class);  
    11.   
    12.     @Override  
    13.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
    14.         Person person = (Person) msg;  
    15.         logger.info("BusinessHandler read msg from client :" + person);  
    16.     }  
    17.   
    18.     @Override  
    19.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
    20.         ctx.flush();  
    21.     }  
    22.       
    23.     @Override  
    24.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
    25.           
    26.     }  
    27. }  

    Client端的类:Client ClientInitHandler PersonEncoder

    1、Client 建立与Server的连接

    [java] view plain copy
    1. package com.guowl.testobjcoder;  
    2.   
    3. import io.netty.bootstrap.Bootstrap;  
    4. import io.netty.channel.ChannelFuture;  
    5. import io.netty.channel.ChannelInitializer;  
    6. import io.netty.channel.ChannelOption;  
    7. import io.netty.channel.EventLoopGroup;  
    8. import io.netty.channel.nio.NioEventLoopGroup;  
    9. import io.netty.channel.socket.SocketChannel;  
    10. import io.netty.channel.socket.nio.NioSocketChannel;  
    11.   
    12. public class Client {  
    13.     public void connect(String host, int port) throws Exception {  
    14.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
    15.   
    16.         try {  
    17.             Bootstrap b = new Bootstrap();   
    18.             b.group(workerGroup);   
    19.             b.channel(NioSocketChannel.class);   
    20.             b.option(ChannelOption.SO_KEEPALIVE, true);   
    21.             b.handler(new ChannelInitializer<SocketChannel>() {  
    22.                 @Override  
    23.                 public void initChannel(SocketChannel ch) throws Exception {  
    24.                     ch.pipeline().addLast(new PersonEncoder());  
    25.                     ch.pipeline().addLast(new ClientInitHandler());  
    26.                 }  
    27.             });  
    28.   
    29.             ChannelFuture f = b.connect(host, port).sync();  
    30.             f.channel().closeFuture().sync();  
    31.         } finally {  
    32.             workerGroup.shutdownGracefully();  
    33.         }  
    34.   
    35.     }  
    36.   
    37.     public static void main(String[] args) throws Exception {  
    38.         Client client = new Client();  
    39.         client.connect("127.0.0.1", 8000);  
    40.     }  
    41. }  

    2、ClientInitHandler 向Server发送Person对象

    [java] view plain copy
    1. package com.guowl.testobjcoder;  
    2.   
    3. import io.netty.channel.ChannelHandlerContext;  
    4. import io.netty.channel.ChannelInboundHandlerAdapter;  
    5.   
    6. import org.slf4j.Logger;  
    7. import org.slf4j.LoggerFactory;  
    8.   
    9. public class ClientInitHandler extends ChannelInboundHandlerAdapter {  
    10.     private static Logger   logger  = LoggerFactory.getLogger(ClientInitHandler.class);  
    11.     @Override  
    12.     public void channelActive(ChannelHandlerContext ctx) throws Exception {  
    13.         logger.info("HelloClientIntHandler.channelActive");  
    14.         Person person = new Person();  
    15.         person.setName("guowl");  
    16.         person.setSex("man");  
    17.         person.setAge(30);  
    18.         ctx.write(person);  
    19.         ctx.flush();  
    20.     }  
    21. }  

    3、PersonEncoder 把Person对象转换成ByteBuf进行传送

    [java] view plain copy
    1. package com.guowl.testobjcoder;  
    2.   
    3. import com.guowl.utils.ByteObjConverter;  
    4.   
    5. import io.netty.buffer.ByteBuf;  
    6. import io.netty.channel.ChannelHandlerContext;  
    7. import io.netty.handler.codec.MessageToByteEncoder;  
    8.   
    9. public class PersonEncoder extends MessageToByteEncoder<Person> {  
    10.   
    11.     @Override  
    12.     protected void encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throws Exception {  
    13.         byte[] datas = ByteObjConverter.ObjectToByte(msg);  
    14.         out.writeBytes(datas);  
    15.         ctx.flush();  
    16.     }  
    17. }  

    工具类:ByteObjConverter

    [java] view plain copy
    1. package com.guowl.utils;  
    2.   
    3. import java.io.ByteArrayInputStream;  
    4. import java.io.ByteArrayOutputStream;  
    5. import java.io.IOException;  
    6. import java.io.ObjectInputStream;  
    7. import java.io.ObjectOutputStream;  
    8.   
    9. public class ByteObjConverter {  
    10.     public static Object ByteToObject(byte[] bytes) {  
    11.         Object obj = null;  
    12.         ByteArrayInputStream bi = new ByteArrayInputStream(bytes);  
    13.         ObjectInputStream oi = null;  
    14.         try {  
    15.             oi = new ObjectInputStream(bi);  
    16.             obj = oi.readObject();  
    17.         } catch (Exception e) {  
    18.             e.printStackTrace();  
    19.         } finally {  
    20.             try {  
    21.                 bi.close();  
    22.             } catch (IOException e) {  
    23.                 e.printStackTrace();  
    24.             }  
    25.             try {  
    26.                 oi.close();  
    27.             } catch (IOException e) {  
    28.                 e.printStackTrace();  
    29.             }  
    30.         }  
    31.         return obj;  
    32.     }  
    33.   
    34.     public static byte[] ObjectToByte(Object obj) {  
    35.         byte[] bytes = null;  
    36.         ByteArrayOutputStream bo = new ByteArrayOutputStream();  
    37.         ObjectOutputStream oo = null;  
    38.         try {  
    39.             oo = new ObjectOutputStream(bo);  
    40.             oo.writeObject(obj);  
    41.             bytes = bo.toByteArray();  
    42.         } catch (Exception e) {  
    43.             e.printStackTrace();  
    44.         } finally {  
    45.             try {  
    46.                 bo.close();  
    47.             } catch (IOException e) {  
    48.                 e.printStackTrace();  
    49.             }  
    50.             try {  
    51.                 oo.close();  
    52.             } catch (IOException e) {  
    53.                 e.printStackTrace();  
    54.             }  
    55.         }  
    56.         return (bytes);  
    57.     }  
    58. }  

    通过上述代码,实现了Server端与Client端直接使用person对象进行通信的目的。基于此,可以构建更为复杂的场景:Server端同时支撑多种协议,不同的协议采用不同的Decoder进行解析,解析结果保持统一,这样业务处理类可以保持接口一致。下一节将编写这样一个案例。

    本例中需要注意的事项是:

    1、Person对象必须实现Serializable接口,否则不能进行序列化。

    2、PersonDecoder读取ByteBuf数据的时候,并没有对多次流式数据进行处理,而是简单的一次性接收,如果数据量大的情况下,可能会出现数据不完整,这个问题会在后续的学习中解决。

  • 相关阅读:
    idea spring boot 1.x junit单元测试
    linux oracle/jdk启用大页面
    jdk8之CompletableFuture与CompletionService
    gc日志深入解析-覆盖CMS、并行GC、G1、ZGC、openj9
    h2 web console使用
    LockSupport工具类详解
    反射、Unsafe、直接调用性能大比拼
    spring boot druid动态多数据源监控集成
    Linux网络
    org.springframework.jdbc.CannotGetJdbcConnectionException: Could not get JDBC Connection总结
  • 原文地址:https://www.cnblogs.com/zeroone/p/8490904.html
Copyright © 2011-2022 走看看