zoukankan      html  css  js  c++  java
  • 网络编程 -- RPC实现原理 -- Netty -- 迭代版本V3 -- 编码解码

     网络编程 -- RPC实现原理 -- 目录

      啦啦啦

    V2——Netty -- pipeline.addLast(io.netty.handler.codec.MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>) 覆写编码解码方法。

     pipeline相当于拦截器。在pipeline中添加MessageToMessageCodec接口的实现类,该接口的实现类中的encode()方法自动将发送的Object对象转换为ByteBuf,decode()方法自动将接收的ByteBuf对象转换为Object

     

      Class : Server

    package lime.pri.limeNio.netty.netty03;
    
    import java.net.InetSocketAddress;
    import java.util.Date;
    import java.util.List;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.serializer.SerializerFeature;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.MessageToMessageCodec;
    import io.netty.util.CharsetUtil;
    import lime.pri.limeNio.netty.netty03.entity.User;
    
    public class Server {
    
        public static void main(String[] args) throws Exception {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            serverBootstrap.group(boss, worker);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new MessageToMessageCodec<ByteBuf, Object>() {
                        @Override
                        protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
                            System.out.println("-- -- 服务端编码");
                            out.add(Unpooled.buffer().writeBytes(JSON.toJSONString(msg,SerializerFeature.WriteClassName).getBytes(CharsetUtil.UTF_8)));
                        }
    
                        @Override
                        protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
                            System.out.println("-- -- 服务端解码");
                            out.add(JSON.parse(msg.toString(CharsetUtil.UTF_8)));
                        }
                    }).addLast(new ChannelHandlerAdapter() {
    
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            System.out.println("客户端请求数据:" + msg);
                            String request = (String) msg;
                            Object response = "请求参数不正确";
                            if("Query Date".equalsIgnoreCase(request)){
                                response = "当前系统时间:" + new Date().toString();
                            }else if("Query User".equalsIgnoreCase(request)){
                                response = new User(1,"lime",new Date());
                            }
                            ChannelFuture channelFuture = ctx.writeAndFlush(response);
                            channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
                            channelFuture.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                            channelFuture.addListener(ChannelFutureListener.CLOSE);
                        }
                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                            cause.printStackTrace();
                        }
    
                    });
                }
    
            });
            ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(9999)).sync();
            channelFuture.channel().closeFuture().sync();
            boss.close();
            worker.close();
        }
    }

      Class : Client

    package lime.pri.limeNio.netty.netty03;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.serializer.SerializerFeature;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.MessageToMessageCodec;
    import io.netty.util.CharsetUtil;
    
    public class Client {
    
        public static void main(String[] args) throws Exception {
            for (int i = 0; i < 10; i++) {
                new Thread() {
                    {
                        setDaemon(false);
                    }
    
                    public void run() {
                        try {
                            client();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    };
                }.start();
                Thread.sleep(1000);
            }
        }
    
        private static void client() throws Exception {
            Bootstrap bootstrap = new Bootstrap();
            EventLoopGroup worker = new NioEventLoopGroup();
            bootstrap.group(worker);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new MessageToMessageCodec<ByteBuf, Object>() {
    
                        @Override
                        protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
                            System.out.println("-- -- 客户端编码");
                            out.add(Unpooled.buffer().writeBytes(JSON.toJSONString(msg, SerializerFeature.WriteClassName).getBytes(CharsetUtil.UTF_8)));
                        }
    
                        @Override
                        protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
                            System.out.println("-- -- 客户端解码");
                            out.add(JSON.parse(msg.toString(CharsetUtil.UTF_8)));
                        }
                    }).addLast(new ChannelHandlerAdapter() {
                        /**
                         * 默认只捕获网络连接异常
                         */
                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                            System.out.println(cause);
                        }
    
                        /**
                         * 客户端发送经过JSON编码的byteBuf
                         */
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            String request = null;
                            switch ((int) (Math.random() * 10) % 3) {
                            case 0:
                                request = "Query Date";
                                break;
                            case 1:
                                request = "Query User";
                                break;
    
                            default:
                                request = "Query What?";
                                break;
                            }
                            ChannelFuture channelFuture = ctx.writeAndFlush(request);
                            channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
                            channelFuture.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                        }
    
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            System.out.println("服务端响应数据 --> " + msg);
                        }
    
                    });
                }
            });
            ChannelFuture channelFuture;
    
            channelFuture = bootstrap.connect(new InetSocketAddress(9999)).sync();
            channelFuture.channel().closeFuture().sync();
            worker.close();
        }
    }

    啦啦啦

  • 相关阅读:
    Func<T, TResult>的一个使用场合
    MongoDB 服务启动时指定dbpath
    IM平台即时聊天功能及服务介绍
    线程池参数设置
    List 去重的 6 种方法
    Java中List排序的3种方法!
    Redis 的缓存异常处理 —— 缓存雪崩、缓存击穿、缓存穿透
    吞吐量(TPS)、QPS、并发数、响应时间(RT)概念
    Java中try()catch{}的使用方法
    HashMap七种遍历的方法
  • 原文地址:https://www.cnblogs.com/ClassNotFoundException/p/7074482.html
Copyright © 2011-2022 走看看