zoukankan      html  css  js  c++  java
  • NodeJS Addon 多线程通信

    某个产品的Mac客户端计划基于electron实现,因为现有SDK有C API,原理上用NodeJS Addon来封装成JS API就可使用了。但涉及到与Addon多线程交互,翻找资料没能找到到底该怎么做,只好翻看NodeJS实现找到实现思路。

    实现思路上,NodeJS V8 引擎是libuv单线程的,客户端前台页面逻辑跑在libuv事件循环中,后台SDK跑在独立的线程上。业务需要前后台线程之间互相通信,前台访问后台直接API调用,然后异步回调、后台通知则需要唤醒libuv线程来完成。

    libuv 唯一线程安全的接口是 uv_async_send,  用它可以唤醒libuv线程执行指定代码。

    后台SDK -> NodeJS 调用流程:

    0. SDK 实例初试化时

       初试化一个uv_async_t: 

       uv_async_init(uv_default_loop(), &uv_async, node_event_process);

       注意必须在libuv线程线程执行,node_event_process 是被唤醒时执行的方法。

    1.  将参数临时存放在SDK实例

    2.  唤醒libuv线程

      uv_async_send(&uv_async)

      libuv 在UNIX下的实现是,uv_async_init时创建一个namepipe fd, 在epoll等待,send时就是往fd写入数据,epoll就会返回。

      要注意的是uv_async_send每次调用并不保证都会执行回调,只保证能唤醒线程,不能作为调用方式。

    3. SDK线程执行 std::condition_variable.wait() 挂起等待libuv线程执行结果

    4. libuv线程唤醒,读取SDK实例临时参数执行node_event_process调用

        通过V8接口,调用JSON库,将参数解析为V8 Object对象

        通过CallID查找事务回调的CallBack Function,或者提供bind方法绑定的Callback Function,执行调用就会返回Javascript世界开始执行

    5. 执行完成,将返回值临时放到SDK实例上

        返回v8::Local<v8::Value> 需先处理为自己的类型,传递到SDK线程读取会导致崩溃

    6. std::condition_variable.notify_one()  唤醒SDK线程继续执行

    7. SDK wait() 继续执行,读取返回值

    参考代码如下:

    class RcsSdk : public Nan::ObjectWrap {
    public:
        static NAN_MODULE_INIT(Init);
        void addCallback(int cid, Nan::Callback* callback);
        const Nan::Callback* removeCallback(int cid);
        const Nan::Callback* getBindMethod(const char* method);
        
        void notifyMainLoop();
    
    #ifdef CALL_NODE_USE_ASYNC_QUEUE
        std::queue<EventItem*> asyncEventQueue;
    #else
        void waitForEventProcess(EventItem* item);
        void notifyEventComplete();
        EventItem* syncEventItem;
        char* syncEventResult;
    #endif
        std::mutex eventSyncMutex;
        rcs_state *state;
        
    private:
        explicit RcsSdk(const char* appId, const char* number, const char* imei,
                        const char* devicevendor,const char* devicemodel,
                        const char* deviceos,const char* deviceosversion,
                        const char* clientVendor, const char* clientVersion,
                        const char* storage, const char* privateStorage, int clientType);
        ~RcsSdk();
        
        static NAN_METHOD(New);
        static NAN_METHOD(bind);
        
        static void addApiMethods(v8::Local<v8::FunctionTemplate> &tpl);
        static Nan::Persistent<v8::Function> constructor;
        
        std::condition_variable eventCondition;
        uv_async_t uv_async;
        std::map<int, const Nan::Callback*> callbacks;
        std::map<std::string, const Nan::Callback*> bindMethods;
    };
    
    
    
    
    static std::map<rcs_state*, RcsSdk*> sdkMap;
    static std::mutex globalMutex;
    
    static RcsSdk* node_get_sdk(rcs_state *R){
        RcsSdk *sdk = NULL;
        globalMutex.lock();
        std::map<rcs_state*, RcsSdk*>::iterator iterator = sdkMap.find(R);
        if(iterator != sdkMap.end()){
            sdk = iterator->second;
        }
        globalMutex.unlock();
        return sdk;
    }
    
    char* node_event_deliver(struct rcs_state *R, const char* event, const char* json){
        //LuaSdk 后台线程
        LOG_DEBUG("node_event_deliver begin, event:%s json:%s", event, json);
        RcsSdk* sdk = node_get_sdk(R);
        char* result = NULL;
        if(NULL == sdk){
            LOG_DEBUG("node_event_deliver sdk NULL");
            return NULL;
        }
    
        EventItem item;
        item.event = event;
        item.data = (char *)json;
        sdk->waitForEventProcess(&item);
        sdk->eventSyncMutex.lock();
        result = sdk->syncEventResult;
        sdk->eventSyncMutex.unlock();
        LOG_DEBUG("node_event_deliver end");
        return result;
    }
    static int node_get_cid(v8::Handle<v8::Value> json){
        int cid = -1;
        if(json->IsObject()){
            v8::Handle<v8::Object> ojson = json->ToObject();
            cid = ojson->Get(Nan::New("id").ToLocalChecked())->Int32Value();
        }
        return cid;
    }
    static v8::Handle<v8::Value> parseJson(v8::Handle<v8::Value> jsonString) {
        // Call JSON.parse.apply(JSON, jsonString)
        v8::Handle<v8::Context> context = Nan::GetCurrentContext();
        v8::Handle<v8::Object> global = context->Global();
        
        v8::Handle<v8::Object> JSON = global->Get(Nan::New("JSON").ToLocalChecked())->ToObject();
        v8::Handle<v8::Function> JSON_parse =v8::Handle<v8::Function>::Cast(JSON->Get(Nan::New("parse").ToLocalChecked()));
        
        return JSON_parse->Call(JSON, 1, &jsonString);
    }
    static v8::Local<v8::Value> node_do_event_process(RcsSdk* sdk, EventItem *item){
        v8::Handle<v8::Value> json = parseJson(Nan::New(item->data).ToLocalChecked());
        v8::Local<v8::Value> argv[] = {json};
        v8::Local<v8::Value> result;
        
        if(!std::strcmp(item->event, "callback")){
            int cid = node_get_cid(json);
            LOG_DEBUG("node api callback cid:%d", cid);
            const Nan::Callback * callback = sdk->removeCallback(cid);
            if(NULL == callback){
                LOG_ERROR("node api callback=NULL");
            }else{
                 result = callback->Call(1, argv);
                 delete callback;
            }
        }else{
            LOG_DEBUG("node_do_event_process:%s %s", item->event, item->data);
            const Nan::Callback * callback = sdk->getBindMethod(item->event);
            if(NULL == callback){
                LOG_DEBUG("node listener callback=NULL");
            }else{
                result = callback->Call(1, argv);
            }
        }
    
        return result;
    }
    // SDK线程调用 uv_async_send 后,主线程被唤醒后执行
    // uv_async_send 是libuv唯一线程安全的API,但不保证每次调用都通知到,只是能确保至少一次的唤醒主线程
    // 参见: http://docs.libuv.org/en/v1.x/async.html
    // int uv_async_send(uv_async_t* async)
    // Wakeup the event loop and call the async handle’s callback.
    //
    // Note It’s safe to call this function from any thread. The callback will be
    // called on the loop thread.
    // Warning libuv will coalesce calls to uv_async_send(), that is, not every call to
    // it will yield an execution of the callback. For example: if uv_async_send() is
    // called 5 times in a row before the callback is called, the callback will only be
    // called once. If uv_async_send() is called again after the callback was called, it
    // will be called again.
    static void node_event_process(uv_async_t *handle){
        //nodejs 主线程
        LOG_DEBUG("node_event_process begin");
        Nan::HandleScope scope;
        RcsSdk *sdk = (RcsSdk*)handle->data;
        
        sdk->eventSyncMutex.lock();
        EventItem *item = sdk->syncEventItem;
        sdk->syncEventItem = NULL;
        sdk->eventSyncMutex.unlock();
        if(NULL == item){
            LOG_ERROR("node_event_process sync syncEventItem=NULL");
        }else{
            v8::Local<v8::Value> ret = node_do_event_process(sdk, item);
            char *result = NULL;
            // 暂只支持返回String类型(必须在主线程解析v8对象)
            if(!ret.IsEmpty() && ret->IsString()){
                result = *Nan::Utf8String(v8::Local<v8::String>::Cast(ret));
            }
            // 此处分配内存,由下次调用前释放,最多保留上一次的结果占用的内存
            if(result != NULL){
                char* cpy = (char*)RCS_MALLOC(strlen(result)+1);
                assert(cpy);
                strcpy(cpy, result);
                result = cpy;
            }
            sdk->eventSyncMutex.lock();
            sdk->syncEventResult = result;
            sdk->eventSyncMutex.unlock();
        }
        sdk->notifyEventComplete();
        LOG_DEBUG("node_event_process end");
    }
    
    // RcsSdk Imps
    Nan::Persistent<v8::Function> RcsSdk::constructor;
    NAN_MODULE_INIT(RcsSdk::Init) {
        v8::Local<v8::FunctionTemplate> tpl = Nan::New<v8::FunctionTemplate>(New);
        tpl->SetClassName(Nan::New("RcsSdk").ToLocalChecked());
        //这个数只要>0即可
        tpl->InstanceTemplate()->SetInternalFieldCount(1);
        //构建Js 的ProtoType
        Nan::SetPrototypeMethod(tpl, "bind", bind);
        addApiMethods(tpl);
        
        constructor.Reset(Nan::GetFunction(tpl).ToLocalChecked());
        Nan::Set(target, Nan::New("RcsSdk").ToLocalChecked(), Nan::GetFunction(tpl).ToLocalChecked());
    }
    
    RcsSdk::RcsSdk(const char* number, const char* imei, const char* devicevendor,
                   const char* devicemodel, const char* deviceos, const char* deviceosversion,
                   const char* clientVendor, const char* clientVersion, const char* storage,
                   const char* appId, const char* syspath, int clientType)
    #ifndef CALL_NODE_USE_ASYNC_QUEUE
    :syncEventItem(NULL),
    syncEventResult(NULL)
    #endif
    {
        LOG_DEBUG("begin initSdk: %s %s %s %s %s", appId, number, clientVendor, clientVersion, storage);
    
        state = rcs_state_new();
    
        rcs_listeners listeners;
        node_get_listeners(listeners);
        rcs_set_listeners(state, &listeners);
        
        rcs_callbacks callbacks;
        node_get_callbacks(callbacks);
        rcs_set_callbacks(state, &callbacks);
        
        // 每个State上有且只有一个用于中断主线程的 uv_async_t
        uv_async_init(uv_default_loop(), &uv_async, node_event_process);
        uv_async.data = this;
        
        rcs_init(state, number, imei, "node-imsi",
                 devicevendor, devicemodel, deviceos, deviceosversion,
                 clientVendor,clientVersion, storage, appId, syspath, clientType);
        rcs_start(state);
        
        globalMutex.lock();
        sdkMap[state] = this;
        globalMutex.unlock();
    }
    
    RcsSdk::~RcsSdk() {
        if(state){
            rcs_stop(state, 0);
            globalMutex.lock();
            sdkMap.erase(state);
            globalMutex.unlock();
            state = NULL;
        }
    }
    void RcsSdk::addCallback(int cid, Nan::Callback* callback){
        callbacks[cid] = callback;
    }
    const Nan::Callback* RcsSdk::removeCallback(int cid){
        std::map<int, const Nan::Callback*>::iterator iterator = callbacks.find(cid);
        const Nan::Callback* callback = NULL;
        if(iterator != callbacks.end()){
            callback = iterator->second;
            callbacks.erase(iterator);
        }
        return callback;
    }
    const Nan::Callback* RcsSdk::getBindMethod(const char* method){
        std::map<std::string, const Nan::Callback*>::iterator iterator = bindMethods.find(method);
        if(iterator != bindMethods.end()){
            return iterator->second;
        }
        return NULL;
    }
    void RcsSdk::notifyMainLoop(){
        uv_async_send(&uv_async);
    }
    
    NAN_METHOD(RcsSdk::New) {
        if (!info.IsConstructCall()) {
            // Invoked as plain function `RcsSdk(...)`, turn into construct call.
            const int argc = info.Length();
            v8::Local<v8::Value> argv[12];
            for(int i=0; i<argc; i++){
                argv[i] = info[i];
            }
            v8::Local<v8::Function> cons = Nan::New(constructor);
            info.GetReturnValue().Set(cons->NewInstance(argc, argv));
        } else {
            // Invoked as constructor: `new RcsSdk(...)`
            
            if(info.Length() != 12){
                Nan::ThrowError(Nan::Error("Wrong number of arguments"));
                return;
            }
    
            CHECK_ARGS_STRING(info, 0, "number");
            CHECK_ARGS_STRING(info, 1, "imei");
            CHECK_ARGS_STRING(info, 2, "devicevendor");
            CHECK_ARGS_STRING(info, 3, "devicemodel");
            CHECK_ARGS_STRING(info, 4, "deviceos");
            CHECK_ARGS_STRING(info, 5, "deviceosversion");
            CHECK_ARGS_STRING(info, 6, "clientVendor");
            CHECK_ARGS_STRING(info, 7, "clientVersion");
            CHECK_ARGS_STRING(info, 8, "storage");
            CHECK_ARGS_STRING(info, 9, "appId");
            CHECK_ARGS_STRING(info, 10, "sysPath");
            CHECK_ARGS_NUMBER(info, 11, "clientType");
            
            const char* number = *Nan::Utf8String(info[0]);
            const char* imei = *Nan::Utf8String(info[1]);
            const char* devicevendor = *Nan::Utf8String(info[2]);
            const char* devicemodel = *Nan::Utf8String(info[3]);
            const char* deviceos = *Nan::Utf8String(info[4]);
            const char* deviceosversion = *Nan::Utf8String(info[5]);
            const char* clientVendor = *Nan::Utf8String(info[6]);
            const char* clientVersion = *Nan::Utf8String(info[7]);
            const char* storage = *Nan::Utf8String(info[8]);
            const char* appId = *Nan::Utf8String(info[9]);
            const char* sysPath = *Nan::Utf8String(info[10]);
            int clientType = info[11]->Uint32Value();
            
            RcsSdk *sdk = new RcsSdk(number, imei, devicevendor, devicemodel, deviceos, deviceosversion,
                                     clientVendor, clientVersion, storage, appId, sysPath, clientType);
            sdk->Wrap(info.This());
            info.GetReturnValue().Set(info.This());
        }
    }
    
    NAN_METHOD(RcsSdk::bind) {
        RcsSdk* sdk = Nan::ObjectWrap::Unwrap<RcsSdk>(info.This());
        if(info.Length() != 2 || ! info[0]->IsString() || !info[1]->IsFunction()){
            Nan::ThrowError(Nan::Error("Wrong arguments"));
            return;
        }
        const char* method = *Nan::Utf8String(info[0]);
        Nan::Callback *callback = new Nan::Callback(info[1].As<v8::Function>());
        sdk->bindMethods[method] = callback;
    }
    

      

  • 相关阅读:
    TCP/IP网络编程之优于select的epoll(一)
    TCP/IP网络编程之I/O流分离
    TCP/IP网络编程之套接字与标准I/O
    TCP/IP网络编程之多播与广播
    TCP/IP网络编程之多种I/O函数
    TCP/IP网络编程之I/O复用
    TCP/IP网络编程之进程间通信
    TCP/IP网络编程之多进程服务端(二)
    服务器被入侵了怎么办?
    sentos7为例添加python3和python2共存
  • 原文地址:https://www.cnblogs.com/lulu/p/5532883.html
Copyright © 2011-2022 走看看