zoukankan      html  css  js  c++  java
  • 网游内存数据库的设计(2)

    续第一篇,前两天对核心存储做了些修改,以前只打算与关系数据库的行与表做对应,value类型只能使array或list,

    现在把7种基本类型也加入到value支持的类型中,以使得数据库更通用.

    当然,这都不是本文的核心,本篇主要介绍一个测试前端,以及测试的远程调用协议.

    先贴出测试前端的服务器代码:

    #include "netservice.h"
    #include "msg_loop.h"
    #include "datasocket.h"
    #include "SysTime.h"
    #include "db_protocal.h"
    
    atomic_32_t wpacket_count = 0;
    atomic_32_t rpacket_count = 0;
    atomic_32_t buf_count = 0;
    
    global_table_t gtb;
    void server_process_packet(datasocket_t s,rpacket_t r)
    {
        //执行操作并返回结果
        cache_protocal_t p;
        uint32_t coro_id = rpacket_read_uint32(r);
        uint8_t type = rpacket_read_uint8(r);
        switch(type)
        {
            case CACHE_GET:
                p = create_get();
                break;
            case CACHE_SET:
                p = create_set();
                break;
            case CACHE_DEL:
                p = create_del();
                break;            
        }
        wpacket_t ret = p->execute(gtb,r,coro_id);
        if(NULL != ret)
            data_send(s,ret);
        destroy_protocal(&p);
    }
    
    void process_new_connection(datasocket_t s)
    {
        printf("w:%u,r:%u,b:%u\n",wpacket_count,rpacket_count,buf_count);
    }
    
    void process_connection_disconnect(datasocket_t s,int32_t reason)
    {
        release_datasocket(&s);
        printf("w:%u,r:%u,b:%u\n",wpacket_count,rpacket_count,buf_count);
    }
    
    void process_send_block(datasocket_t s)
    {
        //发送阻塞,直接关闭
        close_datasocket(s);
    }
    
    
    const char *ip;
    uint32_t port;
    int main(int argc,char **argv)
    {
        init_net_service();
        ip = argv[1];
        port = atoi(argv[2]);
        netservice_t n = create_net_service(1);
        gtb = global_table_create(65536);
        
        int32_t i = 0;
        char key[64];
        for( ; i < 1000000; ++i)
        {
            basetype_t a = basetype_create_int32(i);
            snprintf(key,64,"test%d",i);
            a = global_table_insert(gtb,key,a,global_hash(key));
            if(!a)
                printf("error 1\n");
            basetype_release(&a);        
        }
        
        net_add_listener(n,ip,port);
        msg_loop_t m = create_msg_loop(server_process_packet,process_new_connection,process_connection_disconnect,process_send_block);
        while(1)
        {
            msg_loop_once(m,n,100);    
        }
    
        return 0;
    }

    前端的网络模块使用了在上一篇中介绍的网络框架,启动时先插入100W条32位整型的记录,然后进入消息循环,不断的处理从客户端发过来的操作请求.

    目前只添加了三个协议,分别是获取:CACHE_GET;添加/修改:CACHE_SET;删除:CACHE_DEL.

    服务器处理协议并将结果返回给客户端.

    然后是测试客户端:

    #include "db_protocal.h"
    #include "dbtype.h"
    #include <stdio.h>
    #include "SocketWrapper.h"
    #include "SysTime.h"
    #include "KendyNet.h"
    #include "Connector.h"
    #include "Connection.h"
    #include "common_define.h"
    #include "netservice.h"
    #include "msg_loop.h"
    #include "co_sche.h"
    
    sche_t g_sche = NULL;
    uint32_t call_count = 0;
    
    atomic_32_t wpacket_count = 0;
    atomic_32_t rpacket_count = 0;
    atomic_32_t buf_count = 0;
    datasocket_t db_s;
    
    int8_t test_select(const char *key,int32_t i)
    {
        coro_t co = get_current_coro();
        wpacket_t wpk = get_wpacket(64);
        wpacket_write_uint32(wpk,(int32_t)co);
        wpacket_write_uint8(wpk,CACHE_GET);//ÉèÖÃ
        wpacket_write_string(wpk,key);
        data_send(db_s,wpk);
        coro_block(co);
        int8_t ret = rpacket_read_uint8(co->rpc_response);
        rpacket_read_uint8(co->rpc_response);
        int32_t val = rpacket_read_uint32(co->rpc_response);
        if(val != i)
            printf("error\n");
        //printf("begin\n");
        rpacket_destroy(&co->rpc_response);
        //printf("end\n");
        return ret;
    }
    
    void *test_coro_fun2(void *arg)
    {
        coro_t co = get_current_coro();
        while(1)
        {
            char key[64];
            int32_t i = rand()%1000000;
            snprintf(key,64,"test%d",100);        
            if(0 == test_select(key,100))
                ++call_count;
        }
    }
    
    
    void server_process_packet(datasocket_t s,rpacket_t r)
    {
        coro_t co = (coro_t)rpacket_read_uint32(r);
        co->rpc_response = rpacket_create_by_rpacket(r);
        coro_wakeup(co);
    }
    
    void process_new_connection(datasocket_t s)
    {
        printf("connect server\n");
        db_s = s;
        g_sche = sche_create(20000,65536,NULL,NULL);
        int i = 0;
        for(; i < 20000; ++i)
        {
            sche_spawn(g_sche,test_coro_fun2,NULL);
        }
    }
    
    void process_connection_disconnect(datasocket_t s,int32_t reason)
    {
        release_datasocket(&s);
    }
    
    void process_send_block(datasocket_t s)
    {
        //·¢ËÍ×èÈû,Ö±½Ó¹Ø±Õ
        close_datasocket(s);
    }
    
    int main(int argc,char **argv)
    {
        init_net_service();
        const char *ip = argv[1];
        uint32_t port = atoi(argv[2]);
        netservice_t n = create_net_service(1);
        net_connect(n,ip,port);
        msg_loop_t m = create_msg_loop(server_process_packet,process_new_connection,process_connection_disconnect,process_send_block);
        uint32_t tick = GetSystemMs();
        while(1)
        {
            msg_loop_once(m,n,1);
            uint32_t now = GetSystemMs();
            if(now - tick > 1000)
            {
                printf("call_count:%u\n",(call_count*1000)/(now-tick));
                tick = now;
                call_count = 0;
            }
            if(g_sche)
                sche_schedule(g_sche);            
        }
        return 0;
    }

    操作接口使用用户级线程实现,以支持同步调用接口,用户级线程发出请求后就阻塞自己,直到结果返回时才被唤醒:

    关键部分在test_select,把自己的coro地址作为id打包到协议中,发往服务器,然后调用coro_block阻塞。服务器返回的数据包

    中也带了对应的coro_id,以通知客户端的调度系统该唤醒哪个coro.coro被唤醒后从结果包中读取操作结果和数据,返回给上层调用者.

    从测试结果来看,启动1W个coro的客户端,每秒平均能执行50W次的操作。对于一个万人在线的MMORPG游戏来说应该已经是够用的了。

    如果还是不够,可以通过表空间的划分,启动多个内存数据库进程来服务请求。

     项目地址:https://github.com/sniperHW/kendylib/tree/master/dbcache

  • 相关阅读:
    Java实现多线程的四种实现方式
    电梯调度算法[转]
    带黑洞的随机游走问题
    深度学习印象
    使用jupyterthemes插件定制jupyter notebook界面
    tf.gfile
    中国象棋残局库构建[抄]
    Android(Linux)线路规程的使用
    Remote Displayer for Android V1.2
    Android开发资源汇总
  • 原文地址:https://www.cnblogs.com/sniperHW/p/2759912.html
Copyright © 2011-2022 走看看