zoukankan      html  css  js  c++  java
  • Gobelieve 架构(转载)

    Gobelieve 架构

    Gobelieve github地址

    im 客户连接服务器 (可分布式部署,暂无负载均衡模块)

    imr 路由查询服务器(主要解决im分布式部署的问题)

    ims 存储服务器 (主从部署)

    基础模块


    1.数据包协议

    包:header(12)|body
    
    header:len(4),seq(4),cmd(1),version(1),空(2)

    2.数据收发流程

    accept收到一个连接
    开启写线程和读线程
    
    写线程:监听client.wt阻塞队列,一有数据就写入conn
    
    读线程:按照数据包协议从conn读出数据包,由client.HandleMessage处理

    3.几个方法

    PushMessage 通过route_channel 发送 MSG_PUBLISH 给IMR
    PushGroupMessage 通过route_channel 发送 MSG_PUBLISH_GROUP 给IMR
    SaveMessage 通过IMS RPC服务 调用SavePeerMessage
    SaveGroupMessage 通过IMS RPC服务 调用SaveGroupMessage
    im_client.SendMessage
        1.PushMessage
        2.本地路由查询 并EnqueueMessage
    im_client.SendGroupMessage
        1.PushGroupMessage
        2.group_manager查询group
        3.由group得到所有menber,对每个menber查询路由表,并EnqueueMessage
    im_client.EnqueueMessage
        将数据写入client.wt,供发送出去

    IM 模块


    IM模块初始化

    1.redis_pool

    2.storage_pools 连接ims:3333

    http服务器读取最近消息时调用

    3.rpc_clients ims:13333

    SyncMessage
    SyncGroupMessage
    SavePeerMessage
    SaveGroupMessage

    4.group_rpc_clients (可选)

    5.route_channels 连接imr:4444

    开启读写线程
    写:从channel.wt管道取值并发送给imr
    读:从imr接受消息,并分发给当前im节点连接用户

    6.group_manager

    1.load: 从mysql加载group,保存至 group_manager.groups
    2.run: reids订阅 
    case group_create、
        group_disband、
        group_member_add、
        group_member_remove、
        group_upgrade、
        回调处理 增删改查group_manager.groups
    case ping
        脏数据检测
    3.ping:
       每个五分钟发送ping

    7.group_message_deliver:普通群消息分发

    1.init:创建本地存储文件
    2.run: 监听wt管道,有数据表示有新消息写入文件
            读取文件并发送

    8.ListenRedis 禁言

    redis订阅 speak_forbidden
    接受事件推送,从本地路由查询到对应client,修改forbidden字段。

    9.SyncKeyService

    从 group_sync_c 和 sync_c 管道取值,保存至redis
    ( 客户端发送的 MSG_GROUP_SYNC_KEY 和 MSG_SYNC_KEY 消息会将消息内同步key写入对应的管道 group_sync_c 和 sync_c)

    10.StartHttpServer :6666

    web服务器

    11.StartSocketIO :websocket

    12.ListenClient :23000 处理客户端连接

    ListenClient 处理流程


    1.登录认证 cmd:MSG_AUTH_TOKEN

    客户端将uid与token传给服务器,由redis_pool查询认证
    
    认证成功: 1.由EnqueueMessage发送消息{cmd:MSG_AUTH_STATUS,status:0}
            2.client.AddClient() 缓存本连接到本机路由表
            3.client.IMClient.Login() 缓存本链接到IMR路由表
    认证失败: 由EnqueueMessage发送消息{cmd:MSG_AUTH_STATUS,status:1}

    2.IMClient 处理消息类型

    MSG_IM: 处理IM 同步消息
    MSG_GROUP_IM: 处理Group 同步消息
    MSG_INPUTING: 处理Inputing消息
    MSG_RT: 处理实时消息
    MSG_UNREAD_COUNT: 设置未读消息数
    MSG_SYNC: 客户端请求同步最新消息
    MSG_SYNC_KEY: 客户端将SYNC_KEY 传至服务端
    MSG_SYNC_GROUP: 客户端请求同步最新群消息
    MSG_GROUP_SYNC_KEY: 客户端将GROUP_SYNC_KEY 传至服务端

    RoomClient 消息类型

    MSG_ENTER_ROOM:进入聊天室
    MSG_LEAVE_ROOM:离开聊天室
    MSG_ROOM_IM:聊天室IM消息

    VOIPClient消息类型

    MSG_VOIP_CONTROL: VOIP命令

    CustomerClient消息类型

    MSG_CUSTOMER: 顾客->客服
    MSG_CUSTOMER_SUPPORT:客服->顾客

    3.MSG_IM处理流程:
    用户A -> B

    1.SaveMessage:保存消息到目标用户B存储队列 (rpc->SavePeerMessage)
    2.SaveMessage:保存消息到发送用户A存储队列(供多点登录同步消息)
    3.PushMessage:外部推送消息给目标用户B(由IMR寻路由)MSG_IM
    4.SendMessage:发送同步消息给目标用户B (外部推送+本地寻址发送) MSG_SYNC_NOTIFY
    5.SendMessage:发送同步消息给发送用户A(多点登录)MSG_SYNC_NOTIFY
    6.EnqueueMessage:给本连接回复MSG_ACK消息

    4.MSG_GROUP_IM处理流程

    1.由group_manager查询到指定group
    2.根据Group类型:
        1.HandleSuperGroupMessage:
            SaveGroupMessage:保存MSG_GROUP_IM消息 (rpc->SaveGroupMessage)
            PushGroupMessage:外部推送群消息 MSG_GROUP_IM
            SendGroupMessage:发送群同步通知消息(外部推送+本地寻址推送) MSG_SYNC_GROUP_NOTIFY
        2.HandleGroupMessage:
            group_message_deliver:
                saveMessage:本地保存消息 MSG_PENDING_GROUP_MESSAGE
                ReadMessage:读取消息 MSG_PENDING_GROUP_MESSAGE
                对群每个成员:SaveMessage:保存MSG_GROUP_IM消息 
                            PushMessage:外部推送消息 MSG_GROUP_IM
                            SendMessage:发送同步消息 MSG_SYNC_NOTIFY
    3.EnqueueMessage:给本连接回复MSG_ACK消息

    5.MSG_INPUTING:

    SendMessage:发送给目标用户

    6.MSG_RT 实时消息处理流程

    SendMessage:发送消息给目标用户

    7.MSG_UNREAD_COUNT 设置用户未读数:

    由redis_pool操作 hashkey:users_$appid_$uid field:unread

    8.消息同步流程:

    服务端->客户端 MSG_SYNC_NOTIFY:
        客户端:
        1.isSyncing==false:sendSync 发送旧syncKey,请求消息MSG_SYNC,状态切换为同步状态
        2.isSyncing==true: 同步状态中,新的newSyncKey 保存在pendingSyncKey中,this.pendingSyncKey = newSyncKey;
    
    客户端->服务端:MSG_SYNC
        服务端:
        1.从客户端传来的sync_key得到last_id,(如果last_id==0,从redis取出最新sync_key)
        2.rpc->SyncMessage:根据last_id取出缓存的最近消息msgs
        3.EnqueueMessage:发送MSG_SYNC_BEGIN消息 (客户端不做处理)
        4.EnqueueMessage:循环发送msgs
        5.EnqueueMessage:发送MSG_SYNC_END消息 其中包含sync_key为最后一条msg的MsgID
    
    服务端->客户端:MSG_SYNC_END
        客户端: 
        1.取出newSyncKey(如果newSyncKey>this.syncKey,客户端保存newSyncKey,并发送给服务端MSG_SYNC_KEY)
        2.切换同步状态isSyncing = false; 
        3.如果this.pendingSyncKey > this.syncKey ,
            即在上次同步中有新的MSG_SYNC_NOTIFY消息传给客户端,则再次同步,sendSync发送syncKey,pendingSyncKey置零
    
    客户端->服务端: MSG_SYNC_KEY
        服务端:
        1.从sync_key得到last_id,
        2.包裹成SyncHistory,写入管道sync_c <- s
     
    消息传输流程

    9.超级群同步流程

    服务端->客户端 MSG_SYNC_GROUP_NOTIFY
        客户端: 
        1.isSyncing==false:sendSync 发送旧syncKey,请求消息MSG_SYNC_GROUP,状态切换为同步状态
        2.isSyncing==true: 同步状态中,新的newSyncKey 保存在pendingSyncKey中,this.pendingSyncKey = newSyncKey;
    
    客户端->服务端 MSG_SYNC_GROUP
        服务端: 
        1/从客户端传来的group_sync_key取出group_id,sync_key,last_id=sync_key,(如果last_id,从redis取出新的group_sync_key_$groupid)
        2.rpc->SyncGroupMessage:根据last_id取出最近的群消息 msgs
        3.EnqueueMessage:发送MSG_SYNC_GROUP_BEGIN 消息
        4.EnqueueMessage:循环发送msgs
        5.EnqueueMessage:发送MSG_SYNC_GROUP_END,其中sync_key为最后一条msg的MsgID
    
    服务端->客户端 MSG_SYNC_GROUP_END
        客户端:
        1.取出GroupSyncKey.syncKey和当前syncKey(如果GroupSyncKey.syncKey较大,客户端保存更新,并发送给服务端 MSG_GROUP_SYNC_KEY)
        
        2.切换同步状态isSyncing = false;
            如果this.pendingSyncKey > this.syncKey ,
                即在上次同步中有新的MSG_SYNC_NOTIFY消息传给客户端,则再次同步,sendSync发送syncKey,pendingSyncKey置零
    
    客户端->服务端 MSG_GROUP_SYNC_KEY
        服务端:
        1.取出group_id last_id
        2.包裹成SyncGroupHistory,写入管道group_sync_c <- s 

    IMR模块

    1.redis_pool

    2.group_manager

    3.ListenClient :4444

    1.MSG_SUBSCRIBE
    2.MSG_UNSUBSCRIBE
    3.MSG_SUBSCRIBE_ROOM
    4.MSG_UNSUBSCRIBE_ROOM
    (以上四个均是对imr维护的路由表增删改查)
    5.MSG_PUBLISH
        1.根据消息内容得到消息类型和目标用户
        2.查询路由表,用户如果离线则将消息放入第三方推送队列
        3.根据路由表得到用户连接所在im节点,并把消息推送至该节点
        
        第三部过滤条件:
            1.消息类型不为 MSG_IM、MSG_GROUP_IM、MSG_CUSTOMER、MSG_CUSTOMER_SUPPORT、MSG_SYSTEM
            2.目标IM节点和发送IM节点不是同一节点
    
    6.MSG_PUBLISH_GROUP
        1.根据消息内容得到消息类型和群
        2.对群内每个成员查询路由表,用户如果离线则将消息放入第三方推送队列
        3.群发给所有IM节点
        
        第三部过滤条件:
            1.消息类型为 MSG_PUBLISH_GROUP
    
    7.MSG_PUBLISH_ROOM
        1.根据消息内容得到roomid
        2.根据roomid查询路由表得到所有节点
        3.对每个节点发送消息
    
        第三部过滤条件:
            1.发送节点和目标节点不是同一节点
     
    IMR架构

    IMS模块 (主从)

    1.NewMaster

    1.init:创建容器存储 clients,创建队列ewt
    2.run:监听队列ewt,将ewt队列内消息添加到缓存cache数组
        cache每满1000或者每隔1分钟,执行SendBatch,
        SendBatch:
            封装消息 MSG_STORAGE_SYNC_MESSAGE_BATCH ,写入每个从节点连接的消息队列client.ewt

    2.NewSlaver 监听主节点(可选)

    run:连接至主节点,连接成功发送 MSG_STORAGE_SYNC_BEGIN
        循环读取消息:
            MSG_STORAGE_SYNC_MESSAGE storage.SaveSyncMessage(emsg)
            MSG_STORAGE_SYNC_MESSAGE_BATCH storage.SaveSyncMessageBatch(mb)

    3.waitSignal 处理中断 SIGINT SIGTERM

    storage.FlushPeerIndex() 将每个人最近消息的msgid写入文件
    storage.FlushGroupIndex()

    4.ListenSyncClient master监听 3334

    处理从节点连接 RunLoop:
        1.(初始化同步)接受 MSG_STORAGE_SYNC_BEGIN 消息,从中取得msgid,
        根据msgid LoadSyncMessagesInBackground 查询得到消息,并发送给从节点 
        2.将从节点连接添加至 clients
        3.进入for循环,监听消息队列client.ewt并发送给从节点
        4.循环break后 RemoveClient

    5.ListenRPCClient :13333

    SyncMessage
    SyncGroupMessage
    SavePeerMessage
    SaveGroupMessage
    GetNewCount
    由im调用

    6.ListenClient() 3333

    对于每个连接:
        1.init:创建写管道wt
        2.run :写线程,从wt管道取数据并发送
                读线程,HandleMessage

    7.HandleMessage

    1.MSG_LOAD_OFFLINE  
        对IM: storage_client.LoadOfflineMessage响应
    2.MSG_SAVE_AND_ENQUEUE 
        对IM:storage_client.SaveAndEnqueueMessage响应
    3.MSG_DEQUEUE 
        对IM: storage_client.DequeueMessage响应
    *4.MSG_LOAD_LATEST
        对IM-StartHttpServer-LoadLatestMessage响应
    *5.MSG_LOAD_HISTORY 
        对IM-StartHttpServer-LoadHistoryMessage响应
    6.MSG_SAVE_AND_ENQUEUE_GROUP  
        对IM: storage_client.SaveAndEnqueueGroupMessage响应
    7.MSG_DEQUEUE_GROUP 
        对IM :storage_client.DequeueGroupMessage响应
    8.MSG_LOAD_GROUP_OFFLINE 
        对IM: storage_client.LoadGroupOfflineMessage响应
    
    (上述除了4,5,暂无被IM模块调用)
    IMS架构



    作者:JackieF777
    链接:https://www.jianshu.com/p/8121d6e85282
    来源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

  • 相关阅读:
    20165212第八周学习总结
    20165212第八周课上测试补做
    20165212实验二面向对象程序设计
    Titanic生存预测
    聚类算法数据生成器make_blobs
    k-means
    监督学习、无监督学习与半监督学习
    在线Latex公式编辑器
    西瓜书课后习题——第四章
    ML经典数据集
  • 原文地址:https://www.cnblogs.com/nuanshou/p/10492674.html
Copyright © 2011-2022 走看看