zoukankan      html  css  js  c++  java
  • Netty实现Tcp服务端

    请求报文:前四位(指定报文长度)+报文内容

    示例:0010aaooerudyh

    1.1NettyServer:启动TCP服务

    package com.bokeyuan.socket.server;
    
    import com.bokeyuan.socket.BeanUtil;
    import com.bokeyuan.socket.config.ConfigConstant;
    import com.bokeyuan.socket.server.codec.MyByteToMessageCodec;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    import java.net.InetSocketAddress;
    import java.nio.charset.Charset;
    
    /**
     * @author: chenly
     * @date: 2021-07-05 10:42
     * @description:
     * @version: 1.0
     */
    @Component
    @Slf4j
    public class NettyServer extends Thread{
    
    
        @Override
        public void run() {
            startServer();
        }
    
        private void startServer(){
            EventLoopGroup bossGroup = null;
            EventLoopGroup workGroup = null;
            ServerBootstrap serverBootstrap = null;
            ChannelFuture future = null;
            try {
                //初始化线程组
                bossGroup= new NioEventLoopGroup();
                workGroup= new NioEventLoopGroup();
                //初始化服务端配置
                serverBootstrap= new ServerBootstrap();
                //绑定线程组
                serverBootstrap.group(bossGroup,workGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch)
                                    throws Exception {
                                ch.pipeline().addLast(BeanUtil.getBean(MyByteToMessageCodec.class));
                                ch.pipeline().addLast(BeanUtil.getBean(MtReadTimeoutHandler.class));
                                ch.pipeline().addLast(new StringDecoder(Charset.forName(ConfigConstant.MsgCode)));
                                ch.pipeline().addLast(new StringEncoder(Charset.forName(ConfigConstant.MsgCode)));
                                ch.pipeline().addLast(BeanUtil.getBean(NettyServerHandler.class));
    
                            }
                        }).option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
                future = serverBootstrap.bind(new InetSocketAddress(8090)).sync();
                log.info(" *************TCP服务端启动成功 Port:{}*********** " , 8090);
            } catch (Exception e) {
                log.error("TCP服务端启动异常",e);
            } finally {
                if(future!=null){
                    try {
                        future.channel().closeFuture().sync();
                    } catch (InterruptedException e) {
                        log.error("channel关闭异常:",e);
                    }
                }
                if(bossGroup!=null){
                    //线程组资源回收
                    bossGroup.shutdownGracefully();
                }
    
                if(workGroup!=null){
                    //线程组资源回收
                    workGroup.shutdownGracefully();
                }
    
    
            }
        }
    
    }
    View Code

    1.2NettyServerHandler 继承ChannelInboundHandlerAdapter 类

    package com.bokeyuan.socket.server;
    
    import com.bokeyuan.socket.MsgHandler;
    import com.bokeyuan.socket.StringUtil;
    import com.bokeyuan.socket.config.ConfigConstant;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Scope;
    import org.springframework.stereotype.Component;
    
    import java.net.InetSocketAddress;
    
    /**
     * @author: void
     * @date: 2021-07-05 10:43
     * @description:
     * @version: 1.0
     */
    @Component
    @Scope("prototype")
    @Slf4j
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        @Autowired
        private MsgHandler msgHandler;
        /**
         *
         * 业务数据处理
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            int len = 4;
            String response = "";
            String recieved = "";
            try{
                recieved  = (String) msg;
                log.info("服务端接收到的内容为:{}",msg);
                //小于4位不处理
                if(recieved.length()< len){
                    log.info("报文小于4位");
                    ctx.close();
                    return;
                }
                String length = recieved.substring(0,4);
                //非数字不处理
                if(!StringUtil.isNumeric(length)){
                    log.info("报文前四位不是数字");
                    ctx.close();
                    return;
                }
                //报文长度小于前四位指定的长度 不处理
                System.out.println(recieved.getBytes(ConfigConstant.MsgCode).length);
                if(recieved.getBytes(ConfigConstant.MsgCode).length-len < Integer.parseInt(length)){
                    log.info("报文长度不够");
                    ctx.close();
                    return;
                }
                //去掉前四位表示长度的内容,截取指定长度的内容
                System.out.println(recieved);
                byte[] msgbytes = recieved.getBytes(ConfigConstant.MsgCode);
                //截取报文前四位长度的报文
                byte[] tempMsg = new byte[Integer.parseInt(length)];
                System.arraycopy(msgbytes, 4, tempMsg, 0, tempMsg.length);
                response =msgHandler.handler(new String(tempMsg, ConfigConstant.MsgCode));
                log.info("服务端响应的的内容为:{}",response);
            }catch (Exception e) {
                log.error("报文解析异常,报文内容为:"+msg, e);
            }finally {
                ctx.writeAndFlush(response.getBytes(ConfigConstant.MsgCode)).addListener(ChannelFutureListener.CLOSE);
            }
        }
    
        /**
         *从客户端收到新的数据、读取完成---调用
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            log.info("从客户端收到新的数据读取完成********");
            if(ctx!=null){
                ctx.flush();
            }
        }
    
        /**
         * 客户端与服务端建立连接--执行
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            ctx.channel().read();
            InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIp = insocket.getAddress().getHostAddress();
            //此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接
            log.info("客户端与服务端建立连接:{}", clientIp);
        }
    
    
        /**
         * 客户端与服务端断连-调用
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
            InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIp = insocket.getAddress().getHostAddress();
            if(ctx!=null){
                //断开连接时,服务端关闭连接,避免造成资源浪费
                ctx.close();
            }
            log.info("客户端与服务端断连:{}", clientIp);
        }
    
    
        /**
         * 当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if(ctx!=null){
                //抛出异常,断开与客户端的连接
                ctx.close();
                log.error("连接异常,服务端主动断开连接{}",cause);
            }
    
        }
    
        /**
         * 服务端read超时-调用
         * @param ctx
         * @param evt
         * @throws Exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    
            InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIp = insocket.getAddress().getHostAddress();
            if(ctx!=null){
                //超时时断开连接
                ctx.close();
            }
            log.error("服务端read超时:{}", clientIp);
        }
    
    
    
        private String  getClientIp(ChannelHandlerContext ctx){
            InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIP = insocket.getAddress().getHostAddress();
            return clientIP;
        }
    }
    View Code

    1.3MsgHandler类 :业务处理

    package com.bokeyuan.socket;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Scope;
    import org.springframework.stereotype.Component;
    
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * @author: void
     * @date: 2021-07-05 12:37
     * @description:
     * @version: 1.0
     */
    @Component
    @Scope("prototype")
    @Slf4j
    public class MsgHandler {
    
    
        /**
         * 业务出路
         * @param msg 请求报文
         * @return
         */
        public String handler(String msg){
            log.info("收到的报文内容为"+msg);
            //业务处理
            //.....
            return "success";
    
        }
    
    
    }
    View Code

    1.4MyByteToMessageCodec类 处理拆包粘包 继承ByteToMessageCodec类

    package com.bokeyuan.socket.server.codec;
    import com.bokeyuan.socket.config.ConfigConstant;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageCodec;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.annotation.Scope;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    
    @Slf4j
    @Component
    @Scope("prototype")
    public class MyByteToMessageCodec extends ByteToMessageCodec<Object>
    {
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception
        {
            int msgLength = 4;
            if(buf.readableBytes() < msgLength)
            {// 不足长度4(开始4位代表整个报文长度位),无法获取长度
                return;
            }
            //记下readerIndex
            buf.markReaderIndex();
    
            //获取前4位表示报文长度的字符串
            String lenstr = getMsgLength(buf,msgLength);
            //转为整型
            int length = toInt(lenstr);
            if(length <= 0)
            {
               ctx.close();
                return;
            }
    
            //判断报文长度是否到达报文前四位指定的长度
            if(buf.readableBytes() < length)
            {
                //重置到上一次调用markReaderIndex()的readerIndex位置
                buf.resetReaderIndex();
                return;
            }
    
            //报文内容(不包含前四位,前四位在前面已经被读取)
            byte[] data = getBytes(buf);
            String content = new String(data, ConfigConstant.MsgCode);
    
            //msg=报文前四位长度+报文内容
            String msg = lenstr+content;
            out.add(msg);
            ctx.writeAndFlush(out);
        }
    
        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception
        {
            byte[] body = (byte[]) msg;
            out.writeBytes(body);
        }
    
    
        /**
         * 读取ByteBuf字节内容
         * @param buf
         * @return
         */
        private byte[] getBytes(ByteBuf buf){
            int readablebytes = buf.readableBytes();
            ByteBuf tempBuf = buf.readBytes(readablebytes);
            byte[] data = new byte[readablebytes];//数据大小
            tempBuf.getBytes(0, data);
            return data;
        }
    
        /**
         * 读取ByteBuf字节内容
         * @param buf
         * @return
         */
        private byte[] getBytes2(ByteBuf buf){
            byte[] data = new byte[buf.readableBytes()];//数据大小
            buf.getBytes(0, data);
            return data;
        }
    
        /**
         * 读取ByteBuf中指定长度内容
         * @param buf ByteBuf
         * @param len 读取字节长度
         * @return
         */
        private String getMsgLength(ByteBuf buf,int len){
            byte[] bytes = new byte[len];
            ByteBuf lengBuf = buf.readBytes(len);
            lengBuf.getBytes(0, bytes);
            return  new String(bytes);
        }
    
    
        private int toInt(String lenstr){
            try {
                return Integer.parseInt(lenstr);
            } catch (NumberFormatException e) {
                log.info("报文前四位:{}不是有效数字",lenstr);
                return 0;
            }
    
        }
    
    }
    View Code

    1.5MyReadTimeoutHandler 处理客户端长时间未发数据给服务端情况 继承ReadTimeoutHandler类

    package com.bokeyuan.socket.server;
    
    import com.bokeyuan.socket.config.ConfigConstant;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.annotation.Scope;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    @Component
    @Scope("prototype")
    public class MyReadTimeoutHandler extends ReadTimeoutHandler {
    
        public MyReadTimeoutHandler()
        {
            //客户端长时间没有发送数据给服务端,socket服务端主动断开
            super(ConfigConstant.READ_TIMEOUT,TimeUnit.SECONDS);
        }
    
       
    }
    View Code

    1.6、BeanUtil类

    package com.bokeyuan.socket;
    
    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.stereotype.Component;
    
    /**
     * @description:
     * @author: void
     * @create: 2019-11-07
     **/
    @Component
    public class BeanUtil implements ApplicationContextAware {
    
        private static ApplicationContext applicationContext;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            if (BeanUtil.applicationContext == null) {
                BeanUtil.applicationContext = applicationContext;
            }
        }
    
        /**
         * Get Bean by clazz.
         *
         * @param clazz Class
         * @param <T>   class type
         * @return Bean instance
         */
        public static <T> T getBean(Class<T> clazz) {
            if (applicationContext == null) {
                return null;
            }
            return applicationContext.getBean(clazz);
        }
    
        @SuppressWarnings("unchecked")
        public static <T> T getBean(String beanId) {
            if (applicationContext == null) {
                return null;
            }
            return (T) applicationContext.getBean(beanId);
        }
    
    }
    View Code

    1.7、常量类

    package com.bokeyuan.socket.config;
    
    /**
     * @author: void
     * @date: 2021-09-03 14:06
     * @description:
     * @version: 1.0
     */
    public class ConfigConstant {
    
        public static String MsgCode = "GBK";
        /**服务端读取超时时间*/
        public static long READ_TIMEOUT;
    }
    View Code

    1.8、启动类

    package com;
    import com.bokeyuan.socket.server.NettyServer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    /**
     * @author : void
     * @version : 2.0
     * @date : 2020/6/11 10:08
     */
    @SpringBootApplication
    public class Application implements CommandLineRunner {
    
        @Autowired
        private NettyServer nettyServer;
    
    
        @Override
        public void run(String... args) throws Exception {
            nettyServer.start();
        }
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class,args);
        }
    }
    View Code

    出现过的问题及解决方法

    1MyByteToMessageCodec is not a @Sharable handler, so can't be added or removed multiple times.

    解决方法

    在MyByteToMessageCodec类上添加@Scope("prototype")设置为多例模式

    2、报错 java.lang.UnsupportedOperationException: direct buffer

    使用以下代码读取Bytebuf报了下图错误

     

     

     这个写法jdk1.6支持,jdk1.8不支持

     改用如下方法读取ByteBuf字节

     

    3ByteToMessageDecoder中的decode方法执行多次的问题

     

     参考地址:https://blog.csdn.net/u011412234/article/details/54929360?locationNum=1&fps=1

     解决方法:

    因为使用了错误的方法(下图getBytes2(ByteBuf))读取ByteBuf中的字节,导致readerIndex没有变化,一直没有读完,所有decode方法一直调用。

    改为下图getBytes(ByteBuf)方法读取

     

    作者:小念
    本文版权归作者和博客园共有,欢迎转载,但必须给出原文链接,并保留此段声明,否则保留追究法律责任的权利。
  • 相关阅读:
    springboot整合Quartz框架
    安装 和 配置 HBase
    HBase 安装之后版本的验证的bug:(错误的替换、找不到或无法加载主类、SLF4J)
    HBase基本知识和应用场景
    修改idea的临时数据存放目录(默认保存在C盘用户目录下的.IntelliJIdea2020.3)
    Eclipse中格式化代码快捷键Ctrl+Shift+F失效的解决办法(关于快捷键失效原因可能是与输入法的快捷键冲突)
    参考大数据厦门大学林子雨编著的《大数据技术原理与应用(第3版)》中第三课《HDFS编程实践(Hadoop3.1.3)》遇到的bug
    框架设计思维符合语义即可使用,而不用关心底层的实现
    Ubuntu下无法输入中文问题解决
    HDFS编程实践(Hadoop3.1.3)
  • 原文地址:https://www.cnblogs.com/kiko2014551511/p/15238829.html
Copyright © 2011-2022 走看看