zoukankan      html  css  js  c++  java
  • netty的对象传输

    pom

    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>5.0.0.Alpha2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling -->
            <dependency>
                <groupId>org.jboss.marshalling</groupId>
                <artifactId>jboss-marshalling-serial</artifactId>
                <version>2.0.0.Beta2</version>
            </dependency>

    处理对象的工具类   MarshallingCodeCFactory

    package com.jiagoushi.lzh.网络编程.netty1.serial;
    
    import io.netty.handler.codec.marshalling.*;
    import org.jboss.marshalling.MarshallerFactory;
    import org.jboss.marshalling.Marshalling;
    import org.jboss.marshalling.MarshallingConfiguration;
    
    /**
     * Marshalling工厂
     * @author(liuzhonghua)
     * @since 2018-9-9
     */
    public final class MarshallingCodeCFactory {
    
        /**
         * 创建Jboss Marshalling解码器MarshallingDecoder
         * @return MarshallingDecoder
         */
        public static MarshallingDecoder buildMarshallingDecoder() {
            //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            //创建了MarshallingConfiguration对象,配置了版本号为5
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            //根据marshallerFactory和configuration创建provider
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
            MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
            return decoder;
        }
    
        /**
         * 创建Jboss Marshalling编码器MarshallingEncoder
         * @return MarshallingEncoder
         */
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
    
    }

    实体类

    req

    package com.jiagoushi.lzh.网络编程.netty1.serial;
    
    import lombok.Data;
    
    import java.io.Serializable;
    @Data
    public class Req implements Serializable{
    
        private static final long  SerialVersionUID = 1L;
        
        private String id ;
        private String name ;
        private String requestMessage ;
        private byte[] attachment;
    }

    resp

    package com.jiagoushi.lzh.网络编程.netty1.serial;
    
    import lombok.Data;
    
    import java.io.Serializable;
    @Data
    public class Resp implements Serializable{
        
        private static final long serialVersionUID = 1L;
        
        private String id;
        private String name;
        private String responseMessage;
    }

    server服务端

    package com.jiagoushi.lzh.网络编程.netty1.serial;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    public class Server {
    
        public static void main(String[] args) throws Exception{
            
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
            
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 1024)
             //设置日志
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ServerHandler());
                }
            });
            
            ChannelFuture cf = b.bind(8765).sync();
            
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
            
        }
    }

    服务端处理类

    package com.jiagoushi.lzh.网络编程.netty1.serial;
    
    import com.jiagoushi.lzh.网络编程.utils.GzipUtils;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.io.File;
    import java.io.FileOutputStream;
    
    public class ServerHandler extends ChannelHandlerAdapter{
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Req req = (Req)msg;
            System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());
            byte[] attachment = GzipUtils.ungzip(req.getAttachment());
            
            String path = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "00"+req.getId()+".jpg";
            FileOutputStream fos = new FileOutputStream(path);
            fos.write(attachment);
            fos.close();
            
            Resp resp = new Resp();
            resp.setId(req.getId());
            resp.setName("resp" + req.getId());
            resp.setResponseMessage("响应内容" + req.getId());
            ctx.writeAndFlush(resp);//.addListener(ChannelFutureListener.CLOSE);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
        
        
    }

    client客户端

    package com.jiagoushi.lzh.网络编程.netty1.serial;
    
    import com.jiagoushi.lzh.网络编程.utils.GzipUtils;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    import java.io.File;
    import java.io.FileInputStream;
    
    public class Client {
    
        
        public static void main(String[] args) throws Exception{
            
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ClientHandler());
                }
            });
            
            ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
    
            for(int i = 0; i < 5; i++ ){
                Req req = new Req();
                req.setId("" + i);
                req.setName("pro" + i);
                req.setRequestMessage("数据信息" + i);    
                String path = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "001.jpg";
                File file = new File(path);
                FileInputStream in = new FileInputStream(file);  
                byte[] data = new byte[in.available()];  
                in.read(data);  
                in.close(); 
                req.setAttachment(GzipUtils.gzip(data));
                cf.channel().writeAndFlush(req);
            }
    
            cf.channel().closeFuture().sync();
            group.shutdownGracefully();
        }
    }

    客户端处理类

    package com.jiagoushi.lzh.网络编程.netty1.serial;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelHandlerAdapter{
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                Resp resp = (Resp)msg;
                System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
        
    }
  • 相关阅读:
    kafka 订单应用需求
    Centos7快速搭建博客
    Error: Cannot retrieve metalink for repository: epel. Please verify its path and try again
    5G关键技术评述
    ganglia安装
    zookeeper安装
    hadoop安装过程
    redis持久化的操作和介绍
    windows下的anacond使用pip安装组件操作
    Java客户端连接Hbase,并创建表(超详细)
  • 原文地址:https://www.cnblogs.com/coder-lzh/p/9611238.html
Copyright © 2011-2022 走看看