如何在Node.js中使用多线程

作者选择了开源精神疾病作为为捐赠而写计划的一部分接受捐赠。

介绍

Node.js在单线程中运行JavaScript代码,这意味着你的代码一次只能执行一个任务。然而,Node.js本身是多线程的,并通过libuv库提供隐藏线程,用于处理诸如从磁盘读取文件或网络请求等I/O操作。通过使用隐藏线程,Node.js提供了允许代码进行I/O请求而不阻塞主线程的异步方法。

尽管Node.js有隐藏线程,但不能用它们来卸载CPU密集型任务,例如复杂的计算、图像调整或视频压缩。由于JavaScript是单线程的,当运行CPU密集型任务时,它会阻塞主线程,直到任务完成为止。在不使用其他线程的情况下,加速CPU绑定任务的唯一方法是提高处理器速度。

然而,近年来,CPU的速度并未显著提升。相反,计算机配备了额外的核心,如今拥有8个或更多核心的计算机变得更为普遍。尽管存在这种趋势,但由于JavaScript是单线程的,你的代码无法利用计算机上额外的核心来加速CPU密集型任务或避免破坏主线程。

为了解决这个问题,Node.js引入了worker-threads模块,它允许你创建线程并并行执行多个JavaScript任务。一旦一个线程完成一个任务,它会向主线程发送包含操作结果的消息,以便在代码的其他部分使用。使用工作线程的优势在于CPU密集型任务不会阻塞主线程,你可以将任务分割并分发给多个工作线程以进行优化。

在本教程中,你将创建一个具有阻塞主线程的CPU密集型任务的Node.js应用程序。接下来,你将使用worker-threads模块将CPU密集型任务转移到另一个线程以避免阻塞主线程。最后,你将分割CPU密集型任务,并让四个线程并行处理以加速任务。

先决条件

要完成本教程,你需要:

设置项目并安装依赖

在这一步中,您将创建项目目录,初始化npm,并安装所有必要的依赖项。

首先,创建并移动到项目目录:

  1. mkdir multi-threading_demo
  2. cd multi-threading_demo

mkdir命令创建一个目录,cd命令将工作目录更改为新创建的目录。

随后,使用npm init命令初始化项目目录:

  1. npm init -y

-y选项接受所有默认选项。

当命令运行时,您的输出将类似于这样:

Wrote to /home/sammy/multi-threading_demo/package.json:

