zoukankan      html  css  js  c++  java
  • Java Netty 服务端向客户端发送16进制数据

    放假前夕,接手一个不太熟悉的任务,不过好在用的东西,比较熟,就是netty通讯。具体遇到什么问题嘞,我们来看一下。

    netty服务端可以接收消息,但是不能正确的发送消息给客户端,最开始看到的时候,没有注意到,会是编码问题,具体我们来看一下吧。

    在写的过程中,看到这篇文章,我才意识到,我可能被同事已有的代码误导了:

     这里比较郁闷的是,人家没有加编码,而我这边是,加了编码,初看 没意识到。然后在解码器里边去处理了接收的消息,还想发送消息出去。

    这里就不给大家看没改之前的,直接上正确代码。

    启动类:

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class NettyServer {
    
    	private static class SingletionWSServer {
    		static final NettyServer instance = new NettyServer();
    	}
    
    	public static NettyServer getInstance() {
    		return SingletionWSServer.instance;
    	}
    
    	private EventLoopGroup mainGroup;
    	private EventLoopGroup subGroup;
    	private ServerBootstrap server;
    	private ChannelFuture future;
    
    	public NettyServer() {
    		// 主线程组
    		mainGroup = new NioEventLoopGroup();
    
    		// 子线程组
    		subGroup = new NioEventLoopGroup();
    		// netty服务器的创建,ServerBootstrap是一个启动类
    		server = new ServerBootstrap();
    		server.group(mainGroup, subGroup)// 设置主从线程组
    				.option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO))
    				.channel(NioServerSocketChannel.class)// 设置nio双向通道
    				.childHandler(new NettyServerInitializer());// 子处理器,用于处理subGroup
    	}
    
    	/**
    	 * 启动
    	 */
    	public void bind(int port) {
    		try {
    			this.future = server.bind(port).sync();
    			System.err.println("netty websocket server 启动完毕...");
    			log.info("netty websocket server 启动完毕...");
    			this.future.channel().closeFuture().sync();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    			System.err.println("netty websocket server 启动异常..." + e.getMessage());
    			log.debug("netty websocket server 启动异常..." + e.getMessage());
    		}
    	}
    }
    

    NettyServerInitializer

    import com.slife.netty.coder.NettyMessageDecoder;
    import com.slife.netty.coder.NettyMessageEncoder;
    import com.slife.netty.handler.HeartBeatHandler;
    import com.slife.netty.handler.NettyServerHandler;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
    
    	@Override
    	protected void initChannel(SocketChannel ch) throws Exception {
    		log.info("初始化 SocketChannel");
    
    		ChannelPipeline pipeline = ch.pipeline();
    
    		// 自定义解码器
    		pipeline.addLast(new NettyMessageDecoder());
    
    		// 自定义编码器
    		pipeline.addLast(new NettyMessageEncoder());
    
    		// 自定义的空闲检测
    		pipeline.addLast(new HeartBeatHandler());
    		// ========================增加心跳支持 end ========================
    
    		/**
    		 *
    		 * @param maxFrameLength
    		 *            帧的最大长度
    		 * @param lengthFieldOffset
    		 *            length字段偏移的地址
    		 * @param lengthFieldLength
    		 *            length字段所占的字节长
    		 * @param lengthAdjustment
    		 *            修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段
    		 * @param initialBytesToStrip
    		 *            解析时候跳过多少个长度
    		 * @param failFast
    		 *            为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异
    		 */
    		pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 4, 4, 2, 0));
    
    		// 自定义hanler 处理解码消息并回复信息
    		pipeline.addLast(new NettyServerHandler());
    	}
    
    }
    

    这里需要解析一下的是这个类,LengthFieldBasedFrameDecoder,上述代码的注解是翻译过来的,定义的参数值,大家要依据自己的实际情况去设置。

    监控:HeartBeatHandler

    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    
    public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
    
    	@Override
    	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    
    		// 判断evt是否是IdleStateEvent(用于触发用户事件,包含 读空闲/写空闲/读写空闲)
    		if (evt instanceof IdleStateEvent) {
    			IdleStateEvent event = (IdleStateEvent) evt; // 强制类型转换
    
    			if (event.state() == IdleState.READER_IDLE) {
    				System.out.println("进入读空闲...");
    			} else if (event.state() == IdleState.WRITER_IDLE) {
    				System.out.println("进入写空闲...");
    			} else if (event.state() == IdleState.ALL_IDLE) {
    				System.out.println("channel关闭前channelGroup数量为:"+ NettyServerHandler.channelGroup.size());
    				System.out.println("进入读写空闲...");
    				Channel channel = ctx.channel();
    				//关闭无用的channel,以防资源浪费
    				channel.close();
    				System.out.println("channel关闭后channelGroup数量为:"+ NettyServerHandler.channelGroup.size());
    			}
    		}
    	}
    }
    

    解码器:NettyMessageDecoder

    import java.util.List;
    
    import com.netty.constant.Delimiter;
    import com.netty.pojo.GpsMessage;
    import com.netty.pojo.LoginMsg;
    import com.utils.CrcUtils;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    public class NettyMessageDecoder extends ByteToMessageDecoder {
    
    	@Override
    	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    		System.out.println("开始解码:");
    		int length = in.readableBytes();
    		if (length < Delimiter.MINIMUM_LENGTH)
    			return;
    		in.markReaderIndex(); // 我们标记一下当前的readIndex的位置
    
    		// 解码后消息对象
    		GpsMessage gpsMessage = new GpsMessage();
    		byte packetLen = in.readByte();
    		int nPacketLen = packetLen & 0xff;
    		gpsMessage.setPacketLen(nPacketLen);
    		/**
    		 * 协议
    		 */
    		byte agreement = in.readByte();
    		gpsMessage.setAgreement(agreement);
    		ByteBuf frame = null;
    		if (agreement == Delimiter.LOGIN_PACKET) { // 登录包
    			LoginMsg loginMsg = new LoginMsg();
    			frame = CrcUtils.decodeCodeIDFrame(ctx, in);
    			String sCode = CrcUtils.bytesToHexString(frame);
    			System.out.println("编号:" + sCode);
    			loginMsg.setCardId(sCode);
    			gpsMessage.setContent(loginMsg);
    		} else if (agreement == Delimiter.STATUS_PACKET) {// 心跳包
    			System.out.println(" 心跳包:");
    			frame = CrcUtils.decodeCodeIDFrame(ctx, in);
    			String sContent = CrcUtils.bytesToHexString(frame);
    			System.out.println("心跳包内容:" + sContent);
    			gpsMessage.setContent(sContent);
    		}
    		out.add(gpsMessage);
    		System.out.println("解码结束!");
    	}
    
    }
    

    编码器:NettyMessageEncoder

    import com.netty.pojo.GpsMessage;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    public class NettyMessageEncoder extends MessageToByteEncoder<GpsMessage> {
    
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, GpsMessage gpsMessage, ByteBuf byteBuf) throws Exception {
            // 2、写入数据包长度
            byteBuf.writeInt(gpsMessage.getPacketLen());
    
            // 3、写入请求类型
            byteBuf.writeByte(gpsMessage.getAgreement());
    
            // 4、写入预留字段
            //byteBuf.writeByte(nettyMessage.getHeader().getReserved());
    
            // 5、写入数据
            byteBuf.writeBytes(gpsMessage.getContent().toString().getBytes());
        }
    }
    

    处理消息的handler:

    import org.springframework.util.StringUtils;
    
    import com.netty.channel.CardChannelRel;
    import com.netty.constant.Delimiter;
    import com.netty.pojo.GpsMessage;
    import com.netty.pojo.LoginMsg;
    import com.utils.ConvertCode;
    import com.utils.CrcUtils;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufAllocator;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
    	// 用于记录和管理所有客户端的channel
    	public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
    		System.out.println("处理消息的handler:" + msg);
    		Channel currentChannel = ctx.channel();
    		// 1. 获取客户端发送的消息
    		GpsMessage gpsMessage = (GpsMessage) msg;
    		if (gpsMessage != null) {
    			
    			// 协议
    			byte agreement = gpsMessage.getAgreement();
    
    			String cardId = "";
    			if (agreement == Delimiter.LOGIN_PACKET) { // 登录包
    
    				LoginMsg loginMsg = (LoginMsg) gpsMessage.getContent();
    				cardId = loginMsg.getCardId();
    				CardChannelRel.put(cardId, currentChannel);
    				String sReply = "回复";
    				System.out.println(" 回复包:" + sReply);
    				CardChannelRel.output();
    				// 发送消息
    				writeToClient(sReply, currentChannel, "登录回复");
    			} else if (agreement == Delimiter.STATUS_PACKET) {// 心跳包
    				System.out.print("心跳包:");
    				String receiveStr = (String) gpsMessage.getContent();
    				System.out.println("心跳包内容:" + receiveStr);
    				writeToClient(receiveStr, currentChannel, "心跳包回复");
    			} else {
    				// 发送消息
    				// 从全局用户channel关系中获取接受方的channel
    				Channel receiverChannel = CardChannelRel.get(cardId);
    				if (receiverChannel != null) {
    					// 当receiverChannel不为空的时候,从 ChannelGroup 去查找对应的channel是否存在
    					Channel findChannel = channelGroup.find(receiverChannel.id());
    					if (findChannel != null) {
    						// 用户在线
    						writeToClient("其他消息", currentChannel, "其他消息回复");
    					}
    				}
    			}
    		
    		}
    	}
    
    	/**
    	 * 公用回写数据到客户端的方法
    	 * 
    	 * @param 需要回写的字符串
    	 * @param receiverChannel
    	 * @param mark
    	 *            用于打印/log的输出 <br>
    	 *            //channel.writeAndFlush(msg);//不行 <br>
    	 *            //channel.writeAndFlush(receiveStr.getBytes());//不行 <br>
    	 *            在netty里,进出的都是ByteBuf,楼主应确定服务端是否有对应的编码器,将字符串转化为ByteBuf
    	 */
    	public void writeToClient(final String receiveStr, Channel receiverChannel, final String mark) {
    		try {
    			ByteBuf byteValue = Unpooled.buffer();// netty需要用ByteBuf传输
    			byteValue.writeBytes(ConvertCode.hexString2Bytes(receiveStr));// 对接需要16进制
    			receiverChannel.writeAndFlush(byteValue).addListener(new ChannelFutureListener() {
    				@Override
    				public void operationComplete(ChannelFuture future) throws Exception {
    					StringBuilder sb = new StringBuilder("");
    					if (!StringUtils.isEmpty(mark)) {
    						sb.append("【").append(mark).append("】");
    					}
    					if (future.isSuccess()) {
    						System.out.println(sb.toString() + "回写成功" + byteValue);
    						log.info(sb.toString() + "回写成功" + byteValue);
    					} else {
    						System.out.println(sb.toString() + "回写失败" + byteValue);
    						log.error(sb.toString() + "回写失败" + byteValue);
    					}
    				}
    			});
    		} catch (Exception e) {
    			e.printStackTrace();
    			System.out.println("调用通用writeToClient()异常" + e.getMessage());
    			log.error("调用通用writeToClient()异常:", e);
    		}
    	}
    
    	/**
    	 * 当客户连接服务端之后(打开链接) 获取客户端的channel,并且放到ChannelGroup中去进行管理
    	 */
    	@Override
    	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    		channelGroup.add(ctx.channel());
    	}
    
    	@Override
    	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    		super.handlerRemoved(ctx);
    		String channelId = ctx.channel().id().asLongText();
    		System.out.println("客户端被移除,channelId为:" + channelId);
    		// 当触发handlerRemoved,ChannelGroup会自动移除对应的客户端channel
    		channelGroup.remove(ctx.channel());
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		cause.printStackTrace();
    		// 发生异常之后关键channel。随后从ChannelGroup 中移除
    		ctx.channel().close();
    		channelGroup.remove(ctx.channel());
    	}
    
    }

    上述类中:Delimiter为自定义的消息类型,大家可根据自己十六进制去定义响应不用的消息类型

    CardChannelRel:

    import java.util.HashMap;
    
    import io.netty.channel.Channel;
    
    /**
     * 用户id和channel的关联关系处理
     */
    public class CardChannelRel {
    
    	private static HashMap<String, Channel> manager = new HashMap<>();
    
    	public static void put(String senderId, Channel channel) {
    		manager.put(senderId, channel);
    	}
    
    	public static Channel get(String senderId) {
    		return manager.get(senderId);
    	}
    	
    	
    	public static void output() {
    		for (HashMap.Entry<String, Channel> entry : manager.entrySet()) {
    			System.out.println("CredId:" + entry.getKey() + 
    					",ChannelId:" + entry.getValue().id().asLongText());
    		}
    	}
    } 

    效果:

    工具类

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    
    public class CrcUtils {
    	public static String CRC_16(byte[] bytes) {
    		int i, j, lsb;
    		int h = 0xffff;
    		for (i = 0; i < bytes.length; i++) {
    			h ^= bytes[i];
    			for (j = 0; j < 8; j++) {
    				lsb = h & 0x0001; // 取 CRC 的移出位
    				h >>= 1;
    				if (lsb == 1) {
    					h ^= 0x8408;
    				}
    			}
    		}
    		h ^= 0xffff;
    		return Integer.toHexString(h).toUpperCase();
    	}
    
    	public static byte[] hexStringToByte(String hex) {
    		int len = (hex.length() / 2);
    		byte[] result = new byte[len];
    		char[] achar = hex.toCharArray();
    		for (int i = 0; i < len; i++) {
    			int pos = i * 2;
    			result[i] = (byte) (toByte(achar[pos]) << 4 | toByte(achar[pos + 1]));
    		}
    		return result;
    	}
    
    	private static byte toByte(char c) {
    		byte b = (byte) "0123456789ABCDEF".indexOf(c);
    		return b;
    	}
    
    	public static String bytesToHexString(ByteBuf buffer) {
    		final int length = buffer.readableBytes();
    		StringBuffer sb = new StringBuffer(length);
    		String sTmp;
    
    		for (int i = 0; i < length; i++) {
    			byte b = buffer.readByte();
    			sTmp = Integer.toHexString(0xFF & b);
    			if (sTmp.length() < 2)
    				sb.append(0);
    			sb.append(sTmp.toUpperCase());
    		}
    		return sb.toString();
    	}
    
    }

    参考文章:

    1.https://blog.csdn.net/qq_42599616/article/details/105459117?utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~all~sobaiduend~default-3-105459117.nonecase&utm_term=netty%E8%BF%9E%E6%8E%A5%E6%88%90%E5%8A%9F%E4%B8%8D%E8%83%BD%E5%8F%91%E9%80%81%E6%95%B0%E6%8D%AE&spm=1000.2123.3001.4430

    2.https://github.com/bjmashibing/tank/commit/1121deccf76786b634389629454a0ec0af80765f

    3.https://blog.csdn.net/linsongbin1/article/details/77915686?utm_source=blogxgwz2

    4.https://blog.csdn.net/yqwang75457/article/details/73913572

  • 相关阅读:
    浅谈页面的瀑布流布局
    前端常用动画库
    JavaScript七宗罪和一些槽点
    prototype与 _proto__的关系
    Javascript之傻傻理不清的原型链、prototype、__proto__
    C#开发微信门户及应用(26)-公众号微信素材管理
    C#开发微信门户及应用(25)-微信企业号的客户端管理功能
    基于InstallShield2013LimitedEdition的安装包制作
    Entity Framework 实体框架的形成之旅--Code First模式中使用 Fluent API 配置(6)
    Entity Framework 实体框架的形成之旅--Code First的框架设计(5)
  • 原文地址:https://www.cnblogs.com/haoliyou/p/13754863.html
Copyright © 2011-2022 走看看