zoukankan      html  css  js  c++  java
  • Java Netty 服务端向客户端发送消息

    说到netty通讯,回忆了下,还是18年的时候,学了了netty,至今就学习的时候写过一个项目。最近闲生,被要求做一个netty通讯的项目,顺手,总结一下,之前写的项目。

    当时是写了一款访微信聊天的软件,所以用到了netty通讯,废话不过说,我们来直接上代码吧。

    import org.springframework.stereotype.Component;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j	
    @Component
    public class WSServer {
    
    	private static class SingletionWSServer {
    		static final WSServer instance = new WSServer();
    	}
    
    	public static WSServer getInstance() {
    		return SingletionWSServer.instance;
    	}
    
    	private EventLoopGroup mainGroup;
    	private EventLoopGroup subGroup;
    	private ServerBootstrap server;
    	private ChannelFuture future;
    
    	public WSServer() {
    		// 主线程组
    		mainGroup = new NioEventLoopGroup();
    
    		// 子线程组
    		subGroup = new NioEventLoopGroup();
    		// netty服务器的创建,ServerBootstrap是一个启动类
    		server = new ServerBootstrap();
    		server.group(mainGroup, subGroup)// 设置主从线程组
    				.channel(NioServerSocketChannel.class)// 设置nio双向通道
    				.childHandler(new WSServerInitializer());// 子处理器,用于处理subGroup
    	}
    
    	/**
    	 * 启动
    	 */
    	public void start() {
    		this.future = server.bind(9700);
    		System.err.println("netty websocket server 启动完毕...");
    		log.info("netty websocket server 启动完毕...");
    	}
    
    }
    

    这个类用于创建netty的服务端链接,下面我们来配置启动的一些配置。

    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    import io.netty.handler.stream.ChunkedWriteHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    import lombok.extern.slf4j.Slf4j;
    @Slf4j
    public class WSServerInitializer extends ChannelInitializer<SocketChannel> {
    
    	@Override
    	protected void initChannel(SocketChannel ch) throws Exception {
    		System.out.println("初始化 SocketChannel");
    		log.info("初始化 SocketChannel");
    		ChannelPipeline pipeline = ch.pipeline();
    
    		//websocket 基于http协议,所以要http编解码器
    		pipeline.addLast(new HttpServerCodec());
    		
    		//对写大数据流的支持
    		pipeline.addLast(new ChunkedWriteHandler());
    		
    		//对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
    		//几乎在netty中的编程,都会使用到此hanler
    		pipeline.addLast(new HttpObjectAggregator(1024*64));
    		
    		//========================以上是用于支持http协议========================
    		
    		
    		//========================增加心跳支持 start    ========================
    		
    		//针对客户端,如果在1分钟时没有想服务端发送写心跳(ALL),则主动断开
    		//如果是读空闲或者写空闲,不处理
    		pipeline.addLast(new IdleStateHandler(8, 10, 12));
    		
    		//自定义的空闲检测
    		pipeline.addLast(new HeartBeatHandler());
    		//========================增加心跳支持 end      ========================
    		
    		
    		/**
    		 * websocket服务器处理的协议,用于指定给客户端连接访问的路由: /ws
    		 * 本Handler会帮助你处理一些繁重的复杂的事
    		 * 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳
    		 * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
    		 */
    		pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
    		
    		//自定义hanler
    		pipeline.addLast(new ChatHandler());
    	}
    
    }
    

    自定义空闲检测

    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    
    /**
     * 用于检测channel的心跳的handler 继承 ChannelInboundHandlerAdapter 从而不需要实现 channelRead0方法
     * 
     * @author wb0024
     *
     */
    public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
    
    	@Override
    	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    
    		// 判断evt是否是IdleStateEvent(用于触发用户事件,包含 读空闲/写空闲/读写空闲)
    		if (evt instanceof IdleStateEvent) {
    			IdleStateEvent event = (IdleStateEvent) evt; // 强制类型转换
    
    			if (event.state() == IdleState.READER_IDLE) {
    				System.out.println("进入读空闲...");
    			} else if (event.state() == IdleState.WRITER_IDLE) {
    				System.out.println("进入写空闲...");
    			} else if (event.state() == IdleState.ALL_IDLE) {
    				System.out.println("channel关闭前users数量为:"+ChatHandler.users.size());
    				System.out.println("进入读写空闲...");
    				Channel channel = ctx.channel();
    				//关闭无用的channel,以防资源浪费
    				channel.close();
    				System.out.println("channel关闭后users数量为:"+ChatHandler.users.size());
    			}
    		}
    	}
    
    }
    

    自定义hanler

    import com.imooc.enums.MsgActionEnum;
    import com.imooc.utils.JsonUtils;
    
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    /**
     * 处理消息的handler
     * 
     * @author wb0024 TextWebSocketFrame:在netty中,是用于为websocket专门处理文本的对象,frame是消息的载体
     */
    public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    
    	// 用于记录和管理所有客户端的channel
    	public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    	@Override
    	protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
    
    		// 获取客户端传输过来的消息
    		String content = msg.text();
    
    		System.out.println("处理消息的handler:" + content);
    		Channel currentChannel = ctx.channel();
    
    		// 1. 获取客户端发送的消息
    		DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);
    
    		Integer action = dataContent.getAction();
    		System.out.println("action:" + action);
    		// 2. 判断消息类型,根据不同的类型来处理不同的业务
    		if (action == MsgActionEnum.CONNECT.type) {
    			// 2.1 当websocket 第一次open的时候,初始化channel,把用户的channel和userId关联起来
    			String senderId = dataContent.getChatMsg().getSenderId();
    			UserChannelRel.put(senderId, currentChannel);
    
    			// 测试
    			for (Channel channel : users) {
    				System.out.println(channel.id().asLongText());
    			}
    			UserChannelRel.output();
    
    		} else if (action == MsgActionEnum.CHAT.type) {
    			// 2.2 聊天类型的消息,把聊天记录保存到数据库,同时标记消息的签收状态[未签收]
    			ChatMsg chatMsg = dataContent.getChatMsg();
    			String receiverId = chatMsg.getReceiverId();
    
    			DataContent dataContentMsg = new DataContent();
    			dataContentMsg.setChatMsg(chatMsg);
    
    			// 发送消息
    			// 从全局用户channel关系中获取接受方的channel
    			Channel receiverChannel = UserChannelRel.get(receiverId);
    			if (receiverChannel != null) {
    				// 当receiverChannel不为空的时候,从 ChannelGroup 去查找对应的channel是否存在
    				Channel findChannel = users.find(receiverChannel.id());
    				if (findChannel != null) {
    					// 用户在线
    					receiverChannel.writeAndFlush(new TextWebSocketFrame(JsonUtils.objectToJson(dataContentMsg)));
    				}
    			}
    		} else if (action == MsgActionEnum.KEEPALIVE.type) {
    			// 2.4 心跳类型的消息
    			System.out.println("收到来自channel为[" + currentChannel + "]的心跳包");
    		}
    
    	}
    
    	/**
    	 * 当客户连接服务端之后(打开链接) 获取客户端的channel,并且放到ChannelGroup中去进行管理
    	 */
    	@Override
    	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    		users.add(ctx.channel());
    	}
    
    	@Override
    	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    
    		String channelId = ctx.channel().id().asLongText();
    		System.out.println("客户端被移除,channelId为:" + channelId);
    
    		// 当触发handlerRemoved,ChannelGroup会自动移除对应的客户端channel
    		users.remove(ctx.channel());
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		cause.printStackTrace();
    		// 发生异常之后关键channel。随后从ChannelGroup 中移除
    		ctx.channel().close();
    		users.remove(ctx.channel());
    	}
    
    }
    

      

    定义channel管理

    import java.util.HashMap;
    
    import io.netty.channel.Channel;
    
    /**
     * 用户id和channel的关联关系处理
     * 
     * @author wb0024
     *
     */
    public class UserChannelRel {
    
    	private static HashMap<String, Channel> manager = new HashMap<>();
    
    	public static void put(String senderId, Channel channel) {
    		manager.put(senderId, channel);
    	}
    
    	public static Channel get(String senderId) {
    		return manager.get(senderId);
    	}
    	
    	
    	public static void output() {
    		for (HashMap.Entry<String, Channel> entry : manager.entrySet()) {
    			System.out.println("UserId:" + entry.getKey() + 
    					",ChannelId:" + entry.getValue().id().asLongText());
    		}
    	}
    }
    

    其他类

    import java.io.Serializable;
    
    import lombok.Data;
    
    @Data
    public class ChatMsg implements Serializable {
    
    	/**
    	 * 
    	 */
    	private static final long serialVersionUID = 1L;
    
    	private String senderId; // 发送者的用户id
    	private String receiverId; // 接受者的用户id
    	private String msg; // 聊天内容
    	private String msgId; // 用于消息的签收
    
    }
    

      

    import java.io.Serializable;
    
    import lombok.Data;
    
    @Data
    public class DataContent implements Serializable {
    
    	/**
    	 * 
    	 */
    	private static final long serialVersionUID = 1L;
    
    	private Integer action; // 动作类型
    	private ChatMsg chatMsg; // 用户的聊天内容对象
    	private String extand; // 扩展字段
    
    }
    

     好了,到了这里一个简单的netty通讯就做好了

  • 相关阅读:
    【读书笔记】深入理解计算机系统
    快速排序
    列表查找的两种方法
    冒泡排序、选择排序、插入排序
    堆排序
    supervisor进程管理
    redis-主从复制
    redis-淘汰策略
    URI和URL
    python爬虫之xpath的基本使用
  • 原文地址:https://www.cnblogs.com/haoliyou/p/13754375.html
Copyright © 2011-2022 走看看