{
  "name": "multi-threading_demo",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

接下来,安装express,一个Node.js的web框架:

  1. npm install express

您将使用Express创建一个具有阻塞和非阻塞端点的服务器应用程序。

Node.js默认附带worker-threads模块,因此您不需要安装它。

您现在已经安装了必要的软件包。接下来,您将学习更多关于进程和线程以及它们如何在计算机上执行的知识。

理解进程和线程

在开始编写与CPU绑定的任务并将其分配到单独的线程之前,首先需要了解进程和线程是什么,以及它们之间的区别。最重要的是,您将了解进程和线程在单核或多核计算机系统上的执行方式。

进程

A process is a running program in the operating system. It has its own memory and cannot see nor access the memory of other running programs. It also has an instruction pointer, which indicates the instruction currently being executed in a program. Only one task can be executed at a time.

为了理解这一点,您将创建一个带有无限循环的Node.js程序,以便在运行时不退出。

使用nano或您喜欢的文本编辑器,创建并打开process.js文件:

  1. nano process.js

在您的process.js文件中,输入以下代码:

multi-threading_demo/process.js
const process_name = process.argv.slice(2)[0];

count = 0;
while (true) {
  count++;
  if (count == 2000 || count == 4000) {
    console.log(`${process_name}: ${count}`);
  }
}

在第一行中,process.argv属性返回包含程序命令行参数的数组。然后,附加JavaScript的slice()方法,参数为2,以创建数组的从索引2开始的浅拷贝。这样做会跳过前两个参数,它们是Node.js路径和程序文件名。接下来,您使用方括号表示法检索切片数组的第一个参数,并将其存储在process_name变量中。

之后,您定义了一个while循环,并传递给它一个true条件,使循环永远运行。在循环内,count变量在每次迭代期间递增1。接下来是一个if语句,检查count是否等于20004000。如果条件评估为真,console.log()方法会在终端中记录一条消息。

使用CTRL+X保存并关闭您的文件,然后按Y保存更改。

使用node命令运行程序:

  1. node process.js A &

A is a command-line argument that is passed to the program and stored in the process_name variable. The & at end the allows the Node program to run in the background, which lets you enter more commands in the shell.

运行程序时,您将看到类似以下的输出:

Output
[1] 7754 A: 2000 A: 4000

数字7754是操作系统分配给它的进程ID。A: 2000A: 4000是程序的输出。

使用node命令运行程序时,您创建了一个进程。操作系统为程序分配内存,找到计算机磁盘上的程序可执行文件,并将程序加载到内存中。然后,它分配一个进程ID并开始执行程序。此时,您的程序现在已成为一个进程。

当进程运行时,其进程ID将添加到操作系统的进程列表中,并可以使用诸如htoptopps等工具查看。这些工具提供有关进程的更多详细信息,以及停止或设置其优先级的选项。

要快速总结一个 Node 进程,只需在终端中按下ENTER以恢复提示。接下来,运行ps命令来查看 Node 进程:

  1. ps |grep node

ps 命令列出与系统上当前用户相关联的所有进程。管道操作符|将所有ps输出传递给grep,以过滤出只有 Node 进程的列表。

运行该命令将产生类似以下的输出:

Output
7754 pts/0 00:21:49 node

你可以从单个程序创建无数个进程。例如,使用以下命令创建另外三个带有不同参数的进程,并将它们放在后台运行:

  1. node process.js B & node process.js C & node process.js D &

在该命令中,你创建了三个process.js程序的实例。符号&将每个进程放在后台运行。

运行该命令后,输出看起来类似以下内容(尽管顺序可能不同):

Output
[2] 7821 [3] 7822 [4] 7823 D: 2000 D: 4000 B: 2000 B: 4000 C: 2000 C: 4000

从输出中可以看到,每个进程在计数达到20004000时都会将进程名称记录到终端。每个进程不知道其他进程是否在运行:进程D不知道进程C的存在,反之亦然。任何在其中一个进程中发生的事情都不会影响其他 Node.js 进程。

如果你仔细检查输出,你会发现输出的顺序与你创建这三个进程时的顺序不同。在运行命令时,进程的参数顺序是按照 BCD 的顺序。但现在,顺序是 DBC。原因是操作系统有调度算法,决定在给定时间在 CPU 上运行哪个进程。

在单核机器上,进程会 并发 执行。也就是说,操作系统在规律的间隔内切换进程。例如,进程 D 执行一段时间,然后它的状态被保存在某个地方,操作系统安排进程 B 执行一段时间,依此类推。这样来回进行,直到所有任务都完成。从输出来看,每个进程似乎都已经执行完毕,但实际上,操作系统调度程序不断地在它们之间切换。

在多核系统上 —— 假设你有四个核心 —— 操作系统将每个进程调度到每个核心上同时执行。这被称为 并行处理。然而,如果你创建了四个额外的进程(总数为八个),每个核心将同时执行两个进程,直到它们完成为止。

线程

线程就像进程一样:它们有自己的指令指针,一次可以执行一个JavaScript任务。与进程不同,线程没有自己的内存。相反,它们驻留在进程的内存中。当您创建一个进程时,可以使用worker_threads模块创建多个线程,以并行执行JavaScript代码。此外,线程可以通过消息传递或在进程内存中共享数据来彼此通信。这使它们相对于进程而言更轻量级,因为生成线程不会向操作系统请求更多内存。

在执行线程时,它们的行为与进程类似。如果在单核系统上运行多个线程,操作系统将在规律的间隔内在它们之间进行切换,使每个线程有机会直接在单个CPU上执行。在多核系统上,操作系统会将线程调度到所有核心上,并同时执行JavaScript代码。如果您创建的线程多于可用的核心数量,每个核心将同时执行多个线程。

有了这些,按下ENTER,然后使用kill命令停止所有当前运行的Node进程:

  1. sudo kill -9 `pgrep node`

pgrep返回所有四个Node进程的进程ID给kill命令。-9选项指示kill发送SIGKILL信号

运行该命令时,您将看到类似以下的输出:

Output
[1] Killed node process.js A [2] Killed node process.js B [3] Killed node process.js C [4] Killed node process.js D

有时候输出可能会延迟,并在您稍后运行其他命令时显示出来。

现在您已经了解了进程和线程之间的区别,接下来您将在下一部分中使用 Node.js 隐藏线程。

理解 Node.js 中的隐藏线程

Node.js 确实提供了额外的线程,这就是为什么它被认为是多线程的原因。在本节中,您将查看 Node.js 中的隐藏线程,这有助于使 I/O 操作非阻塞。

正如在介绍中提到的,JavaScript 是单线程的,所有 JavaScript 代码都在单个线程中执行。这包括您的程序源代码以及您在程序中包含的第三方库。当程序进行 I/O 操作以读取文件或网络请求时,这会阻塞主线程。

然而,Node.js 实现了 libuv 库,为 Node.js 进程提供了四个额外的线程。利用这些线程,I/O 操作被单独处理,当它们完成时,事件循环将与 I/O 任务相关联的回调添加到微任务队列中。当主线程的调用栈清空时,回调被推送到调用栈上,然后执行。为了明确这一点,与给定 I/O 任务相关联的回调不会并行执行;然而,通过线程的帮助,读取文件或网络请求的任务本身是并行进行的。一旦 I/O 任务完成,回调在主线程中运行。

除了这四个线程外,V8 引擎 还提供了两个线程来处理诸如自动垃圾回收之类的事务。这将进程中的线程总数增加到七个:一个主线程,四个 Node.js 线程和两个 V8 线程。

要确认每个 Node.js 进程都有七个线程,再次运行 process.js 文件并将其放入后台:

  1. node process.js A &

终端将记录进程 ID,以及程序的输出:

Output
[1] 9933 A: 2000 A: 4000

在某个地方记下进程 ID,然后按 ENTER 键,以便您可以再次使用提示符。

要查看线程,请运行 top 命令并传递在输出中显示的进程 ID:

  1. top -H -p 9933

-H 指示 top 显示进程中的线程。-p 标志指示 top 仅监视给定进程 ID 的活动。

运行该命令时,您的输出将类似于以下内容:

Output
top - 09:21:11 up 15:00, 1 user, load average: 0.99, 0.60, 0.26 Threads: 7 total, 1 running, 6 sleeping, 0 stopped, 0 zombie %Cpu(s): 24.8 us, 0.3 sy, 0.0 ni, 75.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st MiB Mem : 7951.2 total, 6756.1 free, 248.4 used, 946.7 buff/cache MiB Swap: 0.0 total, 0.0 free, 0.0 used. 7457.4 avail Mem PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 9933 node-us+ 20 0 597936 51864 33956 R 99.9 0.6 4:19.64 node 9934 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node 9935 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.84 node 9936 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node 9937 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.93 node 9938 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node 9939 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node

正如您在输出中所看到的,Node.js 进程总共有七个线程:一个用于执行 JavaScript 的主线程,四个 Node.js 线程和两个 V8 线程。

正如之前讨论的那样,这四个 Node.js 线程用于执行 I/O 操作,使它们变为非阻塞。它们在这项任务上表现良好,并且自行创建线程进行 I/O 操作可能会更糟糕。但对于 CPU 绑定的任务就不能这样说了。CPU 绑定的任务不会使用进程中的任何额外线程,并会阻塞主线程。

现在按 q 键退出 top,然后使用以下命令停止 Node 进程:

  1. kill -9 9933

现在您已经了解了 Node.js 进程中的线程,接下来您将在下一节中编写一个 CPU 绑定的任务,并观察它如何影响主线程。

创建一个不使用工作线程的 CPU 绑定任务

在本节中,您将构建一个 Express 应用程序,其中包含一个非阻塞路由和一个运行 CPU 绑定任务的阻塞路由。

首先,在您喜欢的编辑器中打开 index.js

  1. nano index.js

在您的 index.js 文件中,添加以下代码以创建一个基本服务器:

multi-threading_demo/index.js
const express = require("express");

const app = express();
const port = process.env.PORT || 3000;

app.get("/non-blocking/", (req, res) => {
  res.status(200).send("This page is non-blocking");
});

app.listen(port, () => {
  console.log(`App listening on port ${port}`);
});

在下面的代码块中,您使用Express创建了一个HTTP服务器。首先,您导入了express模块。接下来,您将app变量设置为保存Express实例。然后,您定义了port变量,它保存了服务器应该监听的端口号。

之后,您使用app.get('/non-blocking')来定义应发送GET请求的路由。最后,您调用app.listen()方法指示服务器开始监听端口3000

接下来,定义另一个路由,/blocking/,其中将包含一个CPU密集型任务:

multi-threading_demo/index.js
...
app.get("/blocking", async (req, res) => {
  let counter = 0;
  for (let i = 0; i < 20_000_000_000; i++) {
    counter++;
  }
  res.status(200).send(`result is ${counter}`);
});

app.listen(port, () => {
  console.log(`App listening on port ${port}`);
});

您使用app.get("/blocking")定义了/blocking路由,它接受一个异步回调作为第二个参数,回调使用async关键字作为前缀来运行CPU密集型任务。在回调函数中,您创建了一个for循环,它迭代了200亿次,每次迭代都会将counter变量增加1。这个任务在CPU上运行,需要几秒钟才能完成。

这时,您的index.js文件看起来应该是这样:

multi-threading_demo/index.js
const express = require("express");

const app = express();
const port = process.env.PORT || 3000;

app.get("/non-blocking/", (req, res) => {
  res.status(200).send("This page is non-blocking");
});

app.get("/blocking", async (req, res) => {
  let counter = 0;
  for (let i = 0; i < 20_000_000_000; i++) {
    counter++;
  }
  res.status(200).send(`result is ${counter}`);
});

app.listen(port, () => {
  console.log(`App listening on port ${port}`);
});

保存并退出您的文件,然后使用以下命令启动服务器:

  1. node index.js

运行该命令时,您将看到类似以下的输出:

Output
App listening on port 3000

这表明服务器正在运行并且可以提供服务。

现在,在您喜欢的浏览器中访问http://localhost:3000/non-blocking。您将立即看到带有消息This page is non-blocking的响应。

注意:如果你正在远程服务器上按照教程操作,你可以使用端口转发在浏览器中测试应用程序。

在Express服务器仍在运行时,在本地计算机上打开另一个终端,并输入以下命令:

  1. ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip

连接到服务器后,在本地计算机的Web浏览器中导航到http://localhost:3000/non-blocking。在整个教程的剩余部分,保持第二个终端保持打开状态。

接下来,打开一个新标签并访问http://localhost:3000/blocking。随着页面加载,快速打开两个额外的标签并再次访问http://localhost:3000/non-blocking。你会发现你不会立即得到响应,页面将一直尝试加载。只有在/blocking路由完成加载并返回响应result is 20000000000之后,其余路由才会返回响应。

所有/non-blocking路由之所以在/blocking路由加载时无法工作,是因为CPU绑定的for循环阻塞了主线程。当主线程被阻塞时,Node.js在CPU绑定任务完成之前无法提供任何请求服务。因此,如果你的应用程序对/non-blocking路由有数千个同时的GET请求,只需访问一次/blocking路由就足以使所有应用程序路由变得无响应。

正如您所见,阻塞主线程可能会影响用户对您的应用的体验。为了解决这个问题,您需要将 CPU 绑定的任务转移到另一个线程,以便主线程可以继续处理其他 HTTP 请求。

有了这个方法后,通过按下CTRL+C停止服务器。在对index.js文件进行更改后,您将在下一节重新启动服务器。服务器停止的原因是 Node.js 在文件发生新更改时不会自动刷新。

现在您已经了解了 CPU 密集型任务对应用程序的负面影响,接下来将尝试通过使用 promises 避免阻塞主线程。

使用 Promises 转移 CPU 绑定的任务

通常,当开发人员了解到 CPU 绑定任务的阻塞效果时,他们会转向 promises,使代码变得非阻塞。这种直觉源自于使用非阻塞的基于 promise 的 I/O 方法,例如readFile()writeFile()。但正如您已经了解到的那样,I/O 操作利用了 Node.js 的隐藏线程,而 CPU 绑定任务则没有。尽管如此,在本节中,您将尝试将 CPU 绑定任务包装在一个 promise 中,以尝试使其非阻塞。这不会起作用,但它将帮助您看到使用工作线程的价值,这是您将在下一节中做的事情。

再次在您的编辑器中打开index.js文件:

  1. nano index.js

在你的index.js文件中,删除包含CPU密集型任务的高亮代码:

multi-threading_demo/index.js
...
app.get("/blocking", async (req, res) => {
  let counter = 0;
  for (let i = 0; i < 20_000_000_000; i++) {
    counter++;
  }
  res.status(200).send(`result is ${counter}`);
});
...

接下来,添加以下高亮代码,其中包含返回Promise的函数:

multi-threading_demo/index.js
...
function calculateCount() {
  return new Promise((resolve, reject) => {
    let counter = 0;
    for (let i = 0; i < 20_000_000_000; i++) {
      counter++;
    }
    resolve(counter);
  });
}

app.get("/blocking", async (req, res) => {
  res.status(200).send(`result is ${counter}`);
}

calculateCount()函数现在包含了你在/blocking处理函数中的计算。该函数返回一个Promise,使用new Promise语法初始化。该Promise接受一个带有resolvereject参数的回调函数,用于处理成功或失败。当for循环运行结束时,Promise将以counter变量中的值解析:

接下来,在index.js文件中的/blocking/处理函数中调用calculateCount()函数:

multi-threading_demo/index.js
app.get("/blocking", async (req, res) => {
  const counter = await calculateCount();
  res.status(200).send(`result is ${counter}`);
});

在这里,你使用await关键字前缀调用calculateCount()函数以等待Promise解析。一旦Promise解析完成,counter变量将设置为解析后的值:

你的完整代码现在看起来像这样:

multi-threading_demo/index.js
const express = require("express");

const app = express();
const port = process.env.PORT || 3000;

app.get("/non-blocking/", (req, res) => {
  res.status(200).send("This page is non-blocking");
});

function calculateCount() {
  return new Promise((resolve, reject) => {
    let counter = 0;
    for (let i = 0; i < 20_000_000_000; i++) {
      counter++;
    }
    resolve(counter);
  });
}

app.get("/blocking", async (req, res) => {
  const counter = await calculateCount();
  res.status(200).send(`result is ${counter}`);
});

app.listen(port, () => {
  console.log(`App listening on port ${port}`);
});

保存并退出你的文件,然后再次启动服务器:

  1. node index.js

在你的网络浏览器中,访问http://localhost:3000/blocking,当它加载时,快速重新加载http://localhost:3000/non-blocking标签页。你会注意到,non-blocking路由仍然受到影响,它们都将等待/blocking路由完成加载。因为路由仍然受到影响,Promise不能使JavaScript代码并行执行,也不能用于使CPU密集型任务变为非阻塞。

随着这个,使用CTRL+C停止应用服务器。

现在您知道,Promise 不提供任何机制使 CPU 密集型任务非阻塞,您将使用 Node.js 的worker-threads模块将 CPU 密集型任务转移到单独的线程中。

使用worker-threads模块转移 CPU 密集型任务

在这个部分,您将使用worker-threads模块将 CPU 密集型任务转移到另一个线程,以避免阻塞主线程。为此,您将创建一个worker.js文件,其中包含 CPU 密集型任务。在index.js文件中,您将使用worker-threads模块初始化线程,并在worker.js文件中开始任务以并行运行到主线程。一旦任务完成,工作线程将发送包含结果的消息回到主线程。

首先,验证您是否有2个或更多核心,使用nproc命令:

  1. nproc
Output
4

如果显示两个或更多核心,则可以继续进行此步骤。

接下来,在您的文本编辑器中创建并打开worker.js文件:

  1. nano worker.js

在您的worker.js文件中,添加以下代码来导入worker-threads模块并执行 CPU 密集型任务:

multi-threading_demo/worker.js
const { parentPort } = require("worker_threads");

let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
  counter++;
}

第一行加载worker_threads模块并提取parentPort类。该类提供了可用于向主线程发送消息的方法。接下来,您有一个CPU密集型任务,当前在index.js文件的calculateCount()函数中。在此步骤的后续操作中,您将从index.js中删除此函数。

在此之后,添加下面突出显示的代码:

multi-threading_demo/worker.js
const { parentPort } = require("worker_threads");

let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
  counter++;
}

