zoukankan      html  css  js  c++  java
  • 网络编程 -- RPC实现原理 -- Netty -- 迭代版本V4 -- 粘包拆包

     网络编程 -- RPC实现原理 -- 目录

      啦啦啦

    V2——Netty -- 

      new LengthFieldPrepender(2) : 设置数据包 2 字节的特征码

      new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2) :  65535 :数据包长度、0:分隔符偏移值、2:分隔符长度、0:数据包偏移值、2:数据包长度。

      Class : Server

    package lime.pri.limeNio.netty.netty04;
    
    import java.net.InetSocketAddress;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.List;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.serializer.SerializerFeature;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    import io.netty.handler.codec.MessageToMessageCodec;
    import io.netty.util.CharsetUtil;
    import lime.pri.limeNio.netty.netty03.entity.User;
    
    public class Server {
    
        public static void main(String[] args) throws Exception {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            serverBootstrap.group(boss, worker);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new LengthFieldPrepender(2))
                            .addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2))
                            .addLast(new MessageToMessageCodec<ByteBuf, Object>() {
                                @Override
                                protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out)
                                        throws Exception {
                                    out.add(Unpooled.buffer().writeBytes(JSON.toJSONString(msg,SerializerFeature.WriteClassName).getBytes(CharsetUtil.UTF_8)));
                                }
    
                                @Override
                                protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
                                        throws Exception {
                                    out.add(JSON.parse(msg.toString(CharsetUtil.UTF_8)));
                                }
                            }).addLast(new ChannelHandlerAdapter() {
    
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    System.out.println("客户端(" + ctx.channel().remoteAddress() + ") 请求数据:" + msg);
                                    ChannelFuture channelFuture = null;
                                    
                                    String request = (String) msg;
                                    Calendar calendar = Calendar.getInstance();
                                    if("Query Date".equalsIgnoreCase(request)){
                                        for (int i = 1; i < 10; i++) {
                                            calendar.set(Calendar.DAY_OF_MONTH, i);
                                            channelFuture = ctx.writeAndFlush("当前系统时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(calendar.getTime()));
                                        }
                                    }else if("Query User".equalsIgnoreCase(request)){
                                        for (int i = 1; i < 10; i++) {
                                            channelFuture = ctx.writeAndFlush(new User(i,"lime_" + i,new Date()));
                                        }
                                    }else{
                                        channelFuture = ctx.writeAndFlush("请求参数不正确");
                                    }
                                    
                                    channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
                                    channelFuture.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                                    channelFuture.addListener(ChannelFutureListener.CLOSE);
                                }
    
                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                    System.out.println(cause);
                                }
    
                            });
                }
    
            });
            ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(9999)).sync();
            channelFuture.channel().closeFuture().sync();
            boss.close();
            worker.close();
        }
    }

      Class : Client

    package lime.pri.limeNio.netty.netty04;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.serializer.SerializerFeature;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    import io.netty.handler.codec.MessageToMessageCodec;
    import io.netty.util.CharsetUtil;
    
    public class Client {
    
        public static void main(String[] args) throws Exception {
            for (int i = 0; i < 1; i++) {
                new Thread() {
                    {
                        setDaemon(false);
                    }
    
                    public void run() {
                        try {
                            client();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    };
                }.start();
            }
        }
    
        private static void client() throws Exception {
            Bootstrap bootstrap = new Bootstrap();
            EventLoopGroup worker = new NioEventLoopGroup();
            bootstrap.group(worker);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new LengthFieldPrepender(2))
                            .addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2))
                            .addLast(new MessageToMessageCodec<ByteBuf, Object>() {
                                @Override
                                protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out)
                                        throws Exception {
                                    out.add(Unpooled.buffer().writeBytes(JSON.toJSONString(msg,SerializerFeature.WriteClassName).getBytes(CharsetUtil.UTF_8)));
                                }
    
                                @Override
                                protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
                                        throws Exception {
                                    out.add(JSON.parse(msg.toString(CharsetUtil.UTF_8)));
                                }
                            }).addLast(new ChannelHandlerAdapter() {
    
                                /**
                                 * 默认只捕获网络连接异常
                                 */
                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                    System.out.println(cause);
                                }
    
                                /**
                                 * 客户端发送经过JSON编码的byteBuf
                                 */
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    String request = null;
                                    switch (0) {
                                    case 0:
                                        request = "Query Date";
                                        break;
                                    case 1:
                                        request = "Query User";
                                        break;
                                    default:
                                        request = "Query What?";
                                        break;
                                    }
                                    ctx.writeAndFlush(request);
                                }
    
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    System.out.println("当前线程:" + Thread.currentThread() + " 服务端响应数据 --> " + msg);
                                }
    
                            });
                }
            });
            ChannelFuture channelFuture;
    
            channelFuture = bootstrap.connect(new InetSocketAddress(9999)).sync();
            channelFuture.channel().closeFuture().sync();
            worker.close();
        }
    }

      Console: Server

    客户端(/192.168.229.1:6280) 请求数据:Query Date

      Console : Client

    当前线程:Thread[nioEventLoopGroup-0-1,10,main] 服务端响应数据 --> 当前系统时间:2017-06-01 21:42:08
    当前线程:Thread[nioEventLoopGroup-0-1,10,main] 服务端响应数据 --> 当前系统时间:2017-06-02 21:42:08
    当前线程:Thread[nioEventLoopGroup-0-1,10,main] 服务端响应数据 --> 当前系统时间:2017-06-03 21:42:08
    当前线程:Thread[nioEventLoopGroup-0-1,10,main] 服务端响应数据 --> 当前系统时间:2017-06-04 21:42:08
    当前线程:Thread[nioEventLoopGroup-0-1,10,main] 服务端响应数据 --> 当前系统时间:2017-06-05 21:42:08
    当前线程:Thread[nioEventLoopGroup-0-1,10,main] 服务端响应数据 --> 当前系统时间:2017-06-06 21:42:08
    当前线程:Thread[nioEventLoopGroup-0-1,10,main] 服务端响应数据 --> 当前系统时间:2017-06-07 21:42:08
    当前线程:Thread[nioEventLoopGroup-0-1,10,main] 服务端响应数据 --> 当前系统时间:2017-06-08 21:42:08
    当前线程:Thread[nioEventLoopGroup-0-1,10,main] 服务端响应数据 --> 当前系统时间:2017-06-09 21:42:08

    啦啦啦

  • 相关阅读:
    数据库实例: STOREBOOK > 用户 > 编辑 用户: SYSTEM
    数据库实例: STOREBOOK > 用户 > 编辑 用户: SYSMAN
    数据库实例: STOREBOOK > 用户 > 编辑 用户: SYS
    [慢查优化]建索引时注意字段选择性 & 范围查询注意组合索引的字段顺序(转)
    面试常考知识
    TCP、UDP和HTTP详解
    TCP流量控制与拥塞控制
    主键与唯一性索引
    进程与线程的区别
    WEB中会话跟踪
  • 原文地址:https://www.cnblogs.com/ClassNotFoundException/p/7074572.html
Copyright © 2011-2022 走看看