著者は、Write for DOnationsプログラムの一環として、Open Sourcing Mental Illnessを寄付先に選択しました。
はじめに
Node.jsはJavaScriptコードをシングルスレッドで実行します。つまり、コードは一度に1つのタスクしか実行できません。ただし、Node.js自体はマルチスレッドであり、libuv
ライブラリを介して隠されたスレッドを提供し、ディスクからのファイルの読み取りやネットワークリクエストなどのI/O操作を処理します。隠されたスレッドを使用することで、Node.jsはメインスレッドをブロックせずにI/Oリクエストを行う非同期メソッドを提供します。
Node.jsには隠されたスレッドがありますが、それらを使用してCPU集中型のタスク(複雑な計算、画像のリサイズ、ビデオの圧縮など)をオフロードすることはできません。JavaScriptがシングルスレッドであるため、CPU集中型のタスクが実行されると、メインスレッドがブロックされ、そのタスクが完了するまで他のコードが実行されません。他のスレッドを使用しない場合、CPUにバインドされたタスクの処理速度を上げる唯一の方法は、プロセッサの速度を上げることです。
しかし、近年、CPUは速くなっていません。代わりに、コンピュータは余分なコアを搭載しており、8つ以上のコアを持つコンピュータが一般的になっています。それにもかかわらず、JavaScriptはシングルスレッドであるため、コードはコンピュータの余分なコアを利用してCPUバウンドのタスクを高速化したり、メインスレッドを壊れないようにすることができません。
この問題を解決するために、Node.jsはworker-threads
モジュールを導入しました。このモジュールを使用すると、スレッドを作成し、複数のJavaScriptタスクを並行して実行できます。スレッドがタスクを完了すると、その結果を含むメッセージをメインスレッドに送信し、コードの他の部分で使用できるようになります。ワーカースレッドを使用する利点は、CPUバウンドのタスクがメインスレッドをブロックせず、タスクを複数のワーカーに分割して最適化できることです。
このチュートリアルでは、メインスレッドをブロックするCPU集中型のタスクを持つNode.jsアプリを作成します。次に、worker-threads
モジュールを使用してCPU集中型のタスクをメインスレッドをブロックせずに別のスレッドにオフロードします。最後に、CPUバウンドのタスクを分割し、4つのスレッドで並行して作業するようにします。
前提条件
このチュートリアルを完了するには、以下が必要です:
-
4つ以上のコアを持つマルチコアシステム。デュアルコアシステムでもステップ1から6までのチュートリアルに従うことができます。ただし、パフォーマンスの向上を確認するには、ステップ7で4つのコアが必要です。
-
Node.js開発環境。Ubuntu 22.04を使用している場合は、Ubuntu 22.04にNode.jsをインストールする方法のステップ3に従って最新バージョンのNode.jsをインストールしてください。別のオペレーティングシステムを使用している場合は、Node.jsをインストールしてローカル開発環境を作成する方法を参照してください。
-
JavaScriptのイベントループ、コールバック、およびプロミスに関する十分な理解が必要です。当社のチュートリアル、JavaScriptでのイベントループ、コールバック、プロミス、および非同期/待機の理解で見つけることができます。
-
Expressウェブフレームワークの基本的な使用方法に関する知識が必要です。当社のガイド、Node.jsとExpressの始め方をチェックしてください。
プロジェクトの設定と依存関係のインストール
このステップでは、プロジェクトディレクトリを作成し、npm
を初期化し、すべての必要な依存関係をインストールします。
まず、プロジェクトディレクトリを作成して移動します:
- mkdir multi-threading_demo
- cd multi-threading_demo
mkdir
コマンドはディレクトリを作成し、cd
コマンドは作成したディレクトリに作業ディレクトリを変更します。
次に、npm init
コマンドを使用してnpmでプロジェクトディレクトリを初期化します:
- npm init -y
コマンドが実行されると、出力は次のようになります:
Wrote to /home/sammy/multi-threading_demo/package.json:
{
"name": "multi-threading_demo",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC"
}
次に、Node.jsのWebフレームワークであるexpress
をインストールします:
- npm install express
Expressを使用して、ブロッキングおよび非同期エンドポイントを持つサーバーアプリケーションを作成します。
Node.jsにはデフォルトでworker-threads
モジュールが付属しているため、インストールする必要はありません。
必要なパッケージがインストールされました。次に、プロセスとスレッドについて詳しく学び、コンピュータでどのように実行されるかを学びます。
プロセスとスレッドの理解
CPU バウンドタスクを書き始め、それらを別々のスレッドにオフロードする前に、まずプロセスとスレッドが何であるか、およびそれらの間の違いを理解する必要があります。最も重要なのは、プロセスとスレッドが単一またはマルチコアのコンピュータシステムでどのように実行されるかを確認することです。
プロセス
A process is a running program in the operating system. It has its own memory and cannot see nor access the memory of other running programs. It also has an instruction pointer, which indicates the instruction currently being executed in a program. Only one task can be executed at a time.
これを理解するために、無限ループを持つ Node.js プログラムを作成し、実行時に終了しないようにします。
お好みのテキストエディタ nano
を使用して、process.js
ファイルを作成および開きます:
- nano process.js
process.js
ファイルに次のコードを入力します:
const process_name = process.argv.slice(2)[0];
count = 0;
while (true) {
count++;
if (count == 2000 || count == 4000) {
console.log(`${process_name}: ${count}`);
}
}
最初の行では、process.argv
プロパティがプログラムのコマンドライン引数を含む配列を返します。次に、インデックス 2 以降の配列の浅いコピーを作成するために JavaScript の slice()
メソッドを引数 2
で使用します。これにより、最初の 2 つの引数(Node.js パスとプログラムファイル名)がスキップされます。次に、スライスされた配列から最初の引数を取得し、process_name
変数に格納します。
その後、while
ループを定義し、true
条件を渡してループを無限に実行します。ループ内で、count
変数は各反復ごとに1
ずつ増加します。その後に、count
が2000
または4000
と等しいかどうかをチェックするif
文が続きます。条件がtrueに評価されると、console.log()
メソッドがターミナルにメッセージをログします。
ファイルを保存して閉じるには、CTRL+X
を使用し、その後Y
を押して変更内容を保存します。
次に、node
コマンドを使用してプログラムを実行します:
- node process.js A &
A
is a command-line argument that is passed to the program and stored in the process_name
variable. The &
at end the allows the Node program to run in the background, which lets you enter more commands in the shell.
プログラムを実行すると、次のような出力が表示されます:
Output[1] 7754
A: 2000
A: 4000
数字7754
は、オペレーティングシステムがそれに割り当てたプロセスIDです。A: 2000
およびA: 4000
は、プログラムの出力です。
node
コマンドを使用してプログラムを実行すると、プロセスが作成されます。オペレーティングシステムは、プログラムのためにメモリを割り当て、コンピュータのディスク上にプログラム実行可能ファイルを特定し、プログラムをメモリにロードします。その後、プロセスIDを割り当て、プログラムの実行を開始します。その時点で、あなたのプログラムはプロセスになります。
プロセスが実行されている間、そのプロセスIDはオペレーティングシステムのプロセスリストに追加され、htop
、top
、またはps
などのツールで確認することができます。これらのツールは、プロセスに関する詳細情報を提供し、それらを停止または優先させるオプションも提供します。
Nodeプロセスの簡単な概要を取得するには、ターミナルでENTER
を押してプロンプトを取得します。次に、ps
コマンドを実行してNodeプロセスを表示します:
- ps |grep node
ps
コマンドは、システム上の現在のユーザーに関連するすべてのプロセスをリストします。パイプ演算子|
を使用してps
の出力をgrep
に渡すと、プロセスをNodeプロセスのみにフィルタリングします。
コマンドを実行すると、以下のような出力が得られます:
Output7754 pts/0 00:21:49 node
1つのプログラムから無数のプロセスを作成できます。たとえば、異なる引数で3つの追加プロセスを作成してバックグラウンドに配置するには、次のコマンドを使用します:
- node process.js B & node process.js C & node process.js D &
このコマンドでは、process.js
プログラムのインスタンスを3つ作成しました。&
シンボルは各プロセスをバックグラウンドに配置します。
コマンドを実行すると、以下のような出力が得られます(順序は異なる可能性があります):
Output[2] 7821
[3] 7822
[4] 7823
D: 2000
D: 4000
B: 2000
B: 4000
C: 2000
C: 4000
出力では、各プロセスがカウントが2000
および4000
に達したときにプロセス名をターミナルに記録していることがわかります。各プロセスは他の実行中のプロセスを認識していません:プロセスD
はプロセスC
の存在を認識しておらず、その逆もまた然りです。どちらのプロセスで何が起こっても、他のNode.jsプロセスには影響を与えません。
もし出力をよく見ると、3つのプロセスを作成したときの順序と同じではないことがわかります。コマンドを実行するとき、プロセスの引数はB
、C
、D
の順でした。しかし今、順序はD
、B
、C
です。その理由は、OSにはCPUで実行するプロセスを決定するスケジューリングアルゴリズムがあるためです。
単一コアのマシンでは、プロセスは同時に実行されます。つまり、オペレーティングシステムは定期的な間隔でプロセス間を切り替えます。例えば、プロセスD
が限られた時間だけ実行され、その状態がどこかに保存され、OSはプロセスB
を実行するためにスケジュールされ、以降同様に行われます。これは全てのタスクが完了するまで続きます。出力から見ると、それぞれのプロセスが完了まで実行されたように見えるかもしれませんが、実際にはOSのスケジューラが常にこれらの間を切り替えています。
マルチコアシステムでは、4つのコアがあると仮定して、OSは各プロセスを同時に各コアで実行するようにスケジュールします。これは並列処理として知られています。ただし、プロセスをさらに4つ作成すると(合計8つ)、各コアはそれぞれ2つのプロセスを完了するまで同時に実行します。
スレッド
スレッドはプロセスのようなものです:それらには独自の命令ポインタがあり、1つのJavaScriptタスクを実行できます。プロセスとは異なり、スレッドには独自のメモリがありません。代わりに、プロセスのメモリ内に存在します。プロセスを作成すると、worker_threads
モジュールで複数のスレッドを作成し、JavaScriptコードを並行して実行できます。さらに、スレッドはメッセージパッシングを介して互いに通信したり、プロセスのメモリ内でデータを共有したりできます。これにより、プロセスと比較して軽量です。スレッドを生成すると、オペレーティングシステムによりさらに多くのメモリが要求されません。
スレッドの実行に関しては、プロセスのそれと類似した動作を示します。単一コアシステムで複数のスレッドが実行されている場合、オペレーティングシステムは定期的な間隔でそれらを切り替え、各スレッドに単一CPU上で直接実行する機会を与えます。マルチコアシステムでは、OSはスレッドをすべてのコアにスケジュールし、JavaScriptコードを同時に実行します。利用可能なコアよりも多くのスレッドを作成した場合、各コアは複数のスレッドを同時に実行します。
このようにして、ENTER
を押し、次にkill
コマンドで現在実行中のすべてのNodeプロセスを停止します:
- sudo kill -9 `pgrep node`
pgrep
はすべての四つのNodeプロセスのプロセスIDをkill
コマンドに返します。 -9
オプションはkill
にSIGKILLシグナルを送信するように指示します。
コマンドを実行すると、次のような出力が表示されます:
Output[1] Killed node process.js A
[2] Killed node process.js B
[3] Killed node process.js C
[4] Killed node process.js D
出力が遅れることがあり、後で別のコマンドを実行すると表示されることがあります。
プロセスとスレッドの違いがわかったので、次のセクションではNode.jsの隠れたスレッドを使用します。
Node.jsにおける隠れたスレッドの理解
Node.jsは追加のスレッドを提供しているため、マルチスレッドと見なされます。このセクションでは、Node.jsの隠れたスレッドについて調べ、I/O操作を非同期にするのに役立ちます。
導入で説明したように、JavaScriptはシングルスレッドであり、すべてのJavaScriptコードは単一のスレッドで実行されます。これには、プログラムのソースコードやプログラムに含まれるサードパーティのライブラリも含まれます。プログラムがファイルの読み取りやネットワークリクエストなどのI/O操作を行う場合、これはメインスレッドをブロックします。
ただし、Node.jsはlibuv
ライブラリを実装しており、これによりNode.jsプロセスには4つの追加スレッドが提供されます。これらのスレッドを使用すると、I/O操作が個別に処理され、完了するとイベントループがI/Oタスクに関連付けられたコールバックをマイクロタスクキューに追加します。メインスレッドの呼び出しスタックがクリアされると、コールバックがコールスタックにプッシュされ、それが実行されます。これを明確にするために、指定されたI/Oタスクに関連付けられたコールバックは並行して実行されません。ただし、ファイルの読み取りやネットワークリクエストのタスク自体は、スレッドの助けを借りて並行して発生します。I/Oタスクが完了すると、コールバックはメインスレッドで実行されます。
これらの4つのスレッドに加えて、V8エンジンは、自動ガベージコレクションなどの処理を行うための2つのスレッドも提供します。これにより、プロセス内のスレッドの合計数は7になります。メインスレッド1つ、Node.jsスレッド4つ、V8スレッド2つです。
すべてのNode.jsプロセスが7つのスレッドを持っていることを確認するには、process.js
ファイルを再度実行してバックグラウンドに置きます。
- node process.js A &
ターミナルにはプロセスIDとプログラムからの出力が表示されます。
Output[1] 9933
A: 2000
A: 4000
プロセスIDをどこかにメモして、プロンプトを再度使用できるようにENTER
を押してください。
スレッドを表示するには、top
コマンドを実行して出力に表示されるプロセスIDを渡します。
- top -H -p 9933
-H
はtop
にスレッドをプロセス内に表示するよう指示します。-p
フラグは、指定されたプロセスIDのアクティビティのみをtop
が監視するよう指示します。
コマンドを実行すると、出力は次のようになります。
Outputtop - 09:21:11 up 15:00, 1 user, load average: 0.99, 0.60, 0.26
Threads: 7 total, 1 running, 6 sleeping, 0 stopped, 0 zombie
%Cpu(s): 24.8 us, 0.3 sy, 0.0 ni, 75.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
MiB Mem : 7951.2 total, 6756.1 free, 248.4 used, 946.7 buff/cache
MiB Swap: 0.0 total, 0.0 free, 0.0 used. 7457.4 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
9933 node-us+ 20 0 597936 51864 33956 R 99.9 0.6 4:19.64 node
9934 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node
9935 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.84 node
9936 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node
9937 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.93 node
9938 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node
9939 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node
お使いのディレクトリに表示されるように、Node.js プロセスには合計7つのスレッドがあります:JavaScriptを実行するためのメインスレッド1つ、Node.jsスレッド4つ、およびV8スレッド2つ。
先に述べたように、Node.jsスレッド4つはI/O操作に使用され、それにより非同期になります。これらはそのタスクに適しており、I/O操作のために自分でスレッドを作成すると、アプリケーションのパフォーマンスが悪化する可能性があります。同じことはCPUバウンドタスクには当てはまりません。CPUバウンドタスクはプロセス内の余分なスレッドを使用せず、メインスレッドをブロックします。
今、q
を押してtop
を終了し、次のコマンドでNodeプロセスを停止します:
- kill -9 9933
Node.jsプロセスのスレッドについて知ったので、次のセクションではCPUバウンドタスクを記述し、それがメインスレッドにどのように影響するかを観察します。
ワーカースレッドを使用しないCPUバウンドタスクの作成
このセクションでは、ノンブロッキングのルートと、CPUバウンドタスクを実行するブロッキングのルートを備えたExpressアプリを構築します。
まず、好きなエディタでindex.js
を開きます:
- nano index.js
お好みのエディタでindex.js
ファイルを開き、次のコードを追加して基本的なサーバーを作成します:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
次のコードブロックで、Expressを使用してHTTPサーバーを作成します。最初の行で、express
モジュールをインポートします。次に、app
変数を設定して、Expressのインスタンスを保持します。その後、サーバーがリッスンするポート番号を保持するport
変数を定義します。
その後、app.get('/non-blocking')
を使用して、GET
リクエストが送信されるルートを定義します。最後に、app.listen()
メソッドを呼び出して、サーバーがポート3000
でリッスンを開始するように指示します。
次に、別のルート/blocking/
を定義し、CPU集中型のタスクを含めます。
...
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
/blocking
ルートをapp.get("/blocking")
を使用して定義し、2番目の引数としてasync
キーワードで修飾された非同期コールバックを取ります。このコールバック内で、for
ループを作成し、200億回反復します。各反復中に、counter
変数を1
ずつ増やします。このタスクはCPUで実行され、完了するまで数秒かかります。
この時点で、index.js
ファイルは次のようになります。
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
ファイルを保存して終了し、次のコマンドでサーバーを起動します。
- node index.js
このコマンドを実行すると、次のような出力が表示されます。
OutputApp listening on port 3000
これにより、サーバーが実行され、提供する準備ができていることが示されます。
今、お好みのブラウザでhttp://localhost:3000/non-blocking
を訪問します。メッセージThis page is non-blocking
が即座に表示されます。
注意:リモートサーバーでチュートリアルを実行している場合、ポートフォワーディングを使用してブラウザでアプリをテストできます。
Expressサーバーがまだ実行中の場合、ローカルコンピューターの別のターミナルを開き、次のコマンドを入力します。
- ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip
サーバーに接続したら、ローカルマシンのWebブラウザで http://localhost:3000/non-blocking
に移動します。このチュートリアルの残りの部分で、2番目のターミナルを開いたままにしておきます。
次に、新しいタブを開き、 http://localhost:3000/blocking
を訪問します。ページが読み込まれる間に、さらに2つのタブを開いて、再び http://localhost:3000/non-blocking
を訪問します。インスタントな応答が得られず、ページが読み込みを続けることがわかります。 /blocking
ルートの読み込みが完了し、応答 result is 20000000000
を返すまで、残りのルートが応答を返すことはありません。
/non-blocking
ルートが /blocking
ルートの読み込みと同時に機能しない理由は、CPUバウンドの for
ループがメインスレッドをブロックするためです。メインスレッドがブロックされている間、Node.js はCPUバウンドのタスクが完了するまで、リクエストを処理できません。したがって、アプリケーションに数千の同時 GET
リクエストがある場合、 /blocking
ルートを1回訪問するだけで、すべてのアプリケーションルートが応答しなくなります。
メインスレッドをブロックすることは、ユーザーのアプリ体験に悪影響を与える可能性があります。この問題を解決するには、CPUに負荷のかかるタスクを別のスレッドにオフロードし、メインスレッドが他のHTTPリクエストを処理できるようにする必要があります。
そのため、CTRL+C
を押してサーバーを停止します。次のセクションでindex.js
ファイルにさらに変更を加えた後、サーバーを再起動します。サーバーを停止する理由は、Node.jsがファイルに新しい変更が加えられた場合に自動的にリフレッシュされないためです。
アプリケーションにCPU集中タスクが及ぼすネガティブな影響を理解したので、プロミスを使用してメインスレッドをブロックするのを回避しようとします。
プロミスを使用したCPUに負荷のかかるタスクのオフロード
開発者がCPUに負荷のかかるタスクのブロック効果について学ぶと、非ブロッキングなプロミスベースのI/Oメソッド(readFile()
やwriteFile()
など)を使用することがあります。しかし、I/O操作はNode.jsの非表示スレッドを使用しており、CPUに負荷のかかるタスクにはそれがありません。それにもかかわらず、このセクションでは、CPUに負荷のかかるタスクをプロミスでラップして非ブロッキングにしようと試みます。うまくいきませんが、次のセクションでワーカースレッドを使用する価値を理解するのに役立ちます。
エディタで再度index.js
ファイルを開いてください。
- nano index.js
あなたのindex.js
ファイルで、CPU集約タスクを含む強調されたコードを削除してください:
...
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
...
次に、次の強調されたコードを追加して、プロミスを返す関数を含めてください:
...
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
res.status(200).send(`result is ${counter}`);
}
calculateCount()
関数には、/blocking
ハンドラ関数で行っていた計算が含まれています。この関数はプロミスを返し、new Promise
構文で初期化されます。プロミスはresolve
とreject
パラメータを取るコールバックを使用し、成功または失敗を処理します。for
ループの実行が完了すると、プロミスはcounter
変数の値で解決します。
次に、index.js
ファイルで/blocking/
ハンドラ関数でcalculateCount()
関数を呼び出してください:
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
ここで、await
キーワードが前置されてcalculateCount()
関数を呼び出し、プロミスが解決するのを待ちます。プロミスが解決すると、counter
変数が解決された値に設定されます。
完全なコードは以下のようになります:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
ファイルを保存して終了し、サーバーを再起動してください:
- node index.js
Webブラウザでhttp://localhost:3000/blocking
を訪れ、読み込まれる間に素早くhttp://localhost:3000/non-blocking
タブをリロードしてください。非同期の影響を受けるため、non-blocking
ルートは引き続き影響を受け、すべてが/blocking
ルートの読み込みが完了するのを待ちます。ルートが引き続き影響を受けるため、JavaScriptコードを並列で実行させることはできず、CPUバウンドタスクをノンブロッキングにするためにプロミスは使用できません。
それで、CTRL+C
でアプリケーションサーバーを停止します。
約束はCPUバウンドタスクを非ブロッキングにするためのメカニズムを提供しないことがわかったので、Node.jsのworker-threads
モジュールを使用してCPUバウンドタスクを別のスレッドにオフロードします。
worker-threads
モジュールを使用したCPUバウンドタスクのオフロード
このセクションでは、メインスレッドをブロックせずにCPU集中タスクを別のスレッドにオフロードするために、worker-threads
モジュールを使用します。これを行うために、CPU集中タスクを含むworker.js
ファイルを作成します。index.js
ファイルでは、worker-threads
モジュールを使用してスレッドを初期化し、worker.js
ファイルでタスクをメインスレッドと並行して実行します。タスクが完了すると、ワーカースレッドは結果を含むメッセージをメインスレッドに送信します。
まず、nproc
コマンドを使用して2つ以上のコアを持っていることを確認します:
- nproc
Output4
2つ以上のコアが表示される場合、この手順を進めることができます。
次に、テキストエディタでworker.js
ファイルを作成して開きます:
- nano worker.js
worker.js
ファイルに、次のコードを追加してworker-threads
モジュールをインポートし、CPU集中タスクを実行します。
const { parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
最初の行では、worker_threads
モジュールをロードし、parentPort
クラスを抽出しています。このクラスは、メインスレッドにメッセージを送信するために使用できるメソッドを提供しています。次に、index.js
ファイルのcalculateCount()
関数で現在実行中のCPU集中タスクがあります。このステップでは、index.js
からこの関数を削除します。
これに続いて、以下のハイライトされたコードを追加します:
const { parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
parentPort.postMessage(counter);
ここでは、parentPort
クラスのpostMessage()
メソッドを呼び出して、counter
変数に格納されたCPUバウンドタスクの結果を含むメッセージをメインスレッドに送信します。
ファイルを保存して終了します。テキストエディタでindex.js
を開きます:
- nano index.js
すでにworker.js
でCPUバウンドタスクがあるため、index.js
からハイライトされたコードを削除します:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
次に、app.get("/blocking")
のコールバックで、スレッドを初期化するために次のコードを追加します:
const express = require("express");
const { Worker } = require("worker_threads");
...
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error occurred: ${msg}`);
});
});
...
最初に、worker_threads
モジュールをインポートし、Worker
クラスを展開します。そして、app.get("/blocking")
のコールバック内で、new
キーワードを使用してWorker
をworker.js
ファイルパスとともに呼び出します。これにより新しいスレッドが作成され、worker.js
ファイルのコードが別のコアでスレッドで実行されます。
次に、メッセージイベントをリッスンするために、on("message")
メソッドを使用してworker
インスタンスにイベントをアタッチします。メッセージがworker.js
ファイルからの結果を含む場合、それはメソッドのコールバックにパラメータとして渡され、CPUバウンドタスクの結果を含むユーザへの応答が返されます。
次に、エラーイベントをリッスンするために、on("error")
メソッドを使用してworkerインスタンスに別のイベントをアタッチします。エラーが発生した場合、コールバックはユーザにエラーメッセージを含む404
応答を返します。
完全なファイルは以下のようになります:
const express = require("express");
const { Worker } = require("worker_threads");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error occurred: ${msg}`);
});
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
ファイルを保存して終了し、サーバーを実行してください:
- node index.js
ウェブブラウザーでhttp://localhost:3000/blocking
タブを再度訪問してください。読み込みが完了する前に、すべてのhttp://localhost:3000/non-blocking
タブをリフレッシュしてください。今度は、/blocking
ルートの読み込みが完了するのを待たずに、即座に読み込まれることに気付くはずです。これは、CPUバウンドタスクが別のスレッドにオフロードされ、メインスレッドがすべての着信リクエストを処理しているためです。
これで、CTRL+C
を使用してサーバーを停止してください。
これで、CPU集約型タスクをワーカースレッドを使用してノンブロッキングにすることができるようになりました。CPU集約型タスクのパフォーマンスを向上させるために、4つのワーカースレッドを使用します。
4つのワーカースレッドを使用してCPU集中型タスクを最適化する
このセクションでは、CPU集中型のタスクを4つのワーカースレッドに分割して、タスクをより速く完了させ、/blocking
ルートの読み込み時間を短縮します。
同じタスクにより多くのワーカースレッドを使用するには、タスクを分割する必要があります。タスクには200億回のループが含まれるため、200億を使用するスレッド数で割ります。この場合、4
です。 20_000_000_000 / 4
を計算すると、5_000_000_000
になります。したがって、各スレッドは0
から5_000_000_000
までのループを行い、counter
を1
ずつ増やします。各スレッドが終了すると、結果を含むメッセージをメインスレッドに送信します。メインスレッドが4つのスレッドそれぞれからメッセージを受信したら、結果を結合してユーザーに応答を送信します。
大きな配列を反復処理するタスクがある場合は、同じアプローチを使用することもできます。たとえば、ディレクトリ内の800枚の画像をリサイズしたい場合、すべての画像ファイルパスを含む配列を作成します。次に、800
を4
(スレッド数)で割り、各スレッドが範囲で作業するようにします。スレッド1は、配列のインデックス0
から199
までの画像をリサイズし、スレッド2はインデックス200
から399
までの画像をリサイズし、以降のスレッドも同様です。
最初に、4つ以上のコアがあることを確認してください。
- nproc
Output4
worker.js
ファイルのコピーを作成するには、cp
コマンドを使用します:
- cp worker.js four_workers.js
現在のindex.js
およびworker.js
ファイルはそのままにしておきます。後でこのセクションの変更を比較するために、再度実行できるようにします。
次に、テキストエディタでfour_workers.js
ファイルを開きます:
- nano four_workers.js
four_workers.js
ファイルで、workerData
オブジェクトをインポートするために、ハイライトされたコードを追加します:
const { workerData, parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000 / workerData.thread_count; i++) {
counter++;
}
parentPort.postMessage(counter);
まず、WorkerData
オブジェクトを抽出します。これには、スレッドが初期化されるときにメインスレッドから渡されるデータが含まれています(これはすぐにindex.js
ファイルで行います)。オブジェクトには、スレッド数を含むthread_count
プロパティがあります。この値は4
です。次に、for
ループで、20_000_000_000
を4
で割ります。結果は5_000_000_000
です。
ファイルを保存して閉じたら、index.js
ファイルをコピーします:
- cp index.js index_four_workers.js
エディタでindex_four_workers.js
ファイルを開きます:
- nano index_four_workers.js
index_four_workers.js
ファイルで、スレッドインスタンスを作成するために、ハイライトされたコードを追加します:
...
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
...
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
});
}
app.get("/blocking", async (req, res) => {
...
})
...
まず、作成するスレッドの数を含むTHREAD_COUNT
定数を定義します。後でサーバーでより多くのコアが利用可能になると、スケーリングにはTHREAD_COUNT
の値を使用するスレッド数に変更する必要があります。
次に、createWorker()
関数がプロミスを作成して返します。プロミスのコールバック内で、Worker
クラスにfour_workers.js
ファイルへのファイルパスを第1引数として渡して新しいスレッドを初期化します。次に、2番目の引数としてオブジェクトを渡します。その後、そのオブジェクトに別のオブジェクトを値として持つworkerData
プロパティを割り当てます。最後に、thread_count
プロパティにはTHREAD_COUNT
定数のスレッド数が値として割り当てられます。workerData
オブジェクトは、以前にworkers.js
ファイルで参照したものです。
プロミスが解決されるかエラーが発生するかを確認するために、以下のハイライトされた行を追加してください:
...
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
worker.on("message", (data) => {
resolve(data);
});
worker.on("error", (msg) => {
reject(`An error ocurred: ${msg}`);
});
});
}
...
ワーカースレッドがメインスレッドにメッセージを送信すると、プロミスが返されたデータで解決されます。ただし、エラーが発生した場合は、プロミスがエラーメッセージを返します。
新しいスレッドを初期化し、スレッドからデータを返す関数が定義されたので、app.get("/blocking")
でその関数を使用します。
ただし、createWorker()
関数で既にこの機能を定義しているので、以下のハイライトされたコードを削除してください:
...
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error ocurred: ${msg}`);
});
});
...
削除されたコードの代わりに、次のコードを追加して4つのワーカースレッドを初期化してください:
...
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
});
...
最初に、空の配列を含むworkerPromises
変数を作成します。次に、THREAD_COUNT
の値、つまり4
回分繰り返します。各繰り返しで、新しいスレッドを作成するためにcreateWorker()
関数を呼び出します。次に、その関数が返すプロミスオブジェクトをJavaScriptのpush
メソッドを使用してworkerPromises
配列にプッシュします。ループが終了すると、workerPromises
にはcreateWorker()
関数を4回呼び出した際にそれぞれ返される4つのプロミスオブジェクトが含まれます。
次に、以下のハイライトされたコードを追加して、プロミスが解決するのを待ち、ユーザーに応答を返します:
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
const thread_results = await Promise.all(workerPromises);
const total =
thread_results[0] +
thread_results[1] +
thread_results[2] +
thread_results[3];
res.status(200).send(`result is ${total}`);
});
workerPromises
配列はcreateWorker()
を呼び出して返されるプロミスを含んでいるため、Promise.all()
メソッドにawait
構文を付け加え、all()
メソッドをworkerPromises
を引数として呼び出します。Promise.all()
メソッドは、配列内のすべてのプロミスが解決するのを待ちます。それが起こると、thread_results
変数にプロミスが解決した値が含まれます。計算が4つのワーカーに分割されたため、thread_results
から各値をブラケット表記構文を使用して取得し、それらをすべて合算します。合算が完了したら、合計値をページに返します。
完全なファイルは以下のようになります:
const express = require("express");
const { Worker } = require("worker_threads");
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
worker.on("message", (data) => {
resolve(data);
});
worker.on("error", (msg) => {
reject(`An error ocurred: ${msg}`);
});
});
}
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
const thread_results = await Promise.all(workerPromises);
const total =
thread_results[0] +
thread_results[1] +
thread_results[2] +
thread_results[3];
res.status(200).send(`result is ${total}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
ファイルを保存して閉じます。このファイルを実行する前に、まずindex.js
を実行して応答時間を計測してください。
- node index.js
次に、ローカルコンピューターで新しいターミナルを開き、以下のcurl
コマンドを入力してください。このコマンドは、/blocking
ルートからの応答にかかる時間を計測します:
- time curl --get http://localhost:3000/blocking
time
コマンドは、curl
コマンドの実行時間を計測します。 curl
コマンドは指定されたURLにHTTPリクエストを送信し、--get
オプションはcurl
にGET
リクエストを行うよう指示します。
コマンドを実行すると、出力は次のようになります:
Outputreal 0m28.882s
user 0m0.018s
sys 0m0.000s
ハイライトされた出力によると、応答に約28秒かかることが示されており、これはコンピューターによって異なる場合があります。
次に、CTRL+C
を使用してサーバーを停止し、index_four_workers.js
ファイルを実行してください:
- node index_four_workers.js
2番目のターミナルで再度/blocking
ルートにアクセスしてください:
- time curl --get http://localhost:3000/blocking
以下のような一貫した出力が表示されます:
Outputreal 0m8.491s
user 0m0.011s
sys 0m0.005s
この出力によると、約8秒かかることが示されており、これは負荷時間が約70%削減されたことを意味します。
これで、CPUに負荷がかかるタスクを4つのワーカースレッドを使用して最適化しました。 4つ以上のコアを備えたマシンを使用している場合は、THREAD_COUNT
をその数に更新すると、負荷時間がさらに短縮されます。
結論
この記事では、メインスレッドをブロックするCPUバウンドタスクを持つNodeアプリを構築しました。その後、約束を使用してタスクを非同期にしようとしましたが、失敗しました。その後、worker_threads
モジュールを使用して、CPUバウンドタスクを別のスレッドにオフロードして非同期にしました。最後に、worker_threads
モジュールを使用して、CPU集中型のタスクを高速化するために4つのスレッドを作成しました。
次のステップとして、オプションについて詳しくはNode.js Workerスレッドのドキュメントを参照してください。さらに、CPU集中型のタスク用のワーカープールを作成するpiscina
ライブラリを確認できます。Node.jsの学習を続けたい場合は、チュートリアルシリーズHow To Code in Node.jsを参照してください。
Source:
https://www.digitalocean.com/community/tutorials/how-to-use-multithreading-in-node-js