zoukankan      html  css  js  c++  java
  • 锋利的NodeJS之NodeJS多线程

    最近刚好有朋友在问Node.js多线程的问题,我总结了一下,可以考虑使用源码包里面的worker_threads或者第三方的模块来实现。
    首先明确一下多线程在Node.js中的概念,然后在聊聊worker_threads的用法。天生异步,真心强大。

    1. Node.js多线程概述
      有人可能会说,Node.js虽然是单线程的,但是可以利用循环事件(Event Loop)l来实现并发执行任务。追究其本质,NodeJs实际上使用了两种不同的线程,一个是用于处理循环事件的主线程一个是工作池(Worker pool)里面的一些辅助线程。关于这两种线程主要功能和关系如图1所示。

    图1 Node.js线程图
    所以从本质上来讲,NodeJs并不是真正的原生多线程,而是利用循环事件来实现高效并发执行任务。要做到真正的多线程,需要依赖其他模块或者第三方库。
    2. Worker_threads是Node.js官方推荐的实现真正多线程的模块,有官方技术团队进行长期维护。Worker_threads不需要单独安装,它位于Node.js源码中,具体位置是lib/worker_threads.js。worker_threads模块允许使用并行执行JavaScript的线程,使用也非常方便,只需要引入该模块即可,代码如下。
    const worker = require('worker_threads');
    与child_process或cluster不同,worker_threads可以共享内存。它们通过传输ArrayBuffer实例或共享SharedArrayBuffer实例来实现。
    官网上给了一个完整的例子,如下所示。

    const {
        Worker, isMainThread, parentPort, workerData
    } = require('worker_threads');
    
    if (isMainThread) {
        module.exports = function parseJSAsync(script) {
            return new Promise((resolve, reject) => {
                const worker = new Worker(__filename, {
                    workerData: script
                });
                worker.on('message', message => console.log(message));
                worker.on('error', reject);
                worker.on('exit', (code) => {
                    if (code !== 0)
                        reject(new Error(`Worker stopped with exit code ${code}`));
                });
            });
        };
        
    } else {
        const { parse } = require('som-parse-libary');
        const script = workerData;
        parentPort.postMessage(parse(script));
    }
    

    笔者对以上代码开始解析,重点概念如下所示:
    Worker该类代表一个独立的js执行线程。
    isMainThead一个布尔值,当前代码是否运行在Worker线程中。
    parentPortMessagePort对象,如果当前线程是个生成的Worker线程,则允许和父线程通信。
    workerData一个可以传递给线程构造函数的任何js数据的的复制数据。
    Worker_theads还提供了很多实用的API,整理如下所示。
    1.worker.getEnvironmentData(key)
    可以获取环境变量,先使用setEnvironmentData来设置环境变量,然后再使用g
    etEnvironmentData来获取。
    举一个简单的例子,代码如下所示。

    const {
      Worker,
      isMainThread,
      setEnvironmentData,
      getEnvironmentData,
    } = require('worker_threads');
    
    if (isMainThread) {
      setEnvironmentData('Hi', 'Node.js!');
      const worker = new Worker(__filename);
    } else {
      console.log(getEnvironmentData('Hi'));.
    }
    
    执行这段代码,可以在控制台打印出“Node.js”字符串。
    
    1. isMainThread
      isMainThread可以用来判断该进程是不是主线程,如果是主线程,则返回true,否则返回false。下面编写一个嵌套worker的代码,用于展示。
    const { Worker, isMainThread } = require('worker_threads');
    
    if (isMainThread) {
        console.log("This is  a main thread
    ");
        // This re-loads the current file inside a Worker instance.
        new Worker(__filename);
    } else {
        console.log('Inside Worker!');
        console.log(isMainThread);  // Prints 'false'.
    }
    
    1. MessageChannel和相关用法
      MessageChannel是worker_threads提供的一个双向异步的消息通信信道。下面这段代码就展示了两个MessagePort对象互相传递消息的过程,我们如果想主动结束某个Channel,那么可以使用close事件来完成。
    const {MessageChannel}  = require('worker_threads');
    
    const {port1, port2} = new MessageChannel();
    
    // port1给port2发送信息
    port1.postMessage({carName: 'BYD'});
    
    port2.on('message', (message) => {
        console.log("I receive message is ", message);
    })
    
    // port2给port1发送信息
    port2.postMessage({personality: "Brave"});
    port1.on('message', (message) => {
        console.log("I receive message is ", message);
    });
    

    运行上面的代码,可以在控制台看到如下输出:

    I receive message is  { personality: 'Brave' }
    I receive message is  { carName: 'BYD' }
    

    port.on(‘message’)方法是利用被动等待的方式接收事件,如果想手动接收信息可以使用receiveMessageOnPort方法,指定从某个port接收消息,如下所示。

    const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
    const {port1, port2} = new MessageChannel();
    port1.postMessage({Name: "freePHP"});
    
    let result = receiveMessageOnPort(port2);
    console.log(result);
    let result2 = receiveMessageOnPort(port2);
    console.log(result2);
    

    运行上面的代码,可以得到如下输出。

    { message: { Name: 'freePHP' } }
    undefined
    

    从结果可以看出,receiveMessageOnPort可以指定从另一个MessagePort对象获取消息,是一次消耗消息。
    实际工作中,我们不可能只使用单个线程来完成任务,所以需要创建线程池来维护和管理worker thread对象。为了简化线程池的实现,假设只会传递一个woker脚本作为参数,具体实现如下所示。需要单独安装async_hooks模块,它用于异步加载资源。

    const { AsyncResource } = require('async_hooks'); // 用于异步加载资源
    const { EventEmitter } = require('events');
    const path = require('path');
    const { Worker } = require('worker_threads');
    
    const kTaskInfo = Symbol('kTaskInfo');
    const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
    
    class WorkerPoolTaskInfo extends AsyncResource {
        constructor(callback) {
            super('WorkerPoolTaskInfo');
            this.callback = callback;
        }
    
        done(err, result) {
            this.runInAsyncScope(this.callback, null, err, result);
            this.emitDestroy();  // 只会被执行一次
        }
    }
    
    class WorkerPool extends EventEmitter {
        constructor(numThreads) {
            super();
            this.numThreads = numThreads;
            this.workers = [];
            this.freeWorkers = [];
    
            for (let i = 0; i < numThreads; i++)
                this.addNewWorker();
        }
    
        /**
         * 添加新的线程
         */
        addNewWorker() {
            const worker = new Worker(path.resolve(__dirname, 'task2.js'));
            worker.on('message', (result) => {
                // 如果成功状态,则将回调传给runTask方法,然后worker移除TaskInfo标记。
                worker[kTaskInfo].done(null, result);
                worker[kTaskInfo] = null;
                //
                this.freeWorkers.push(worker);
                this.emit(kWorkerFreedEvent);
            });
            worker.on('error', (err) => {
                // 报错后调用回调
                if (worker[kTaskInfo])
                    worker[kTaskInfo].done(err, null);
                else
                    this.emit('error', err);
                // 移除一个worker,然后启动一个新的worker来代替当前的worker
                this.workers.splice(this.workers.indexOf(worker), 1);
                this.addNewWorker();
            });
            this.workers.push(worker);
            this.freeWorkers.push(worker);
            this.emit(kWorkerFreedEvent);
        }
    
        /**
         * 执行任务
         * @param task
         * @param callback
         */
        runTask(task, callback) {
            if (this.freeWorkers.length === 0) {
                this.once(kWorkerFreedEvent, () => this.runTask(task, callback));
                return;
            }
    
            const worker = this.freeWorkers.pop();
            worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
            worker.postMessage(task);
        }
    
        /**
         * 关闭线程
         */
        close() {
            for (const worker of this.workers) {
                worker.terminate();
            }
        }
    }
    
    module.exports = WorkerPool;
    

    其中task2.js是定义好的一个计算两个数字相加的脚本,内容如下。

    const { parentPort } = require('worker_threads');
    parentPort.on('message', (task) => {
      parentPort.postMessage(task.a + task.b);
    });
    

    调用这个线程池非常简单,用例如下所示。

    const WorkerPool = require('./worker_pool.js');
    const os = require('os');
    
    const pool = new WorkerPool(os.cpus().length);
    
    let finished = 0;
    for (let i = 0; i < 10; i++) {
      pool.runTask({ a: 42, b: 100 }, (err, result) => {
        console.log(i, err, result);
        if (++finished === 10)
          pool.close();
      });
    }
    
  • 相关阅读:
    STM32Cube Uart_DMA测试工程
    STM32CubeMX安装指南
    基于STM32Cube的ADC模数采样设计
    C++ this指针的用法
    用七段数码管显示26个字母的方案
    FPGA的引脚VCCINT 、VCCIO VCCA
    Keil环境中建立带FreeRTOS的STM32L项目
    STM32L时钟
    Mysql explain
    nginx屏蔽IP
  • 原文地址:https://www.cnblogs.com/freephp/p/14640156.html
Copyright © 2011-2022 走看看