zoukankan      html  css  js  c++  java
  • ucontext实现的用户级多线程框架2(抢先式多线程)

    以前曾经写过一篇blog,使用linux ucontext实现的用户级多线程框架.那个框架实现的是协作式多线程序,也就是只有当正在执行的coroutine
    主动放弃处理器时,其它coroutine才有机会得以执行.
     
    今天用ucontext实现了一个抢先式的用户级多线程框架,其主要思想是,用一个物理线程作为中断发生器,以固定的时间间隔发送SIGUSR1信号.
    另一个物理线程运行Scheduler和用户级线程。每当这个物理线程收到信号的时候,就会将执行权切换到Scheduler,由Scheduler挑选一个
    个用户线程执行.
     
    thread.h
    // C/C++ header file
    // Author: root
    // File: /mnt/win-c/project/include/thread.h
    // Created: 16:18:04 2008-05-04
    // Modified: 08:39:43 2008-05-11
    // Brief:
    #ifndef _THREAD_H
    #define _THREAD_H
    #include <pthread.h>
    #include <utility>
    #include <list>
    #include <deque>
    #include <stdio.h>
    #include <iostream>
    #include <string.h>
    #include <time.h>
    #include <vector>
    #include <sys/time.h>
    /*
    *可执行接口
    */
    struct runnable
    {
    public:
    runnable():terminated(false){}
    virtual bool run() = 0;
    virtual void setTerminate()
    {
    printf("runnable terminated/n");
    terminated = true;
    }
    bool isTerminate()
    {
    return terminated;
    }
    void clearTerminate()
    {
    terminated = false;
    }
    protected:
    volatile bool terminated;
    };

    class Thread
    {
    public:
    Thread(runnable *_runnable,std::string name="kenny",bool _isjoinable=true)
    :name(name),_runnable(_runnable),isjoinable(_isjoinable)
    {
    }
    Thread(std::string name="kenny",bool _isjoinable = true):name(name),isjoinable(_isjoinable){}
    void setRunnable(runnable *_runnable)
    {
    this->_runnable = _runnable;
    }

    ~Thread()
    {
    /*if(_runnable)
    delete _runnable;
    _runnable = NULL;
    */
    }
    bool start();
    static void *threadFun(void* arg)
    {
    runnable *r = (runnable*)arg;
    r->run();
    }
    void join()
    {
    if(isjoinable)
    ::pthread_join(id, NULL);
    }
    bool isJoinable()
    {
    return isjoinable;
    }
    void setTerminate()
    {
    printf("thread setterminate %s/n",name.c_str());
    _runnable->setTerminate();
    }
    bool isTerminate()
    {
    return _runnable->isTerminate();
    }
    void clearTerminate()
    {
    _runnable->clearTerminate();
    }
    /**
    * /brief 使当前线程睡眠指定的时间,秒
    *
    *
    * /param sec 指定的时间,秒
    */
    static void sleep(const long sec)
    {
    ::sleep(sec);
    }
    /**
    * /brief 使当前线程睡眠指定的时间,毫秒
    *
    *
    * /param msec 指定的时间,毫秒
    */
    static void msleep(const long msec)
    {
    ::usleep(1000 * msec);
    }
    /**
    * /brief 使当前线程睡眠指定的时间,微秒
    *
    *
    * /param usec 指定的时间,微秒
    */
    static void usleep(const long usec)
    {
    ::usleep(usec);
    }

    /**
    * /brief 使当前线程睡眠指定的时间,纳秒
    *
    *
    * /param nsec 指定的时间,纳秒
    */
    static void nanosleep(const long nsec)
    {
    struct timespec req;
    req.tv_sec = nsec / 1000000000;
    req.tv_nsec = nsec;
    ::nanosleep(&req, NULL);
    }
    pthread_t GetId()
    {
    return id;
    }
    protected:
    runnable *_runnable;
    pthread_t id;
    bool isjoinable;
    std::string name;

    };
    #endif

    thread.cpp
    // C++ source file
    // Author: root
    // File: /mnt/win-c/IOCP/thread.cpp
    // Created: 16:08:50 2008-05-10
    // Modified: 08:34:42 2008-05-11
    // Brief:

    #include "thread.h"

    bool Thread::start()
    {
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    if(isjoinable)
    pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);
    else
    pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
    pthread_create(&id,&attr,threadFun,(void*)this->_runnable);
    pthread_attr_destroy(&attr);
    }
    uthread.h
    /*
    * brief: 用ucontext实现的用户级线程框架
    * author: kenny huang
    * date: 2009/10/13
    * email: huangweilook@21cn.com
    */
    #ifndef _UTHREAD_H
    #define _UTHREAD_H
    #include <ucontext.h>
    #include <stdio.h>
    #include <string.h>
    #include <list>
    #include <pthread.h>
    #include <signal.h>
    #define MAX_UTHREAD 128
    typedef int uthread_id;
    #define INVAID_ID -1
    void int_signal_handler(int sig);
    //用户态线程的当前状态
    enum thread_status
    {
    ACTIVED = 0,//可运行的
    BLOCKED,//被阻塞
    SLEEP,//主动休眠
    DIE,//死死亡
    };

    class uthread_runnable
    {
    public:
    virtual void main_routine() = 0;
    };
    class Scheduler;
    /*
    * 用户态线程
    */
    class u_thread
    {
    friend class Scheduler;
    private:
    u_thread(uthread_runnable *rable,unsigned int ssize,uthread_id uid)
    :ssize(ssize),_status(BLOCKED),rable(rable),uid(uid)
    {
    stack = new char[ssize];
    ucontext.uc_stack.ss_sp = stack;
    ucontext.uc_stack.ss_size = ssize;
    getcontext(&ucontext);
    }
    ~u_thread()
    {
    delete []stack;
    }
    static void star_routine();

    public:
    ucontext_t &GetContext()
    {
    return ucontext;
    }
    void SetStatus(thread_status _status)
    {
    this->_status = _status;
    }
    thread_status GetStatus()
    {
    return _status;
    }
    uthread_id GetUid()
    {
    return uid;
    }
    //休眠time时间
    static void sleep(uthread_id utid,int t);

    private:
    ucontext_t ucontext;
    char *stack;//coroutine使用的栈
    unsigned int ssize;//栈的大小
    thread_status _status;
    uthread_runnable *rable;
    uthread_id uid;
    };
    /*
    * 任务调度器
    */
    class Scheduler
    {
    //friend void u_thread::star_routine();
    friend class u_thread;
    public:
    static void scheduler_init();
    static void schedule();
    static uthread_id uthread_create(uthread_runnable *rable,unsigned int stacksize);
    static void int_sig();
    private:
    static u_thread *GetCurrentUThread()
    {
    if(current == -1)
    return NULL;
    return threads[current];
    }
    //休眠time时间
    static void sleep(uthread_id utid,int t);
    private:
    static std::list<u_thread*> activeList;//可运行uthread列表

    static std::list<std::pair<u_thread*,time_t> > sleepList;//正在睡眠uthread列表

    static char stack[4096];

    static ucontext_t ucontext;//Scheduler的context
    static ucontext_t* p_curcontext;//指向当前正在使用的context

    static u_thread *threads[MAX_UTHREAD];
    static int total_count;
    static int current;//在uthread创建时使用的
    };
    /*心跳发射器,发射器必须运行在一个独立的线程中,以固定的间隔
    * 往所有运行着coroutine的线程发送中断信号
    */
    class beat
    {
    public:
    beat(unsigned int interval):interval(interval)
    {}
    void addTread(pthread_t id)
    {
    allthread.push_back(id);
    }
    void loop()
    {
    while(true)
    {
    //每隔固定时间向所有线程发中断信号
    ::usleep(1000 * interval);
    std::list<pthread_t>::iterator it = allthread.begin();
    std::list<pthread_t>::iterator end = allthread.end();
    for( ; it != end ;++it)
    {
    pthread_kill(*it,SIGUSR1);
    }
    }
    }
    private:
    unsigned int interval;//发送中断的间隔(豪秒)
    std::list<pthread_t> allthread;
    };
    #endif
    uthread.cpp
    #include "uthread.h"
    #include <assert.h>
    #include <stdlib.h>
    #include <time.h>
    ucontext_t Scheduler::ucontext;
    char Scheduler::stack[4096];
    ucontext_t* Scheduler::p_curcontext = NULL;
    u_thread *Scheduler::threads[128];
    int Scheduler::total_count = 0;
    int Scheduler::current = -1;
    std::list<u_thread*> Scheduler::activeList;
    std::list<std::pair<u_thread*,time_t> > Scheduler::sleepList;
    extern sigset_t globalset;
    void block_sigusr1()
    {
    sigprocmask(SIG_BLOCK,&globalset,NULL);
    }
    void unblock_sigusr1()
    {
    sigprocmask(SIG_UNBLOCK,&globalset,NULL);
    }
    void int_signal_handler(int sig)
    {
    printf("recv int ");
    Scheduler::int_sig();
    }
    void u_thread::sleep(uthread_id utid,int t)
    {
    Scheduler::sleep(utid,t);
    }
    void u_thread::star_routine()
    {
    u_thread *current_uthread = Scheduler::GetCurrentUThread();
    assert(current_uthread);

    //回到Scheduler::uthread_create
    current_uthread->SetStatus(ACTIVED);
    ucontext_t &cur_context = current_uthread->GetContext();
    swapcontext(&cur_context,&Scheduler::ucontext);

    current_uthread->rable->main_routine();
    current_uthread->SetStatus(DIE);
    }
    void Scheduler::scheduler_init()
    {
    for(int i = 0; i < MAX_UTHREAD; ++i)
    threads[i] = 0;
    getcontext(&ucontext);
    ucontext.uc_stack.ss_sp = stack;
    ucontext.uc_stack.ss_size = sizeof(stack);
    ucontext.uc_link = NULL;
    makecontext(&ucontext,schedule, 0);
    p_curcontext = &ucontext;
    }

    void Scheduler::schedule()
    {
    while(total_count > 0)
    {

    //首先执行active列表中的uthread
    std::list<u_thread*>::iterator it = activeList.begin();
    std::list<u_thread*>::iterator end = activeList.end();
    for( ; it != end; ++it)
    {
    if(*it && (*it)->GetStatus() == ACTIVED)
    {
    ucontext_t &cur_context = (*it)->GetContext();
    p_curcontext = &cur_context;
    swapcontext(&ucontext,&cur_context);
    //回到调度器,阻塞SIGUSR1
    block_sigusr1();
    p_curcontext = &ucontext;

    uthread_id uid = (*it)->GetUid();
    if((*it)->GetStatus() == DIE)
    {
    printf("%d die/n",uid);
    delete threads[uid];
    threads[uid] = 0;
    --total_count;
    activeList.erase(it);
    break;
    }
    else if((*it)->GetStatus() == SLEEP)
    {
    printf("%d sleep/n",uid);
    activeList.erase(it);
    break;
    }
    }
    }
    //看看Sleep列表中是否有uthread该醒来了
    std::list<std::pair<u_thread*,time_t> >::iterator its = sleepList.begin();
    std::list<std::pair<u_thread*,time_t> >::iterator ends = sleepList.end();
    time_t now = time(NULL);
    for( ; its != ends; ++its)
    {
    //可以醒来了
    if(now >= its->second)
    {
    u_thread *uthread = its->first;
    uthread->SetStatus(ACTIVED);
    activeList.push_back(uthread);
    sleepList.erase(its);
    break;
    }
    }
    }

    printf("scheduler end/n");
    }
    uthread_id Scheduler::uthread_create(uthread_runnable *rable,unsigned int stacksize)
    {
    if(total_count >= MAX_UTHREAD)
    return INVAID_ID;
    int i = 0;
    for( ; i < MAX_UTHREAD; ++i)
    {
    if(threads[i] == 0)
    {
    threads[i] = new u_thread(rable,stacksize,i);
    ++total_count;
    current = i;
    ucontext_t &cur_context = threads[i]->GetContext();
    cur_context.uc_link = &ucontext;
    makecontext(&cur_context,u_thread::star_routine, 0);
    swapcontext(&ucontext, &cur_context);
    current = -1;
    activeList.push_back(threads[i]);
    return i;
    }
    }
    }
    void Scheduler::sleep(uthread_id utid,int t)
    {
    if(utid == INVAID_ID)
    return;
    assert(threads[utid]);
    time_t now = time(NULL);
    now += t;
    printf("wake up time %u/n",now);
    //插入到sleep列表中
    sleepList.push_back(std::make_pair(threads[utid],now));

    //保存当前上下文切换回scheduler
    threads[utid]->SetStatus(SLEEP);
    ucontext_t &cur_context = threads[utid]->GetContext();
    swapcontext(&cur_context,&Scheduler::ucontext);
    //回到coroutine,开启动SIGUSR1
    unblock_sigusr1();
    }

    void Scheduler::int_sig()
    {
    //收到中断信号,如果当前正在Scheduler中则不做处理
    if(p_curcontext == NULL || p_curcontext == &ucontext)
    return;
    printf("%d/n",&(*p_curcontext));
    //否则,将上下文切换回Scheduler
    swapcontext(p_curcontext,&Scheduler::ucontext);
    //回到coroutine,开启动SIGUSR1
    unblock_sigusr1();
    printf("i'm come back %d/n",&(*p_curcontext));
    }
    test.cpp
    // kcoroutine.cpp : 定义控制台应用程序的入口点。
    //
    #include "uthread.h"
    #include "thread.h"
    sigset_t globalset;
    class runable_test : public uthread_runnable
    {
    public:
    runable_test(const char *name):name(name){}
    void main_routine()
    {
    unsigned long c = 0;
    while(1)
    {
    //if(c % 10000 == 0)
    // printf("%s/n",name);
    // ++c;
    //u_thread::sleep(uid,1);
    //printf("%s wake up/n",name);
    }
    }
    const char *name;
    uthread_id uid;
    };
    class uthreadRunner : public runnable
    {
    public:
    bool run()
    {
    //初始化信号
    struct sigaction sigusr1;
    sigusr1.sa_flags = 0;
    sigusr1.sa_handler = int_signal_handler;
    sigemptyset(&sigusr1.sa_mask);
    int status = sigaction(SIGUSR1,&sigusr1,NULL);
    if(status == -1)
    {
    printf("error sigaction/n");
    return false;
    }

    //首先初始化调度器
    Scheduler::scheduler_init();


    runable_test t1("0");
    runable_test t2("1");
    runable_test t3("2");
    runable_test t4("3");

    //创建4个用户级线程
    t1.uid = Scheduler::uthread_create(&t1,4096);
    t2.uid = Scheduler::uthread_create(&t2,4096);
    t3.uid = Scheduler::uthread_create(&t3,4096);
    t4.uid = Scheduler::uthread_create(&t4,4096);

    printf("create finish/n");
    //开始调度线程的运行
    Scheduler::schedule();
    }
    };

    int main()
    {
    sigemptyset(&globalset);
    sigaddset(&globalset,SIGUSR1);

    //首先创建运行coroutine的线程
    uthreadRunner ur;
    Thread c(&ur);
    c.start();
    //创建心跳中断线程
    beat b(200);
    b.addTread(c.GetId());
    b.loop();
    return 0;
    }





  • 相关阅读:
    aaa
    记一次Vue实战总结
    Data too long for column 'xxx' at row 1MySql.Data.MySqlClient.MySqlPacket ReadPacket() 报错解决
    uni-app 监听返回按钮
    微信H5分享外部链接,缩略图不显示
    uni-app 动态控制下拉刷新
    vueX 的使用
    uni-app H5 腾讯地图无法导航
    uni-app支付功能
    hooks 与 animejs
  • 原文地址:https://www.cnblogs.com/sniperHW/p/2429642.html
Copyright © 2011-2022 走看看