zoukankan      html  css  js  c++  java
  • netty实现消息中心(二)基于netty搭建一个聊天室

    前言

    上篇博文(netty实现消息中心(一)思路整理
    )大概说了下netty websocket消息中心的设计思路,这篇文章主要说说简化版的netty聊天室代码实现,支持群聊和点对点聊天。

    此demo主要说明netty实现消息推送的基本使用方法,如果需要扩充其它功能,可以基于此脚手架扩展。
    完整项目代码地址:netty聊天室github源码

    介绍

    1.登录页面
    login.png

    2.聊天页面
    index.png

    核心代码:

    启动netty服务,监听端口

        private static void startNettyMsgServer() {
            // 使用多Reactor多线程模型,EventLoopGroup相当于线程池,内部维护一个或多个线程(EventLoop),每个EventLoop可处理多个Channel(单线程处理多个IO任务)
        	// 创建主线程组EventLoopGroup,专门负责建立连接
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            // 创建子线程组,专门负责IO任务的处理
            EventLoopGroup workGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workGroup);
                b.channel(NioServerSocketChannel.class);
                b.childHandler(new WebSocketChanneInitializer());
                System.out.println("服务端开启等待客户端连接....");
                Channel ch = b.bind(WebSocketConstant.WEB_SOCKET_PORT).sync().channel();
    
                //创建一个定长线程池,支持定时及周期性任务执行
                ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
                WebSocketInfoService webSocketInfoService = new WebSocketInfoService();
                //定时任务:扫描所有的Channel,关闭失效的Channel
                executorService.scheduleAtFixedRate(webSocketInfoService::scanNotActiveChannel,
                        3, 60, TimeUnit.SECONDS);
    
                //定时任务:向所有客户端发送Ping消息
                executorService.scheduleAtFixedRate(webSocketInfoService::sendPing,
                        3, 50, TimeUnit.SECONDS);
    
                ch.closeFuture().sync();
    
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
    //            //退出程序
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    

    netty ChannelHandler,负责处理通道的生命周期事件

    package com.cola.chat_server.handler;
    
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Set;
    import java.util.UUID;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.alibaba.fastjson.JSONObject;
    import com.cola.chat_server.constant.MessageCodeConstant;
    import com.cola.chat_server.constant.MessageTypeConstant;
    import com.cola.chat_server.constant.WebSocketConstant;
    import com.cola.chat_server.model.WsMessage;
    import com.cola.chat_server.service.WebSocketInfoService;
    import com.cola.chat_server.util.DateUtils;
    import com.cola.chat_server.util.NettyAttrUtil;
    import com.cola.chat_server.util.RequestParamUtil;
    import com.cola.chat_server.util.SessionHolder;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.handler.codec.http.HttpVersion;
    import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
    import io.netty.util.CharsetUtil;
    
    
    
    /**
     * Netty ChannelHandler,用来处理客户端和服务端的会话生命周期事件(握手、建立连接、断开连接、收消息等)
     * @Author 
     * @Description 接收请求,接收 WebSocket 信息的控制类
     */
    public class WebSocketSimpleChannelInboundHandler extends SimpleChannelInboundHandler<Object> {
    
        private static final Logger logger = LoggerFactory.getLogger(WebSocketSimpleChannelInboundHandler.class);
        // WebSocket 握手工厂类
        private WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(WebSocketConstant.WEB_SOCKET_URL, null, false);
        private WebSocketServerHandshaker handshaker;
        private WebSocketInfoService websocketInfoService = new WebSocketInfoService();
    
        /**
         * 处理客户端与服务端之间的 websocket 业务
         */
        private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
            //判断是否是关闭 websocket 的指令
            if (frame instanceof CloseWebSocketFrame) {
                //关闭握手
                handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
                websocketInfoService.clearSession(ctx.channel());
                return;
            }
            //判断是否是ping消息
            if (frame instanceof PingWebSocketFrame) {
                ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
                return;
            }
            // 判断是否Pong消息
            if (frame instanceof PongWebSocketFrame) {
                ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
                return;
            }
            //判断是否是二进制消息,如果是二进制消息,抛出异常
            if (!(frame instanceof TextWebSocketFrame)) {
                System.out.println("目前我们不支持二进制消息");
                ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
                throw new RuntimeException("【" + this.getClass().getName() + "】不支持消息");
            }
            // 获取并解析客户端向服务端发送的 json 消息
            String message = ((TextWebSocketFrame) frame).text();
            logger.info("消息:{}", message);
            JSONObject json = JSONObject.parseObject(message);
            try {
                String uuid = UUID.randomUUID().toString();
                String time = DateUtils.date2String(new Date(), "yyyy-MM-dd HH:mm:ss");
                json.put("id", uuid);
                json.put("sendTime", time);
                
                int code = json.getIntValue("code");
                switch (code) {
                    //群聊
                    case MessageCodeConstant.GROUP_CHAT_CODE:
                        //向连接上来的客户端广播消息
                    	SessionHolder.channelGroup.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(json)));
                        break;
                    //私聊
                    case MessageCodeConstant.PRIVATE_CHAT_CODE:
                        //接收人id
                        String receiveUserId = json.getString("receiverUserId");
                        String sendUserId = json.getString("sendUserId");
                        String msg = JSONObject.toJSONString(json);
                        // 点对点挨个给接收人发送消息
                        for (Map.Entry<String, Channel> entry : SessionHolder.channelMap.entrySet()) {
                        	String userId = entry.getKey();
                        	Channel channel = entry.getValue();
                    		if (receiveUserId.equals(userId)) {
                    			channel.writeAndFlush(new TextWebSocketFrame(msg));
                    		}
                        }
                        // 如果发给别人,给自己也发一条
                        if (!receiveUserId.equals(sendUserId)) {
                        	SessionHolder.channelMap.get(sendUserId).writeAndFlush(new TextWebSocketFrame(msg));
                        }
                        break;
                    case MessageCodeConstant.SYSTEM_MESSAGE_CODE:
                    	//向连接上来的客户端广播消息
                    	SessionHolder.channelGroup.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(json)));
                    	break;
                    //pong
                    case MessageCodeConstant.PONG_CHAT_CODE:
                        Channel channel = ctx.channel();
                        // 更新心跳时间
                        NettyAttrUtil.refreshLastHeartBeatTime(channel);
                    default:
                }
            } catch(Exception e) {
                logger.error("转发消息异常:", e);
                e.printStackTrace();
            }
        }
    
        /**
         * 客户端与服务端创建连接的时候调用
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //创建新的 WebSocket 连接,保存当前 channel
            logger.info("————客户端与服务端连接开启————");
    //        // 设置高水位
    //        ctx.channel().config().setWriteBufferHighWaterMark();
    //        // 设置低水位
    //        ctx.channel().config().setWriteBufferLowWaterMark();
        }
    
        /**
         * 客户端与服务端断开连接的时候调用
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            logger.info("————客户端与服务端连接断开————");
            websocketInfoService.clearSession(ctx.channel());
        }
    
        /**
         * 服务端接收客户端发送过来的数据结束之后调用
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        /**
         * 工程出现异常的时候调用
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            logger.error("异常:", cause);
            ctx.close();
        }
    
        /**
         * 服务端处理客户端websocket请求的核心方法
         */
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
            if (o instanceof FullHttpRequest) {
                //处理客户端向服务端发起 http 请求的业务
                handHttpRequest(channelHandlerContext, (FullHttpRequest) o);
            } else if (o instanceof WebSocketFrame) {
                //处理客户端与服务端之间的 websocket 业务
                handWebsocketFrame(channelHandlerContext, (WebSocketFrame) o);
            }
        }
    
        /**
         * 处理客户端向服务端发起 http 握手请求的业务
         * WebSocket在建立握手时,数据是通过HTTP传输的。但是建立之后,在真正传输时候是不需要HTTP协议的。
         *
         * WebSocket 连接过程:
         * 首先,客户端发起http请求,经过3次握手后,建立起TCP连接;http请求里存放WebSocket支持的版本号等信息,如:Upgrade、Connection、WebSocket-Version等;
         * 然后,服务器收到客户端的握手请求后,同样采用HTTP协议回馈数据;
         * 最后,客户端收到连接成功的消息后,开始借助于TCP传输信道进行全双工通信。
         */
        private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
            // 如果请求失败或者该请求不是客户端向服务端发起的 http 请求,则响应错误信息
            if (!request.decoderResult().isSuccess()
                    || !("websocket".equals(request.headers().get("Upgrade")))) {
                // code :400
                sendHttpResponse(ctx, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
                return;
            }
            //新建一个握手
            handshaker = factory.newHandshaker(request);
            if (handshaker == null) {
                //如果为空,返回响应:不受支持的 websocket 版本
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            } else {
                //否则,执行握手
                Map<String, String> params = RequestParamUtil.urlSplit(request.uri());
                String userId = params.get("userId");
                Channel channel = ctx.channel();
                NettyAttrUtil.setUserId(channel, userId);
                NettyAttrUtil.refreshLastHeartBeatTime(channel);
            	handshaker.handshake(ctx.channel(), request);
            	SessionHolder.channelGroup.add(ctx.channel());
            	SessionHolder.channelMap.put(userId, ctx.channel());
            	logger.info("握手成功,客户端请求uri:{}", request.uri());
            	
            	// 推送用户上线消息,更新客户端在线用户列表
            	Set<String> userList = SessionHolder.channelMap.keySet();
            	WsMessage msg = new WsMessage();
            	Map<String, Object> ext = new HashMap<String, Object>();
            	ext.put("userList", userList);
            	msg.setExt(ext);
            	msg.setCode(MessageCodeConstant.SYSTEM_MESSAGE_CODE);
            	msg.setType(MessageTypeConstant.UPDATE_USERLIST_SYSTEM_MESSGAE);
            	SessionHolder.channelGroup.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(msg)));
            	
            }
        }
    
    
        /**
         * 服务端向客户端响应消息
         */
        private void sendHttpResponse(ChannelHandlerContext ctx, DefaultFullHttpResponse response) {
            if (response.status().code() != 200) {
                //创建源缓冲区
                ByteBuf byteBuf = Unpooled.copiedBuffer(response.status().toString(), CharsetUtil.UTF_8);
                //将源缓冲区的数据传送到此缓冲区
                response.content().writeBytes(byteBuf);
                //释放源缓冲区
                byteBuf.release();
            }
            //写入请求,服务端向客户端发送数据
            ChannelFuture channelFuture = ctx.channel().writeAndFlush(response);
            if (response.status().code() != 200) {
            	/**
            	 * 如果请求失败,关闭 ChannelFuture
            	 * ChannelFutureListener.CLOSE 源码:future.channel().close();
            	 */
                channelFuture.addListener(ChannelFutureListener.CLOSE);
            }
        }
    }
    
    

    会话工具类,保存用户和通道的对应关系,用于广播和点对点聊天

    /**
     * netty会话管理
     * @author 
     *
     */
    public class SessionHolder {
    	
        /**
         * 存储每个客户端接入进来时的 channel 对象
         * 主要用于使用 writeAndFlush 方法广播信息
         */
        public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        /**
         * 用于客户端和服务端握手时存储用户id和netty Channel对应关系
         */
        public static Map<String, Channel> channelMap = new ConcurrentHashMap<String, Channel>(); 
    
    }
    

    主要代码就是以上部分,如果需要扩充其它功能,可以基于此脚手架扩展。
    完整项目代码地址:netty聊天室github源码

    此demo主要用于展示netty实现消息推送的基本使用方法,用于生产还存在以下单机问题:
    1.无法支撑过高连接数
    2.广播时带宽有限
    3.不能实现高可用
    4.无法横向扩展
    后期将集成zookeeper,做一版netty集群的聊天室。

  • 相关阅读:
    ORACLE CLIENT客户端安装步骤详解
    mkswap 把一个分区格式化成为swap交换区
    编译安装lnmp
    使用源代码安装lnmp
    查看nginx编译安装
    linux lnmp编译安装
    nginx编译安装
    lnmp脚本
    搭建LAMP测试环境
    绝路上的桥
  • 原文地址:https://www.cnblogs.com/powerjiajun/p/12680501.html
Copyright © 2011-2022 走看看