zoukankan      html  css  js  c++  java
  • netty无缝切换rabbitmq、activemq、rocketmq实现聊天室单聊、群聊功能

    file

    file

    netty的pipeline处理链上的handler:需要IdleStateHandler心跳检测channel是否有效,以及处理登录认证的UserAuthHandler和消息处理MessageHandler

    protected void initChannel(SocketChannel ch) throws Exception {
    	ch.pipeline().addLast(defLoopGroup,
    		//编码解码器
    		new HttpServerCodec(),
    		//将多个消息转换成单一的消息对象
    		new HttpObjectAggregator(65536),
    		//支持异步发送大的码流,一般用于发送文件流
    		new ChunkedWriteHandler(),
    		//检测链路是否读空闲,配合心跳handler检测channel是否正常
    		new IdleStateHandler(60, 0, 0),
    		//处理握手和认证
    		new UserAuthHandler(),
    		//处理消息的发送
    		new MessageHandler()
    	);
    }
    

    对于所有连进来的channel,我们需要保存起来,往后的群发消息需要依靠这些channel

    public static void addChannel(Channel channel) {
            String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
            System.out.println("addChannel:" + remoteAddr);
            if (!channel.isActive()) {
                logger.error("channel is not active, address: {}", remoteAddr);
            }
            UserInfo userInfo = new UserInfo();
            userInfo.setAddr(remoteAddr);
            userInfo.setChannel(channel);
            userInfo.setTime(System.currentTimeMillis());
            userInfos.put(channel, userInfo);
        }
    

    登录后,channel就变成有效的channel,无效的channel之后将会丢弃

    public static boolean saveUser(Channel channel, String nick, String password) {
            UserInfo userInfo = userInfos.get(channel);
            if (userInfo == null) {
                return false;
            }
            if (!channel.isActive()) {
                logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick);
                return false;
            }
            // 验证用户名和密码
            if (nick == null || password == null) {
                return false;
            }
            LambdaQueryWrapper<Account> lambdaQueryWrapper = new LambdaQueryWrapper<>();
            lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password);
            Account account = accountMapperStatic.selectOne(lambdaQueryWrapper);
            if (account == null) {
                return false;
            }
            // 增加一个认证用户
            userCount.incrementAndGet();
            userInfo.setNick(nick);
            userInfo.setAuth(true);
            userInfo.setId(account.getId());
            userInfo.setUsername(account.getUsername());
            userInfo.setGroupNumber(account.getGroupNumber());
            userInfo.setTime(System.currentTimeMillis());
    
            // 注册该用户推送消息的通道
            offlineInfoTransmitStatic.registerPull(channel);
            return true;
        }
    

    当channel关闭时,就不再接收消息。unregisterPull就是注销信息消费者,客户端不再接取聊天消息。此外,从下方有一个加写锁的操作,就是为了避免channel还在发送消息时,这边突然关闭channel,这样会导致报错。

    public static void removeChannel(Channel channel) {
            try {
                logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel));
                //加上读写锁保证移除channel时,避免channel关闭时,还有别的线程对其操作,造成错误
                rwLock.writeLock().lock();
                channel.close();
                UserInfo userInfo = userInfos.get(channel);
                if (userInfo != null) {
                    if (userInfo.isAuth()) {
                        offlineInfoTransmitStatic.unregisterPull(channel);
                        // 减去一个认证用户
                        userCount.decrementAndGet();
                    }
                    userInfos.remove(channel);
                }
            } finally {
                rwLock.writeLock().unlock();
            }
    
        }
    

    为了无缝切换使用rabbitmq、rocketmq、activemq、不使用中间件存储和转发聊天消息这4种状态,定义如下4个接口。依次是发送单聊消息、群聊消息、客户端启动接收消息、客户端下线不接收消息。

    public interface OfflineInfoTransmit {
        void pushP2P(Integer userId, String message);
    
        void pushGroup(String groupNumber, String message);
    
        void registerPull(Channel channel);
    
        void unregisterPull(Channel channel);
    }
    

    其中,如何使用rabbitmq、rocketmq、activemq三种中间件中的一种来存储和转发聊天消息,它的处理流程如下:

    1. 单聊的模型参考线程池的模型,如果用户在线,直接通过channel发送给用户。如果用户离线,则发往中间件存储,下次用户上线时直接从中间件拉取消息。这样做对比所有消息的发送都通过中间件来转的好处是提升了性能
    2. 群聊则是完全通过中间件来转发消息,消息发送中间件,客户端从中间件接取消息。如果仍像单聊那样操作,在线用户直接通过channel发送,操作过于繁琐,要判断这个群组的哪些用户是否在线
    3. 如果用户在线就注册消费者,从中间件接取消息。否则,就断开消费者,消息保留在中间件中,以便客户端下次上线时拉取。这样就实现了离线消息的接收。
    4. 不管使用哪种中间件或使用不使用中间件,它的处理流程都遵循上面的3个要求,就能无缝切换上方的4种方法来存储和转发消息。需要哪种方法开启相应注解即可。

    file

    项目地址:https://github.com/shuangyueliao/netty-chat

  • 相关阅读:
    【xsy1230】 树(tree) 点分治+线段树
    【xsy1237】 字符转换 矩阵快速幂
    【xsy1232】Magic 最小割
    【xsy1144】选物品 主席树
    【xsy3423】党² 线段树+李超线段树or动态半平面交
    $Django python中使用redis, django中使用(封装了),redis开启事务(管道)
    $Django redis内存数据库 (知识回顾cmd切换目录)
    $Django 路飞之课程下的分类,用户登陆成功前端存cookie,
    $Django 路飞之小知识回顾,Vue之样式element-ui,Vue绑定图片--mounted页面挂载--路由携带参数
    $Django 路飞学城项目简介
  • 原文地址:https://www.cnblogs.com/shuangyueliao/p/11427410.html
Copyright © 2011-2022 走看看