parentPort.postMessage(counter);

在这里,调用parentPort类的postMessage()方法,该方法向主线程发送包含存储在counter变量中的CPU绑定任务结果的消息。

保存并退出您的文件。在文本编辑器中打开index.js

  1. nano index.js

由于您已经在worker.js中有了CPU绑定任务,因此从index.js中删除下面突出显示的代码:

multi-threading_demo/index.js
const express = require("express");

const app = express();
const port = process.env.PORT || 3000;

app.get("/non-blocking/", (req, res) => {
  res.status(200).send("This page is non-blocking");
});

function calculateCount() {
  return new Promise((resolve, reject) => {
    let counter = 0;
    for (let i = 0; i < 20_000_000_000; i++) {
      counter++;
    }
    resolve(counter);
  });
}

app.get("/blocking", async (req, res) => {
  const counter = await calculateCount();
  res.status(200).send(`result is ${counter}`);
});

app.listen(port, () => {
  console.log(`App listening on port ${port}`);
});

接下来,在app.get("/blocking")回调中,添加以下代码以初始化线程:

multi-threading_demo/index.js
const express = require("express");
const { Worker } = require("worker_threads");
...
app.get("/blocking", async (req, res) => {
  const worker = new Worker("./worker.js");
  worker.on("message", (data) => {
    res.status(200).send(`result is ${data}`);
  });
  worker.on("error", (msg) => {
    res.status(404).send(`An error occurred: ${msg}`);
  });
});
...

