zoukankan      html  css  js  c++  java
  • SRS之SrsRtmpConn::publishing详解

    1. SrsRtmpConn::publishing

    int SrsRtmpConn::publishing(SrsSource* source)
    {
        int ret = ERROR_SUCCESS;
        
        /* 在配置文件中配置了 refer_publish 配置项才会进行 refer check */
        if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_pubish(req->vhost))) 
            != ERROR_SUCCESS) {
            srs_error("check publish_refer failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("check publish_refer success.");
        
        /* 若配置文件中使能了 http_hooks 配置项,则调用该函数,否则忽略 */
        if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) {
            srs_error("http hook on_publish failed. ret=%d", ret);
            return ret;
        }
        
        /* 若配置文件中没有配置 mode 配置项,则返回 false */
        bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
        if ((ret = acquire_publish(source, vhost_is_edge)) == ERROR_SUCCESS) {
            // use isolate thread to recv,
            // @see: https://github.com/ossrs/srs/issues/237
            SrsPublishRecvThread trd(rtmp, req, 
                st_netfd_fileno(stfd), 0, this, source, 
                client_type != SrsRtmpConnFlashPublish,
                vhost_is_edge);
            
            srs_info("start to publish stream %s success", req->stream.c_str());
            /* 该函数会启动接收推流数据的线程 recv,然后当前线程会循环进入休眠,并在醒来
             * 后更新状态信息 */
            ret = do_publishing(source, &trd);
            
            // stop isolate recv thread
            trd.stop();
        }
        
        if (ret != ERROR_SYSTEM_STREAM_BUSY) {
            release_publish(source, vhost_is_edge);
        }
        
        /* whatever the acquire publish, always release publish.
         * when the acquire error in the middle-way, the publish state changed,
         * but failed, so we must cleanup it.
         * @see https://github.com/ossrs/srs/issues/474: srs_librtmp推送后返回错误码1009
         * @remark when stream is busy, should never release it. */
        http_hooks_on_unpublish();
        
        return ret;
    }
    

    2. SrsRtmpConn::acquire_publish

    int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge)
    {
        int ret = ERROR_SUCCESS;
        
        /* 检测是否可以进行 publish */
        if (!source->can_publish(is_edge)) {
            ret = ERROR_SYSTEM_STREAM_BUSY;
            srs_warn("stream %s is already publishing. ret=%d", 
                req->get_stream_url().c_str(), ret);
            return ret;
        }
        
        // when edge, ignore the publish event, directly proxy it.
        if (is_edge) {
            if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) {
                srs_error("notice edge start publish stream failed. ret=%d", ret);
                return ret;
            }
        } else {
            /* 通知可以进行 publish */
            if ((ret = source->on_publish()) != ERROR_SUCCESS) {
                srs_error("notify publish failed. ret=%d", ret);
                return ret;
            }
        }
        
        return ret;
    }
    

    2.1 SrsSource::can_publish

    bool SrsSource::can_publish(bool is_edge)
    {
        /* 当为 edge 时,检测当前 edge 的状态,只要不为 SrsEdgeStatePublish,即可进行 publish */
        if (is_edge) {
            return publish_edge->can_publish();
        }
        
        /* 默认初始化时为 true */
        return _can_publish;
    }
    

    2.2 SrsSource::on_publish

    /**
     * publish stream event notify.
     * @param _req the request from client, the source will deep copy it,
     *        for when reload the request of client maybe invalid.
     */
    int SrsSource::on_publish()
    {
        int ret = ERROR_SUCCESS;
        
        // update the request object.
        srs_assert(_req);
        
        /* 若当前线程上下文的id与_source_id不一致,表明id改变了,需要更新_source_id,
         * 并通知所有的 consumers */
        /* whatever, the publish thread is the source or edge source, 
         * save its id to source id */
        on_source_id_changed(_srs_context->get_id);
        
        /* the mix queue to correct the timestamp for mix_correct algorithm */
        /* reset the mix queue. */
        mix_queue->clear();
        
        /* detect the monotonically again. */
        is_monotonically_increase = true;
        last_packet_time = 0;
        
        /* create forwarders: 需要配置文件中配置 forward 配置项,否则忽略该函数 */
        if ((ret = create_forwarders()) != ERROR_SUCCESS) {
            srs_error("create forwarders failed. ret=%d", ret);
            return ret;
        }
        
        // TODO: FIXME: use initialize to set req.
    #ifdef SRS_AUTO_TRANSCODE
        if ((ret = encoder->on_publish(_req)) != ERROR_SUCCESS) {
            srs_error("start encoder failed. ret=%d", ret);
            return ret;
        }
    #endif
    
        // TODO: FIXME: use initialize to set req.
    #ifdef SRS_AUTO_HLS
        if ((ret = hls->on_publish(_req, false)) != ERROR_SUCCESS) {
            srs_error("start hls failed. ret=%d", ret);
            return ret;
        }
    #endif
        
        // TODO: FIXME: use initialize to set req.
    #ifdef SRS_AUTO_DVR
        if ((ret = dvr->on_publish(_req)) != ERROR_SUCCESS) {
            srs_error("start dvr failed. ret=%d", ret);
            return ret;
        }
    #endif
    
    #ifdef SRS_AUTO_HDS
        if ((ret = hds->on_publish(_req)) != ERROR_SUCCESS) {
            srs_error("start hds failed. ret=%d", ret);
            return ret;
        }
    #endif
    
        // notify the handler.
        srs_assert(handler);
        /* handler 指向子类 SrsServer,因此调用子类实现的 on_publish */
        if ((ret = handler->on_publish(this, _req)) != ERROR_SUCCESS) {
            srs_error("handle on publish failed. ret=%d", ret);
            return ret;
        }
        /* 更新 stat 中的值 */
        SrsStatistic* stat = SrsStatistic::instance();
        stat->on_stream_publish(_req, _source_id);
        
        return ret;
    }
    

    2.2.1 SrsSource::on_source_id_changed

    /**
     * source id changed
     * @id: 当前线程的 id
     */
    int SrsSource::on_source_id_changed(int id)
    {
        int ret = ERROR_SUCCESS;
        
        if (_source_id == id) {
            return ret;
        }
        
        if (_pre_source_id == -1) {
            _pre_source_id = id;
        } else if (_pre_source_id != _source_id) {
            _pre_source_id = _source_id;
        }
        
        /* source id, 
         * for publish, it's the publish client id.
         * for edge, it's the edge ingest id.
         * when source id changed, for example, the edge reconnect,
         * invoke the on_source_id_changed() to let all clients know.
         */
        _source_id = id;
        
        /* SrsConsumer: the consumer for SrsSource, that is a play client. */
        /* notice all consumer */
        std::vector<SrsConsumer*>::iterator it;
        for (it = consumers.begin(); it != consumers.end(); ++it) {
            SrsConsumer* consumer = *it;
            consumer->update_source_id();
        }
        
        retrn ret;
    }
    

    2.2.2 SrsServer::on_publish

    int SrsServer::on_publish(SrsSource* s, SrsRequest* r)
    {
        int ret = ERROR_SUCCESS;
        
        /* 暂时忽略,没有启用 */
    #ifdef SRS_AUTO_HTTP_SERVER
        if ((ret = http_server->http_mount(s, r)) != ERROR_SUCCESS) {
            return ret;
        }
    #endif
    
        return ret;
    }
    

    3. SrsPublishRecvThread 构造

    /**
     * the publish recv thread got message and callback the souorce method 
     * to process message.
     */
    SrsPublishRecvThread::SrsPublishRecvThread(
        SrsRtmpServer* rtmp_sdk, 
        SrsRequest* _req, int mr_sock_fd, int timeout_ms, 
        SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge
    ): trd(this, rtmp_sdk, timeout_ms)
    {
        rtmp = rtmp_sdk;
        
        _conn = conn;
        /* the params for conn callback. */
        _source = source;
        _is_fmle = is_fmle;
        _is_edge = is_edge;
        
        /* the recv thread error code. */
        recv_error_code = ERROR_SUCCESS;
        /* the msgs already got. */
        _nb_msgs = 0;
        /* The video frames we got. */
        video_frames = 0;
        /* the error timeout cond
         * @see https://github.com/ossrs/srs/issues/244 */
        error = st_cond_new();
        /* merged context id. */
        ncid = cid = 0;
        
        req = _req;
        mr_fd = mr_sock_fd;
        
        // the mr settings,
        // @see https://github.com/ossrs/srs/issues/241
        mr = _srs_config->get_mr_enabled(req->vhost);
        mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
        
        realtime = _srs_config->get_realtime_enabled(req->vhost);
        
        /* for reload handler to register itself,
         * when config service do the reload, callback the handler. */
        _srs_config->subscribe(this);
    }
    

    该 SrsPublishRecvThread 类中同时有一个 SrsRecvThread 类的成员 trd,因此会同时构造 SrsRecvThread 类。

    3.1 SrsRecvThread 构造函数

    /**
     * the recv thread, use message handler to handle each received message.
     */
    SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, 
        SrsRtmpServer* rtmp_sdk, int timeout_ms)
    {
        
        timeout = timeout_ms;
        /* handler 指向子类 SrsPublishRecvThread 的指针对象 */
        handler = msg_handler;
        rtmp = rtmp_sdk;
        /* 构造一个可重复使用的线程:recv */
        trd = new SrsReusableThread2("recv", this);
    }
    

    3.2 SrsReusableThread2 构造函数

    SrsReusableThread2::SrsReusableThread2(const char* n, ISrsReusableThread2Handler* h, 
        int64_t interval_us)
    {
        /* 指向子类 SrsRecvThread 的指针对象 */
        handler = h;
        /* 构造一个 SrsThread 类,该类代表一个线程 */
        pthread = new internal::SrsThread(n, this, interval_us, true);
    }
    

    4. SrsRtmpConn::do_publishing

    int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
    {
        int ret = ERROR_SUCCESS;
        
        /**
         * SrsPithyPrint:
         * the stage is used for a collection of object to do print,
         * the print time in a stage is constant and not changed,
         * that is, we always got one message to print every specified time.
         *
         * for example, stage #1 for all play clients, print time is 3s,
         * if there is 1client, it will print every 3s.
         * if there is 10clients, random select one to print every 3s.
         * Usage:
         *     SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
         *     SrsAutoFree(SrsPithyPrint, pprint);
         *     while (true) {
         *         pprint->elapse();
         *         if (pprint->can_print()) {
         *             // print pithy message.
         *             // user can get the elapse time by: pprint->age()
         *         }
         *         // read and write RTMP messages.
         *     }
         */
        SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
        SrsAutoFree(SrsPithyPrint, pprint);
        
        /* 构建并启动一个专门用于接收客户端推流数据的 recv 线程 */
        // start isolate recv thread.
        if ((ret = trd->start()) != ERROR_SUCCESS) {
            srs_error("start isolate recv thread failed. ret=%d", ret);
            return ret;
        }
        
        // change the isolate recv thread context id,
        // merge its log to current thread.
        int receive_thread_cid = trd->get_cid();
        trd->set_cid(_srs_context->get_cid());
        
        // initialize the publish timeout.
        /* 若没有配置 publish_1stpkt_timeout,则当没有收到消息时使用默认超时时间 20000ms */
        publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
        /* 若没有配置 publish_normal_timeout,则当已经收到过消息时使用默认超时时间 5000ms */
        publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost);
        
        // set the sock options.
        set_sock_options();
        
        if (true) {
            /* vhost{} 中是否启用了 mr 功能,否返回 false */
            bool mr = _srs_config->get_mr_enabled(req->vhost);
            int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
            srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d, rtcid=%d",
                      mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout, 
                      tcp_nodelay, receive_thread_cid);
        }
        
        int64_t nb_msgd = 0;
        uint64_t nb_frames = 0;
        while (!disposed) {
            /* 自动计算经过的时间 */
            pprint->elapse();
            
            // when source is set to expired, disconnect it.
            if (expired) {
                ret = ERROR_USER_DISCONNECT;
                srs_error("connection expired. ret=%d", ret);
                return ret;
            }
            
            // cond wait for timeout.
            if (nb_msgs == 0) {
                // when not got msgs, wait for a larger timeout.
                // @see https://github.com/ossrs/srs/issues/441
                trd->wait(publish_1stpkt_timeout);
            } else {
                trd->wait(publish_normal_timeout);
            }
            
            // check the thread error code.
            if ((ret = trd->error_code()) != ERROR_SUCCESS) {
                if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) 
                {
                    srs_error("recv thread failed. ret=%d", ret);
                }
                return ret;
            }
            
            // when not got any messages, timeout.
            if (trd->nb_msgs() <= nb_msgs) {
                ret = ERROR_SOCKET_TIMEOUT;
                srs_warn("publish timeout %dms, nb_msgs=%"PRId64", ret=%d",
                    nb_msgs? publish_normal_timeout : publish_1stpkt_timeout, nb_msgs, ret);
                break;
            }
            nb_msgs = trd->nb_msgs();
            
            // Update the stat for video fps.
            // @remark https://github.com/ossrs/srs/issues/851
            SrsStatistic* stat = SrsStatistic::instance();
            if ((ret = stat->on_video_frames(req, (int)(trd->nb_video_frames() - nb_frames))) 
                != ERROR_SUCCESS) {
                return ret;
            }
            nb_frames = trd->nb_video_frames();
            
            // reportable
            if (pprint->can_print()) {
                kbps->sample();
                bool mr = _srs_config->get_mr_enabled(req->vhost);
                int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
                srs_trace("<- "SRS_CONSTS_LOG_CLIENT_PUBLISH
                    " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d, "
                    "mr=%d/%d, p1stpt=%d, pnt=%d", 
                    pprint->age(),
                    kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
                    kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(),
                    mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout
                );
            }
        }
        
        return ret;
    }
    

    4.1 SrsPithyPrint::create_rtmp_publish

    为 rtmp publish 构造一个 SrsPithyPrint 类。

    SrsPithyPrint* SrsPithyPrint::create_rtmp_publish()
    {
        return new SrsPithyPrint(SRS_CONSTS_STAGE_PUBLISH_USER);
    }
    

    4.1.1 SrsPithyPrint 构造函数

    SrsPithyPrint::SrsPithyPrint(int _stage_id)
    {
        stage_id = _stage_id;
        client_id = enter_stage();
        previous_tick = srs_get_system_time_ms();
        _age = 0;
    }
    

    4.1.2 SrsPithyPrint::enter_stage

    int SrsPithyPrint::enter_stage()
    {
        SrsStageInfo* stage = NULL;
        
        std::map<int, SrsStageInfo*>::iterator it = _srs_stages.find(stage_id);
        if (it == _srs_stages.end()) {
            stage = new SrsStageInfo(stage_id);
            _srs_stages[stage_id] = stage;
        } else {
            stage = it->second;
        }
        
        srs_assert(stage != NULL);
        client_id = stage->nb_clients++;
        
        srs_verbose("enter stage, stage_id=%d, client_id=%d, nb_clients=%d, time_ms=%d",
            stage->stage_id, client_id, stage->nb_clients, stage->pithy_print_time_ms);
            
        return client_id;
    }
    

    4.2 SrsPublishRecvThread::start: 启动线程

    int SrsPublishRecvThread::start()
    {
        /* 调用 SrsRecvThread::start()  */
        int ret = trd.start();
        ncid = cid = trd.cid();
        return ret;
    }
    

    4.2.1 SrsRecvThread::start

    int SrsRecvThread::start()
    {
        /* 接着调用 SrsReusableThread2::start() */
        return trd->start();
    }
    

    4.2.2 SrsReusableThread2::start

    /* for the reusable thread, start and stop by user. */
    int SrsReusableThread2::start()
    {
        /* 调用 SrsThread::start() */
        return pthread->start();
    }
    

    4.2.3 SrsThread::start

    int SrsThread::start()
    {
        int ret = ERROR_SUCCESS;
        
        if (tid) {
            srs_info("thread %s already running.", _name);
            return ret;
        }
        
        if ((tid = st_thread_create(thread_fun, this, (joinable? 1:0), 0)) == NULL) {
            ret = ERROR_ST_CREATE_CYCLE_THREAD;
            srs_error("st_thread_create failed. ret=%d", ret);
            return ret;
        }
        
        disposed = false;
        // we set to loop to true for thread to run.
        loop = true;
        
        // wait for cid to ready, for parent thread to get the cid.
        while (_cid < 0) {
            st_usleep(10 * 1000);
        }
        
        // now, cycle thread can run.
        can_run = true;
        
        return ret;
    }
    

    以上 Thread 类之间的关系图

    image

    4.3 SrsRtmpConn::set_sock_options

    void SrsRtmpConn::set_sock_options()
    {
        /* 若配置文件中的 vhost{} 没有配置 tcp_nodelay 配置项,默认返回 false */
        bool nvalue = _srs_config->get_tcp_nodelay(req->vhost);
        if (nvalue != tcp_nodelay) {
            tcp_nodelay = nvalue;
    #ifdef SRS_PERF_TCP_NODELAY
            int fd = st_netfd_fileno(stfd);
            
            socklen_t nb_v = sizeof(int);
            
            int ov = 0;
            getsocket(fd, IPPROTO_TCP, TCP_NODELAY, &ov, &nb_v);
            
            int v = tcp_nodelay;
            // set the socket send buffer when required larger buffer
            if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &v, nb_v) < 0) {
                srs_warn("set sock TCP_NODELAY=%d failed.", v);
            }
            getsocket(fd, IPPROTO_TCP, TCP_NODELAY, &v, &nb_v);
            
            srs_trace("set TCP_NODELAY %d=>%d", ov, v);
    #else
            srs_warn("SRS_PERF_TCP_NODELAY is disabled but tcp_nodelay configed.");
    #endif
        }
    }
    

    4.4 SrsPithyPrint::elapse

    /* auto calc the elapse time */
    void SrsPithyPrint::elapse()
    {
        SrsStageInfo* stage = _srs_stages[stage_id];
        srs_assert(stage != NULL);
        
        int64_t diff = srs_get_system_time_ms() - previous_tick;
        diff = srs_max(0, diff);
        
        stage->elapse(diff);
        _age += diff;
        previous_tick = srs_get_system_time_ms();
    }
    
    void SrsStageInfo::elapse(int64_t diff)
    {
        age += diff;
    }
    

    4.5 SrsPublishRecvThread::wait

    int SrsPublishRecvThread::wait(int timeout_ms)
    {
        /* 若已经发生错误,则直接返回错误码 */
        if (recv_error_code != ERROR_SUCCESS) {
            return recv_error_code;
        }
        
        // ignore any return of cond wait.
        st_cond_timedwait(error, timeout_ms * 1000);
        
        return ERROR_SUCCESS;
    }
    

    4.5.1 st_cond_timedwait

    int st_cond_timedwait(_st_cond_t* cvar, st_utime_t timeout)
    {
        _st_thread_t *me = _ST_CURRENT_THREAD();
        int rv;
        
        if (me->flags & _ST_FL_INTERRUPT) {
            me->flags &= ~_ST_FL_INTERRUPT;
            errno = EINTR;
            return -1;
        }
        
        /* Put caller thread on the condition variable's wait queue */
        me->state = _ST_ST_COND_WAIT;
        ST_APPEND_LINK(&me->wait_links, &cvar->wait_q);
        
        /* 当超时时间不为 -1 时,将当前线程添加到 sleep 队列中 */
        if (timeout != ST_UTIME_NO_TIMEOUT) 
            _ST_ADD_SLEEPQ(me, timeout);
        
        /* 然后切换上下文环境,直到被条件变量唤醒或是超时事件到了而醒来 */
        _ST_SWITCH_CONTEXT(me);
        
        ST_REMOVE_LINK(&me->wait_links);
        rv = 0;
        
        if (me->flags & _ST_FL_TIMEDOUT) {
            me->flags &= ~_ST_FL_TIMEDOUT;
            errno = ETIME;
            rv = -1;
        }
        if (me->flags & _ST_FL_INTERRUPT) {
            me->flags &= ~_ST_FL_INTERRUPT;
            errno = EINTR;
            rv = -1;
        }
        
        return rv;
    }
    
    /* Insert an element "_e" at the end of the list "_l" */
    #define ST_APPEND_LINK(_e, _l) ST_INSERT_BEFORE(_e, _l)
    
    /* Insert element "_e" into the list, before "_l" */
    #define ST_INSERT_BEFORE(_e, _l) 
        ST_BEGIN_MACRO  
        (_e)->next = (_l); 
        (_e)->prev = (_l)->prev; 
        (-l)->prev->next = (_e); 
        (-l)->prev = (_e); 
        ST_END_MACRO
    

    4.6 SrsStatistic::on_video_frames

    /* When got videos, update the frames.
     * We only stat the total number of video frames. */
    int SrsStatistic::on_video_frames(SrsRequest* req, int nb_frames)
    {
        int ret = ERROR_SUCCESS;
        
        SrsStatisticVhost* vhost = create_vhost(req);
        SrsStatisticStream* stream = create_stream(vhost, req);
        
        stream->nb_frames += nb_frames;
        
        return ret;
    }
    

    5. SrsPublishRecvThread::stop

    当从 do_publishing 函数返回时,终止接收推流数据的 recv 线程。

    void SrsPublishRecvThread::stop()
    {
        trd.stop();
    }
    

    5.1 SrsRecvThread::stop

    void SrsRecvThread::stop()
    {
        trd->stop();
    }
    

    5.2 SrsReusableThread2::stop

    void SrsReusableThread2::stop()
    {
        pthread->stop();
    }
    

    5.3 SrsThread::stop

    void SrsThread::stop()
    {
        if (!tid) {
            return;
        }
        
        /* 设置线程循环标志,退出循环 */
        loop = false;
        
        dispose();
        
        _cid = -1;
        can_run = false;
        tid = NULL; 
    }
    

    5.4 SrsThread::dispose

    void SrsThread::dispose()
    {
        if (disposed) {
            return;
        }
        
        // the interrupt will cause the socket to read/write error,
        // which will terminate the cycle thread.
        st_thread_interrupt(tid);
        
        // when joinable, wait util quit.
        if (_joinable) {
            // wait the thread to exit.
            int ret = st_thread_join(tid, NULL);
            if (ret) {
                srs_warn("core: ignore join thread failed.");
            }
        }
        
        // wait the thread actually terminated.
        // sometimes the thread join return -1, for example,
        // when thread use st_recvfrom, the thread join return -1.
        // so here, we use a variable to ensure the thread stopped.
        // @remark event the thread not joinable, we must ensure the thread stopped when stop.
        while (!really_terminated) {
            st_usleep(10 * 1000);
            
            if (really_terminated) {
                break;
            }
            srs_warn("core: wait thread to actually terminated");
        }
        
        disposed = true;
    }
    

    5.4.1 st_thread_interrupt

    void st_thread_interrupt(_st_thread_t *thread)
    {
        /* If thread is already dead */
        if (thread->state == _ST_ST_ZOMEBIE) 
            return;
        
        /* 设置线程标志为 interrupt */
        thread->flags |= _ST_FL_INTERRUPT;
        
        /* 若线程当前正在运行或已在 run 队列中,则直接返回,在线程函数会检测
         * 线程的 flags,若发现是 interrput,则会终止线程函数的执行 */
        if (thread->state == _ST_ST_RUNNING || thread->state == _ST_ST_RUNNABLE) 
            return;
        
        /* 否则,若线程当前处于 sleep 队列中,即休眠状态,则将该线程从
         * 休眠队列中移除 */
        if (thread->flags & _ST_FL_ON_SLEEPQ) 
            _ST_DEL_SLEEPQ(thread);
        
        /* Make thread runnable */
        thread->state = _ST_ST_RUNNABLE;
        _ST_ADD_RUNQ(thread);
    }
    

    5.4.2 st_thread_join

    int st_thread_join(_st_thread_t *thread, void **retvalp)
    {
        _st_cond_t *term = thread_term;
        
        /* Can't join a non-joinable thread */
        if (term == NULL) {
            errno = EINVAL;
            return -1;
        }
        if (_ST_CURRENT_THREAD() == thread) {
            errno = EDEADLK;
            return -1;
        }
        
        /* Multiple threads can't wait on the same joinable thread */
        if (term->wait_q.next != &term->wait_q) {
            errno = EINVAL;
            return -1;
        }
        
        while (thread->state != _ST_ST_ZOMBIE) {
            if (st_cond_timedwait(term, ST_UTIME_NO_TIMEOUT) != 0) 
                return -1;
        }
        
        if (retvalp) 
            *retvalp = thread->retval;
            
        /*
         * Remove target thread from the zombie queue and make it runnable.
         * When it gets scheduled later, it will do the clean up.
         */
        thread->state = _ST_ST_RUNNABLE;
        _ST_DEL_ZOMBIEQ(thread);
        _ST_ADD_RUNQ(thread);
        
        return 0;
    }
    
  • 相关阅读:
    jmeter jsr223脚本引用变量的问题
    apk反编译
    JS时间对像
    无缝滚动
    带详情图片轮播
    飘动文字2
    飘动的文字1
    抖动图片2
    点击图片图片上下抖动
    点击div掉下和上升
  • 原文地址:https://www.cnblogs.com/jimodetiantang/p/9087775.html
Copyright © 2011-2022 走看看