简单的实现聊天,发送至服务器端之后由服务器转发给其他在线的用户。
1. pom
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>cn.qz</groupId> <artifactId>xm</artifactId> <version>0.0.1-SNAPSHOT</version> <name>blog</name> <description>blog-server</description> <packaging>jar</packaging> <properties> <java.version>1.8</java.version> <!--<maven-jar-plugin.version>3.2.0</maven-jar-plugin.version> --> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> <scope>test</scope> </dependency> <!-- spring-boot整合mybatis-plus --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>2.3</version> </dependency> <dependency> <groupId>com.github.pagehelper</groupId> <artifactId>pagehelper</artifactId> <version>5.1.2</version> </dependency> <!-- spring-boot整合mysql --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency> <!-- 引入 redis 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- spring-boot整合druid --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.22</version> </dependency> <!-- 使用事务需要引入这个包 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <!-- 引入 spring aop 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <!-- commons工具包 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.4</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2</version> </dependency> <!-- 阿里的fastjson用于手动转JSON --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency> <!--httpclient相关包 --> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.3.1</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpmime</artifactId> <version>4.3.1</version> </dependency> <!--tika解析文本内容 --> <dependency> <groupId>org.apache.tika</groupId> <artifactId>tika-parsers</artifactId> <version>1.17</version> </dependency> <!--POI --> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi</artifactId> <version>3.16</version> </dependency> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-ooxml</artifactId> <version>3.16</version> </dependency> <!-- springdata jpa依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.jsoup/jsoup --> <dependency> <groupId>org.jsoup</groupId> <artifactId>jsoup</artifactId> <version>1.12.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.15.0</version> </dependency> <!--netty--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.11.Final</version> </dependency> <!-- poi依赖 <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-ooxml</artifactId> <version>RELEASE</version> </dependency> --> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <dependencies> <!-- spring热部署 --> <!-- 该依赖在此处下载不下来,可以放置在build标签外部下载完成后再粘贴进plugin中 --> <dependency> <groupId>org.springframework</groupId> <artifactId>springloaded</artifactId> <version>1.2.6.RELEASE</version> </dependency> </dependencies> <configuration> <fork>true</fork> </configuration> </plugin> <!-- 要将源码放上去,需要加入这个插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-source-plugin</artifactId> <executions> <execution> <id>attach-sources</id> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin> <!-- 执行Junit测试(测试所有类) --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.10</version> <configuration> <includes> <!--<include>****Test.java</include>--> <include>***</include> </includes> </configuration> </plugin> </plugins> </build> </project>
核心是netty-all, 其他依赖按需引入即可
2. 主要类信息
1. 服务端程序
package com.xm.ggn.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; @Slf4j public class NettyServer { private final int port; public NettyServer(int port) { this.port = port; } public void start() throws Exception { // 修改bossGroup的数量,2线程足够用 EventLoopGroup bossGroup = new NioEventLoopGroup(2); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.option(ChannelOption.SO_BACKLOG, 1024); sb.group(workerGroup, bossGroup) // 绑定线程池 .channel(NioServerSocketChannel.class) // 指定使用的channel .localAddress(this.port)// 绑定监听端口 .childHandler(new MyChannelInitializer()); ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定 log.info(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress()); cf.channel().closeFuture().sync(); // 关闭服务器通道 } finally { workerGroup.shutdownGracefully().sync(); // 释放线程池资源 bossGroup.shutdownGracefully().sync(); } } }
2. Initializer
package com.xm.ggn.netty; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class MyChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { log.info("收到新的客户端连接: {}", socketChannel.toString()); // websocket协议本身是基于http协议的,所以这边也要使用http解编码器 socketChannel.pipeline().addLast(new HttpServerCodec()); // 以块的方式来写的处理器(添加对于读写大数据流的支持) socketChannel.pipeline().addLast(new ChunkedWriteHandler()); // 对httpMessage进行聚合 socketChannel.pipeline().addLast(new HttpObjectAggregator(8192)); // ================= 上述是用于支持http协议的 ============= // websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址 socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10)); // 添加自己的handler socketChannel.pipeline().addLast(new MyWebSocketHandler()); } }
3.handler
package com.xm.ggn.netty; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Date; import java.util.concurrent.ConcurrentHashMap; /** * 自定义服务器端处理handler,继承SimpleChannelInboundHandler,处理WebSocket 连接数据 */ @Slf4j public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 用户id=>channel示例 // 可以通过用户的唯一标识保存用户的channel // 这样就可以发送给指定的用户 public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(); /** * 每当服务端收到新的客户端连接时,客户端的channel存入ChannelGroup列表中,并通知列表中其他客户端channel * * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 获取连接的channel Channel incomming = ctx.channel(); //通知所有已经连接到服务器的客户端,有一个新的通道加入 /*for(Channel channel:channelGroup){ channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"加入 "); }*/ channelGroup.add(incomming); } /** * 每当服务端断开客户端连接时,客户端的channel从ChannelGroup中移除,并通知列表中其他客户端channel * * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //获取连接的channel /*Channel incomming = ctx.channel(); for(Channel channel:channelGroup){ channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"离开 "); }*/ //从服务端的channelGroup中移除当前离开的客户端 channelGroup.remove(ctx.channel()); //从服务端的channelMap中移除当前离开的客户端 Collection<Channel> col = channelMap.values(); while (true == col.contains(ctx.channel())) { col.remove(ctx.channel()); log.info("netty客户端连接删除成功!"); } } /** * 每当从服务端读到客户端写入信息时,将信息转发给其他客户端的Channel. * * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { log.info("netty客户端收到服务器数据, 客户端地址: {}, msg: {}", ctx.channel().remoteAddress(), msg.text()); String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); //消息处理类 message(ctx, msg.text(), date); //channelGroup.writeAndFlush( new TextWebSocketFrame(msg.text())); } /** * 当服务端的IO 抛出异常时被调用 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); log.error("SimpleChatClient:" + incoming.remoteAddress() + "异常", cause); //异常出现就关闭连接 ctx.close(); } //消息处理类 public void message(ChannelHandlerContext ctx, String msg, String date) { try { // 消息转发给在线的其他用户 Channel channel = ctx.channel(); for (Channel tmpChannel : channelGroup) { if (!tmpChannel.equals(channel)) { String sendedMsg = date + ":" + msg; log.info("服务器转发消息,客户端地址: {}, msg: {}", ctx.channel().remoteAddress(), sendedMsg); tmpChannel.writeAndFlush(new TextWebSocketFrame(sendedMsg)); } } } catch (Exception e) { log.error("message 处理异常, msg: {}, date: {}", msg, date, e); } } }
4. Springboot主启动类: 也可以将启动nettyServer代码移动至监听Spring容器启动事件类中
package com.xm.ggn; import com.xm.ggn.netty.NettyServer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.boot.web.servlet.ServletComponentScan; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @ServletComponentScan("com") @EntityScan(basePackages = {"com"}) @EnableScheduling // 允许通过AopContext.currentProxy() 获取代理类 @EnableAspectJAutoProxy(proxyTargetClass = true, exposeProxy = true) @EnableAsync public class BlogApplication { public static void main(String[] args) { SpringApplication.run(BlogApplication.class, args); // 启动netty服务器 try { new NettyServer(8091).start(); } catch (Exception e) { System.out.println("NettyServerError:" + e.getMessage()); } } }
5. 前端就用HTML界面简单的测试
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title>Netty-Websocket</title> <script type="text/javascript"> var socket; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket = new WebSocket("ws://127.0.0.1:8091/ws"); socket.onmessage = function(event){ var ta = document.getElementById('responseText'); ta.value += event.data+" "; }; socket.onopen = function(event){ var ta = document.getElementById('responseText'); ta.value = "Netty-WebSocket服务器。。。。。。连接 "; }; socket.onclose = function(event){ var ta = document.getElementById('responseText'); ta.value = "Netty-WebSocket服务器。。。。。。关闭 "; }; }else{ alert("您的浏览器不支持WebSocket协议!"); } function send(message){ if(!window.WebSocket){return;} if(socket.readyState == WebSocket.OPEN){ socket.send(message); }else{ alert("WebSocket 连接没有建立成功!"); } } </script> </head> <body> <form onSubmit="return false;"> <label>TEXT</label><input type="text" name="message" value="这里输入消息" style=" 1024px;height: 100px;"/> <br /> <br /> <input type="button" value="发送ws消息" onClick="send(this.form.message.value)" /> <hr color="black" /> <h3>服务端返回的应答消息</h3> <textarea id="responseText" style=" 1024px;height: 300px;"></textarea> </form> </body> </html>
3. 测试
1. 启动boot应用
2. 前端用两个浏览器打开
3. 查看服务器端控制台:
2021-03-02 18:14:00.644 | cmdb - INFO | main | com.xm.ggn.netty.NettyServer | line:32 - class com.xm.ggn.netty.NettyServer 启动正在监听: /0:0:0:0:0:0:0:0:8091 2021-03-02 18:14:07.304 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客户端连接: [id: 0x94e8a9ec, L:/127.0.0.1:8091 - R:/127.0.0.1:65288] 2021-03-02 18:14:18.861 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客户端连接: [id: 0xaf71586a, L:/127.0.0.1:8091 - R:/127.0.0.1:65369]
3. 两个控制台分别发几条信息
查看两个界面的服务器端消息:
(1) 第一个
(2) 第二个:
4. 查看服务器端日志
2021-03-02 18:14:00.644 | cmdb - INFO | main | com.xm.ggn.netty.NettyServer | line:32 - class com.xm.ggn.netty.NettyServer 启动正在监听: /0:0:0:0:0:0:0:0:8091 2021-03-02 18:14:07.304 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客户端连接: [id: 0x94e8a9ec, L:/127.0.0.1:8091 - R:/127.0.0.1:65288] 2021-03-02 18:14:18.861 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客户端连接: [id: 0xaf71586a, L:/127.0.0.1:8091 - R:/127.0.0.1:65369] 2021-03-02 18:15:20.947 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客户端收到服务器数据, 客户端地址: /127.0.0.1:65288, msg: 我说是什么 2021-03-02 18:15:20.948 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服务器转发消息,客户端地址: /127.0.0.1:65288, msg: 2021-03-02 18:15:20:我说是什么 2021-03-02 18:15:29.342 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客户端收到服务器数据, 客户端地址: /127.0.0.1:65369, msg: 我说不知道 2021-03-02 18:15:29.342 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服务器转发消息,客户端地址: /127.0.0.1:65369, msg: 2021-03-02 18:15:29:我说不知道 2021-03-02 18:15:34.745 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客户端收到服务器数据, 客户端地址: /127.0.0.1:65369, msg: 我说不知道个鬼 2021-03-02 18:15:34.746 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服务器转发消息,客户端地址: /127.0.0.1:65369, msg: 2021-03-02 18:15:34:我说不知道个鬼 2021-03-02 18:15:44.819 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客户端收到服务器数据, 客户端地址: /127.0.0.1:65288, msg: 你说身子 2021-03-02 18:15:44.820 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服务器转发消息,客户端地址: /127.0.0.1:65288, msg: 2021-03-02 18:15:44:你说身子
接下来就是基于上面的代码简单的实现基于vue的聊天设计。
补充: 关于WebSocketServerProtocolHandler 这个处理器用于处理WebSocket请求套路
验证URL是否是WebSocke的URL,主要就是判断创建时候传进去的这个"/ws"。默认是根据equals来匹配,也可以通过参数来设置进行startWith 匹配,如下方法:
io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandshakeHandler#isNotWebSocketPath
private boolean isNotWebSocketPath(FullHttpRequest req) { return checkStartsWith ? !req.uri().startsWith(websocketPath) : !req.uri().equals(websocketPath); }
(1) 第一种:
socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
上面这中实际调了重载构造方法传递的checkStartsWith 为false
(2) 第二种: 也可以直接调用参数设置checkStartsWith 为true
socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10, true, true));
对应的构造方法是:
public WebSocketServerProtocolHandler(String websocketPath, String subprotocols, boolean allowExtensions, int maxFrameSize, boolean allowMaskMismatch, boolean checkStartsWith) { this.websocketPath = websocketPath; this.subprotocols = subprotocols; this.allowExtensions = allowExtensions; maxFramePayloadLength = maxFrameSize; this.allowMaskMismatch = allowMaskMismatch; this.checkStartsWith = checkStartsWith; }
补充: socket建立连接的时候我们希望获取到用户的标识信息,然后将用户信息和channel维护起来
1. 调整MyChannelInitializer 中的handler
package com.xm.ggn.netty; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class MyChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { log.info("收到新的客户端连接: {}", socketChannel.toString()); // websocket协议本身是基于http协议的,所以这边也要使用http解编码器 socketChannel.pipeline().addLast(new HttpServerCodec()); // 以块的方式来写的处理器(添加对于读写大数据流的支持) socketChannel.pipeline().addLast(new ChunkedWriteHandler()); // 对httpMessage进行聚合 socketChannel.pipeline().addLast(new HttpObjectAggregator(8192)); // ================= 上述是用于支持http协议的 ============= // 添加自己的handler socketChannel.pipeline().addLast(new MyWebSocketHandler()); // websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址 // 这个主要就是验证URL是否是WebSocke的URL,主要就是判断创建时候传进去的这个"/ws"。 下面四个参数的是比较路径相等io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandshakeHandler.isNotWebSocketPath // socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10)); // 也可以用下面参数用于比较startWith socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10, true, true)); } }
2. 修改MyWebSocketHandler 重写channelRead 方法,注意不是channelRead0 方法
/** * 处理建立连接时候请求(用于拿参数) * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (null != msg && msg instanceof FullHttpRequest) { log.info("连接请求,准备提取参数"); //转化为http请求 FullHttpRequest request = (FullHttpRequest) msg; //拿到请求地址 String uri = request.uri(); log.info("uri: " + uri); if (StringUtils.isNotBlank(uri)) { String path = StringUtils.substringBefore(uri, "?"); log.info("path: {}", path); String username = StringUtils.substringAfterLast(path, "/"); log.info(username); channelMap.put(username, ctx.channel()); log.info("channelMap: {}", channelMap); } //重新设置请求地址为WebSocketServerProtocolHandler 匹配的地址(如果WebSocketServerProtocolHandler 的时候checkStartsWith 为true则不需要设置,会根据前缀匹配) // request.setUri("/ws"); } //接着建立请求 super.channelRead(ctx, msg); }
3. 调整前端请求连接地址增加用户姓名
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title>Netty-Websocket</title> <script type="text/javascript"> var socket; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket = new WebSocket("ws://127.0.0.1:8091/ws/admin?username=admin"); socket.onmessage = function(event){ var ta = document.getElementById('responseText'); ta.value += event.data+" "; }; socket.onopen = function(event){ var ta = document.getElementById('responseText'); ta.value = "Netty-WebSocket服务器。。。。。。连接 "; }; socket.onclose = function(event){ var ta = document.getElementById('responseText'); ta.value = "Netty-WebSocket服务器。。。。。。关闭 "; }; }else{ alert("您的浏览器不支持WebSocket协议!"); } function send(message){ if(!window.WebSocket){return;} if(socket.readyState == WebSocket.OPEN){ socket.send(message); }else{ alert("WebSocket 连接没有建立成功!"); } } </script> </head> <body> <form onSubmit="return false;"> <label>TEXT</label><input type="text" name="message" value="这里输入消息" style=" 1024px;height: 100px;"/> <br /> <br /> <input type="button" value="发送ws消息" onClick="send(this.form.message.value)" /> <hr color="black" /> <h3>服务端返回的应答消息</h3> <textarea id="responseText" style=" 1024px;height: 300px;"></textarea> </form> </body> </html>
4. 测试服务器端日志:(可以看到正确的拿到参数信息并且建立连接,也可以通过?传递参数)
2021-03-02 22:43:07.196 | cmdb - INFO | main | com.xm.ggn.netty.NettyServer | line:32 - class com.xm.ggn.netty.NettyServer 启动正在监听: /0:0:0:0:0:0:0:0:8091 2021-03-02 22:43:11.026 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客户端连接: [id: 0xa9fd1310, L:/127.0.0.1:8091 - R:/127.0.0.1:50269] 2021-03-02 22:43:12.702 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:47 - 添加新的channel, incomming: [id: 0xa9fd1310, L:/127.0.0.1:8091 - R:/127.0.0.1:50269] 2021-03-02 22:43:12.928 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:85 - 连接请求,准备提取参数 2021-03-02 22:43:12.928 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:90 - uri: /ws/admin?username=admin 2021-03-02 22:43:12.934 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:93 - path: /ws/admin 2021-03-02 22:43:12.935 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:95 - admin 2021-03-02 22:43:12.935 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:97 - channelMap: {admin=[id: 0xa9fd1310, L:/127.0.0.1:8091 - R:/127.0.0.1:50269]}
补充:整合Vue的jwchat实现聊天 ,jwchat是基于Elementui封装的聊天插件
jwchat官网: https://codegi.gitee.io/jwchatdoc/
这里用了jwchat的两个组件: JwChat-rightbox 展示在线用户、JwChat-index 展示聊天窗口
0. 界面截图如下:
1. 前端核心vue:
<template> <div class="dashboard-container"> <div class="dashboard-text"> <el-row> <el-col :span="6" ><div class="grid-content bg-purple"> <JwChat-rightbox :config="onlineUsers" @click="rightClick" /></div ></el-col> <!-- 如果选择了在线用户显示聊天窗口 --> <el-col :span="18" v-if="chatUserConfig.name != ''" ><div class="grid-content bg-purple-light"> <JwChat-index :config="chatUserConfig" :showRightBox="true" :taleList="chatlogTaleList" @enter="bindEnter" v-model="inputMsg" :toolConfig="toolConfig" scrollType="scroll" @clickTalk="clickTalk" > <!-- 右边插槽 --> <template> <h3>聊天愉快</h3> </template> </JwChat-index> </div></el-col > </el-row> </div> </div> </template> <script> // other.png 表示对方头像; myself.png 表示我自己 import { MessageBox } from "element-ui"; import {findCurrentUsername} from "@/utils/auth" export default { data() { return { // 在线用户相关信息 onlineUsers: { tip: "选择在线人开始聊天", listTip: "当前在线", list: [], }, // 输入框内默认的消息 inputMsg: "", // 聊天记录 chatlogTaleList: [ // { // date: "2020/04/25 21:19:07", // text: { text: "起床不" }, // mine: false, // name: "留恋人间不羡仙", // img: "/images/other.png", // } ], // 展示的工具栏配置 toolConfig: { // show: ['file', 'history', 'img', ['文件1', '', '美图']], show: null, // 关闭所有其他组件 showEmoji: true, callback: this.toolEvent, }, // 正在聊天的用户的信息 chatUserConfig: { img: "/images/other.png", name: "", username: "", fullname: "", dept: "大部门", callback: this.bindCover, historyConfig: { show: true, tip: "加载更多", callback: this.bindLoadHistory, }, }, // 当前用户信息 currentUser: { username: "", fullname: "", }, socket: new Object(), }; }, created() { this.listOnlineUsers(); this.findCurrentUserInfo(); this.webSocket(); }, methods: { webSocket() { // 先记录this对象 const that = this; if (typeof WebSocket == "undefined") { MessageBox.alert("浏览器暂不支持聊天", "提示信息"); } else { // 实例化socket,这里我把用户名传给了后台,使后台能判断要把消息发给哪个用户,其实也可以后台直接获取用户IP来判断并推送 const socketUrl = "ws://127.0.0.1:8091/ws/" + findCurrentUsername(); this.socket = new WebSocket(socketUrl); // 监听socket打开 this.socket.onopen = function () { console.log("浏览器WebSocket已打开"); }; // 监听socket消息接收 this.socket.onmessage = function (messageEvent) { // 转换为json对象然后添加到chatlogTaleList let receivedLog = JSON.parse(messageEvent.data); console.log(receivedLog); let receivedLogs = new Array(); receivedLogs.push(receivedLog); receivedLogs = that.rehandleChatLogs(receivedLogs); if (!that.chatlogTaleList) { that.chatlogTaleList = new Array(); } that.chatlogTaleList = that.chatlogTaleList.concat(receivedLogs); }; // 监听socket错误 this.socket.onerror = function () {}; // 监听socket关闭 this.socket.onclose = function () { MessageBox.alert("WebSocket已关闭"); }; } }, // 查询当前用户信息 findCurrentUserInfo() { let url = "/user/getInfo"; this.$http.post(url).then((res) => { this.currentUser = res.data; }); }, // 发送websocket 消息 send(message) { if (!window.WebSocket) { return; } // 封装消息,然后发送消息 const chatLog = { sendUsername: this.currentUser.username, sendFullname: this.currentUser.fullname, receiveUsername: this.chatUserConfig.username, receiveFullname: this.chatUserConfig.fullname, content: message, readed: false, }; let socket = this.socket; if (socket.readyState == WebSocket.OPEN) { socket.send(JSON.stringify(chatLog)); } else { MessageBox.alert("WebSocket 连接没有建立成功!"); } }, // 获取在线用户(有在线用户的情况下赋值到右边窗口) listOnlineUsers() { let url = "/user/listOnlineUser"; this.$http.get(url).then((res) => { var onlineUsers = res.data; if (!onlineUsers || onlineUsers.length < 1) { return; } onlineUsers.forEach((element) => { element.name = element.username; element.img = "/images/cover.png"; }); this.onlineUsers.list = onlineUsers; }); }, // 点击在线人事件 rightClick(type) { // 1.赋值给聊天人信息 let chatUser = type.value; this.chatUserConfig.name = chatUser.fullname; this.chatUserConfig.username = chatUser.username; this.chatUserConfig.fullname = chatUser.fullname; // 2. 查询聊天记录 let listChatlogurl = "/chat/log/list"; let requestVO = { sendUsername: this.currentUser.username, receiveUsername: this.chatUserConfig.username, queryChangeRole: true, }; this.$http.post(listChatlogurl, requestVO).then((res) => { this.chatlogTaleList = this.rehandleChatLogs(res.data); }); }, // 重新处理聊天记录, 主要是做特殊标记以及设置图像等操作 rehandleChatLogs(chatlogs) { if (!chatlogs || chatlogs.length < 1) { return new Array(); } chatlogs.forEach((element) => { element.date = element.createtimeStr; element.name = element.sendFullname; // 聊天内容(如下为设置文本,也可以设置其他video、图片等) element.text = new Object(); element.text.text = element.content; if (element.sendUsername == this.currentUser.username) { element.mine = true; element.img = "/images/myself.png"; } else { element.mine = false; element.img = "/images/other.png"; } }); return chatlogs; }, // 点击左上角用户名称事件 clickTalk(obj) { console.log(obj); }, // 点击发送或者回车事件 bindEnter(obj) { const msg = this.inputMsg; if (!msg) { MessageBox.alert("您不能发送空消息"); return; } // 发送消息 this.send(msg); }, /** * @description: * @param {*} type 当前点击的按钮 * @param {*} plyload 附加文件或者需要处理的数据 * @return {*} */ toolEvent(type, plyload) { console.log("tools", type, plyload); }, /** * @description: 点击加载更多的回调函数 * @param {*} * @return {*} */ bindLoadHistory() { const history = new Array(3).fill().map((i, j) => { return { date: "2020/05/20 23:19:07", text: { text: j + new Date() }, mine: false, name: "JwChat", img: "image/three.jpeg", }; }); let list = history.concat(this.list); this.list = list; }, bindCover(type) { console.log("header", type); }, }, }; </script>
涉及到主要逻辑:
(1) 点击页面进行如下操作:
1》创建WebSocket连接,创建WebSocket的时候将当前的用户名传到后端,后端记录当前用户名与连接到的netty的channel
2》 查询当前在线用户,并且展示到JwChat-rightbox 列表内。(也可以展示所有用户、如果有群聊体系展示所有的群)
(2) 点击在线用户的时候获取到在线用户的信息并记录下来,然后用ajax异步获取聊天记录(后台根据发布者和接收者按时间升序排序),然后前台根据聊天记录做对应的转换。这里是ajax获取,也可以用websocket拿,对发送的消息做处理,后端接收到消息处理对应的业务即可
(3) 聊天的时候判断是否输入有信息,有信息的时候将信息包装一下(增加发送者、接收者信息)发到后端,后端存入数据库之后再发送到对应的channel返回给前端,前端接收到后做处理完加入聊天记录数组展示在界面
2. 后端主要文件:
(1) 聊天记录表
package com.xm.ggn.bean.chat; import com.xm.ggn.bean.AbstractSequenceEntity; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.ToString; import javax.persistence.Entity; /** * 聊天记录 */ @Entity @Getter @Setter @EqualsAndHashCode(callSuper = true) @ToString(callSuper = true) public class ChatLog extends AbstractSequenceEntity { private String sendUsername; private String sendFullname; private String receiveUsername; private String receiveFullname; private String content; private String remark; /** * 是否已读 */ private boolean readed; }
包含继承的通用字段:
@Id @GeneratedValue(strategy = GenerationType.IDENTITY) @TableId(type = IdType.AUTO) // 增加该注解,mybatis plus insert之后会给bean设上Id protected long id; /** * 创建者 */ @Index(name = "creator") @TableField(update = "%s") protected String creator; /** * 唯一编号 */ @Index(name = "uniqueCode") @TableField(update = "%s") protected String uniqueCode; /** * 创建时间 */ @Index(name = "createtime") @TableField(update = "%s") protected Date createtime;
(2) MyWebSocketHandler消息处理者类:
package com.xm.ggn.netty; import com.alibaba.fastjson.JSONObject; import com.xm.ggn.bean.chat.ChatLog; import com.xm.ggn.service.chat.ChatLogService; import com.xm.ggn.utils.system.SpringBootUtils; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Date; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * 自定义服务器端处理handler,继承SimpleChannelInboundHandler,处理WebSocket 连接数据 */ @Slf4j public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 用户id=>channel示例 // 可以通过用户的唯一标识保存用户的channel // 这样就可以发送给指定的用户 public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(); /** * 每当服务端收到新的客户端连接时,客户端的channel存入ChannelGroup列表中,并通知列表中其他客户端channel * * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 获取连接的channel Channel incomming = ctx.channel(); //通知所有已经连接到服务器的客户端,有一个新的通道加入 /*for(Channel channel:channelGroup){ channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"加入 "); }*/ channelGroup.add(incomming); log.info("添加新的channel, incomming: {}", incomming); } /** * 每当服务端断开客户端连接时,客户端的channel从ChannelGroup中移除,并通知列表中其他客户端channel * * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //获取连接的channel /*Channel incomming = ctx.channel(); for(Channel channel:channelGroup){ channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"离开 "); }*/ //从服务端的channelGroup中移除当前离开的客户端 channelGroup.remove(ctx.channel()); //从服务端的channelMap中移除当前离开的客户端 Collection<Channel> col = channelMap.values(); while (true == col.contains(ctx.channel())) { col.remove(ctx.channel()); log.info("netty客户端连接删除成功!"); } } /** * 处理建立连接时候请求(用于拿参数) * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (null != msg && msg instanceof FullHttpRequest) { log.info("连接请求,准备提取参数"); //转化为http请求 FullHttpRequest request = (FullHttpRequest) msg; //拿到请求地址 String uri = request.uri(); log.info("uri: " + uri); if (StringUtils.isNotBlank(uri)) { String path = StringUtils.substringBefore(uri, "?"); log.info("path: {}", path); String username = StringUtils.substringAfterLast(path, "/"); log.info(username); channelMap.put(username, ctx.channel()); log.info("channelMap: {}", channelMap); } //重新设置请求地址为WebSocketServerProtocolHandler 匹配的地址(如果WebSocketServerProtocolHandler 的时候checkStartsWith 为true则不需要设置,会根据前缀匹配) // request.setUri("/ws"); } //接着建立请求 super.channelRead(ctx, msg); } /** * 每当从服务端读到客户端写入信息时,将信息转发给其他客户端的Channel. * * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { log.info("netty客户端收到服务器数据, 客户端地址: {}, msg: {}", ctx.channel().remoteAddress(), msg.text()); String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); //消息处理类 handleMessage(ctx, msg.text(), date); //channelGroup.writeAndFlush( new TextWebSocketFrame(msg.text())); } /** * 当服务端的IO 抛出异常时被调用 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); log.error("SimpleChatClient:" + incoming.remoteAddress() + "异常", cause); //异常出现就关闭连接 ctx.close(); } /** * 处理读取到的消息 * * @param ctx * @param msg * @param date */ private void handleMessage(ChannelHandlerContext ctx, String msg, String date) { try { // 消息入库 ChatLog chatLog = JSONObject.parseObject(msg, ChatLog.class); log.info("chatLog: {}", chatLog); ChatLogService chatLogService = SpringBootUtils.getBean(ChatLogService.class); chatLogService.insert(chatLog); // 消息转发给对应用户(发给发送者和接收者) String receiveUsername = chatLog.getReceiveUsername(); String sendUsername = chatLog.getSendUsername(); Set<Map.Entry<String, Channel>> entries = channelMap.entrySet(); String key = null; for (Map.Entry<String, Channel> entry : entries) { key = entry.getKey(); if (key.equals(receiveUsername) || key.equals(sendUsername)) { log.info("服务器转发消息, key: {}, msg: {}", key, JSONObject.toJSONString(chatLog)); entry.getValue().writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(chatLog))); } } } catch (Exception e) { log.error("message 处理异常, msg: {}, date: {}", msg, date, e); } } }
这个只是简单的实现了在线用户的单聊,如果要做的好可以添加通讯录功能、群聊,其实这个就是发送消息的时候接受者是群号等。待有这方面需求的时候会继续完善。