首先,导入worker_threads模块并解压Worker类。在app.get("/blocking")回调内,使用new关键字创建Worker的实例,后跟对Worker的调用,其参数是worker.js文件路径。这将创建一个新的线程,并且worker.js文件中的代码将在另一个核心的线程上运行。

随后,您可以使用on("message")方法将事件附加到worker实例,以侦听消息事件。当接收到包含来自worker.js文件的结果的消息时,它将作为参数传递给方法的回调函数,该回调函数返回包含 CPU 绑定任务结果的响应给用户。

接下来,您可以使用on("error")方法将另一个事件附加到 worker 实例,以侦听错误事件。如果发生错误,回调函数将返回包含错误消息的404响应给用户。

您的完整文件现在看起来像下面这样:

multi-threading_demo/index.js
const express = require("express");
const { Worker } = require("worker_threads");

const app = express();
const port = process.env.PORT || 3000;

app.get("/non-blocking/", (req, res) => {
  res.status(200).send("This page is non-blocking");
});

app.get("/blocking", async (req, res) => {
  const worker = new Worker("./worker.js");
  worker.on("message", (data) => {
    res.status(200).send(`result is ${data}`);
  });
  worker.on("error", (msg) => {
    res.status(404).send(`An error occurred: ${msg}`);
  });
});

