zoukankan      html  css  js  c++  java
  • Node.js 多线程——worker_threads

    Node.js 是如何工作的

    Node.js 使用两种线程:event loop 处理的主线程和 worker pool 中的几个辅助线程。

    事件循环是一种机制,它采用回调(函数)并注册它们,准备在将来的某个时刻执行。它与相关的 JavaScript 代码在同一个线程中运行。当 JavaScript 操作阻塞线程时,事件循环也会被阻止。

    工作池是一种执行模型,它产生并处理单独的线程,然后同步执行任务,并将结果返回到事件循环。事件循环使用返回的结果执行提供的回调。

    简而言之,它负责异步 I/O操作 —— 主要是与系统磁盘和网络的交互。它主要由诸如 fs(I/O 密集)或 crypto(CPU 密集)等模块使用。工作池用 libuv 实现,当 Node 需要在 JavaScript 和 C++ 之间进行内部通信时,会导致轻微的延迟,但这几乎不可察觉。

    基于这两种机制,我们可以编写如下代码:

    fs.readFile(path.join(__dirname, './package.json'), (err, content) => {
     if (err) {
       return null;
     }
    
     console.log(content.toString());
    });

    前面提到的 fs 模块告诉工作池使用其中一个线程来读取文件的内容,并在完成后通知事件循环。然后事件循环获取提供的回调函数,并用文件的内容执行它。

    以上是非阻塞代码的示例,我们不必同步等待某事的发生。只需告诉工作池去读取文件,并用结果去调用提供的函数即可。由于工作池有自己的线程,因此事件循环可以在读取文件时继续正常执行。

    在不需要同步执行某些复杂操作时,这一切都相安无事:任何运行时间太长的函数都会阻塞线程。如果应用程序中有大量这类功能,就可能会明显降低服务器的吞吐量,甚至完全冻结它。在这种情况下,无法继续将工作委派给工作池。

    在需要对数据进行复杂的计算时(如AI、机器学习或大数据)无法真正有效地使用 Node.js,因为操作阻塞了主(且唯一)线程,使服务器无响应。在 Node.js v10.5.0 发布之前就是这种情况,在这一版本增加了对多线程的支持。

    worker_threads

    worker_threads 模块允许我们创建功能齐全的多线程 Node.js 程序。

    thread worker 是在单独的线程中生成的一段代码(通常从文件中取出)。

    注意,术语 thread workerworker 和 thread 经常互换使用,他们都指的是同一件事。

    要想使用 thread worker,必须导入 worker_threads 模块。让我们先写一个函数来帮助我们生成这些thread worker,然后再讨论它们的属性。

    type WorkerCallback = (err: any, result?: any) => any;
    
    export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) {
     const worker = new Worker(path, { workerData });
    
     worker.on('message', cb.bind(null, null));
     worker.on('error', cb);
    
     worker.on('exit', (exitCode) => {
       if (exitCode === 0) {
         return null;
       }
    
       return cb(new Error(`Worker has stopped with code ${exitCode}`));
     });
    
     return worker;
    }

    要创建一个 worker,首先必须创建一个 Worker 类的实例。它的第一个参数提供了包含 worker 的代码的文件的路径;第二个参数提供了一个名为 workerData 的包含一个属性的对象。这是我们希望线程在开始运行时可以访问的数据。

    请注意:不管你是用的是 JavaScript, 还是最终要转换为 JavaScript 的语言(例如,TypeScript),路径应该始终引用带有 .js 或 .mjs 扩展名的文件。

    我还想指出为什么使用回调方法,而不是返回在触发 message 事件时将解决的 promise。这是因为 worker 可以发送许多 message 事件,而不是一个。

    正如你在上面的例子中所看到的,线程间的通信是基于事件的,这意味着我们设置了 worker 在发送给定事件后调用的侦听器。

    以下是最常见的事件:

    worker.on('error', (error) => {});

    只要 worker 中有未捕获的异常,就会发出 error 事件。然后终止 worker,错误可以作为提供的回调中的第一个参数。

    worker.on('exit', (exitCode) => {});

    在 worker 退出时会发出 exit 事件。如果在worker中调用了 process.exit(),那么 exitCode 将被提供给回调。如果 worker 以 worker.terminate() 终止,则代码为1。

    worker.on('online', () => {});

    只要 worker 停止解析 JavaScript 代码并开始执行,就会发出 online 事件。它不常用,但在特定情况下可以提供信息。

    worker.on('message', (data) => {});

    只要 worker 将数据发送到父线程,就会发出 message 事件。

    现在让我们来看看如何在线程之间共享数据。

    在线程之间交换数据

    要将数据发送到另一个线程,可以用 port.postMessage() 方法。它的原型如下:

    port.postMessage(data[, transferList])

    port 对象可以是 parentPort,也可以是 MessagePort 的实例 —— 稍后会详细讲解。

    数据参数

    第一个参数 —— 这里被称为 data —— 是一个被复制到另一个线程的对象。它可以是复制算法所支持的任何内容。

    数据由结构化克隆算法进行复制(包含function的对象引用都会报错DataCloneError:xxxx could not be cloned)。引用自 Mozilla:

    它通过递归输入对象来进行克隆,同时保持之前访问过的引用的映射,以避免无限遍历循环。

    该算法不复制函数、错误、属性描述符或原型链。还需要注意的是,以这种方式复制对象与使用 JSON 不同,因为它可以包含循环引用和类型化数组,而 JSON 不能。

    由于能够复制类型化数组,该算法可以在线程之间共享内存。

    实例:

    1、代码

    server.js

    const express = require('express');
    const ws = require('ws');
    const convertMessage = require('./worker');//引入worker中的方法
    
    const app = express()
    const wsServer = new ws.Server({ noServer: true });
    wsServer.on('connection', (socket, req) => {
        socket.on('message', message => {
            console.log(message);
        });
    });
    const port = 3002
    app.get('/test', (req, res) => {
        //1.接收到test请求,调用convertMessage,发起子线程
        convertMessage().then(() => {
    //5.converMessage resove后向客户端发送success res.send(
    'success') }) }) app.get('/', async (req, res) => { res.send('Hello World!') }) //启动服务 const server = app.listen(port, () => { console.log(`Example app listening at http://localhost:${port}`) }) server.on('upgrade', (request, socket, head) => { wsServer.handleUpgrade(request, socket, head, socket => { wsServer.emit('connection', socket, request); }); });

    worker.js

    const { Worker, workerData } = require('worker_threads')
    
    module.exports = function convertMessage() {
        return new Promise((resolve, reject) => {
            //2.子线程中执行./index.js文件
            const worker = new Worker('./index.js');
            worker.on('message', (message) => {
                //4.接收到子线程通过postMessage传回的message
                console.log(message)
                resolve()
            });
            worker.on('error', reject);
            worker.on('exit', (code) => {
                //子线程执行完成后触发exit事件
                if (code !== 0) {
                    reject(new Error(`Worker stopped with exit code ${code}`));
                }
            })
        })
    }

    index.js

    const { parentPort, workerData } = require('worker_threads')
    
    for (let i = 0; i < 100; i++) {    
        //3.通过主线程的parentPort,向主线程发送消息
        parentPort.postMessage(`index.js执行中${i}`)
    }

    2、测试结果

    后端通过node server.js启动服务,前端通过http://localhost:3002/test发起请求:

    后端log如下:

     

    前端结果:

     

  • 相关阅读:
    linux下SVN迁移
    hive-site.xml
    5.扩展 GROUP BY
    4.锁——避免重复启动同一程序
    1.执行计划探究(一)
    1.日期_星期
    3.放弃CHAR吧,在铸成大错之前!
    2.NULL 的问题
    1.ORACLE 尽量不使用隐式转换
    webservice
  • 原文地址:https://www.cnblogs.com/vickylinj/p/15673067.html
Copyright © 2011-2022 走看看