介绍
Node.js在单线程中运行JavaScript代码,这意味着你的代码一次只能执行一个任务。然而,Node.js本身是多线程的,并通过libuv
库提供隐藏线程,用于处理诸如从磁盘读取文件或网络请求等I/O操作。通过使用隐藏线程,Node.js提供了允许代码进行I/O请求而不阻塞主线程的异步方法。
尽管Node.js有隐藏线程,但不能用它们来卸载CPU密集型任务,例如复杂的计算、图像调整或视频压缩。由于JavaScript是单线程的,当运行CPU密集型任务时,它会阻塞主线程,直到任务完成为止。在不使用其他线程的情况下,加速CPU绑定任务的唯一方法是提高处理器速度。
然而,近年来,CPU的速度并未显著提升。相反,计算机配备了额外的核心,如今拥有8个或更多核心的计算机变得更为普遍。尽管存在这种趋势,但由于JavaScript是单线程的,你的代码无法利用计算机上额外的核心来加速CPU密集型任务或避免破坏主线程。
为了解决这个问题,Node.js引入了worker-threads
模块,它允许你创建线程并并行执行多个JavaScript任务。一旦一个线程完成一个任务,它会向主线程发送包含操作结果的消息,以便在代码的其他部分使用。使用工作线程的优势在于CPU密集型任务不会阻塞主线程,你可以将任务分割并分发给多个工作线程以进行优化。
在本教程中,你将创建一个具有阻塞主线程的CPU密集型任务的Node.js应用程序。接下来,你将使用worker-threads
模块将CPU密集型任务转移到另一个线程以避免阻塞主线程。最后,你将分割CPU密集型任务,并让四个线程并行处理以加速任务。
先决条件
要完成本教程,你需要:
-
具有四个或更多核心的多核系统。在双核系统上仍然可以按照步骤1至6的教程操作。然而,步骤7需要四个核心才能看到性能的提升。
-
一个Node.js开发环境。如果您使用的是Ubuntu 22.04,请按照如何在Ubuntu 22.04上安装Node.js的第3步安装最新版本的Node.js。如果您使用的是其他操作系统,请参阅如何安装Node.js并创建本地开发环境。
-
对JavaScript中事件循环、回调和Promise有良好的理解,您可以在我们的教程理解JavaScript中的事件循环、回调、Promise和异步/等待中找到相关内容。
-
基本了解如何使用Express web框架。查看我们的指南如何开始使用Node.js和Express。
设置项目并安装依赖
在这一步中,您将创建项目目录,初始化npm
,并安装所有必要的依赖项。
首先,创建并移动到项目目录:
- mkdir multi-threading_demo
- cd multi-threading_demo
mkdir
命令创建一个目录,cd
命令将工作目录更改为新创建的目录。
随后,使用npm init
命令初始化项目目录:
- npm init -y
-y
选项接受所有默认选项。
当命令运行时,您的输出将类似于这样:
Wrote to /home/sammy/multi-threading_demo/package.json:
{
"name": "multi-threading_demo",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC"
}
接下来,安装express
,一个Node.js的web框架:
- npm install express
您将使用Express创建一个具有阻塞和非阻塞端点的服务器应用程序。
Node.js默认附带worker-threads
模块,因此您不需要安装它。
您现在已经安装了必要的软件包。接下来,您将学习更多关于进程和线程以及它们如何在计算机上执行的知识。
理解进程和线程
在开始编写与CPU绑定的任务并将其分配到单独的线程之前,首先需要了解进程和线程是什么,以及它们之间的区别。最重要的是,您将了解进程和线程在单核或多核计算机系统上的执行方式。
进程
A process is a running program in the operating system. It has its own memory and cannot see nor access the memory of other running programs. It also has an instruction pointer, which indicates the instruction currently being executed in a program. Only one task can be executed at a time.
为了理解这一点,您将创建一个带有无限循环的Node.js程序,以便在运行时不退出。
使用nano
或您喜欢的文本编辑器,创建并打开process.js
文件:
- nano process.js
在您的process.js
文件中,输入以下代码:
const process_name = process.argv.slice(2)[0];
count = 0;
while (true) {
count++;
if (count == 2000 || count == 4000) {
console.log(`${process_name}: ${count}`);
}
}
在第一行中,process.argv
属性返回包含程序命令行参数的数组。然后,附加JavaScript的slice()
方法,参数为2
,以创建数组的从索引2开始的浅拷贝。这样做会跳过前两个参数,它们是Node.js路径和程序文件名。接下来,您使用方括号表示法检索切片数组的第一个参数,并将其存储在process_name
变量中。
之后,您定义了一个while
循环,并传递给它一个true
条件,使循环永远运行。在循环内,count
变量在每次迭代期间递增1
。接下来是一个if
语句,检查count
是否等于2000
或4000
。如果条件评估为真,console.log()
方法会在终端中记录一条消息。
使用CTRL+X
保存并关闭您的文件,然后按Y
保存更改。
使用node
命令运行程序:
- node process.js A &
A
is a command-line argument that is passed to the program and stored in the process_name
variable. The &
at end the allows the Node program to run in the background, which lets you enter more commands in the shell.
运行程序时,您将看到类似以下的输出:
Output[1] 7754
A: 2000
A: 4000
数字7754
是操作系统分配给它的进程ID。A: 2000
和A: 4000
是程序的输出。
使用node
命令运行程序时,您创建了一个进程。操作系统为程序分配内存,找到计算机磁盘上的程序可执行文件,并将程序加载到内存中。然后,它分配一个进程ID并开始执行程序。此时,您的程序现在已成为一个进程。
当进程运行时,其进程ID将添加到操作系统的进程列表中,并可以使用诸如htop
,top
或ps
等工具查看。这些工具提供有关进程的更多详细信息,以及停止或设置其优先级的选项。
要快速总结一个 Node 进程,只需在终端中按下ENTER
以恢复提示。接下来,运行ps
命令来查看 Node 进程:
- ps |grep node
ps
命令列出与系统上当前用户相关联的所有进程。管道操作符|
将所有ps
输出传递给grep
,以过滤出只有 Node 进程的列表。
运行该命令将产生类似以下的输出:
Output7754 pts/0 00:21:49 node
你可以从单个程序创建无数个进程。例如,使用以下命令创建另外三个带有不同参数的进程,并将它们放在后台运行:
- node process.js B & node process.js C & node process.js D &
在该命令中,你创建了三个process.js
程序的实例。符号&
将每个进程放在后台运行。
运行该命令后,输出看起来类似以下内容(尽管顺序可能不同):
Output[2] 7821
[3] 7822
[4] 7823
D: 2000
D: 4000
B: 2000
B: 4000
C: 2000
C: 4000
从输出中可以看到,每个进程在计数达到2000
和4000
时都会将进程名称记录到终端。每个进程不知道其他进程是否在运行:进程D
不知道进程C
的存在,反之亦然。任何在其中一个进程中发生的事情都不会影响其他 Node.js 进程。
如果你仔细检查输出,你会发现输出的顺序与你创建这三个进程时的顺序不同。在运行命令时,进程的参数顺序是按照 B
、C
和 D
的顺序。但现在,顺序是 D
、B
和 C
。原因是操作系统有调度算法,决定在给定时间在 CPU 上运行哪个进程。
在单核机器上,进程会 并发 执行。也就是说,操作系统在规律的间隔内切换进程。例如,进程 D
执行一段时间,然后它的状态被保存在某个地方,操作系统安排进程 B
执行一段时间,依此类推。这样来回进行,直到所有任务都完成。从输出来看,每个进程似乎都已经执行完毕,但实际上,操作系统调度程序不断地在它们之间切换。
在多核系统上 —— 假设你有四个核心 —— 操作系统将每个进程调度到每个核心上同时执行。这被称为 并行处理。然而,如果你创建了四个额外的进程(总数为八个),每个核心将同时执行两个进程,直到它们完成为止。
线程
线程就像进程一样:它们有自己的指令指针,一次可以执行一个JavaScript任务。与进程不同,线程没有自己的内存。相反,它们驻留在进程的内存中。当您创建一个进程时,可以使用worker_threads
模块创建多个线程,以并行执行JavaScript代码。此外,线程可以通过消息传递或在进程内存中共享数据来彼此通信。这使它们相对于进程而言更轻量级,因为生成线程不会向操作系统请求更多内存。
在执行线程时,它们的行为与进程类似。如果在单核系统上运行多个线程,操作系统将在规律的间隔内在它们之间进行切换,使每个线程有机会直接在单个CPU上执行。在多核系统上,操作系统会将线程调度到所有核心上,并同时执行JavaScript代码。如果您创建的线程多于可用的核心数量,每个核心将同时执行多个线程。
有了这些,按下ENTER
,然后使用kill
命令停止所有当前运行的Node进程:
- sudo kill -9 `pgrep node`
pgrep
返回所有四个Node进程的进程ID给kill
命令。-9
选项指示kill
发送SIGKILL信号。
运行该命令时,您将看到类似以下的输出:
Output[1] Killed node process.js A
[2] Killed node process.js B
[3] Killed node process.js C
[4] Killed node process.js D
有时候输出可能会延迟,并在您稍后运行其他命令时显示出来。
现在您已经了解了进程和线程之间的区别,接下来您将在下一部分中使用 Node.js 隐藏线程。
理解 Node.js 中的隐藏线程
Node.js 确实提供了额外的线程,这就是为什么它被认为是多线程的原因。在本节中,您将查看 Node.js 中的隐藏线程,这有助于使 I/O 操作非阻塞。
正如在介绍中提到的,JavaScript 是单线程的,所有 JavaScript 代码都在单个线程中执行。这包括您的程序源代码以及您在程序中包含的第三方库。当程序进行 I/O 操作以读取文件或网络请求时,这会阻塞主线程。
然而,Node.js 实现了 libuv
库,为 Node.js 进程提供了四个额外的线程。利用这些线程,I/O 操作被单独处理,当它们完成时,事件循环将与 I/O 任务相关联的回调添加到微任务队列中。当主线程的调用栈清空时,回调被推送到调用栈上,然后执行。为了明确这一点,与给定 I/O 任务相关联的回调不会并行执行;然而,通过线程的帮助,读取文件或网络请求的任务本身是并行进行的。一旦 I/O 任务完成,回调在主线程中运行。
除了这四个线程外,V8 引擎 还提供了两个线程来处理诸如自动垃圾回收之类的事务。这将进程中的线程总数增加到七个:一个主线程,四个 Node.js 线程和两个 V8 线程。
要确认每个 Node.js 进程都有七个线程,再次运行 process.js
文件并将其放入后台:
- node process.js A &
终端将记录进程 ID,以及程序的输出:
Output[1] 9933
A: 2000
A: 4000
在某个地方记下进程 ID,然后按 ENTER
键,以便您可以再次使用提示符。
要查看线程,请运行 top
命令并传递在输出中显示的进程 ID:
- top -H -p 9933
-H
指示 top
显示进程中的线程。-p
标志指示 top
仅监视给定进程 ID 的活动。
运行该命令时,您的输出将类似于以下内容:
Outputtop - 09:21:11 up 15:00, 1 user, load average: 0.99, 0.60, 0.26
Threads: 7 total, 1 running, 6 sleeping, 0 stopped, 0 zombie
%Cpu(s): 24.8 us, 0.3 sy, 0.0 ni, 75.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
MiB Mem : 7951.2 total, 6756.1 free, 248.4 used, 946.7 buff/cache
MiB Swap: 0.0 total, 0.0 free, 0.0 used. 7457.4 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
9933 node-us+ 20 0 597936 51864 33956 R 99.9 0.6 4:19.64 node
9934 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node
9935 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.84 node
9936 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node
9937 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.93 node
9938 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node
9939 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node
正如您在输出中所看到的,Node.js 进程总共有七个线程:一个用于执行 JavaScript 的主线程,四个 Node.js 线程和两个 V8 线程。
正如之前讨论的那样,这四个 Node.js 线程用于执行 I/O 操作,使它们变为非阻塞。它们在这项任务上表现良好,并且自行创建线程进行 I/O 操作可能会更糟糕。但对于 CPU 绑定的任务就不能这样说了。CPU 绑定的任务不会使用进程中的任何额外线程,并会阻塞主线程。
现在按 q
键退出 top
,然后使用以下命令停止 Node 进程:
- kill -9 9933
现在您已经了解了 Node.js 进程中的线程,接下来您将在下一节中编写一个 CPU 绑定的任务,并观察它如何影响主线程。
创建一个不使用工作线程的 CPU 绑定任务
在本节中,您将构建一个 Express 应用程序,其中包含一个非阻塞路由和一个运行 CPU 绑定任务的阻塞路由。
首先,在您喜欢的编辑器中打开 index.js
:
- nano index.js
在您的 index.js
文件中,添加以下代码以创建一个基本服务器:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
在下面的代码块中,您使用Express创建了一个HTTP服务器。首先,您导入了express
模块。接下来,您将app
变量设置为保存Express实例。然后,您定义了port
变量,它保存了服务器应该监听的端口号。
之后,您使用app.get('/non-blocking')
来定义应发送GET
请求的路由。最后,您调用app.listen()
方法指示服务器开始监听端口3000
。
接下来,定义另一个路由,/blocking/
,其中将包含一个CPU密集型任务:
...
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
您使用app.get("/blocking")
定义了/blocking
路由,它接受一个异步回调作为第二个参数,回调使用async
关键字作为前缀来运行CPU密集型任务。在回调函数中,您创建了一个for
循环,它迭代了200亿次,每次迭代都会将counter
变量增加1
。这个任务在CPU上运行,需要几秒钟才能完成。
这时,您的index.js
文件看起来应该是这样:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
保存并退出您的文件,然后使用以下命令启动服务器:
- node index.js
运行该命令时,您将看到类似以下的输出:
OutputApp listening on port 3000
这表明服务器正在运行并且可以提供服务。
现在,在您喜欢的浏览器中访问http://localhost:3000/non-blocking
。您将立即看到带有消息This page is non-blocking
的响应。
注意:如果你正在远程服务器上按照教程操作,你可以使用端口转发在浏览器中测试应用程序。
在Express服务器仍在运行时,在本地计算机上打开另一个终端,并输入以下命令:
- ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip
连接到服务器后,在本地计算机的Web浏览器中导航到http://localhost:3000/non-blocking
。在整个教程的剩余部分,保持第二个终端保持打开状态。
接下来,打开一个新标签并访问http://localhost:3000/blocking
。随着页面加载,快速打开两个额外的标签并再次访问http://localhost:3000/non-blocking
。你会发现你不会立即得到响应,页面将一直尝试加载。只有在/blocking
路由完成加载并返回响应result is 20000000000
之后,其余路由才会返回响应。
所有/non-blocking
路由之所以在/blocking
路由加载时无法工作,是因为CPU绑定的for
循环阻塞了主线程。当主线程被阻塞时,Node.js在CPU绑定任务完成之前无法提供任何请求服务。因此,如果你的应用程序对/non-blocking
路由有数千个同时的GET
请求,只需访问一次/blocking
路由就足以使所有应用程序路由变得无响应。
正如您所见,阻塞主线程可能会影响用户对您的应用的体验。为了解决这个问题,您需要将 CPU 绑定的任务转移到另一个线程,以便主线程可以继续处理其他 HTTP 请求。
有了这个方法后,通过按下CTRL+C
停止服务器。在对index.js
文件进行更改后,您将在下一节重新启动服务器。服务器停止的原因是 Node.js 在文件发生新更改时不会自动刷新。
现在您已经了解了 CPU 密集型任务对应用程序的负面影响,接下来将尝试通过使用 promises 避免阻塞主线程。
使用 Promises 转移 CPU 绑定的任务
通常,当开发人员了解到 CPU 绑定任务的阻塞效果时,他们会转向 promises,使代码变得非阻塞。这种直觉源自于使用非阻塞的基于 promise 的 I/O 方法,例如readFile()
和writeFile()
。但正如您已经了解到的那样,I/O 操作利用了 Node.js 的隐藏线程,而 CPU 绑定任务则没有。尽管如此,在本节中,您将尝试将 CPU 绑定任务包装在一个 promise 中,以尝试使其非阻塞。这不会起作用,但它将帮助您看到使用工作线程的价值,这是您将在下一节中做的事情。
再次在您的编辑器中打开index.js
文件:
- nano index.js
在你的index.js
文件中,删除包含CPU密集型任务的高亮代码:
...
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
...
接下来,添加以下高亮代码,其中包含返回Promise的函数:
...
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
res.status(200).send(`result is ${counter}`);
}
calculateCount()
函数现在包含了你在/blocking
处理函数中的计算。该函数返回一个Promise,使用new Promise
语法初始化。该Promise接受一个带有resolve
和reject
参数的回调函数,用于处理成功或失败。当for
循环运行结束时,Promise将以counter
变量中的值解析:
接下来,在index.js
文件中的/blocking/
处理函数中调用calculateCount()
函数:
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
在这里,你使用await
关键字前缀调用calculateCount()
函数以等待Promise解析。一旦Promise解析完成,counter
变量将设置为解析后的值:
你的完整代码现在看起来像这样:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
保存并退出你的文件,然后再次启动服务器:
- node index.js
在你的网络浏览器中,访问http://localhost:3000/blocking
,当它加载时,快速重新加载http://localhost:3000/non-blocking
标签页。你会注意到,non-blocking
路由仍然受到影响,它们都将等待/blocking
路由完成加载。因为路由仍然受到影响,Promise不能使JavaScript代码并行执行,也不能用于使CPU密集型任务变为非阻塞。
随着这个,使用CTRL+C
停止应用服务器。
现在您知道,Promise 不提供任何机制使 CPU 密集型任务非阻塞,您将使用 Node.js 的worker-threads
模块将 CPU 密集型任务转移到单独的线程中。
使用worker-threads
模块转移 CPU 密集型任务
在这个部分,您将使用worker-threads
模块将 CPU 密集型任务转移到另一个线程,以避免阻塞主线程。为此,您将创建一个worker.js
文件,其中包含 CPU 密集型任务。在index.js
文件中,您将使用worker-threads
模块初始化线程,并在worker.js
文件中开始任务以并行运行到主线程。一旦任务完成,工作线程将发送包含结果的消息回到主线程。
首先,验证您是否有2个或更多核心,使用nproc
命令:
- nproc
Output4
如果显示两个或更多核心,则可以继续进行此步骤。
接下来,在您的文本编辑器中创建并打开worker.js
文件:
- nano worker.js
在您的worker.js
文件中,添加以下代码来导入worker-threads
模块并执行 CPU 密集型任务:
const { parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
第一行加载worker_threads
模块并提取parentPort
类。该类提供了可用于向主线程发送消息的方法。接下来,您有一个CPU密集型任务,当前在index.js
文件的calculateCount()
函数中。在此步骤的后续操作中,您将从index.js
中删除此函数。
在此之后,添加下面突出显示的代码:
const { parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
parentPort.postMessage(counter);
在这里,调用parentPort
类的postMessage()
方法,该方法向主线程发送包含存储在counter
变量中的CPU绑定任务结果的消息。
保存并退出您的文件。在文本编辑器中打开index.js
:
- nano index.js
由于您已经在worker.js
中有了CPU绑定任务,因此从index.js
中删除下面突出显示的代码:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
接下来,在app.get("/blocking")
回调中,添加以下代码以初始化线程:
const express = require("express");
const { Worker } = require("worker_threads");
...
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error occurred: ${msg}`);
});
});
...
首先,导入worker_threads
模块并解压Worker
类。在app.get("/blocking")
回调内,使用new
关键字创建Worker
的实例,后跟对Worker
的调用,其参数是worker.js
文件路径。这将创建一个新的线程,并且worker.js
文件中的代码将在另一个核心的线程上运行。
随后,您可以使用on("message")
方法将事件附加到worker
实例,以侦听消息事件。当接收到包含来自worker.js
文件的结果的消息时,它将作为参数传递给方法的回调函数,该回调函数返回包含 CPU 绑定任务结果的响应给用户。
接下来,您可以使用on("error")
方法将另一个事件附加到 worker 实例,以侦听错误事件。如果发生错误,回调函数将返回包含错误消息的404
响应给用户。
您的完整文件现在看起来像下面这样:
const express = require("express");
const { Worker } = require("worker_threads");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error occurred: ${msg}`);
});
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
保存并退出您的文件,然后运行服务器:
- node index.js
在您的 Web 浏览器中再次访问http://localhost:3000/blocking
选项卡。在它完成加载之前,刷新所有http://localhost:3000/non-blocking
选项卡。现在,您应该注意到它们立即加载,而不需要等待/blocking
路由完成加载。这是因为 CPU 绑定任务被转移到另一个线程,而主线程处理所有传入请求。
现在,使用CTRL+C
停止您的服务器。
现在,您已经知道如何使用工作线程使 CPU 密集型任务非阻塞,您将使用四个工作线程来提高 CPU 密集型任务的性能。
使用四个工作线程优化 CPU 密集型任务
在这一部分中,您将把 CPU 密集型任务分配给四个工作线程,以便它们可以更快地完成任务并缩短 /blocking
路由的加载时间。
为了让更多的工作线程共同处理同一任务,您需要拆分任务。由于任务涉及循环 200 亿次,您将 200 亿除以您想要使用的线程数。在这种情况下,是 4
。计算 20_000_000_000 / 4
将得到 5_000_000_000
。因此,每个线程将循环从 0
到 5_000_000_000
并通过 1
递增 counter
。当每个线程完成时,它将向主线程发送包含结果的消息。一旦主线程分别收到来自四个线程的消息,您将合并结果并向用户发送响应。
如果您有一个迭代大型数组的任务,也可以使用相同的方法。例如,如果您想要调整目录中的 800 张图像大小,您可以创建一个包含所有图像文件路径的数组。接下来,将 800
除以 4
(线程数)并让每个线程处理一个范围。线程一将调整数组索引从 0
到 199
的图像大小,线程二从索引 200
到 399
,依此类推。
首先,请验证您是否有四个或更多核心:
- nproc
Output4
使用cp
命令复制worker.js
文件:
- cp worker.js four_workers.js
当前的index.js
和worker.js
文件将保持不变,以便您稍后可以再次运行它们,以便与此部分的更改性能进行比较。
接下来,在您的文本编辑器中打开four_workers.js
文件:
- nano four_workers.js
在您的four_workers.js
文件中,添加突出显示的代码以导入workerData
对象:
const { workerData, parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000 / workerData.thread_count; i++) {
counter++;
}
parentPort.postMessage(counter);
首先,您提取WorkerData
对象,该对象将包含从主线程传递的数据,当初始化线程时(您将很快在index.js
文件中执行此操作)。
该对象具有一个thread_count
属性,其中包含线程的数量,即4
。然后在for
循环中,将值20_000_000_000
除以4
,结果为5_000_000_000
。
保存并关闭您的文件,然后复制index.js
文件:
- cp index.js index_four_workers.js
在您的编辑器中打开index_four_workers.js
文件:
- nano index_four_workers.js
在您的index_four_workers.js
文件中,添加突出显示的代码以创建线程实例:
...
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
...
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
});
}
app.get("/blocking", async (req, res) => {
...
})
...
首先,定义包含您要创建的线程数量的THREAD_COUNT
常量。稍后,当您的服务器上有更多核心时,扩展将涉及将THREAD_COUNT
的值更改为您要使用的线程数量。
接下来,createWorker()
函数创建并返回一个Promise。在Promise的回调中,通过将Worker
类作为第一个参数传递文件路径到four_workers.js
文件来初始化一个新的线程。然后,将一个对象作为第二个参数传递。接下来,给该对象分配一个具有另一个对象作为其值的workerData
属性。最后,给该对象分配一个thread_count
属性,其值为THREAD_COUNT
常量中的线程数。在workers.js
文件中,workerData
对象是您之前引用的对象。
为了确保Promise解析或抛出错误,请添加以下突出显示的行:
...
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
worker.on("message", (data) => {
resolve(data);
});
worker.on("error", (msg) => {
reject(`An error ocurred: ${msg}`);
});
});
}
...
当工作线程向主线程发送消息时,Promise将以返回的数据解析。然而,如果发生错误,Promise将返回一个错误消息。
现在,您已经定义了初始化新线程并从线程返回数据的函数,您将在app.get("/blocking")
中使用该函数来生成新线程。
但首先,请删除以下突出显示的代码,因为您已经在createWorker()
函数中定义了此功能:
...
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error ocurred: ${msg}`);
});
});
...
删除代码后,添加以下代码以初始化四个工作线程:
...
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
});
...
首先,您创建一个workerPromises
变量,其中包含一个空数组。接下来,您根据THREAD_COUNT
的值(即4
)进行迭代。在每次迭代期间,您调用createWorker()
函数来创建一个新线程。然后,您使用JavaScript的push
方法将函数返回的promise对象推入workerPromises
数组中。当循环结束时,workerPromises
将有四个promise对象,每个都是从调用createWorker()
函数四次返回的。
现在,请在下面的突出显示的代码中添加以下内容,以等待promise解析并向用户返回响应:
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
const thread_results = await Promise.all(workerPromises);
const total =
thread_results[0] +
thread_results[1] +
thread_results[2] +
thread_results[3];
res.status(200).send(`result is ${total}`);
});
由于workerPromises
数组包含调用createWorker()
返回的promise,您需要使用await
语法前缀Promise.all()
方法,并使用workerPromises
作为其参数调用all()
方法。Promise.all()
方法等待数组中的所有promise解析。当这种情况发生时,thread_results
变量包含promise解析的值。由于计算被分割到四个工作线程中,您可以使用方括号表示法从thread_results
中获取每个值,然后将它们全部加在一起。加完后,您将总值返回到页面上。
您的完整文件现在应该是这样的:
const express = require("express");
const { Worker } = require("worker_threads");
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
worker.on("message", (data) => {
resolve(data);
});
worker.on("error", (msg) => {
reject(`An error ocurred: ${msg}`);
});
});
}
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
const thread_results = await Promise.all(workerPromises);
const total =
thread_results[0] +
thread_results[1] +
thread_results[2] +
thread_results[3];
res.status(200).send(`result is ${total}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
保存并关闭您的文件。在运行此文件之前,请先运行index.js
来测量其响应时间。
- node index.js
接下来,在您的本地计算机上打开一个新的终端,并输入以下curl
命令,该命令测量从/blocking
路由获取响应所需的时间:
- time curl --get http://localhost:3000/blocking
time
命令用于测量curl
命令的运行时间。 curl
命令向指定的URL发送HTTP请求,--get
选项指示curl
进行GET
请求。
当命令运行时,您的输出将类似于以下内容:
Outputreal 0m28.882s
user 0m0.018s
sys 0m0.000s
突出显示的输出显示从收到请求到获取响应大约需要28秒,这可能会因计算机而异。
接下来,使用CTRL+C
停止服务器,并运行index_four_workers.js
文件:
- node index_four_workers.js
再次在第二个终端访问/blocking
路由:
- time curl --get http://localhost:3000/blocking
您将看到与以下内容一致的输出:
Outputreal 0m8.491s
user 0m0.011s
sys 0m0.005s
输出显示大约需要8秒,这意味着您将加载时间缩短了约70%。
您成功地使用四个工作线程优化了CPU绑定的任务。 如果您有多于四个核心的机器,请将THREAD_COUNT
更新为该数字,您将进一步缩短加载时间。
结论
在这篇文章中,您使用 Node 应用程序构建了一个占用 CPU 的任务,导致主线程被阻塞。然后,您尝试使用 promises 使任务变为非阻塞,但未成功。随后,您使用了 worker_threads
模块,将 CPU 密集型任务转移到另一个线程以使其非阻塞。最后,您使用 worker_threads
模块创建了四个线程,以加速 CPU 密集型任务。
作为下一步,查看 Node.js Worker threads 文档以了解更多选项。此外,您还可以查看 piscina
库,该库允许您为 CPU 密集型任务创建一个工作线程池。如果您想继续学习 Node.js,请参阅教程系列 How To Code in Node.js。
Source:
https://www.digitalocean.com/community/tutorials/how-to-use-multithreading-in-node-js