zoukankan      html  css  js  c++  java
  • Netty聊天室(2):从0开始实战100w级流量应用

    客户端 Client 登录和响应处理

    疯狂创客圈 Java 分布式聊天室【 亿级流量】实战系列之 17【 博客园 总入口



    源码IDEA工程获取链接Java 聊天室 实战 源码

    写在前面

    ​ 大家好,我是作者尼恩。目前和几个小伙伴一起,组织了一个高并发的实战社群【疯狂创客圈】。正在开始高并发、亿级流程的 IM 聊天程序 学习和实战

    ​ 前面,已经完成一个高性能的 Java 聊天程序的四件大事:

    1. 完成了协议选型,选择了性能更佳的 Protobuf协议。具体的文章为: Netty+Protobuf 整合一:实战案例,带源码

    2. 介绍了 通讯消息数据包的几条设计准则。具体的文章为: Netty +Protobuf 整合二:protobuf 消息通讯协议设计的几个准则

    3. 解决了一个非常基础的问题,这就是通讯的 粘包和半包问题。具体的文章为:Netty 粘包/半包 全解 | 史上最全解读

    4. 前一篇文件,已经完成了 系统三大组成模块的组成介绍。 具体的文章为:Netty聊天程序(实战一):从0开始实战100w级流量应用

      今天介绍非常重要的一个内容:

      客户端的通讯、登录请求和登录响应设计

      下面,开启今天的 惊险和刺激实战之旅

    客户端的会话管理

    ​ 什么是会话?

    ​ 为了方便客户端的开发,管理与服务器的连接,这里引入一个非常重要的中间角色——Session (会话)。有点儿像Web开发中的Tomcat的服务器 Session,但是又有很大的不同。

    ​ 客户端的会话概念图,如下图所示:

    在这里插入图片描述

    ​ 客户端会话有两个很重的成员,一个是user,代表了拥有会话的用户。一个是channel,代表了连接的通道。两个成员的作用是:

    • 通过user,可以获得当前的用户信息

    • 通过channel,可以向服务器发送消息

      所以,会话左拥右抱,左手用户资料,右手服务器的连接。在本例的开发中,会经常用到。

    客户端的逻辑构成

    从逻辑上来说,客户端有三个子的功能模块。

    在这里插入图片描述

    模块一:Handler

    入站处理器。

    在Netty 中非常重要,负责处理入站消息。比方,服务器发送过来登录响应,服务器发送过来的聊天消息。

    模块二:MsgBuilder

    消息组装器。

    将 Java 内部的 消息 Bean 对象,转成发送出去的 Protobuf 消息。

    模块三:Sender

    消息发送器。

    Handler 负责收的工作。Sender 则是负责将消息发送出去。

    三大子模块的类关系图:
    在这里插入图片描述

    介绍完成了主要的组成部分后,开始服务器的连接和Session 的创建。

    连接服务器与Session 的创建

    ​ 通过bootstrap 帮助类,设置完成线程组、通道类型,向管道流水线加入处理器Handler后,就可以开始连接服务器的工作。

    ​ 本小节需要重点介绍的,是连接成功之后,创建 Session,并且将 Session和 channel 相互绑定。

    ​ 代码如下:

    package com.crazymakercircle.chat.client;
    //...
    @Data
    @Service("EchoClient")
    public class ChatClient
    {
        static final Logger LOGGER =
                LoggerFactory.getLogger(ChatClient.class);
       //..
        private Channel channel;
        private ClientSender sender;
    
       public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup)
        {
            ChannelFuture f = null;
            try
            {
                if (bootstrap != null)
                {
                    bootstrap.group(eventLoopGroup);
                    bootstrap.channel(NioSocketChannel.class);
                    bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
                    bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
                    bootstrap.remoteAddress(host, port);
    
                    // 设置通道初始化
                    bootstrap.handler(
                            new ChannelInitializer<SocketChannel>()
                            {
                                public void initChannel(SocketChannel ch) throws Exception
                                {
                                    ch.pipeline().addLast(new ProtobufDecoder());
                                    ch.pipeline().addLast(new ProtobufEncoder());
                                    ch.pipeline().addLast(chatClientHandler);
    
                                }
                            }
                    );
                    LOGGER.info(new Date() + "客户端开始登录[疯狂创客圈IM]");
    
                    f = bootstrap.connect().addListener((ChannelFuture futureListener) ->
                    {
                        final EventLoop eventLoop = futureListener.channel().eventLoop();
                        if (!futureListener.isSuccess())
                        {
                            LOGGER.info("与服务端断开连接!在10s之后准备尝试重连!");
                            eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
    
                            initFalg = false;
                        }
                        else
                        {
                            initFalg = true;
                        }
                        if (initFalg)
                        {
                            LOGGER.info("EchoClient客户端连接成功!");
    
                            LOGGER.info(new Date() + ": 连接成功,启动控制台线程……");
                            channel = futureListener.channel();
    
                            // 创建会话
                            ClientSession session = new ClientSession(channel);
                            channel.attr(ClientSession.SESSION).set(session);
                            session.setUser(ChatClient.this.getUser());
                            startConsoleThread();
                        }
    
                    });
    
                    // 阻塞
                    f.channel().closeFuture().sync();
                }
            } catch (Exception e)
            {
                LOGGER.info("客户端连接失败!" + e.getMessage());
            }
    
        }
    
     //...
    
    }
    

    Session和 channel 相互绑定

    Session和 channel 相互绑定,再截取出来,分析一下。

    ClientSession session = new ClientSession(channel);
    channel.attr(ClientSession.SESSION).set(session);
    session.setUser(ChatClient.this.getUser());
    
    

    ​ 为什么要Session和 channel 相互绑定呢?

    • 发的时候, 需要从Session 写入 Channel ,这相当于正向的绑定。
    • 收的时候,是从Channel 过来的,需要找到 Session ,这相当于反向的绑定。

    ​ Netty 中的 channel ,实现了AttributeMap接口 ,相当于一个 Map容器。 反向的绑定,利用了channel 的这个特点。

    ​ 看一下AttributeMap接口 如何使用的?

    AttributeMap接口的使用

    ​ AttributeMap 是一个接口,并且只有一个attr()方法,接收一个AttributeKey类型的key,返回一个Attribute类型的value。按照Javadoc,AttributeMap实现必须是线程安全的。

    ​ AttributeMap内部结构看起来像下面这样:

    img

    不要被吓着了,其实很简单。

    AttributeMap 的使用,主要是设置和取值。

    • 设值 Key-> Value

    AttributeMap 的设值的方法,举例如下:

    channel.attr(ClientSession.SESSION).set(session);
    

    这个是链式调用,attr() 方法中的是 Key, set()方法中的是Value。 这样就完成了 Key-> Value 的设置。

    • 取值

    AttributeMap 的取值的方法,举例如下:

    ClientSession session =
            ctx.channel().attr(ClientSession.SESSION).get();
    

    这个是链式调用,attr() 方法中的是 Key, get()方法返回 的是Value。 这样就完成了 取值。

    关键是,这个key比较特殊

    一般的Map,Key 的类型多半为字符串。但是这里的Key不行,有特殊的约定。

    Key的类型必须是 AttributeKey 类型,而且这是一个泛型类,它的优势是,不需要对值进行强制的类型转换。

    Key的例子如下:

    public static final AttributeKey<ClientSession> SESSION = AttributeKey.valueOf("session");
    

    客户端登录请求

    登录的请求,大致如下:
    在这里插入图片描述

    ClientSender的 代码如下:

    package com.crazymakercircle.chat.client;
    
    @Service("ClientSender")
    public class ClientSender
    {
        static final Logger LOGGER = LoggerFactory.getLogger(ClientSender.class);
    
    
        private User user;
        private ClientSession session;
    
        public void sendLoginMsg()
        {
            LOGGER.info("开始登陆");
            ProtoMsg.Message message = LoginMsgBuilder.buildLoginMsg(user);
            session.writeAndFlush(message);
        }
    
    //...
        public boolean isLogin()
        {
            return  session.isLogin();
        }
    }
    
    

    Sender 首先通过 LoginMsgBuilder,构造一个protobuf 消息。然后调用session发送消息。

    session 会通过绑定的channel ,将消息发送出去。

    session的代码,如下:

    public synchronized void writeAndFlush(Object pkg)
    {
        channel.writeAndFlush(pkg);
    }
    

    其他的客户端请求流程,大致也是类似的。

    一个客户端的请求大致的流程有三步,分别从Sender 到session到channel。

    在这里插入图片描述

    处理登录成功的响应

    ​ 这是从服务器过来的入站消息。 如果登录成功,服务器会发送一个登录成功的响应过来。 这个响应,会从channel 传递到Handler。

    在这里插入图片描述

    处理器 LoginResponceHandler 的代码如下:

    package com.crazymakercircle.chat.clientHandler;
    
    //...
    
    public class LoginResponceHandler extends ChannelInboundHandlerAdapter
    {
        static final Logger LOGGER = LoggerFactory.getLogger(LoginResponceHandler.class);
        /**
         * 业务逻辑处理
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
        {
            LOGGER.info("msg:{}", msg.toString());
            if (msg != null && msg instanceof ProtoMsg.Message)
            {
                ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
                ProtoMsg.LoginResponse info = pkg.getLoginResponse();
                ProtoInstant.ResultCodeEnum result =
                        ProtoInstant.ResultCodeEnum.values()[info.getCode()];
    
                if (result.equals(ProtoInstant.ResultCodeEnum.SUCCESS))
                {
                    ClientSession session =
                            ctx.channel().attr(ClientSession.SESSION).get();
    
                    session.setLogin(true);
                    LOGGER.info("登录成功");
                }
            }
        }
    
    }
    

    ​ LoginResponceHandler 对消息类型进行判断,如果是请求响应消息,并且登录成功。 则取出绑定的session,通过session,进一步完成登录成功后的业务处理。

    ​ 比如设置成功的状态,完成一些成功的善后处理操作等等。

    ​ 其他的客户端响应处理流程,大致也是类似的。

    在这里插入图片描述

    写在最后

    ​ 至此为止,可以看到,客户端登录的完整流程。

    ​ 下一篇:服务器的请求处理和通讯的全流程闭环介绍。


    疯狂创客圈 Java 死磕系列

    • Java (Netty) 聊天程序【 亿级流量】实战 开源项目实战

  • 相关阅读:
    设计模式 学习笔记(8) 适配器模式、单例模式、享元模式
    设计模式 学习笔记(7) 抽象工厂模式、状态模式
    判断页面是否被嵌入iframe里面
    css3的transform:tanslateZ没有效果
    vuecli3多页应用配置与优化
    正则的test和exec方法与字符串的search,replace,match,split方法的使用
    实验 1:Mininet 源码安装和可视化拓扑工具
    2020软件工程第一次作业
    R12新功能
    FSG(1) FSG报表组件
  • 原文地址:https://www.cnblogs.com/crazymakercircle/p/9992394.html
Copyright © 2011-2022 走看看