请求报文:前四位(指定报文长度)+报文内容
示例:0010aaooerudyh
1.1、NettyServer类 :启动TCP服务
package com.bokeyuan.socket.server; import com.bokeyuan.socket.BeanUtil; import com.bokeyuan.socket.config.ConfigConstant; import com.bokeyuan.socket.server.codec.MyByteToMessageCodec; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.nio.charset.Charset; /** * @author: chenly * @date: 2021-07-05 10:42 * @description: * @version: 1.0 */ @Component @Slf4j public class NettyServer extends Thread{ @Override public void run() { startServer(); } private void startServer(){ EventLoopGroup bossGroup = null; EventLoopGroup workGroup = null; ServerBootstrap serverBootstrap = null; ChannelFuture future = null; try { //初始化线程组 bossGroup= new NioEventLoopGroup(); workGroup= new NioEventLoopGroup(); //初始化服务端配置 serverBootstrap= new ServerBootstrap(); //绑定线程组 serverBootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(BeanUtil.getBean(MyByteToMessageCodec.class)); ch.pipeline().addLast(BeanUtil.getBean(MtReadTimeoutHandler.class)); ch.pipeline().addLast(new StringDecoder(Charset.forName(ConfigConstant.MsgCode))); ch.pipeline().addLast(new StringEncoder(Charset.forName(ConfigConstant.MsgCode))); ch.pipeline().addLast(BeanUtil.getBean(NettyServerHandler.class)); } }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); future = serverBootstrap.bind(new InetSocketAddress(8090)).sync(); log.info(" *************TCP服务端启动成功 Port:{}*********** " , 8090); } catch (Exception e) { log.error("TCP服务端启动异常",e); } finally { if(future!=null){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("channel关闭异常:",e); } } if(bossGroup!=null){ //线程组资源回收 bossGroup.shutdownGracefully(); } if(workGroup!=null){ //线程组资源回收 workGroup.shutdownGracefully(); } } } }
1.2、NettyServerHandler类 继承ChannelInboundHandlerAdapter 类
package com.bokeyuan.socket.server; import com.bokeyuan.socket.MsgHandler; import com.bokeyuan.socket.StringUtil; import com.bokeyuan.socket.config.ConfigConstant; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; /** * @author: void * @date: 2021-07-05 10:43 * @description: * @version: 1.0 */ @Component @Scope("prototype") @Slf4j public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Autowired private MsgHandler msgHandler; /** * * 业务数据处理 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { int len = 4; String response = ""; String recieved = ""; try{ recieved = (String) msg; log.info("服务端接收到的内容为:{}",msg); //小于4位不处理 if(recieved.length()< len){ log.info("报文小于4位"); ctx.close(); return; } String length = recieved.substring(0,4); //非数字不处理 if(!StringUtil.isNumeric(length)){ log.info("报文前四位不是数字"); ctx.close(); return; } //报文长度小于前四位指定的长度 不处理 System.out.println(recieved.getBytes(ConfigConstant.MsgCode).length); if(recieved.getBytes(ConfigConstant.MsgCode).length-len < Integer.parseInt(length)){ log.info("报文长度不够"); ctx.close(); return; } //去掉前四位表示长度的内容,截取指定长度的内容 System.out.println(recieved); byte[] msgbytes = recieved.getBytes(ConfigConstant.MsgCode); //截取报文前四位长度的报文 byte[] tempMsg = new byte[Integer.parseInt(length)]; System.arraycopy(msgbytes, 4, tempMsg, 0, tempMsg.length); response =msgHandler.handler(new String(tempMsg, ConfigConstant.MsgCode)); log.info("服务端响应的的内容为:{}",response); }catch (Exception e) { log.error("报文解析异常,报文内容为:"+msg, e); }finally { ctx.writeAndFlush(response.getBytes(ConfigConstant.MsgCode)).addListener(ChannelFutureListener.CLOSE); } } /** *从客户端收到新的数据、读取完成---调用 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("从客户端收到新的数据读取完成********"); if(ctx!=null){ ctx.flush(); } } /** * 客户端与服务端建立连接--执行 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); ctx.channel().read(); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); //此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接 log.info("客户端与服务端建立连接:{}", clientIp); } /** * 客户端与服务端断连-调用 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); if(ctx!=null){ //断开连接时,服务端关闭连接,避免造成资源浪费 ctx.close(); } log.info("客户端与服务端断连:{}", clientIp); } /** * 当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if(ctx!=null){ //抛出异常,断开与客户端的连接 ctx.close(); log.error("连接异常,服务端主动断开连接{}",cause); } } /** * 服务端read超时-调用 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); if(ctx!=null){ //超时时断开连接 ctx.close(); } log.error("服务端read超时:{}", clientIp); } private String getClientIp(ChannelHandlerContext ctx){ InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIP = insocket.getAddress().getHostAddress(); return clientIP; } }
1.3、MsgHandler类 :业务处理
package com.bokeyuan.socket; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.List; /** * @author: void * @date: 2021-07-05 12:37 * @description: * @version: 1.0 */ @Component @Scope("prototype") @Slf4j public class MsgHandler { /** * 业务出路 * @param msg 请求报文 * @return */ public String handler(String msg){ log.info("收到的报文内容为"+msg); //业务处理 //..... return "success"; } }
1.4、MyByteToMessageCodec类 处理拆包粘包 继承ByteToMessageCodec类
package com.bokeyuan.socket.server.codec; import com.bokeyuan.socket.config.ConfigConstant; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageCodec; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.List; @Slf4j @Component @Scope("prototype") public class MyByteToMessageCodec extends ByteToMessageCodec<Object> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { int msgLength = 4; if(buf.readableBytes() < msgLength) {// 不足长度4(开始4位代表整个报文长度位),无法获取长度 return; } //记下readerIndex buf.markReaderIndex(); //获取前4位表示报文长度的字符串 String lenstr = getMsgLength(buf,msgLength); //转为整型 int length = toInt(lenstr); if(length <= 0) { ctx.close(); return; } //判断报文长度是否到达报文前四位指定的长度 if(buf.readableBytes() < length) { //重置到上一次调用markReaderIndex()的readerIndex位置 buf.resetReaderIndex(); return; } //报文内容(不包含前四位,前四位在前面已经被读取) byte[] data = getBytes(buf); String content = new String(data, ConfigConstant.MsgCode); //msg=报文前四位长度+报文内容 String msg = lenstr+content; out.add(msg); ctx.writeAndFlush(out); } @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { byte[] body = (byte[]) msg; out.writeBytes(body); } /** * 读取ByteBuf字节内容 * @param buf * @return */ private byte[] getBytes(ByteBuf buf){ int readablebytes = buf.readableBytes(); ByteBuf tempBuf = buf.readBytes(readablebytes); byte[] data = new byte[readablebytes];//数据大小 tempBuf.getBytes(0, data); return data; } /** * 读取ByteBuf字节内容 * @param buf * @return */ private byte[] getBytes2(ByteBuf buf){ byte[] data = new byte[buf.readableBytes()];//数据大小 buf.getBytes(0, data); return data; } /** * 读取ByteBuf中指定长度内容 * @param buf ByteBuf * @param len 读取字节长度 * @return */ private String getMsgLength(ByteBuf buf,int len){ byte[] bytes = new byte[len]; ByteBuf lengBuf = buf.readBytes(len); lengBuf.getBytes(0, bytes); return new String(bytes); } private int toInt(String lenstr){ try { return Integer.parseInt(lenstr); } catch (NumberFormatException e) { log.info("报文前四位:{}不是有效数字",lenstr); return 0; } } }
1.5、MyReadTimeoutHandler类 处理客户端长时间未发数据给服务端情况 继承ReadTimeoutHandler类
package com.bokeyuan.socket.server; import com.bokeyuan.socket.config.ConfigConstant; import io.netty.handler.timeout.ReadTimeoutHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @Slf4j @Component @Scope("prototype") public class MyReadTimeoutHandler extends ReadTimeoutHandler { public MyReadTimeoutHandler() { //客户端长时间没有发送数据给服务端,socket服务端主动断开 super(ConfigConstant.READ_TIMEOUT,TimeUnit.SECONDS); } }
1.6、BeanUtil类
package com.bokeyuan.socket; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @description: * @author: void * @create: 2019-11-07 **/ @Component public class BeanUtil implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if (BeanUtil.applicationContext == null) { BeanUtil.applicationContext = applicationContext; } } /** * Get Bean by clazz. * * @param clazz Class * @param <T> class type * @return Bean instance */ public static <T> T getBean(Class<T> clazz) { if (applicationContext == null) { return null; } return applicationContext.getBean(clazz); } @SuppressWarnings("unchecked") public static <T> T getBean(String beanId) { if (applicationContext == null) { return null; } return (T) applicationContext.getBean(beanId); } }
1.7、常量类
package com.bokeyuan.socket.config; /** * @author: void * @date: 2021-09-03 14:06 * @description: * @version: 1.0 */ public class ConfigConstant { public static String MsgCode = "GBK"; /**服务端读取超时时间*/ public static long READ_TIMEOUT; }
1.8、启动类
package com; import com.bokeyuan.socket.server.NettyServer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author : void * @version : 2.0 * @date : 2020/6/11 10:08 */ @SpringBootApplication public class Application implements CommandLineRunner { @Autowired private NettyServer nettyServer; @Override public void run(String... args) throws Exception { nettyServer.start(); } public static void main(String[] args) { SpringApplication.run(Application.class,args); } }
出现过的问题及解决方法
1、MyByteToMessageCodec is not a @Sharable handler, so can't be added or removed multiple times.
解决方法
在MyByteToMessageCodec类上添加@Scope("prototype")设置为多例模式
2、报错 java.lang.UnsupportedOperationException: direct buffer
使用以下代码读取Bytebuf报了下图错误
这个写法jdk1.6支持,jdk1.8不支持
改用如下方法读取ByteBuf字节
3、ByteToMessageDecoder中的decode方法执行多次的问题
参考地址:https://blog.csdn.net/u011412234/article/details/54929360?locationNum=1&fps=1
解决方法:
因为使用了错误的方法(下图getBytes2(ByteBuf))读取ByteBuf中的字节,导致readerIndex没有变化,一直没有读完,所有decode方法一直调用。
改为下图getBytes(ByteBuf)方法读取