app.listen(port, () => {
  console.log(`App listening on port ${port}`);
});

保存并退出您的文件,然后运行服务器:

  1. node index.js

在您的 Web 浏览器中再次访问http://localhost:3000/blocking选项卡。在它完成加载之前,刷新所有http://localhost:3000/non-blocking选项卡。现在,您应该注意到它们立即加载,而不需要等待/blocking路由完成加载。这是因为 CPU 绑定任务被转移到另一个线程,而主线程处理所有传入请求。

现在,使用CTRL+C停止您的服务器。

现在,您已经知道如何使用工作线程使 CPU 密集型任务非阻塞,您将使用四个工作线程来提高 CPU 密集型任务的性能。

使用四个工作线程优化 CPU 密集型任务

在这一部分中,您将把 CPU 密集型任务分配给四个工作线程,以便它们可以更快地完成任务并缩短 /blocking 路由的加载时间。

为了让更多的工作线程共同处理同一任务,您需要拆分任务。由于任务涉及循环 200 亿次,您将 200 亿除以您想要使用的线程数。在这种情况下,是 4。计算 20_000_000_000 / 4 将得到 5_000_000_000。因此,每个线程将循环从 05_000_000_000 并通过 1 递增 counter。当每个线程完成时,它将向主线程发送包含结果的消息。一旦主线程分别收到来自四个线程的消息,您将合并结果并向用户发送响应。

