zoukankan      html  css  js  c++  java
  • 进程间通信

    v2-3b32471e938dae81bfd4d93bf9add3d7_1200x500

    在本系统中发现了两个BUG:两个BUG · Issue #3 · bajdcc/MiniOS,限于自己水平比较渣,没法解决这两个BUG,那么OS系列就告一段落了,纵观整个过程,还是对IPC机制的理解帮助比较大,这种思想可以运用于实践中,如管道、套接字、流的实现。

    ---------------------------------------------------------------------------

    有童鞋问如何从零开始学习代码,我写了点总结:

    首先要了解操作系统的知识,从零开始指的是代码从零开始,所以相关知识的基础也是不可或缺的。要了解系统的启动过程,Zyw_OS启动流程跟实现~未完待续 - 知乎专栏 。从启动过程来分析代码,首先是BIOS。

    (1)然后是boot文件夹下的三个asm文件,它们完成读软盘、设置GDT、加载内核数据至0x100000,并跳转到0x100000处执行内核代码

    (2)随后根据main.c中初始化的顺序学习:(I)vga显示输出、(II)gdt分段管理、(III)idt中断向量设置、(IV)isr中断处理程序、(V)pmm获取可用内存并按4KB记录并保存至栈中,完成物理页框的申请与释放、(VI)vmm分页管理、(VII)sys初始化系统调用函数、(VIII)proc多进程管理,进程切换。

    (3)从零开始添加功能,如先写好boot.asm,能够开机并用中断打印一个A,没问题之后,再设置gdt,vga以及中断管理,这时运行一个int 3看看有没有打印结果出来;这样每次写的代码都能够独立运行。如果是把所有代码下载下来运行的话,效果很不错,但是会不知道从读起。现在要做的就是精简代码,只保留核心功能,摒弃一切杂念。

    (4)那么写代码过程中有几个情况要处理:(I)不明所以,这时可以看注释或是上网查资料(II)这里代码是不是有问题?先放着不管(III)运行出错了,那就debug吧。其中debug的时间比较久,我做到IPC这一部分起码编程时间100小时以上,不包括日常吃饭时想的时间,所以不要慌,操作系统代码需要细嚼慢咽,急也急不得。debug可能用时比较久,这时比较纠结、想放弃、头脑混乱,因为好多bug真不知道哪冒出来的,用qemu源码级调试好处也有限。其实这一关过不了,对操作系统的理解水平也就触到天花板了,也就是只是理解了书上的思想,而没有将代码和思想结合起来。写代码、山寨别人的代码、东拼西凑,不管用什么方式,只要把bug除了就皆大欢喜。

    (5)这样做有好处:代码、思路,所有的细节全部load到了脑子里,要啥有啥,也就是真正理解了内核,可以举一反三,并自己更改代码,添加功能

    (6)要有毅力、有恒心,能吃苦,成功不是一蹴而就,别看我写的代码运行效果挺好,我起码debug了100h以上,每天打底调试6小时,最后才能bug弄好。真正自己花时间花工夫写的代码,才会长久的留自己的脑海里。

    --------------------------------------------------------------------

    写在前面

    Release:bajdcc/MiniOS

    总结下当前的进度:

    1. 引导、GDT、中断、虚页:花了两三天
    2. 多进程:花了一周
    3. 进程间通信:花了两三天

    90%的时间花在了debug上,除完所有bug,已经对整个实现机制了如指掌。所以,debug的过程也是一个学习的过程,虽然效率不高,但是效果好(就是单词抄写百遍的效果啦)。

    90%操作系统的内容 = 枯燥乏味,而10%(余下的内容) = 精妙绝伦

    目前感受设计精妙的地方:

    1. 进程切换机制(时钟分派),进程的父子关系,进程的状态。代表:fork,wait,sleep,exit,kill,如wait和exit的搭配为例,进程的销毁是惰性的(销毁操作集中于父进程中的wait)。
    2. 进程间通信机制。通信分异步和同步,那么这里实现的是同步IPC,比较简单,用不着写队列。这是微内核的基础,它将某些同类别的系统调用转化为唯一一个系统调用0x80。如何区分不同功能的调用呢?就是在调用int 0x80前将参数入栈,参数有通信方式(SEND/RECEIVE),通信对象(正数表示pid,-1表示任意对象),消息结构

    这些精妙的地方只能通过代码去体会。

    用户代码

    static void halt() {
        while (1);
    }
    
    static int delay() {
        volatile int i;
    
        for (i = 0; i < 0x1000000; i++);
        return i;
    }
    
    void user_main() {
    
        int i;
    
        i = call(SYS_FORK);
    
        delay();
        if (i != 0) {
            sys_tasks0();
            delay();
            call(SYS_WAIT);
        } else {
            while (1) {
                delay();
                i = sys_ticks();
                delay();
                printk("!! proc#%d received tick '%d'
    ", proc2pid(proc), i);
                delay();
                delay();
                delay();
                delay();
            }
        }
    
        printk("halt...
    ");
    
        halt();
    }
    
    void sys_tasks0() {
        extern uint32_t tick;
    
        MESSAGE msg;
        while (1) {
            send_recv(RECEIVE, TASK_ANY, &msg);
            int src = msg.source;
            switch (msg.type) {
            case SYS_TICKS:
                msg.RETVAL = tick;
                printk("!! proc #%d sent tick '%d'
    ", proc2pid(proc), tick);
                send_recv(SEND, src, &msg);
                break;
            default:
                assert(!"unknown msg type");
                break;
            }
        }
    }
    
    static int sys_ipc_call(int type) {
        MESSAGE msg;
        reset_msg(&msg);
        msg.type = type;
        send_recv(BOTH, TASK_SYS, &msg);
        return msg.RETVAL;
    }
    
    int sys_ticks() {
        return sys_ipc_call(SYS_TICKS);
    }
    

    进程的结构

    说到IPC,可能会想到pipe、clipboard、windows message、socket、shared memory、file等方式,然而没法实现那么多(括弧笑,所以跟着书上走吧~

    目前只是抄了书上的代码(希望尽快看到结果),还没时间去分析IPC机制的代码。

    首先,我们创建的进程proc的结构有:

    struct proc {
        /* KERNEL */
        struct interrupt_frame *fi;     // 中断现场
        volatile uint8_t pid;           // 进程ID
        uint32_t size;                  // 用户空间大小
        uint8_t state;                  // 进程状态
        char name[PN_MAX_LEN];          // 进程名称
        pde_t *pgdir;                   // 虚页目录(一级页表)
        char *stack;                    // 进程内核堆栈
        struct proc *parent;            // 父进程
        int8_t ticks;                   // 时间片
        int8_t priority;                // 优先级
        /* IPC */
        int p_flags;                    // 标识
        MESSAGE *p_msg;                 // 消息
        int p_recvfrom;                 // 接收消息的进程ID
        int p_sendto;                   // 发送消息的进程ID
        int has_int_msg;                // nonzero if an INTERRUPT occurred when the task is not ready to deal with it.
        struct proc *q_sending;         // queue of procs sending messages to this proc
        struct proc *next_sending;      // next proc in the sending queue (q_sending)
    };
    

    结构很复杂吧?不过,如果是一步步实现功能,往里添加的话,其实也不算多。

    PCB结构:

    1. 进程相关信息:名称,ID,状态,父进程
    2. 调度信息:中断现场,时间片,优先级
    3. 内存信息:代码空间大小,页表,内核堆栈
    4. IPC:消息收发状态flags,消息msg,收发进程ID,消息队列(链表)

    进程的切换:

    1. 主进程死循环,通过时钟中断,进行调度
    2. 中断时保存现场(即proc->fi),如果是最外层中断,那么起调度作用(此时k_reenter=0),内层中断k_reenter>0,中断结束后iret返回,从proc->fi中恢复现场,此时修改相应特权级

    微内核架构

    原版的linux中有一堆的系统调用,那么微内核架构与此不同,它将系统调用按功能划分开来,如分成内存管理、文件管理等,建立专门的系统级进程来负责系统调用。

    那么,也就是 “ring1级系统服务进程” 与 系统 打交道(通过系统调用),而我们的“ring3级用户进程” 只要与 “ring1级系统服务进程” 通信就可以了。结论:ring3用户级 <=> ring1服务级 <=> ring0系统级,ring1就像中介一样,而ring0与ring3可以素不相识。这样,微内核架构(相当于微服务)抽象出一个服务层sys_task,降低了耦合度。

    • ring1与ring0打交道:通过系统调用即可
    • ring1与ring3打交道:维护一个消息等待队列

    进程间通信

    主要分两个函数msg_send和msg_receive。

    收/发消息有几种情况:

    1. 系统服务监听消息:没消息时休眠,来消息时唤醒
    2. 系统服务发送消息:仅当系统服务收到用户的SEND消息后,被唤醒,随后用户再发送RECV消息,系统服务收到后设置msg
    3. 用户进程发送消息:系统服务不可用,用户进程堵塞,直到系统服务处理完其他任务,从等待队列中取出用户进程,并唤醒用户进程
    4. 用户进程接收消息:当用户进程发送SEND消息收到回应后,会再发送一个RECV消息,等待系统服务响应并设置msg,最后用户进程拿到设置后的msg

    这里的操作挺像TCP的握手操作的,归纳起来的同步通信模型

    1. 系统服务:监听消息(就像web服务器一样),单线程: while(1){recv(), send()}
    2. 用户进程:像web客户端一样,单线程:{send() recv()}

    消息队列

    统一调用接口:

    int send_recv(int function, int src_dest, MESSAGE* msg)
    {
        int ret = 0, caller;
    
        caller = proc2pid(proc);
    
        if (function == RECEIVE)
            memset(msg, 0, sizeof(MESSAGE));
    
        switch (function) {
        case BOTH: // 先发送再接收
            ret = _sendrec(SEND, src_dest, msg, caller);
            if (ret == 0)
                ret = _sendrec(RECEIVE, src_dest, msg, caller);
            break;
        case SEND:
        case RECEIVE:
            ret = _sendrec(function, src_dest, msg, caller);
            break;
        default:
            assert((function == BOTH) ||
                   (function == SEND) || (function == RECEIVE));
            break;
        }
    
        return ret;
    }
    

    对于系统服务service:

    1. send_recv(RECV, TASK_ANY, msg) 监听消息
    2. 处理msg
    3. send_recv(RECV, msg.source, msg) 发送消息给客户端

    对于客户端程序client:

    1. 初始化msg
    2. send_recv(SEND, SYSTASK_ID, msg) 发送消息给系统服务
    3. send_recv(RECV, SYSTASK_ID, msg) 堵塞并接收消息
    4. 处理msg

    一、发送消息

    int msg_send(struct proc* current, int dest, MESSAGE* m)
    {
        struct proc* sender = current;
        struct proc* p_dest = npid(dest); /* proc dest */
    
        /* check for deadlock here */
        if (deadlock(proc2pid(sender), dest)) {
            printk("DEADLOCK! %d --> %d
    ", sender->pid, p_dest->pid);
            assert(!"DEADLOCK");
        }
    
        if ((p_dest->p_flags & RECEIVING) && /* dest is waiting for the msg */
            (p_dest->p_recvfrom == proc2pid(sender) ||
             p_dest->p_recvfrom == TASK_ANY)) {
    
            memcpy(va2la(dest, p_dest->p_msg),
                  va2la(proc2pid(sender), m),
                  sizeof(MESSAGE));
    
            p_dest->p_msg = 0;
            p_dest->p_flags &= ~RECEIVING; /* dest has received the msg */
            p_dest->p_recvfrom = TASK_NONE;
            unblock(p_dest);
        }
        else { /* dest is not waiting for the msg */
            sender->p_flags |= SENDING;
            sender->p_sendto = dest;
            sender->p_msg = m;
    
            /* append to the sending queue */
            struct proc * p;
            if (p_dest->q_sending) {
                p = p_dest->q_sending;
                while (p->next_sending)
                    p = p->next_sending;
                p->next_sending = sender;
            }
            else {
                p_dest->q_sending = sender;
            }
            sender->next_sending = 0;
    
            block(sender);
        }
    
        return 0;
    }
    

    解释:

    1. 判断是否死锁,即A->send->B,同时B->send->A
    2. 若对方正在监听消息,则将msg拷贝到对方的p_msg中,并消除对方的监听与堵塞状态
    3. 若对方不在监听消息(可能在处理其他事务),则将发送方PCB指针插入到对方的q_sending队列中,并将发送方堵塞以等待接收方的回应

    二、接收消息

    int msg_receive(struct proc* current, int src, MESSAGE* m)
    {
        struct proc* p_who_wanna_recv = current;
        struct proc* p_from = 0; /* from which the message will be fetched */
        struct proc* prev = 0;
        int copyok = 0;
    
        if ((p_who_wanna_recv->has_int_msg) &&
            ((src == TASK_ANY) || (src == INTERRUPT))) {
            /* There is an interrupt needs p_who_wanna_recv's handling and
             * p_who_wanna_recv is ready to handle it.
             */
    
            MESSAGE msg;
            reset_msg(&msg);
            msg.source = INTERRUPT;
            msg.type = HARD_INT;
            assert(m);
            memcpy(va2la(proc2pid(p_who_wanna_recv), m), &msg,
                  sizeof(MESSAGE));
    
            p_who_wanna_recv->has_int_msg = 0;
    
            return 0;
        }
    
    
        /* Arrives here if no interrupt for p_who_wanna_recv. */
        if (src == TASK_ANY) {
            /* p_who_wanna_recv is ready to receive messages from
             * TASK_ANY proc, we'll check the sending queue and pick the
             * first proc in it.
             */
            if (p_who_wanna_recv->q_sending) {
                p_from = p_who_wanna_recv->q_sending;
                copyok = 1;
            }
        }
        else {
            /* p_who_wanna_recv wants to receive a message from
             * a certain proc: src.
             */
            p_from = npid(src);
    
            if ((p_from->p_flags & SENDING) &&
                (p_from->p_sendto == proc2pid(p_who_wanna_recv))) {
                /* Perfect, src is sending a message to
                 * p_who_wanna_recv.
                 */
                copyok = 1;
    
                struct proc* p = p_who_wanna_recv->q_sending;
                while (p) {
                    assert(p_from->p_flags & SENDING);
                    if (proc2pid(p) == proc2pid(npid(src))) { /* if p is the one */
                        p_from = p;
                        break;
                    }
                    prev = p;
                    p = p->next_sending;
                }
    
            }
        }
    
        if (copyok) {
            /* It's determined from which proc the message will
             * be copied. Note that this proc must have been
             * waiting for this moment in the queue, so we should
             * remove it from the queue.
             */
            if (p_from == p_who_wanna_recv->q_sending) { /* the 1st one */
                assert(prev == 0);
                p_who_wanna_recv->q_sending = p_from->next_sending;
                p_from->next_sending = 0;
            }
            else {
                prev->next_sending = p_from->next_sending;
                p_from->next_sending = 0;
            }
    
            /* copy the message */
            memcpy(va2la(proc2pid(p_who_wanna_recv), m),
                  va2la(proc2pid(p_from), p_from->p_msg),
                  sizeof(MESSAGE));
    
            p_from->p_msg = 0;
            p_from->p_sendto = TASK_NONE;
            p_from->p_flags &= ~SENDING;
            unblock(p_from);
        }
        else {  /* nobody's sending TASK_ANY msg */
            /* Set p_flags so that p_who_wanna_recv will not
             * be scheduled until it is unblocked.
             */
            p_who_wanna_recv->p_flags |= RECEIVING;
    
            p_who_wanna_recv->p_msg = m;
    
            if (src == TASK_ANY)
                p_who_wanna_recv->p_recvfrom = TASK_ANY;
            else
                p_who_wanna_recv->p_recvfrom = proc2pid(p_from);
    
            block(p_who_wanna_recv);
    
        }
    
        return 0;
    }
    

    解释:

    1. 若接收方发生中断,则处理中断,函数立即返回
    2. 若接收方可以接收一切消息TASK_ANY,那么此时判断q_sending发送队列中是否有消息,是的话,则从队列中取消息,清除发送方的SENDING状态;如果此时q_sending中没有消息,则接收方堵塞,置RECEIVING状态
    3. 若接收方只接收某一种消息,则当消息不匹配时,接收方堵塞;若消息匹配,进行第2步中的取消息操作

    死锁的简单判断:

    由于q_sending队列表示等待队列,只要遍历它,看是否可以遍历到当前进程本身即可。

    堵塞的简单实现:

    堵塞意味着要暂停当前进程并切换到其他进程,然而本系统的实现有限,只能强行触发时钟中断进行进程切换,由此可能导致BUG。

    阶段性总结

    如果说debug是负反馈,那么proc和ipc的实现就是大大的正反馈,先前用java实现了解释器并构建操作系统(bajdcc/jMiniLang),提供lambda、coroutine、multi-process等机制,但效率极低,求个一百内素数都要半天,还是没法完成做一个操作系统的愿望。本来用C/C++/ java/C# 也造了好多好多轮子,那么这次实现操作系统只用到了ASM和C,但是!!!难度非同小可!因为:资料贫乏、机制复杂、陷阱众多、难以调试、理解困难等等……但我没有放弃!!但看来IPC运行良好没有panic的时候,我的内心是非常喜悦的!这大概就是编程的美吧!

    https://zhuanlan.zhihu.com/p/26054925备份。

  • 相关阅读:
    写日志
    读写excel
    python安装模块
    数据库
    日志和关键字查找
    时间戳
    os 模块
    图-最小生成树算法之Kruskal及其Java实现
    图-图的表示、搜索算法及其Java实现
    前端实现list排序
  • 原文地址:https://www.cnblogs.com/bajdcc/p/8972965.html
Copyright © 2011-2022 走看看