作者選擇 開源心理疾病 作為 Write for DOnations 計劃的一部分捐贈對象。
介紹
Node.js 在單個線程中運行 JavaScript 代碼,這意味著您的代碼一次只能執行一個任務。但是,Node.js 本身是多線程的,並通過 libuv
库提供隱藏的線程,該庫處理諸如從磁盤讀取文件或網絡請求等 I/O 操作。通過使用隱藏的線程,Node.js 提供了異步方法,允許您的代碼進行 I/O 請求而不會阻塞主線程。
儘管 Node.js 有隱藏的線程,但您無法使用它們來卸載 CPU 密集型任務,例如復雜的計算、圖像調整或視頻壓縮。由於 JavaScript 是單線程的,當 CPU 密集型任務運行時,它會阻塞主線程,直到任務完成為止。如果不使用其他線程,加速 CPU 綁定任務的唯一方法是提高處理器速度。
然而,近年來,中央處理器(CPU)並未變得更快。相反地,電腦配備了額外的核心,現在更常見的是電腦擁有8個或更多的核心。儘管如此,由於JavaScript是單線程的,您的代碼將無法利用計算機上的額外核心來加快處理器密集型任務的速度或避免中斷主線程。
為了解決這個問題,Node.js引入了worker-threads
模組,允許您創建線程並且同時執行多個JavaScript任務。一旦線程完成任務,它會向主線程發送一個包含操作結果的消息,以便與代碼的其他部分一起使用。使用工作線程的優點是,CPU密集型任務不會阻塞主線程,並且您可以將任務分割並分配給多個工作線程以進行優化。
在本教程中,您將創建一個具有阻塞主線程的CPU密集型任務的Node.js應用程序。接下來,您將使用worker-threads
模組將CPU密集型任務轉移到另一個線程中,以避免阻塞主線程。最後,您將將CPU密集型任務分割成四個線程並且並行處理,以加快任務的速度。
先決條件
完成本教程,您需要:
-
具有四個或更多核心的多核系統。您仍然可以在雙核系統上按照步驟1到6的教程進行操作。但是,第7步需要四個核心才能看到性能改善。
-
一個 Node.js 開發環境。如果您使用的是 Ubuntu 22.04,請按照 在 Ubuntu 22.04 上安裝 Node.js 的第 3 步 來安裝最新版本的 Node.js。如果您使用的是其他操作系統,請參閱 如何安裝 Node.js 並建立本地開發環境。
-
對於 JavaScript 中的事件循環、回調和承諾有良好的理解,您可以在我們的教程中找到相關信息,理解 JavaScript 中的事件循環、回調、承諾和異步/等待。
-
基本了解如何使用 Express Web 框架。查看我們的指南,如何開始使用 Node.js 和 Express。
設置項目並安裝依賴項
在這一步,您將創建項目目錄,初始化npm
,並安裝所有必要的依賴項。
首先,創建並移動到項目目錄:
- mkdir multi-threading_demo
- cd multi-threading_demo
mkdir
命令創建一個目錄,cd
命令將工作目錄更改為新創建的目錄。
隨後,使用npm init
命令初始化項目目錄:
- npm init -y
-y
選項接受所有默認選項。
當命令運行時,您的輸出將類似於這樣:
Wrote to /home/sammy/multi-threading_demo/package.json:
{
"name": "multi-threading_demo",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC"
}
接下來,安裝express
,一個Node.js Web框架:
- npm install express
您將使用Express創建一個具有阻塞和非阻塞端點的伺服器應用程序。
Node.js默認附帶worker-threads
模塊,因此您無需安裝它。
您現在已經安裝了必要的套件。接下來,您將更多了解有關進程和線程以及它們在計算機上的執行方式。
理解進程和線程
在開始編寫與 CPU 繫結的任務並將其卸載到獨立的線程之前,您首先需要了解進程和線程是什麼,以及它們之間的區別。最重要的是,您將查看進程和線程在單核或多核計算機系統上的執行方式。
進程
A process is a running program in the operating system. It has its own memory and cannot see nor access the memory of other running programs. It also has an instruction pointer, which indicates the instruction currently being executed in a program. Only one task can be executed at a time.
要了解這一點,您將創建一個具有無限循環的 Node.js 程序,以便在運行時不退出。
使用 nano
或您偏好的文本編輯器,創建並打開 process.js
文件:
- nano process.js
在您的 process.js
文件中,輸入以下代碼:
const process_name = process.argv.slice(2)[0];
count = 0;
while (true) {
count++;
if (count == 2000 || count == 4000) {
console.log(`${process_name}: ${count}`);
}
}
在第一行中,process.argv
屬性返回包含程序命令行參數的數組。然後,附加 JavaScript 的 slice()
方法,參數為 2
,以製作從索引 2 開始的數組的淺拷貝。這樣做可以跳過前兩個參數,即 Node.js 路徑和程序文件名。接下來,使用括號表示法檢索切片數組的第一個參數並將其存儲在 process_name
變量中。
之後,您定義一個while
迴圈並傳遞一個true
條件使迴圈永遠運行。在迴圈內,count
變數在每次迭代時增加1
。接下來是一個if
語句,檢查count
是否等於2000
或4000
。如果條件求值為真,console.log()
方法會在終端中記錄一條消息。
使用CTRL+X
保存並關閉您的文件,然後按Y
保存更改。
使用node
命令運行該程序:
- node process.js A &
A
is a command-line argument that is passed to the program and stored in the process_name
variable. The &
at end the allows the Node program to run in the background, which lets you enter more commands in the shell.
運行該程序時,您將看到類似以下的輸出:
Output[1] 7754
A: 2000
A: 4000
數字7754
是操作系統分配給它的進程ID。A:2000
和A:4000
是程序的輸出。
使用node
命令運行程序時,您會創建一個進程。操作系統為該程序分配內存,定位計算機磁盤上的程序可執行文件並將程序加載到內存中。然後,它會為其分配一個進程ID並開始執行該程序。此時,您的程序現在已成為一個進程。
當進程正在運行時,其進程ID將添加到操作系統的進程列表中,可以使用htop
、top
或ps
等工具查看。這些工具提供有關進程的更多詳細信息,以及停止或優先處理它們的選項。
要快速獲得 Node 進程的摘要,請在終端中按下ENTER
,以獲得提示返回。接著,執行ps
命令查看 Node 進程:
- ps |grep node
ps
命令列出系統上當前用戶的所有進程。管道運算符|
將所有ps
輸出傳遞給grep
,以篩選出僅列出 Node 進程。
運行該命令將產生類似以下的輸出:
Output7754 pts/0 00:21:49 node
您可以從單個程序創建無數個進程。例如,使用以下命令創建三個帶有不同參數的進程並將它們放在後台運行:
- node process.js B & node process.js C & node process.js D &
在該命令中,您創建了process.js
程序的三個更多實例。&
符號將每個進程放在後台運行。
運行該命令後,輸出將類似於以下內容(順序可能有所不同):
Output[2] 7821
[3] 7822
[4] 7823
D: 2000
D: 4000
B: 2000
B: 4000
C: 2000
C: 4000
如您所見,在輸出中,當計數達到2000
和4000
時,每個進程都將進程名稱記錄到終端中。每個進程都不知道其他進程的存在:進程D
不知道進程C
的存在,反之亦然。在任何一個進程中發生的任何事情都不會影響其他 Node.js 進程。
如果你仔細檢查輸出,你會發現輸出的順序不同於你創建三個進程時的順序。執行命令時,進程的參數順序是按照 B
、C
和 D
。但現在,順序是 D
、B
和 C
。原因是作業系統有排程算法,決定在給定時間內在 CPU 上運行哪個進程。
在單核機器上,進程會並行執行。也就是說,操作系統會在固定間隔內在進程之間切換。例如,進程 D
執行一段時間,然後它的狀態被保存在某個地方,作業系統安排進程 B
執行一段時間,依此類推。這樣來回進行,直到所有任務完成。從輸出來看,每個進程可能看起來都已完成,但實際上,作業系統調度程序不斷在它們之間切換。
在多核系統上——假設你有四個核心——作業系統將每個進程排定在同一時間在每個核心上執行。這被稱為並行。但是,如果你創建四個以上的進程(總共八個),每個核心將同時執行兩個進程,直到它們完成。
線程
執行緒就像是進程一樣:它們有自己的指令指標,並且一次可以執行一個 JavaScript 任務。與進程不同,執行緒沒有自己的記憶體。相反,它們存在於進程的記憶體中。當您創建一個進程時,可以使用worker_threads
模塊創建多個執行 JavaScript 代碼的執行緒並行執行。此外,執行緒可以通過消息傳遞或在進程的記憶體中共享數據與彼此通信。這使它們與進程相比較輕量級,因為生成一個執行緒不需要向操作系統請求更多記憶體。
就執行緒的執行而言,它們的行為與進程的行為類似。如果在單核系統上運行多個執行緒,操作系統將在固定間隔內在它們之間進行切換,使每個執行緒有機會直接在單個 CPU 上執行。在多核系統上,操作系統會將執行緒安排在所有核心上並同時執行 JavaScript 代碼。如果您創建的執行緒多於可用的核心數,則每個核心將同時執行多個執行緒。
有了這個,按下ENTER
,然後使用kill
命令停止所有當前運行的 Node 進程:
- sudo kill -9 `pgrep node`
pgrep
返回所有四個 Node 進程的進程 ID 給kill
命令。-9
選項指示kill
發送一個SIGKILL信號。
當您運行命令時,您將看到類似以下的輸出:
Output[1] Killed node process.js A
[2] Killed node process.js B
[3] Killed node process.js C
[4] Killed node process.js D
有時候輸出可能會延遲,並且在您稍後運行其他命令時顯示出來。
現在您已經知道了進程和線程之間的區別,接下來您將在下一節中使用Node.js的隱藏線程。
理解Node.js中的隱藏線程
Node.js確實提供了額外的線程,這就是為什麼它被認為是多線程的原因。在本節中,您將查看Node.js中的隱藏線程,這有助於使I/O操作非阻塞。
正如介紹中提到的,JavaScript是單線程的,所有的JavaScript代碼都在一個線程中執行。這包括您的程序源代碼和您在程序中包含的第三方庫。當程序執行I/O操作以讀取文件或網絡請求時,這會阻塞主線程。
然而,Node.js 實現了 libuv
庫,該庫為 Node.js 進程提供了四個額外的線程。通過這些線程,I/O 操作被分開處理,當它們完成時,事件循環會將與 I/O 任務關聯的回調添加到微任務隊列中。當主線程的調用堆棧清空時,回調會被推送到調用堆棧,然後執行。為了澄清這一點,與給定 I/O 任務相關的回調並不是並行執行的;然而,讀取文件或網絡請求的任務本身是在幫助線程的情況下並行執行的。一旦 I/O 任務完成,回調就在主線程中運行。
除了這四個線程之外,V8 引擎 還提供了兩個線程來處理自動垃圾回收等事情。這將進程中的總線程數增加到七個:一個主線程,四個 Node.js 線程和兩個 V8 線程。
為了確認每個 Node.js 進程都有七個線程,再次運行 process.js
文件並將其放在後台運行:
- node process.js A &
終端將記錄進程 ID,以及程序的輸出:
Output[1] 9933
A: 2000
A: 4000
請在某處記下進程 ID,然後按 ENTER
鍵,以便您可以再次使用提示符。
要查看線程,運行 top
命令並將其傳遞給輸出中顯示的進程 ID:
- top -H -p 9933
-H
指示 top
顯示進程中的線程。 -p
標誌指示 top
僅監視給定進程 ID 中的活動。
運行命令時,您的輸出將類似於以下內容:
Outputtop - 09:21:11 up 15:00, 1 user, load average: 0.99, 0.60, 0.26
Threads: 7 total, 1 running, 6 sleeping, 0 stopped, 0 zombie
%Cpu(s): 24.8 us, 0.3 sy, 0.0 ni, 75.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
MiB Mem : 7951.2 total, 6756.1 free, 248.4 used, 946.7 buff/cache
MiB Swap: 0.0 total, 0.0 free, 0.0 used. 7457.4 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
9933 node-us+ 20 0 597936 51864 33956 R 99.9 0.6 4:19.64 node
9934 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node
9935 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.84 node
9936 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node
9937 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.93 node
9938 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node
9939 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node
正如您在輸出中所看到的,Node.js進程總共有七個線程:一個主線程用於執行JavaScript,四個Node.js線程和兩個V8線程。
正如之前討論的,四個Node.js線程用於I/O操作,以使它們非阻塞。對於這個任務它們工作得很好,而且自己為I/O操作創建線程可能會更加降低應用程序的性能。對於CPU密集型任務則不能這麼說。CPU密集型任務不使用進程中的任何額外線程並阻塞主線程。
現在按下q
退出top
並使用以下命令停止Node進程:
- kill -9 9933
現在您已經了解了Node.js進程中的線程,接下來的部分將編寫一個CPU密集型任務並觀察它對主線程的影響。
在沒有Worker線程的情況下創建CPU密集型任務
在本部分中,您將構建一個具有非阻塞路由和運行CPU密集型任務的阻塞路由的Express應用程序。
首先,在您首選的編輯器中打開index.js
:
- nano index.js
在您的index.js
文件中,添加以下代碼以創建一個基本服務器:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
在下面的程式碼塊中,您使用 Express 建立了一個 HTTP 伺服器。在第一行中,您導入了 express
模組。接下來,您設置了 app
變數以保存 Express 的一個實例。然後,您定義了 port
變數,該變數保存了伺服器應該監聽的埠號。
在此之後,您使用 app.get('/non-blocking')
定義了 GET
請求應該發送的路由。最後,您調用了 app.listen()
方法來指示伺服器開始監聽埠號 3000
。
接下來,定義另一個路由,/blocking/
,其中將包含一個 CPU 密集型任務:
...
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
您使用 app.get("/blocking")
定義了 /blocking
路由,該路由接受一個異步回調作為第二個參數,該回調以 async
關鍵字為前綴運行一個 CPU 密集型任務。在回調中,您創建了一個 for
循環,它迭代 200 億次,並在每次迭代期間將 counter
變數增加 1
。此任務在 CPU 上運行,需要幾秒鐘才能完成。
此時,您的 index.js
檔現在看起來是這樣的:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
保存並退出您的檔案,然後使用以下命令啟動伺服器:
- node index.js
執行該命令後,您將看到類似以下的輸出:
OutputApp listening on port 3000
這顯示伺服器正在運行並準備提供服務。
現在,請在您偏好的瀏覽器中訪問 http://localhost:3000/non-blocking
。您將立即收到一條帶有消息 This page is non-blocking
的回應。
注意:如果您正在远程服务器上按照教程操作,您可以使用端口转发在浏览器中测试应用程序。
在Express服务器仍在运行时,在本地计算机上打开另一个终端,并输入以下命令:
- ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip
连接到服务器后,在本地计算机的Web浏览器中导航至http://localhost:3000/non-blocking
。在整个教程中保持第二个终端保持打开状态。
接下来,打开一个新标签并访问http://localhost:3000/blocking
。在页面加载时,快速打开两个额外的标签并再次访问http://localhost:3000/non-blocking
。您会发现您不会立即得到响应,页面将一直尝试加载。只有在/blocking
路由完成加载并返回响应result is 20000000000
后,其余的路由才会返回响应。
之所以所有/non-blocking
路由在/blocking
路由加载时无法正常工作,是因为CPU绑定的for
循环阻塞了主线程。当主线程被阻塞时,Node.js无法为任何请求提供服务,直到CPU绑定的任务完成。因此,如果您的应用程序对/non-blocking
路由有数千个同时的GET
请求,仅需访问/blocking
路由即可使所有应用程序路由不响应。
正如您所見,阻塞主線程會損害用戶對您應用的體驗。為了解決這個問題,您需要將 CPU 繁重任務轉移至另一個線程,以便主線程可以繼續處理其他 HTTP 請求。
因此,通過按下 CTRL+C
停止服務器。您將在下一節再次啟動服務器,屆時會對 index.js
檔案進行更多更改。停止服務器的原因是 Node.js 在檔案有新變更時不會自動刷新。
現在您已經了解了 CPU 密集型任務對您的應用程序可能造成的負面影響,您現在將嘗試使用 promises 來避免阻塞主線程。
使用 Promises 來分擔 CPU 繁重任務
通常當開發者了解到 CPU 繁重任務的阻塞效應時,他們會轉向使用 promises 來使代碼非阻塞。這種本能來自於使用非阻塞的 promise-based I/O 方法,如 readFile()
和 writeFile()
的知識。但正如您所學到的,I/O 操作利用的是 Node.js 的隱藏線程,而 CPU 繁重任務則不是。儘管如此,在本節中,您將嘗試將 CPU 繁重任務包裹在一個 promise 中,以試圖使其非阻塞。這不會奏效,但它將幫助您看到使用工作線程的價值,這是您將在下一節中做的。
再次在您的編輯器中打開 index.js
檔案:
- nano index.js
在您的index.js
文件中,請移除包含 CPU 密集任務的突顯代碼:
...
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
...
接著,添加以下突顯代碼,其中包含一個返回 promise 的函數:
...
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
res.status(200).send(`result is ${counter}`);
}
calculateCount()
函數現在包含您在/blocking
處理函數中的計算。此函數返回一個 promise,使用new Promise
語法初始化。該 promise 接受一個帶有resolve
和reject
參數的回調函數,用於處理成功或失敗。當for
循環運行完成時,該 promise 將使用counter
變數中的值解析。
接著,在index.js
文件中的/blocking/
處理函數中調用calculateCount()
函數:
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
在此,使用帶有await
關鍵字前綴的方式調用calculateCount()
函數,以等待 promise 解析。一旦 promise 解析,counter
變數將設置為解析的值。
您的完整代碼現在看起來如下:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
保存並退出文件,然後重新啟動服務器:
- node index.js
在網頁瀏覽器中,訪問http://localhost:3000/blocking
並在它加載時快速重新加載http://localhost:3000/non-blocking
選項卡。正如您會注意到的那樣,non-blocking
路由仍然受影響,它們都將等待/blocking
路由完成加載。由於路由仍然受影響,promise 無法使 JavaScript 代碼並行執行,也無法用於使 CPU 綁定任務非阻塞。
隨後,使用CTRL+C
停止應用伺服器。
現在您已經知道,Promise並不提供任何機制來使CPU繁重的任務變得非阻塞,您將使用Node.js的worker-threads
模組將CPU繁重的任務轉移到單獨的線程中。
使用worker-threads
模組卸載CPU繁重的任務
在本節中,您將使用worker-threads
模組將一個CPU密集型任務卸載到另一個線程中,以避免阻塞主線程。為此,您將創建一個worker.js
文件,其中將包含CPU密集型任務。在index.js
文件中,您將使用worker-threads
模組來初始化線程並在worker.js
文件中啟動任務,使其與主線程並行運行。一旦任務完成,工作線程將向主線程發送包含結果的消息。
首先,請確認您使用nproc
命令擁有2個或更多核心:
- nproc
Output4
如果顯示兩個或更多核心,則可以繼續進行此步驟。
接下來,在您的文本編輯器中創建並打開worker.js
文件:
- nano worker.js
在您的worker.js
文件中,添加以下代碼以導入worker-threads
模組並執行CPU密集型任務:
const { parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
第一行加載worker_threads
模塊並提取parentPort
類。該類提供了您可以使用的方法,用於向主線程發送消息。接下來,您有目前在index.js
文件的calculateCount()
函數中的CPU密集型任務。在這一步之後,您將從index.js
中刪除此函數。
隨後,添加下面突出顯示的代碼:
const { parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
parentPort.postMessage(counter);
在這裡,您調用parentPort
類的postMessage()
方法,該方法將一個消息發送到主線程,該消息包含存儲在counter
變量中的CPU綁定任務的結果。
保存並退出您的文件。在您的文本編輯器中打開index.js
:
- nano index.js
由於您已經在worker.js
中具有CPU綁定任務,請從index.js
中刪除突出顯示的代碼:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
接下來,在app.get("/blocking")
回調中,添加以下代碼來初始化線程:
const express = require("express");
const { Worker } = require("worker_threads");
...
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error occurred: ${msg}`);
});
});
...
首先,您導入worker_threads
模塊並解包Worker
類。在app.get("/blocking")
回調內,使用new
關鍵字創建Worker
的實例,該關鍵字後面是對Worker
的調用,其參數是worker.js
文件路徑。這將創建一個新的線程,並且worker.js
文件中的代碼將在另一個核心上的線程中運行。
隨後,您使用`on(“message”)`方法將事件附加到`worker`實例,以監聽消息事件。當接收到包含從`worker.js`文件中獲得的結果的消息時,將其作為參數傳遞給該方法的回調,該回調返回一個包含CPU密集任務結果的響應給用戶。
接下來,您使用`on(“error”)`方法將另一個事件附加到worker實例,以監聽錯誤事件。如果發生錯誤,回調將返回一個包含錯誤消息的`404`響應給用戶。
現在,您完整的文件看起來像下面這樣:
const express = require("express");
const { Worker } = require("worker_threads");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error occurred: ${msg}`);
});
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
保存並退出您的文件,然後運行服務器:
- node index.js
再次訪問您的網絡瀏覽器中的`http://localhost:3000/blocking`頁面。在它完成加載之前,刷新所有`http://localhost:3000/non-blocking`頁面。您現在應該注意到它們立即加載,而無需等待`/blocking`路由完成加載。這是因為CPU密集任務被轉移到另一個線程,而主線程處理所有傳入的請求。
現在,使用`CTRL+C`停止您的服務器。
現在,您可以使用工作線程將CPU密集任務設置為非阻塞,以提高性能。
使用四個工作線程優化 CPU 密集型任務
在這一節中,您將把 CPU 密集型任務分配給四個工作線程,以便它們可以更快地完成任務,縮短 /blocking
路由的加載時間。
要讓更多工作線程處理同一任務,您需要將任務分割。由於該任務涉及循環執行 200 億次,您將把 200 億除以您想要使用的線程數。在本例中,它是 4
。計算 20_000_000_000 / 4
將得到 5_000_000_000
。因此,每個線程將從 0
循環到 5_000_000_000
,並將 counter
每次增加 1
。當每個線程完成時,它將向主線程發送包含結果的消息。一旦主線程分別收到來自所有四個線程的消息,您將合併結果並向用戶發送響應。
如果您有一個需要迭代大型數組的任務,您也可以使用相同的方法。例如,如果您想要調整目錄中的 800 張圖像大小,您可以創建一個包含所有圖像文件路徑的數組。接下來,將 800
除以 4
(線程計數),並讓每個線程處理一個範圍。線程一將調整從數組索引 0
到 199
的圖像,線程二從索引 200
到 399
,依此類推。
首先,請確認您有四個或更多的核心:
- nproc
Output4
使用cp
命令複製worker.js
文件:
- cp worker.js four_workers.js
當您稍後再次運行這些文件以比較此部分變更後的性能時,當前的index.js
和worker.js
文件將保持不變。
接下來,在您的文本編輯器中打開four_workers.js
文件:
- nano four_workers.js
在您的four_workers.js
文件中,添加突出顯示的代碼以導入workerData
對象:
const { workerData, parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000 / workerData.thread_count; i++) {
counter++;
}
parentPort.postMessage(counter);
首先,您提取WorkerData
對象,該對象將包含從主線程傳遞的數據,在線程初始化時(您很快將在index.js
文件中執行)進行。該對象具有一個thread_count
屬性,其中包含線程的數量,即4
。然後在for
循環中,將值20_000_000_000
除以4
,結果為5_000_000_000
。
保存並關閉您的文件,然後複製index.js
文件:
- cp index.js index_four_workers.js
在您的編輯器中打開 index_four_workers.js
文件:
- nano index_four_workers.js
在您的index_four_workers.js
文件中,添加突出顯示的代碼以創建線程實例:
...
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
...
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
});
}
app.get("/blocking", async (req, res) => {
...
})
...
首先,定義包含您要創建的線程數的THREAD_COUNT
常量。稍後,當您的服務器上有更多核心時,擴展將涉及將THREAD_COUNT
的值更改為您要使用的線程數。
接下來,createWorker()
函數創建並返回一個 promise。在 promise 回調函數中,通過將文件路徑傳遞給 Worker
類的第一個參數來初始化一個新的線程,文件路徑為 four_workers.js
。然後,將對象作為第二個參數傳遞。接下來,將對象分配給具有另一個對象作為其值的 workerData
屬性。最後,將對象分配給具有數值為常量 THREAD_COUNT
中線程數量的 thread_count
屬性。workerData
對象是您之前在 workers.js
文件中引用的對象。
為了確保 promise 解析或拋出錯誤,請添加以下突出顯示的行:
...
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
worker.on("message", (data) => {
resolve(data);
});
worker.on("error", (msg) => {
reject(`An error ocurred: ${msg}`);
});
});
}
...
當工作線程向主線程發送消息時,promise 解析並返回返回的數據。但是,如果發生錯誤,promise 將返回一個錯誤消息。
現在,您已經定義了初始化新線程並返回線程數據的函數,您將在 app.get("/blocking")
中使用該函數來生成新的線程。
但首先,請刪除以下突出顯示的代碼,因為您已經在 createWorker()
函數中定義了此功能:
...
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error ocurred: ${msg}`);
});
});
...
刪除代碼後,添加以下代碼來初始化四個工作線程:
...
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
});
...
首先,您創建一個 `workerPromises` 變量,其中包含一個空數組。接下來,您遍歷與 `THREAD_COUNT` 的值相同的次數,該值為 `4`。在每次迭代期間,您調用 `createWorker()` 函數來創建一個新的線程。然後,您使用 JavaScript 的 `push` 方法將函數返回的 promise 對象推入 `workerPromises` 數組中。當循環結束時,`workerPromises` 將有四個 promise 對象,每個對象都是調用 `createWorker()` 函數四次返回的。
現在,將下面突出顯示的代碼添加到下面,以等待承諾解析並向用戶返回響應:
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
const thread_results = await Promise.all(workerPromises);
const total =
thread_results[0] +
thread_results[1] +
thread_results[2] +
thread_results[3];
res.status(200).send(`result is ${total}`);
});
由於 `workerPromises` 數組包含調用 `createWorker()` 返回的承諾,您將 `Promise.all()` 方法前綴為 `await` 語法,並使用 `all()` 方法調用 `workerPromises` 作為其參數。`Promise.all()` 方法等待數組中的所有承諾解析。當發生這種情況時,`thread_results` 變量包含承諾解析的值。由於計算被分配給四個工作線程,您可以通過使用方括號表示法從 `thread_results` 中獲取每個值來將它們全部加在一起。一旦添加,您將總值返回到頁面。
您的完整文件現在應該如下所示:
const express = require("express");
const { Worker } = require("worker_threads");
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
worker.on("message", (data) => {
resolve(data);
});
worker.on("error", (msg) => {
reject(`An error ocurred: ${msg}`);
});
});
}
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
const thread_results = await Promise.all(workerPromises);
const total =
thread_results[0] +
thread_results[1] +
thread_results[2] +
thread_results[3];
res.status(200).send(`result is ${total}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
保存並關閉您的文件。在運行此文件之前,首先運行 `index.js` 來測量其響應時間:
- node index.js
接下來,在您的本地計算機上打開一個新的終端並輸入以下curl
命令,該命令測量從/blocking
路徑獲取響應所需的時間:
- time curl --get http://localhost:3000/blocking
time
命令測量curl
命令運行的時間。 curl
命令向給定的URL發送HTTP請求,--get
選項指示curl
發出GET
請求。
當命令運行時,您的輸出將類似於這樣:
Outputreal 0m28.882s
user 0m0.018s
sys 0m0.000s
突出顯示的輸出顯示獲取響應大約需要28秒,這可能因您的計算機而異。
接下來,使用CTRL+C
停止服務器,然後運行index_four_workers.js
文件:
- node index_four_workers.js
再次在第二個終端訪問/blocking
路徑:
- time curl --get http://localhost:3000/blocking
您將看到與以下內容一致的輸出:
Outputreal 0m8.491s
user 0m0.011s
sys 0m0.005s
輸出顯示大約需要8秒,這意味著您將負載時間減少了大約70%。
您成功地使用四個工作線程優化了CPU綁定任務。 如果您有擁有超過四個核心的計算機,請將THREAD_COUNT
更新為該數字,您將進一步減少負載時間。
結論
在這篇文章中,您使用 Node 應用程式建立了一個佔用 CPU 的任務,導致主執行緒被阻塞。接著,您嘗試使用 promises 使該任務非同步,但未成功。之後,您使用 worker_threads
模組將 CPU 任務卸載到另一個執行緒,使其非同步。最後,您使用 worker_threads
模組創建了四個執行緒以加速這個需要大量 CPU 運算的任務。
作為下一步,請查閱 Node.js Worker threads 文件以深入了解選項。此外,您可以查看 piscina
函式庫,它允許您為 CPU 密集型任務建立工作執行緒池。如果想繼續學習 Node.js,請參閱教程系列 How To Code in Node.js。
Source:
https://www.digitalocean.com/community/tutorials/how-to-use-multithreading-in-node-js