如果您有一个迭代大型数组的任务,也可以使用相同的方法。例如,如果您想要调整目录中的 800 张图像大小,您可以创建一个包含所有图像文件路径的数组。接下来,将 800 除以 4(线程数)并让每个线程处理一个范围。线程一将调整数组索引从 0199 的图像大小,线程二从索引 200399,依此类推。

首先,请验证您是否有四个或更多核心:

  1. nproc
Output
4

使用cp命令复制worker.js文件:

  1. cp worker.js four_workers.js

当前的index.jsworker.js文件将保持不变,以便您稍后可以再次运行它们,以便与此部分的更改性能进行比较。

接下来,在您的文本编辑器中打开four_workers.js文件:

  1. nano four_workers.js

在您的four_workers.js文件中,添加突出显示的代码以导入workerData对象:

multi-threading_demo/four_workers.js
const { workerData, parentPort } = require("worker_threads");

let counter = 0;
for (let i = 0; i < 20_000_000_000 / workerData.thread_count; i++) {
  counter++;
}

parentPort.postMessage(counter);

首先,您提取WorkerData对象,该对象将包含从主线程传递的数据,当初始化线程时(您将很快在index.js文件中执行此操作)。
该对象具有一个thread_count属性,其中包含线程的数量,即4。然后在for循环中,将值20_000_000_000除以4,结果为5_000_000_000

