zoukankan      html  css  js  c++  java
  • ucontext实现的用户级多线程框架3(实现echo服务器)

    前面一篇文章实现了一个抢先式的用户级多线程框架,现在用那个框架编写一个echo服务,

    因为只是个实验,所以代码写得比较杂乱,还有很多可能出错的情况也没有处理,这些在今后的进一

    步研究中都会慢慢修改,下面是代码:

    uthread.h

    /*
    * brief: 用ucontext实现的用户级线程框架
    * author: kenny huang
    * date: 2009/10/13
    * email: huangweilook@21cn.com
    */
    #ifndef _UTHREAD_H
    #define _UTHREAD_H
    #include <ucontext.h>
    #include <stdio.h>
    #include <string.h>
    #include <list>
    #include <pthread.h>
    #include <signal.h>
    #include "socketwrapper.h"

    #define MAX_UTHREAD 128

    void int_signal_handler(int sig);
    //用户态线程的当前状态
    enum thread_status
    {
    ACTIVED = 0,//可运行的
    BLOCKED,//被阻塞
    SLEEP,//主动休眠
    DIE,//死死亡
    };

    typedef int (*uthread_func)(void*);

    class Scheduler;
    class u_thread;

    typedef struct
    {
    int index;//在Scheduler::threads中的下标
    u_thread *p_uthread;
    ucontext_t *p_context;
    }uthread_id;

    /*
    * 用户态线程
    */
    class u_thread
    {
    friend class Scheduler;
    private:
    u_thread(unsigned int ssize,int index,uthread_id parent)
    :ssize(ssize),_status(BLOCKED),parent_context(parent.p_context)
    {
    stack = new char[ssize];
    ucontext.uc_stack.ss_sp = stack;
    ucontext.uc_stack.ss_size = ssize;
    getcontext(&ucontext);
    uid.index = index;
    uid.p_uthread = this;
    uid.p_context = &ucontext;

    }
    ~u_thread()
    {
    delete []stack;
    }
    static void star_routine(int uthread,int func,int arg);

    public:
    ucontext_t* GetParentContext()
    {
    return parent_context;
    }
    ucontext_t *GetContext()
    {
    return &ucontext;
    }
    void SetStatus(thread_status _status)
    {
    this->_status = _status;
    }
    thread_status GetStatus()
    {
    return _status;
    }
    uthread_id GetUid()
    {
    return uid;
    }

    private:
    ucontext_t ucontext;
    ucontext_t *parent_context;//父亲的context
    char *stack;//coroutine使用的栈
    unsigned int ssize;//栈的大小
    thread_status _status;
    uthread_id uid;
    };

    void BeginRun();
    bool cSpawn(uthread_func func,void *arg,unsigned int stacksize);
    int cRecv(int sock,char *buf,int len);
    int cSend(int sock,char *buf,int len);
    int cListen(int sock);
    void cSleep(int t);

    class beat;

    /*
    * 任务调度器
    */
    class Scheduler
    {
    friend class beat;
    friend class u_thread;
    friend void BeginRun();
    friend bool cSpawn(uthread_func func,void *arg,unsigned int stacksize);
    friend int cRecv(int sock,char *buf,int len);
    friend int cSend(int sock,char *buf,int len);
    friend int cListen(int sock);
    friend void cSleep(int t);
    public:
    static void scheduler_init();
    static void int_sig();



    private:

    //休眠time时间
    static void sleep(int t);
    static void check_Network();
    static void schedule();
    static bool spawn(uthread_func func,void *arg,unsigned int stacksize);
    static int recv(int sock,char *buf,int len);
    static int send(int sock,char *buf,int len);
    static int listen(int sock);

    private:
    static std::list<u_thread*> activeList;//可运行uthread列表

    static std::list<std::pair<u_thread*,time_t> > sleepList;//正在睡眠uthread列表
    static volatile bool block_signal;
    static char stack[4096];

    static ucontext_t ucontext;//Scheduler的context

    static uthread_id uid_current;//当前正获得运行权的context

    static uthread_id uid_self;

    static u_thread *threads[MAX_UTHREAD];
    static int total_count;
    static int epollfd;
    const static int maxsize = 10;
    };
    /*心跳发射器,发射器必须运行在一个独立的线程中,以固定的间隔
    * 往所有运行着coroutine的线程发送中断信号
    */
    class beat
    {
    public:
    beat(unsigned int interval):interval(interval)
    {}
    void setThread(pthread_t id)
    {
    thread_scheduler = id;
    }
    void loop()
    {
    while(true)
    {
    //每隔固定时间向所有线程发中断信号
    ::usleep(1000 * interval);
    while(1)
    {
    if(!Scheduler::block_signal)
    {
    pthread_kill(thread_scheduler,SIGUSR1);
    break;
    }
    }

    }
    }
    private:
    unsigned int interval;//发送中断的间隔(豪秒)
    pthread_t thread_scheduler;
    };


    bool initcoroutine(unsigned int interval);


    #endif

    uthread.cpp

    #include "uthread.h"
    #include <assert.h>
    #include <stdlib.h>
    #include <time.h>
    #include <netinet/in.h>
    #include <sys/socket.h>
    #include <sys/time.h>
    #include <netdb.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include <sys/epoll.h>
    #include "thread.h"

    ucontext_t Scheduler::ucontext;

    char Scheduler::stack[4096];

    uthread_id Scheduler::uid_current;

    uthread_id Scheduler::uid_self;

    u_thread *Scheduler::threads[MAX_UTHREAD];

    int Scheduler::total_count = 0;

    int Scheduler::epollfd = 0;

    volatile bool Scheduler::block_signal = true;

    std::list<u_thread*> Scheduler::activeList;

    std::list<std::pair<u_thread*,time_t> > Scheduler::sleepList;

    struct sock_struct
    {
    int sockfd;
    u_thread *uThread;
    };

    void int_signal_handler(int sig)
    {
    Scheduler::int_sig();
    }

    void u_thread::star_routine(int uthread,int func,int arg)
    {
    u_thread *self_uthread = (u_thread *)uthread;
    assert(self_uthread);

    self_uthread->SetStatus(ACTIVED);
    ucontext_t *self_context = self_uthread->GetContext();
    swapcontext(self_context,self_uthread->GetParentContext());
    Scheduler::block_signal = false;

    uthread_func _func = (uthread_func)func;
    void *_arg = (void*)arg;
    int ret = _func(_arg);
    self_uthread->SetStatus(DIE);
    }
    void Scheduler::scheduler_init()
    {
    for(int i = 0; i < MAX_UTHREAD; ++i)
    threads[i] = 0;
    getcontext(&ucontext);
    ucontext.uc_stack.ss_sp = stack;
    ucontext.uc_stack.ss_size = sizeof(stack);
    ucontext.uc_link = NULL;

    //scheduler占用下标0
    makecontext(&ucontext,schedule, 0);

    uid_self.index = 0;
    uid_self.p_uthread = 0;
    uid_self.p_context = &ucontext;
    uid_current = uid_self;

    }
    int Scheduler::listen(int sock)
    {
    u_thread *self_thread = uid_current.p_uthread;

    epoll_event ev;
    sock_struct *ss = new sock_struct;
    ss->uThread = self_thread;
    ss->sockfd = sock;
    ev.data.ptr = (void*)ss;
    ev.events = EPOLLIN;
    int ret;
    TEMP_FAILURE_RETRY(ret = epoll_ctl(epollfd,EPOLL_CTL_ADD,sock,&ev));
    if(ret != 0)
    {
    return -1;
    }

    self_thread->SetStatus(BLOCKED);
    Scheduler::block_signal = true;
    swapcontext(uid_current.p_context,&Scheduler::ucontext);
    Scheduler::block_signal = false;
    return 1;
    }

    int Scheduler::recv(int sock,char *buf,int len)
    {
    if(!buf || !(len > 0))
    return -1;
    u_thread *self_thread = uid_current.p_uthread;
    sock_struct *ss = new sock_struct;
    ss->uThread = self_thread;
    ss->sockfd = sock;
    epoll_event ev;
    ev.data.ptr = (void*)ss;
    ev.events = EPOLLIN;
    int ret;
    TEMP_FAILURE_RETRY(ret = epoll_ctl(epollfd,EPOLL_CTL_ADD,sock,&ev));
    if(ret != 0)
    return -1;

    self_thread->SetStatus(BLOCKED);
    Scheduler::block_signal = true;
    swapcontext(uid_current.p_context,&Scheduler::ucontext);
    printf("recv return/n");
    Scheduler::block_signal = false;
    ret = read(sock,buf,len);
    return ret;
    }
    int Scheduler::send(int sock,char *buf,int len)
    {
    if(!buf || !(len > 0))
    return -1;

    u_thread *self_thread = uid_current.p_uthread;

    sock_struct *ss = new sock_struct;
    ss->uThread = self_thread;
    ss->sockfd = sock;
    epoll_event ev;
    ev.data.ptr = (void*)ss;
    ev.events = EPOLLOUT;
    int ret;
    TEMP_FAILURE_RETRY(ret = epoll_ctl(epollfd,EPOLL_CTL_ADD,sock,&ev));
    if(ret != 0)
    return -1;
    self_thread->SetStatus(BLOCKED);
    Scheduler::block_signal = true;
    swapcontext(uid_current.p_context,&Scheduler::ucontext);
    Scheduler::block_signal = false;
    ret = write(sock,buf,len);
    return ret;
    }
    void Scheduler::check_Network()
    {
    epoll_event events[maxsize];
    sock_struct *ss;
    int nfds = TEMP_FAILURE_RETRY(epoll_wait(epollfd,events,maxsize,0));
    for( int i = 0 ; i < nfds ; ++i)
    {
    //套接口可读
    if(events[i].events & EPOLLIN)
    {
    ss = (sock_struct*)events[i].data.ptr;
    printf("a sock can read/n");
    ss->uThread->SetStatus(ACTIVED);
    epoll_event ev;
    ev.data.fd = ss->sockfd;
    if(0 != TEMP_FAILURE_RETRY(epoll_ctl(epollfd,EPOLL_CTL_DEL,ev.data.fd,&ev)))
    {
    printf("error here/n");
    exit(0);
    }
    delete ss;
    continue;
    }
    //套接口可写
    if(events[i].events & EPOLLOUT)
    {
    ss = (sock_struct*)events[i].data.ptr;
    printf("a sock can write/n");
    ss->uThread->SetStatus(ACTIVED);
    epoll_event ev;
    ev.data.fd = ss->sockfd;
    TEMP_FAILURE_RETRY(epoll_ctl(epollfd,EPOLL_CTL_DEL,ev.data.fd,&ev));
    delete ss;
    continue;
    }
    }
    }
    void Scheduler::schedule()
    {
    epollfd = TEMP_FAILURE_RETRY(epoll_create(maxsize));

    if(epollfd<= 0)
    {
    printf("epoll init error/n");
    return;
    }

    while(total_count > 0)
    {

    //首先执行active列表中的uthread
    std::list<u_thread*>::iterator it = activeList.begin();
    std::list<u_thread*>::iterator end = activeList.end();
    for( ; it != end; ++it)
    {
    if(*it && (*it)->GetStatus() == ACTIVED)
    {
    uid_current = (*it)->GetUid();
    swapcontext(&ucontext,uid_current.p_context);
    uid_current = uid_self;

    int index = (*it)->GetUid().index;
    if((*it)->GetStatus() == DIE)
    {
    printf("%d die/n",index);
    delete threads[index];
    threads[index] = 0;
    --total_count;
    activeList.erase(it);
    break;
    }
    else if((*it)->GetStatus() == SLEEP)
    {
    printf("%d sleep/n",index);
    activeList.erase(it);
    break;
    }
    }
    }
    //检查网络,看看是否有套接口可以操作
    check_Network();
    //看看Sleep列表中是否有uthread该醒来了
    std::list<std::pair<u_thread*,time_t> >::iterator its = sleepList.begin();
    std::list<std::pair<u_thread*,time_t> >::iterator ends = sleepList.end();
    time_t now = time(NULL);
    for( ; its != ends; ++its)
    {
    //可以醒来了
    if(now >= its->second)
    {
    u_thread *uthread = its->first;
    uthread->SetStatus(ACTIVED);
    activeList.push_back(uthread);
    sleepList.erase(its);
    break;
    }
    }
    }
    printf("scheduler end/n");
    }

    bool Scheduler::spawn(uthread_func func,void *arg,unsigned int stacksize)
    {
    printf("uthread_create/n");
    if(total_count >= MAX_UTHREAD)
    return false;
    int i = 1;
    for( ; i < MAX_UTHREAD; ++i)
    {
    if(threads[i] == 0)
    {
    threads[i] = new u_thread(stacksize,i,uid_current);
    ++total_count;
    ucontext_t *cur_context = threads[i]->GetContext();
    activeList.push_back(threads[i]);

    cur_context->uc_link = &ucontext;
    makecontext(cur_context,(void (*)())u_thread::star_routine, 3,(int)&(*threads[i]),(int)func,(int)arg);
    swapcontext(uid_current.p_context, cur_context);
    printf("return from parent/n");
    return true;
    }
    }

    return false;
    }
    void Scheduler::sleep(int t)
    {
    u_thread *self_thread = uid_current.p_uthread;

    time_t now = time(NULL);
    now += t;
    //插入到sleep列表中
    sleepList.push_back(std::make_pair(self_thread,now));

    //保存当前上下文切换回scheduler
    self_thread->SetStatus(SLEEP);

    ucontext_t *cur_context = self_thread->GetContext();
    Scheduler::block_signal = true;
    swapcontext(cur_context,&Scheduler::ucontext);
    Scheduler::block_signal = false;
    }
    void Scheduler::int_sig()
    {
    //printf("Scheduler::int_sig()%x/n",uid_current.p_context);
    Scheduler::block_signal = true;
    swapcontext(uid_current.p_context,&Scheduler::ucontext);
    Scheduler::block_signal = false;
    }

    class HeartBeat : public runnable
    {
    public:
    HeartBeat(unsigned int interval)
    {
    _beat = new beat(interval);
    _beat->setThread(pthread_self());
    }

    ~HeartBeat()
    {
    delete _beat;
    }

    bool run()
    {
    _beat->loop();
    return true;
    }

    private:
    beat *_beat;
    };

    bool initcoroutine(unsigned int interval)
    {
    //初始化信号
    struct sigaction sigusr1;
    sigusr1.sa_flags = 0;
    sigusr1.sa_handler = int_signal_handler;
    sigemptyset(&sigusr1.sa_mask);
    int status = sigaction(SIGUSR1,&sigusr1,NULL);
    if(status == -1)
    {
    printf("error sigaction/n");
    return false;
    }

    //首先初始化调度器
    Scheduler::scheduler_init();


    //启动心跳
    static HeartBeat hb(interval);
    static Thread c(&hb);
    c.start();
    return true;
    }

    void BeginRun()
    {
    Scheduler::schedule();
    }

    bool cSpawn(uthread_func func,void *arg,unsigned int stacksize)
    {
    return Scheduler::spawn(func,arg,stacksize);
    }

    int cRecv(int sock,char *buf,int len)
    {
    return Scheduler::recv(sock,buf,len);
    }

    int cSend(int sock,char *buf,int len)
    {
    return Scheduler::send(sock,buf,len);
    }

    int cListen(int sock)
    {
    return Scheduler::listen(sock);
    }

    void cSleep(int t)
    {
    return Scheduler::sleep(t);
    }

    echoserver.c

    // kcoroutine.cpp : 定义控制台应用程序的入口点。
    //
    #include "uthread.h"
    #include "thread.h"
    int port;
    int test(void *arg)
    {
    char *name = (char*)arg;
    unsigned long c = 0;
    while(1)
    {
    if(c % 10000 == 0)
    {
    printf("%d/n",c);
    cSleep(1);
    }
    ++c;
    }
    }
    int echo(void *arg)
    {
    int sock = *(int*)arg;
    while(1)
    {
    char buf[1024];
    int ret = cRecv(sock,buf,1024);
    if(ret > 0)
    {
    printf("%s/n",buf);
    ret = cSend(sock,buf,ret);
    }
    }
    }
    int listener(void *arg)
    {
    struct sockaddr_in servaddr;
    int listenfd;
    if(0 > (listenfd = Tcp_Listen("127.0.0.1",port,servaddr,5)))
    {
    printf("listen error/n");
    return 0;
    }
    while(1)
    {
    if(cListen(listenfd) > 0)
    {
    printf("a user comming/n");
    struct sockaddr_in cliaddr;
    socklen_t len;
    int sock = Accept(listenfd,(struct sockaddr*)NULL,NULL);
    if(sock >= 0)
    {
    cSpawn(echo,&sock,4096);
    }

    }
    }
    }
    int main(int argc, char **argv)
    {
    port = atoi(argv[1]);
    if(!initcoroutine(20))
    return 0;
    cSpawn(listener,0,4096);
    char name[10] = "test";
    cSpawn(test,name,4096);

    printf("create finish/n");

    //开始调度线程的运行
    BeginRun();
    return 0;
    }

    运行后会看到控制台中不断的输出1,那是runable_test在工作,

    telnet几个客户端上去,就可以看到echo的效果了,总体来看效果还不错,

    不过context的切换效率还没有测试过.





  • 相关阅读:
    蓝瓶的钙,好喝的钙——windows,我要蓝屏的
    gz文件最后四位检测
    中国Linux开源镜像站大全
    linux强制将数据写入磁盘,防止丢失内存的数据
    文件是否真的写入了磁盘?
    OpenStack之日志
    使用 Nmon 监控 Linux 的系统性能
    Android获取系统cpu信息,内存,版本,电量等信息
    Android——service重启
    Android——显示当前运行所有服务,判断服务是否运行
  • 原文地址:https://www.cnblogs.com/sniperHW/p/2429644.html
Copyright © 2011-2022 走看看