zoukankan      html  css  js  c++  java
  • 高性能NIO框架Netty-对象传输

    http://cxytiandi.com/blog/detail/17403

    上篇文章高性能NIO框架Netty入门篇我们对Netty做了一个简单的介绍,并且写了一个入门的Demo,客户端往服务端发送一个字符串的消息,服务端回复一个字符串的消息,今天我们来学习下在Netty中怎么使用对象来传输数据。

    上篇文章中传输字符串我们用的是框架自带的StringEncoder,StringDecoder编解码器,现在想要通过对象来传输数据,该怎么弄呢?

    既然StringEncoder和StringDecoder可以传输字符串,我们来看看这2个类的源码不就知道它们到底做了一些什么工作。

    StringEncoder

    1. public class StringEncoder extends MessageToMessageEncoder<CharSequence> {
    2. // TODO Use CharsetEncoder instead.
    3. private final Charset charset;
    4. /**
    5. * Creates a new instance with the current system character set.
    6. */
    7. public StringEncoder() {
    8. this(Charset.defaultCharset());
    9. }
    10. /**
    11. * Creates a new instance with the specified character set.
    12. */
    13. public StringEncoder(Charset charset) {
    14. if (charset == null) {
    15. throw new NullPointerException("charset");
    16. }
    17. this.charset = charset;
    18. }
    19. @Override
    20. protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
    21. if (msg.length() == 0) {
    22. return;
    23. }
    24. out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
    25. }
    26. }

    通过继承MessageToMessageEncoder,重写encode方法来进行编码操作,就是将字符串进行输出即可

    StringDecoder

    1. public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {
    2. // TODO Use CharsetDecoder instead.
    3. private final Charset charset;
    4. /**
    5. * Creates a new instance with the current system character set.
    6. */
    7. public StringDecoder() {
    8. this(Charset.defaultCharset());
    9. }
    10. /**
    11. * Creates a new instance with the specified character set.
    12. */
    13. public StringDecoder(Charset charset) {
    14. if (charset == null) {
    15. throw new NullPointerException("charset");
    16. }
    17. this.charset = charset;
    18. }
    19. @Override
    20. protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
    21. out.add(msg.toString(charset));
    22. }
    23. }

    继承MessageToMessageDecoder,重写decode方法,将ByteBuf数据直接转成字符串进行输出,解码完成。

    通过上面的源码分析,我们发现编解码的原理无非就是在数据传输前进行一次处理,接收后进行一次处理,在网络中传输的数据都是字节,我们现在想要传PO对象,那么必然需要进行编码和解码2个步骤,我们可以自定义编解码器来对对象进行序列化,然后通过ByteBuf的形式进行传输, 传输对象需要实现java.io.Serializable接口。

    首先我们定义一个传输对象,实现序列化接口,暂时先定义2个字段,一个ID,用来标识客户端,一个内容字段,代码如下:

    1. public class Message implements Serializable {
    2. private static final long serialVersionUID = -7543514952950971498L;
    3. private String id;
    4. private String content;
    5. public String getId() {
    6. return id;
    7. }
    8. public void setId(String id) {
    9. this.id = id;
    10. }
    11. public String getContent() {
    12. return content;
    13. }
    14. public void setContent(String content) {
    15. this.content = content;
    16. }
    17. }

    传输对象定好后,定义对象的编解码器。

    对象编码器

    对象序列化成字节,通过ByteBuf形式进行传输,ByteBuf是一个byte存放的缓冲区,提供了读写操作。

    1. public class MessageEncoder extends MessageToByteEncoder<Message> {
    2. @Override
    3. protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) throws Exception {
    4. byte[] datas = ByteUtils.objectToByte(message);
    5. out.writeBytes(datas);
    6. ctx.flush();
    7. }
    8. }

    对象解码器

    接收ByteBuf数据,将ByteBuf反序列化成对象

    1. public class MessageDecoder extends ByteToMessageDecoder {
    2. @Override
    3. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    4. Object obj = ByteUtils.byteToObject(ByteUtils.read(in));
    5. out.add(obj);
    6. }
    7. }

    将上篇文章中服务端的编解码器改成对象编解码器:

    1. public class ImServer {
    2. public void run(int port) {
    3. EventLoopGroup bossGroup = new NioEventLoopGroup();
    4. EventLoopGroup workerGroup = new NioEventLoopGroup();
    5. ServerBootstrap bootstrap = new ServerBootstrap();
    6. bootstrap.group(bossGroup, workerGroup)
    7. .channel(NioServerSocketChannel.class)
    8. .childHandler(new ChannelInitializer<SocketChannel>() {
    9. @Override
    10. public void initChannel(SocketChannel ch) throws Exception {
    11. //实体类传输数据,jdk序列化
    12. ch.pipeline().addLast("decoder", new MessageDecoder());
    13. ch.pipeline().addLast("encoder", new MessageEncoder());
    14. ch.pipeline().addLast(new ServerPoHandler());
    15. //字符串传输数据
    16. /*ch.pipeline().addLast("decoder", new StringDecoder());
    17. ch.pipeline().addLast("encoder", new StringEncoder());
    18. ch.pipeline().addLast(new ServerStringHandler());*/
    19. }
    20. })
    21. .option(ChannelOption.SO_BACKLOG, 128)
    22. .childOption(ChannelOption.SO_KEEPALIVE, true);
    23. try {
    24. ChannelFuture f = bootstrap.bind(port).sync();
    25. f.channel().closeFuture().sync();
    26. } catch (InterruptedException e) {
    27. e.printStackTrace();
    28. } finally {
    29. workerGroup.shutdownGracefully();
    30. bossGroup.shutdownGracefully();
    31. }
    32. }
    33. }

    接下来编写服务端的消息处理类:

    1. public class ServerPoHandler extends ChannelInboundHandlerAdapter {
    2. @Override
    3. public void channelRead(ChannelHandlerContext ctx, Object msg) {
    4. Message message = (Message) msg;
    5. System.err.println("server:" + message.getId());
    6. ctx.writeAndFlush(message);
    7. }
    8. @Override
    9. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    10. cause.printStackTrace();
    11. ctx.close();
    12. }
    13. }

    服务端改造好了之后,就要改造客户端了,同样的道理,客户端和服务端的编解码器都要一致才行

    客户端连接时指定对象编解码器和对象消息处理类,代码如下:

    1. public class ImConnection {
    2. private Channel channel;
    3. public Channel connect(String host, int port) {
    4. doConnect(host, port);
    5. return this.channel;
    6. }
    7. private void doConnect(String host, int port) {
    8. EventLoopGroup workerGroup = new NioEventLoopGroup();
    9. try {
    10. Bootstrap b = new Bootstrap();
    11. b.group(workerGroup);
    12. b.channel(NioSocketChannel.class);
    13. b.option(ChannelOption.SO_KEEPALIVE, true);
    14. b.handler(new ChannelInitializer<SocketChannel>() {
    15. @Override
    16. public void initChannel(SocketChannel ch) throws Exception {
    17. //实体类传输数据,jdk序列化
    18. ch.pipeline().addLast("decoder", new MessageDecoder());
    19. ch.pipeline().addLast("encoder", new MessageEncoder());
    20. ch.pipeline().addLast(new ClientPoHandler());
    21. //字符串传输数据
    22. /*ch.pipeline().addLast("decoder", new StringDecoder());
    23. ch.pipeline().addLast("encoder", new StringEncoder());
    24. ch.pipeline().addLast(new ClientStringHandler());*/
    25. }
    26. });
    27. ChannelFuture f = b.connect(host, port).sync();
    28. channel = f.channel();
    29. } catch(Exception e) {
    30. e.printStackTrace();
    31. }
    32. }
    33. }

    客户端消息处理类:

    1. /**
    2. * 当编解码器为实体对象时时用来接收数据
    3. * @author yinjihuan
    4. *
    5. */
    6. public class ClientPoHandler extends ChannelInboundHandlerAdapter {
    7. @Override
    8. public void channelRead(ChannelHandlerContext ctx, Object msg) {
    9. Message message = (Message) msg;
    10. System.out.println("client:" + message.getContent());
    11. }
    12. @Override
    13. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    14. cause.printStackTrace();
    15. ctx.close();
    16. }
    17. }

    客户端启动类也需要改造,将发送字符串的消息变成对象消息

    1. public class ImClientApp {
    2. public static void main(String[] args) {
    3. String host = "127.0.0.1";
    4. int port = 2222;
    5. Channel channel = new ImConnection().connect(host, port);
    6. //对象传输数据
    7. Message message = new Message();
    8. message.setId(UUID.randomUUID().toString().replaceAll("-", ""));
    9. message.setContent("hello yinjihuan");
    10. channel.writeAndFlush(message);
    11. //字符串传输数据
    12. //channel.writeAndFlush("yinjihuan");
    13. }
    14. }

    源码参考:https://github.com/yinjihuan/netty-im

  • 相关阅读:
    基于jquery 的插件,让IE支持placeholder属性
    MongoDB入门_MongoDB安装与配置
    MongoDB入门_MongoDB特色
    MongoDB入门_相关网站
    MongoDB入门_学习目标
    Shell编程
    redis数据类型及基本命令
    redis配置文件详解
    redis命令
    安装运行redis
  • 原文地址:https://www.cnblogs.com/yaowen/p/9562806.html
Copyright © 2011-2022 走看看