保存并关闭您的文件,然后复制index.js文件:

  1. cp index.js index_four_workers.js

在您的编辑器中打开index_four_workers.js文件:

  1. nano index_four_workers.js

在您的index_four_workers.js文件中,添加突出显示的代码以创建线程实例:

multi-threading_demo/index_four_workers.js
...
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
...
function createWorker() {
  return new Promise(function (resolve, reject) {
    const worker = new Worker("./four_workers.js", {
      workerData: { thread_count: THREAD_COUNT },
    });
  });
}

app.get("/blocking", async (req, res) => {
  ...
})
...

首先,定义包含您要创建的线程数量的THREAD_COUNT常量。稍后,当您的服务器上有更多核心时,扩展将涉及将THREAD_COUNT的值更改为您要使用的线程数量。

接下来,createWorker()函数创建并返回一个Promise。在Promise的回调中,通过将Worker类作为第一个参数传递文件路径到four_workers.js文件来初始化一个新的线程。然后,将一个对象作为第二个参数传递。接下来,给该对象分配一个具有另一个对象作为其值的workerData属性。最后,给该对象分配一个thread_count属性,其值为THREAD_COUNT常量中的线程数。在workers.js文件中,workerData对象是您之前引用的对象。

为了确保Promise解析或抛出错误,请添加以下突出显示的行:

multi-threading_demo/index_four_workers.js
...
function createWorker() {
  return new Promise(function (resolve, reject) {
    const worker = new Worker("./four_workers.js", {
      workerData: { thread_count: THREAD_COUNT },
    });
    worker.on("message", (data) => {
      resolve(data);
    });
    worker.on("error", (msg) => {
      reject(`An error ocurred: ${msg}`);
    });
  });
}
...

当工作线程向主线程发送消息时,Promise将以返回的数据解析。然而,如果发生错误,Promise将返回一个错误消息。

现在,您已经定义了初始化新线程并从线程返回数据的函数,您将在app.get("/blocking")中使用该函数来生成新线程。

但首先,请删除以下突出显示的代码,因为您已经在createWorker()函数中定义了此功能:

multi-threading_demo/index_four_workers.js
...
app.get("/blocking", async (req, res) => {
  const worker = new Worker("./worker.js");
  worker.on("message", (data) => {
    res.status(200).send(`result is ${data}`);
  });
  worker.on("error", (msg) => {
    res.status(404).send(`An error ocurred: ${msg}`);
  });
});
...

删除代码后,添加以下代码以初始化四个工作线程:

multi-threading_demo/index_four_workers.js
...
app.get("/blocking", async (req, res) => {
  const workerPromises = [];
  for (let i = 0; i < THREAD_COUNT; i++) {
    workerPromises.push(createWorker());
  }
});
...

首先,您创建一个workerPromises变量,其中包含一个空数组。接下来,您根据THREAD_COUNT的值(即4)进行迭代。在每次迭代期间,您调用createWorker()函数来创建一个新线程。然后,您使用JavaScript的push方法将函数返回的promise对象推入workerPromises数组中。当循环结束时,workerPromises将有四个promise对象,每个都是从调用createWorker()函数四次返回的。

现在,请在下面的突出显示的代码中添加以下内容,以等待promise解析并向用户返回响应:

multi-threading_demo/index_four_workers.js
app.get("/blocking", async (req, res) => {
  const workerPromises = [];
  for (let i = 0; i < THREAD_COUNT; i++) {
    workerPromises.push(createWorker());
  }

  const thread_results = await Promise.all(workerPromises);
  const total =
    thread_results[0] +
    thread_results[1] +
    thread_results[2] +
    thread_results[3];
  res.status(200).send(`result is ${total}`);
});

由于workerPromises数组包含调用createWorker()返回的promise,您需要使用await语法前缀Promise.all()方法,并使用workerPromises作为其参数调用all()方法。Promise.all()方法等待数组中的所有promise解析。当这种情况发生时,thread_results变量包含promise解析的值。由于计算被分割到四个工作线程中,您可以使用方括号表示法从thread_results中获取每个值,然后将它们全部加在一起。加完后,您将总值返回到页面上。

您的完整文件现在应该是这样的:

multi-threading_demo/index_four_workers.js
const express = require("express");
const { Worker } = require("worker_threads");

const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;

app.get("/non-blocking/", (req, res) => {
  res.status(200).send("This page is non-blocking");
});

function createWorker() {
  return new Promise(function (resolve, reject) {
    const worker = new Worker("./four_workers.js", {
      workerData: { thread_count: THREAD_COUNT },
    });
    worker.on("message", (data) => {
      resolve(data);
    });
    worker.on("error", (msg) => {
      reject(`An error ocurred: ${msg}`);
    });
  });
}

app.get("/blocking", async (req, res) => {
  const workerPromises = [];
  for (let i = 0; i < THREAD_COUNT; i++) {
    workerPromises.push(createWorker());
  }
  const thread_results = await Promise.all(workerPromises);
  const total =
    thread_results[0] +
    thread_results[1] +
    thread_results[2] +
    thread_results[3];
  res.status(200).send(`result is ${total}`);
});

app.listen(port, () => {
  console.log(`App listening on port ${port}`);
});

保存并关闭您的文件。在运行此文件之前,请先运行index.js来测量其响应时间。

  1. node index.js

接下来,在您的本地计算机上打开一个新的终端,并输入以下curl命令,该命令测量从/blocking路由获取响应所需的时间:

  1. time curl --get http://localhost:3000/blocking

time命令用于测量curl命令的运行时间。 curl命令向指定的URL发送HTTP请求,--get选项指示curl进行GET请求。

当命令运行时,您的输出将类似于以下内容:

Output
real 0m28.882s user 0m0.018s sys 0m0.000s

突出显示的输出显示从收到请求到获取响应大约需要28秒,这可能会因计算机而异。

接下来,使用CTRL+C停止服务器,并运行index_four_workers.js文件:

  1. node index_four_workers.js

再次在第二个终端访问/blocking路由:

  1. time curl --get http://localhost:3000/blocking

您将看到与以下内容一致的输出:

Output
real 0m8.491s user 0m0.011s sys 0m0.005s

输出显示大约需要8秒,这意味着您将加载时间缩短了约70%。

您成功地使用四个工作线程优化了CPU绑定的任务。 如果您有多于四个核心的机器,请将THREAD_COUNT更新为该数字,您将进一步缩短加载时间。

结论

在这篇文章中,您使用 Node 应用程序构建了一个占用 CPU 的任务,导致主线程被阻塞。然后,您尝试使用 promises 使任务变为非阻塞,但未成功。随后,您使用了 worker_threads 模块,将 CPU 密集型任务转移到另一个线程以使其非阻塞。最后,您使用 worker_threads 模块创建了四个线程,以加速 CPU 密集型任务。

作为下一步,查看 Node.js Worker threads 文档以了解更多选项。此外,您还可以查看 piscina 库,该库允许您为 CPU 密集型任务创建一个工作线程池。如果您想继续学习 Node.js,请参阅教程系列 How To Code in Node.js

Source:
https://www.digitalocean.com/community/tutorials/how-to-use-multithreading-in-node-js