著者は、女性エンジニア協会を寄付の受取先として、寄付のために書くプログラムの一環として選びました。
導入
Webアプリケーションにはリクエスト/レスポンスのサイクルがあります。URLを訪れると、ブラウザはデータを処理したりデータベースでクエリを実行したりするアプリケーションを実行しているサーバーにリクエストを送信します。この間、ユーザーはアプリケーションがレスポンスを返すまで待たされます。一部のタスクでは、ユーザーはすぐにレスポンスを得ることができますが、画像の処理、データの分析、レポートの生成、またはメールの送信などの時間がかかるタスクでは、リクエスト/レスポンスのサイクルが遅くなります。たとえば、ユーザーが画像をアップロードするアプリケーションを持っている場合、画像を表示する前にサーバーのディスクスペースを節約するために画像のサイズを変更、圧縮、または別の形式に変換する必要があるかもしれません。画像の処理はCPUを使用するタスクであり、タスクが完了するまでNode.jsのスレッドをブロックすることがあります。それには数秒または数分かかるかもしれません。ユーザーはタスクが完了するのを待たなければならず、その後にサーバーからのレスポンスを受け取ります。
bullmq
を使用すると、リクエスト/レスポンスサイクルを遅らせることなく、Node.jsアプリから時間のかかるタスクをオフロードできる分散タスク(ジョブ)キューを利用できます。このツールを使用すると、bullmq
はバックグラウンドでタスクを非同期に実行し、アプリとは独立してタスクを処理できるため、アプリがユーザーに素早く応答できるようになります。ジョブの追跡には、Redisを使用して、キュー内の各ジョブの短い説明を保存します。次に、bullmq
ワーカーがキュー内の各ジョブをデキューして実行し、完了したらマークします。
この記事では、bullmq
を使用して時間のかかるタスクをバックグラウンドにオフロードし、アプリケーションがユーザーに素早く応答できるようにします。まず、bullmq
を使用せずに時間のかかるタスクを持つアプリを作成します。次に、bullmq
を使用してタスクを非同期で実行します。最後に、Redisキュー内のbullmq
ジョブを管理するためのビジュアルダッシュボードをインストールします。
前提条件
このチュートリアルに従うには、以下が必要です:
-
Node.js開発環境のセットアップ。Ubuntu 22.04の場合は、弊社のチュートリアル「Ubuntu 22.04にNode.jsをインストールする方法」に従ってください。他のシステムの場合は、「Node.jsのインストールとローカル開発環境の作成方法」を参照してください。
-
システムにRedisがインストールされています。Ubuntu 22では、弊社のチュートリアル「Ubuntu 22.04にRedisをインストールしてセキュアにする方法」の手順1から3を実行してください。他のシステムの場合は、弊社のチュートリアル「Redisのインストールとセキュアな設定方法」を参照してください。
-
JavaScriptでのイベントループ、コールバック、プロミス、およびAsync/Awaitの理解
のチュートリアルで開発できる、プロミスとasync/await関数についての理解が必要です。
-
Expressの基本的な使用方法についての基礎知識が必要です。 Node.jsとExpressの始め方のチュートリアルを参照してください。
-
Embedded JavaScript (EJS)の使い方についての理解。 詳細については、Nodeアプリケーションのテンプレート作成にEJSを使用する方法のチュートリアルを参照してください。
-
sharp
を使用した画像の処理の基本的な理解。 詳細については、Node.jsで画像を処理する方法(Sharpを使用)のチュートリアルを参照してください。
ステップ1 — プロジェクトディレクトリの設定
このステップでは、アプリケーションのためのディレクトリを作成し、必要な依存関係をインストールします。このチュートリアルで構築するアプリケーションでは、ユーザーが画像をアップロードし、それをsharp
パッケージを使用して処理します。画像処理は時間がかかるため、リクエスト/レスポンスのサイクルが遅くなる可能性があり、このタスクはbullmq
を使用してバックグラウンドにオフロードすることが適しています。タスクをオフロードするために使用する手法は、他の時間のかかるタスクにも適用することができます。
まず、image_processor
という名前のディレクトリを作成し、そのディレクトリに移動します:
- mkdir image_processor && cd image_processor
次に、ディレクトリをnpmパッケージとして初期化します:
- npm init -y
このコマンドはpackage.json
ファイルを作成します。オプションの-y
は、npmにすべてのデフォルトを受け入れるように指示します。
コマンドを実行すると、出力は以下のようになります:
OutputWrote to /home/sammy/image_processor/package.json:
{
"name": "image_processor",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC"
}
出力によって、package.json
ファイルが作成されたことが確認されます。重要なプロパティには、アプリの名前(name
)、アプリのバージョン番号(version
)、プロジェクトの開始ポイント(main
)などが含まれます。他のプロパティについて詳しく知りたい場合は、npmのpackage.jsonドキュメントを参照してください。
アプリケーションをビルドするには、次の依存関係が必要です:
express
: ウェブアプリを構築するためのウェブフレームワーク。express-fileupload
: フォームからファイルをアップロードするためのミドルウェア。sharp
: 画像処理ライブラリ。ejs
: Node.jsでHTMLマークアップを生成するためのテンプレート言語。bullmq
: 分散タスクキュー。bull-board
:bullmq
を基にしたダッシュボードで、ジョブの状態を素敵なユーザーインターフェース(UI)で表示します。
これらの依存関係をすべてインストールするには、次のコマンドを実行してください:
- npm install express express-fileupload sharp ejs bullmq @bull-board/express
インストールした依存関係に加えて、後でこのチュートリアルで以下の画像も使用します:
お好みの場所に画像をダウンロードするにはcurl
を使用してください。
- curl -O https://deved-images.nyc3.cdn.digitaloceanspaces.com/CART-68886/underwater.png
これで、次に進むNode.jsアプリケーションを構築するために必要な依存関係が揃いました。
ステップ2 — bullmq
を使用しない時間集中型タスクの実装
このステップでは、Expressを使用して画像をアップロードできるアプリケーションを構築します。アプリはsharp
を使用して画像を複数のサイズにリサイズする時間集中型のタスクを開始し、その後にレスポンスが送信されます。このステップでは、時間集中型のタスクがリクエスト/レスポンスサイクルにどのように影響するかを理解するのに役立ちます。
nano
、またはお好みのテキストエディタを使用して、index.js
ファイルを作成します:
- nano index.js
index.js
ファイルに、以下のコードを追加して依存関係をインポートします:
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
最初の行では、Nodeでファイルパスを計算するためにpath
モジュールをインポートします。2行目では、ディレクトリとのやり取りにfs
モジュールをインポートします。次に、express
ウェブフレームワークをインポートします。HTTPリクエスト内のデータを解析するためのミドルウェアを追加するためにbody-parser
モジュールをインポートします。その後、画像処理のためにsharp
モジュールをインポートします。最後に、HTMLフォームからのアップロードを処理するためにexpress-fileupload
をインポートします。
次に、アプリ内でミドルウェアを実装するための以下のコードを追加します:
...
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
bodyParser.urlencoded({
extended: true,
})
);
最初に、app
変数をExpressのインスタンスに設定します。次に、app
変数を使用して、set()
メソッドを使用してExpressがejs
テンプレート言語を使用するように設定します。次に、use()
メソッドを使用して、HTTPリクエストでのJSONデータをJavaScriptでアクセスできる変数に変換するためのbody-parser
モジュールミドルウェアを追加します。次の行では、同様にURLエンコードされた入力を行います。
次に、ファイルのアップロードを処理し静的ファイルを提供するためのさらなるミドルウェアを追加するために、次の行を追加します:
...
app.use(fileUpload());
app.use(express.static("public"));
fileUpload()
メソッドを呼び出してアップロードされたファイルを解析するためのミドルウェアを追加し、Expressが画像やCSSなどの静的ファイルを参照し提供するディレクトリを設定します。
ミドルウェアが設定されたら、次のHTMLフォームを表示するためのルートを作成します:
...
app.get("/", function (req, res) {
res.render("form");
});
ここでは、Expressモジュールのget()
メソッドを使用して、/
ルートとユーザーがホームページまたは/
ルートを訪れたときに実行するコールバックを指定します。コールバックでは、res.render()
を呼び出してviews
ディレクトリ内のform.ejs
ファイルをレンダリングします。まだform.ejs
ファイルまたはviews
ディレクトリを作成していません。
それを作成するには、まずファイルを保存して閉じます。ターミナルで、プロジェクトのルートディレクトリにviews
ディレクトリを作成するために次のコマンドを入力します:
- mkdir views
views
ディレクトリに移動します:
- cd views
エディタでform.ejs
ファイルを作成します:
- nano form.ejs
form.ejs
ファイルに、以下のコードを追加してフォームを作成します:
<!DOCTYPE html>
<html lang="en">
<%- include('./head'); %>
<body>
<div class="home-wrapper">
<h1>Image Processor</h1>
<p>
Resizes an image to multiple sizes and converts it to a
<a href="https://en.wikipedia.org/wiki/WebP">webp</a> format.
</p>
<form action="/upload" method="POST" enctype="multipart/form-data">
<input
type="file"
name="image"
placeholder="Select image from your computer"
/>
<button type="submit">Upload Image</button>
</form>
</div>
</body>
</html>
最初に、head.ejs
ファイルを参照していることに言及しますが、まだ作成していません。 head.ejs
ファイルには、他のHTMLページで参照できるHTML head
要素が含まれます。
body
タグ内で、次の属性を持つフォームを作成します:
action
は、フォームが送信されたときにフォームデータを送信するルートを指定します。method
はデータを送信するためのHTTPメソッドを指定します。POST
メソッドは、データをHTTPリクエストに埋め込みます。encytype
は、フォームデータのエンコード方法を指定します。multipart/form-data
の値は、HTMLinput
要素がファイルデータをアップロードできるようにします。
form
要素内に、ファイルをアップロードするためのinput
タグを作成します。その後、type
属性がsubmit
に設定されたbutton
要素を定義します。これにより、フォームを送信できるようになります。
作業が完了したら、ファイルを保存して閉じます。
次に、head.ejs
ファイルを作成します:
- nano head.ejs
head.ejs
ファイルで、アプリのヘッドセクションを作成するために以下のコードを追加します:
<head>
<meta charset="UTF-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Image Processor</title>
<link rel="stylesheet" href="css/main.css" />
</head>
ここでは、このアプリケーションのスタイルを含むmain.css
ファイルを参照しています。このファイルは、この手順の後半でpublic
ディレクトリに作成します。今のところ、静的アセットのプロセスを設定し続けます。
ファイルを保存して閉じます。
フォームから送信されたデータを処理するには、Expressでpost
メソッドを定義する必要があります。これを行うには、プロジェクトのルートディレクトリに戻ります。
- cd ..
再度、あなたのindex.js
ファイルを開いてください:
- nano index.js
次に、/upload
ルートでフォームの送信を処理するためのメソッドを定義するために、index.js
ファイルに以下の行を追加してください:
app.get("/", function (req, res) {
...
});
app.post("/upload", async function (req, res) {
const { image } = req.files;
if (!image) return res.sendStatus(400);
});
あなたはapp
変数を使用して/upload
ルートで送信されたフォームを処理するためにpost()
メソッドを呼び出します。次に、HTTPリクエストからアップロードされた画像データをimage
変数に抽出します。その後、ユーザーが画像をアップロードしない場合は、400
ステータスコードを返すようにレスポンスを設定します。
アップロードされた画像の処理手順を設定するには、以下のハイライトされたコードを追加してください:
...
app.post("/upload", async function (req, res) {
const { image } = req.files;
if (!image) return res.sendStatus(400);
const imageName = path.parse(image.name).name;
const processImage = (size) =>
sharp(image.data)
.resize(size, size)
.webp({ lossless: true })
.toFile(`./public/images/${imageName}-${size}.webp`);
sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
Promise.all(sizes.map(processImage));
});
これらの行は、アプリが画像を処理する方法を表します。まず、アップロードされた画像から画像の拡張子を削除し、名前をimageName
変数に保存します。次に、processImage()
関数を定義します。この関数はsize
パラメータを取り、その値はリサイズ中の画像の寸法を決定するために使用されます。関数内で、sharp()
をimage.data
とともに呼び出します。これはアップロードされた画像のバイナリデータを含むbufferです。sharp
は、サイズパラメータの値に従って画像のサイズを変更します。次に、sharp
からwebp()
メソッドを使用して画像をwebp画像形式に変換します。その後、画像をpublic/images/
ディレクトリに保存します。
次の番号のリストは、アップロードされた画像をリサイズするために使用されるサイズを定義しています。その後、JavaScriptのmap()
メソッドを使用して、sizes
配列内の各要素にprocessImage()
を呼び出します。その後、新しい配列が返されます。 map()
メソッドがprocessImage()
関数を呼び出すたびに、新しい配列に対するプロミスが返されます。それらを解決するためにPromise.all()
メソッドを使用します。
コンピュータの処理速度は異なりますし、ユーザーがアップロードできる画像のサイズも異なります。これは画像処理速度に影響する可能性があります。このコードをデモンストレーション目的で遅延させるために、CPU負荷の高いインクリメントループと、リサイズされた画像が表示されるページへのリダイレクトを追加するには、以下のハイライトされた行を挿入します:
...
app.post("/upload", async function (req, res) {
...
let counter = 0;
for (let i = 0; i < 10_000_000_000; i++) {
counter++;
}
res.redirect("/result");
});
ループはcounter
変数を増分させるために100億回実行されます。 res.redirect()
関数を使用してアプリを/result
ルートにリダイレクトします。そのルートは、public/images
ディレクトリ内の画像を表示するHTMLページをレンダリングします。
/result
ルートはまだ存在しません。これを作成するには、index.js
ファイルに以下のハイライトされたコードを追加してください:
...
app.get("/", function (req, res) {
...
});
app.get("/result", (req, res) => {
const imgDirPath = path.join(__dirname, "./public/images");
let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
return `images/${image}`;
});
res.render("result", { imgFiles });
});
app.post("/upload", async function (req, res) {
...
});
あなたは/result
ルートをapp.get()
メソッドで定義します。その関数内で、imgDirPath
変数をpublic/images
ディレクトリの完全なパスで定義します。fs
モジュールのreaddirSync()
メソッドを使用して指定されたディレクトリ内のすべてのファイルを読み込みます。それから、map()
メソッドをチェーンして、images/
でプレフィックスが付いた新しい配列を返します。
最後に、res.render()
を呼び出してまだ存在しないresult.ejs
ファイルをレンダリングします。imgFiles
変数(すべての画像の相対パスが含まれる配列)をresult.ejs
ファイルに渡します。
ファイルを保存して閉じます。
result.ejs
ファイルを作成するには、views
ディレクトリに戻ります:
- cd views
result.ejs
ファイルを作成して開きます:
- nano result.ejs
result.ejs
ファイルに以下の行を追加して画像を表示します:
<!DOCTYPE html>
<html lang="en">
<%- include('./head'); %>
<body>
<div class="gallery-wrapper">
<% if (imgFiles.length > 0){%>
<p>The following are the processed images:</p>
<ul>
<% for (let imgFile of imgFiles){ %>
<li><img src=<%= imgFile %> /></li>
<% } %>
</ul>
<% } else{ %>
<p>
The image is being processed. Refresh after a few seconds to view the
resized images.
</p>
<% } %>
</div>
</body>
</html>
まず、head.ejs
ファイルを参照します。body
タグでは、imgFiles
変数が空かどうかを確認します。データがある場合、各ファイルに対して画像を作成します。 imgFiles
が空の場合、数秒後にリサイズされた画像を表示するにはリフレッシュしてください。
とユーザーに伝えるメッセージを表示します。
ファイルを保存して閉じます。
次に、ルートディレクトリに戻り、静的アセットを含むpublic
ディレクトリを作成します:
- cd .. && mkdir public
public
ディレクトリに移動してください:
- cd public
アップロードされた画像を保持するimages
ディレクトリを作成してください:
- mkdir images
次に、css
ディレクトリを作成し、そこに移動してください:
- mkdir css && cd css
エディタで、head.ejs
ファイルで以前参照したmain.css
ファイルを作成して開いてください:
- nano main.css
main.css
ファイルで、次のスタイルを追加してください:
body {
background: #f8f8f8;
}
h1 {
text-align: center;
}
p {
margin-bottom: 20px;
}
a:link,
a:visited {
color: #00bcd4;
}
/** 「ファイルを選択」ボタンのスタイル **/
button[type="submit"] {
background: none;
border: 1px solid orange;
padding: 10px 30px;
border-radius: 30px;
transition: all 1s;
}
button[type="submit"]:hover {
background: orange;
}
/** 「画像をアップロード」ボタンのスタイル **/
input[type="file"]::file-selector-button {
border: 2px solid #2196f3;
padding: 10px 20px;
border-radius: 0.2em;
background-color: #2196f3;
}
ul {
list-style: none;
padding: 0;
display: flex;
flex-wrap: wrap;
gap: 20px;
}
.home-wrapper {
max-width: 500px;
margin: 0 auto;
padding-top: 100px;
}
.gallery-wrapper {
max-width: 1200px;
margin: 0 auto;
}
これらの行は、アプリ内の要素のスタイルを設定します。HTML属性を使用して、ファイルを選択ボタンの背景を16進数コード#2196f3
(青のシェード)に、画像をアップロードボタンの境界線をorange
にスタイル設定します。また、/result
ルート上の要素をより見栄え良くスタイル設定します。
作業が完了したら、ファイルを保存して閉じてください。
プロジェクトのルートディレクトリに戻ってください:
- cd ../..
エディタでindex.js
を開いてください:
- nano index.js
index.js
に、次のコードを追加してサーバーを起動してください:
...
app.listen(3000, function () {
console.log("Server running on port 3000");
});
完全なindex.js
ファイルは次のようになります:
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
bodyParser.urlencoded({
extended: true,
})
);
app.use(fileUpload());
app.use(express.static("public"));
app.get("/", function (req, res) {
res.render("form");
});
app.get("/result", (req, res) => {
const imgDirPath = path.join(__dirname, "./public/images");
let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
return `images/${image}`;
});
res.render("result", { imgFiles });
});
app.post("/upload", async function (req, res) {
const { image } = req.files;
if (!image) return res.sendStatus(400);
const imageName = path.parse(image.name).name;
const processImage = (size) =>
sharp(image.data)
.resize(size, size)
.webp({ lossless: true })
.toFile(`./public/images/${imageName}-${size}.webp`);
sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
Promise.all(sizes.map(processImage));
let counter = 0;
for (let i = 0; i < 10_000_000_000; i++) {
counter++;
}
res.redirect("/result");
});
app.listen(3000, function () {
console.log("Server running on port 3000");
});
変更を終えたら、ファイルを保存して閉じてください。
node
コマンドを使用してアプリを実行してください:
- node index.js
次のような出力が表示されます:
OutputServer running on port 3000
この出力は、サーバーが問題なく実行されていることを確認します。
好きなブラウザを開き、http://localhost:3000/
にアクセスします。
注意: リモートサーバーでチュートリアルに従っている場合は、ポートフォワーディングを使用してローカルのブラウザからアプリにアクセスできます。
Node.jsサーバーが実行されている状態で、別のターミナルを開き、次のコマンドを入力します:
- ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip
サーバーに接続したら、node index.js
を実行し、次にローカルマシンのウェブブラウザでhttp://localhost:3000/
に移動します。
ページが読み込まれると、以下のようになります:
次に、ファイルを選択ボタンを押し、ローカルマシン上のunderwater.png
画像を選択します。表示がファイルが選択されていませんからunderwater.pngに切り替わります。その後、画像をアップロードボタンを押します。アプリが画像を処理し、インクリメントループを実行するためにしばらく読み込みます。
タスクが完了すると、リサイズされた画像が/result
ルートで表示されます:
サーバーをCTRL+C
で停止できます。Node.jsはファイルが変更された場合にサーバーを自動的に再読み込みしないため、ファイルを更新するたびにサーバーを停止して再起動する必要があります。
時間のかかるタスクがアプリケーションのリクエスト/レスポンスサイクルにどのように影響を与えるかを理解しました。次に、タスクを非同期で実行します。
ステップ3 — bullmq
を使用して時間のかかるタスクを非同期に実行する
このステップでは、bullmq
を使用して時間のかかるタスクをバックグラウンドで処理します。この調整により、リクエスト/レスポンスサイクルが解放され、画像が処理されている間にアプリがユーザーに即座に応答できるようになります。
そのためには、ジョブの簡潔な説明を作成し、bullmq
でキューに追加する必要があります。キューは、実際の生活でのキューの動作と同様に機能するデータ構造です。人々がスペースに入るために列に並ぶとき、列の最初の人物が最初にスペースに入ります。後から来た人は列の最後尾に並び、列の先頭にいるすべての人がスペースに入るまで、列の最後尾に並びます。キューのデータ構造の先入れ先出し(FIFO)プロセスでは、キューに追加された最初のアイテムが最初に削除されます(デキュー)。bullmq
では、プロデューサーがジョブをキューに追加し、コンシューマー(またはワーカー)がキューからジョブを削除して実行します。
bullmq
のキューはRedisにあります。ジョブを記述してキューに追加すると、Redisキューにジョブのエントリが作成されます。ジョブの説明は、最小限のデータを含む文字列またはオブジェクトであり、後でbullmq
がジョブを実行するためのデータへの参照を含みます。キューにジョブを追加する機能を定義したら、時間のかかるコードを別の関数に移動します。その後、bullmq
はこの関数を呼び出し、ジョブがキューから取り出されるときにキューに格納されているデータで呼び出します。タスクが完了すると、bullmq
はそれを完了とマークし、キューから別のジョブを取り出して実行します。
エディターでindex.js
を開きます:
- nano index.js
index.js
ファイルで、Redisでbullmq
キューを作成するためにハイライトされた行を追加します:
...
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
const redisOptions = { host: "localhost", port: 6379 };
const imageJobQueue = new Queue("imageJobQueue", {
connection: redisOptions,
});
async function addJob(job) {
await imageJobQueue.add(job.type, job);
}
...
Queue
クラスをbullmq
から抽出し、Redisでキューを作成するために使用されます。次に、redisOptions
変数を設定し、Queue
クラスのインスタンスがRedisとの接続を確立するために使用するプロパティを含むオブジェクトに設定します。host
プロパティの値をlocalhost
に設定します。これは、Redisがローカルマシンで実行されているためです。
注意: Redisがアプリとは別のリモートサーバーで実行されている場合、host
プロパティの値をリモートサーバーのIPアドレスに更新します。また、port
プロパティの値をRedisが接続を待ち受けるデフォルトポートである6379
に設定します。
リモートサーバーでRedisとアプリを一緒に実行している場合は、ポートフォワーディングを設定している場合は、host
プロパティを更新する必要はありませんが、アプリを実行するためにサーバーにログインするたびにポートフォワーディング接続を使用する必要があります。
次に、imageJobQueue
変数をQueue
クラスのインスタンスに設定し、キューの名前を第1引数として、オブジェクトを第2引数として渡します。オブジェクトには、redisOptions
変数内のオブジェクトに値が設定されたconnection
プロパティがあります。Queue
クラスをインスタンス化した後、RedisにimageJobQueue
という名前のキューが作成されます。
最後に、imageJobQueue
にジョブを追加するために使用するaddJob()
関数を定義します。この関数は、ジョブの情報を含むjob
パラメーターを取ります(キューに保存したいデータでaddJob()
関数を呼び出します)。関数内では、imageJobQueue
のadd()
メソッドを呼び出し、ジョブの名前を第1引数として、ジョブデータを第2引数として渡します。
強調表示されたコードを追加して、キューにジョブを追加するためにaddJob()
関数を呼び出します:
...
app.post("/upload", async function (req, res) {
const { image } = req.files;
if (!image) return res.sendStatus(400);
const imageName = path.parse(image.name).name;
...
await addJob({
type: "processUploadedImages",
image: {
data: image.data.toString("base64"),
name: image.name,
},
});
res.redirect("/result");
});
...
ここでは、addJob()
関数を呼び出して、ジョブを説明するオブジェクトを渡します。オブジェクトには、ジョブの名前を示すtype
属性があります。2番目のプロパティimage
は、ユーザーがアップロードした画像データを含むオブジェクトに設定されます。 image.data
内の画像データはバッファ(バイナリ形式)なので、JavaScriptのtoString()
メソッドを呼び出してRedisに保存できる文字列に変換します。data
プロパティがその結果として設定されます。image
プロパティは、アップロードされた画像の名前(画像の拡張子を含む)に設定されます。
bullmq
が後でこのジョブを実行するために必要な情報がこれで定義されました。ジョブに応じて、追加のジョブ情報を追加するか、減らすことができます。
注意: Redisはインメモリデータベースなので、キュー内のジョブの大量のデータの保存は避けてください。ジョブが処理する必要のある大きなファイルがある場合は、ファイルをディスクまたはクラウドに保存し、そのリンクをキュー内の文字列として保存してください。bullmq
がジョブを実行するとき、Redisに保存されたリンクからファイルを取得します。
ファイルを保存して閉じます。
次に、画像処理コードを含むutils.js
ファイルを作成して開きます。
- nano utils.js
utils.js
ファイルに、画像処理関数を定義するための次のコードを追加します:
const path = require("path");
const sharp = require("sharp");
function processUploadedImages(job) {
}
module.exports = { processUploadedImages };
最初の2行で画像を処理し、パスを計算するために必要なモジュールをインポートします。次に、processUploadedImages()
関数を定義します。この関数には、時間のかかる画像処理タスクが含まれます。この関数は job
パラメータを取ります。このパラメータは、ワーカーがジョブデータをキューから取得し、その後キューデータを使用して processUploadedImages()
関数を呼び出す際に補完されます。また、processUploadedImages()
関数をエクスポートして、他のファイルから参照できるようにします。
ファイルを保存して閉じます。
index.js
ファイルに戻ります:
- nano index.js
index.js
ファイルからハイライトされた行をコピーし、その後、このファイルから削除します。これらのコードは後で必要になるため、クリップボードに保存してください。もし nano
を使用している場合は、これらの行をハイライトしてマウスで右クリックしてコピーできます:
...
app.post("/upload", async function (req, res) {
const { image } = req.files;
if (!image) return res.sendStatus(400);
const imageName = path.parse(image.name).name;
const processImage = (size) =>
sharp(image.data)
.resize(size, size)
.webp({ lossless: true })
.toFile(`./public/images/${imageName}-${size}.webp`);
sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
Promise.all(sizes.map(processImage))
let counter = 0;
for (let i = 0; i < 10_000_000_000; i++) {
counter++;
};
...
res.redirect("/result");
});
upload
ルートの post
メソッドは、以下と一致します:
...
app.post("/upload", async function (req, res) {
const { image } = req.files;
if (!image) return res.sendStatus(400);
await addJob({
type: "processUploadedImages",
image: {
data: image.data.toString("base64"),
name: image.name,
},
});
res.redirect("/result");
});
...
このファイルを保存して閉じ、utils.js
ファイルを開きます:
- nano utils.js
utils.js
ファイルで、/upload
ルートのコールバックのためにコピーした行を processUploadedImages
関数に貼り付けます:
...
function processUploadedImages(job) {
const imageName = path.parse(image.name).name;
const processImage = (size) =>
sharp(image.data)
.resize(size, size)
.webp({ lossless: true })
.toFile(`./public/images/${imageName}-${size}.webp`);
sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
Promise.all(sizes.map(processImage));
let counter = 0;
for (let i = 0; i < 10_000_000_000; i++) {
counter++;
};
}
...
画像の処理のコードを移動したので、以前に定義した processUploadedImages()
関数の job
パラメータから画像データを使用するように更新する必要があります。
以下のハイライトされた行を追加および更新してください:
function processUploadedImages(job) {
const imageFileData = Buffer.from(job.image.data, "base64");
const imageName = path.parse(job.image.name).name;
const processImage = (size) =>
sharp(imageFileData)
.resize(size, size)
.webp({ lossless: true })
.toFile(`./public/images/${imageName}-${size}.webp`);
...
}
画像データの文字列化されたバージョンをBuffer.from()
メソッドを使ってバイナリに変換します。次に、path.parse()
を更新し、キューに保存されている画像名への参照を追加します。その後、sharp()
メソッドを更新して、imageFileData
変数に格納されている画像バイナリデータを取るようにします。
完全なutils.js
ファイルは次のようになります:
const path = require("path");
const sharp = require("sharp");
function processUploadedImages(job) {
const imageFileData = Buffer.from(job.image.data, "base64");
const imageName = path.parse(job.image.name).name;
const processImage = (size) =>
sharp(imageFileData)
.resize(size, size)
.webp({ lossless: true })
.toFile(`./public/images/${imageName}-${size}.webp`);
sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
Promise.all(sizes.map(processImage));
let counter = 0;
for (let i = 0; i < 10_000_000_000; i++) {
counter++;
};
}
module.exports = { processUploadedImages };
ファイルを保存して閉じ、index.js
に戻ります:
- nano index.js
画像が今やutils.js
ファイルで処理されるので、sharp
変数はもはや必要ありません。ファイルからハイライトされた行を削除します:
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
...
ファイルを保存して閉じます。
これで、Redisでキューを作成し、ジョブを追加する機能を定義しました。また、processUploadedImages()
関数を定義して、アップロードされた画像を処理します。
残りのタスクは、キューからジョブを引き出し、ジョブデータでprocessUploadedImages()
関数を呼び出すconsumer(またはworker)を作成することです。
エディターにworker.js
ファイルを作成します:
- nano worker.js
worker.js
ファイルに、次のコードを追加します:
const { Worker } = require("bullmq");
const { processUploadedImages } = require("./utils");
const workerHandler = (job) => {
console.log("Starting job:", job.name);
processUploadedImages(job.data);
console.log("Finished job:", job.name);
return;
};
最初の行では、bullmq
からWorker
クラスをインポートしています。これはインスタンス化されると、Redisのキューからジョブをデキューして実行するワーカーを開始します。次に、utils.js
ファイルからprocessUploadedImages()
関数を参照して、ワーカーがキュー内のデータで関数を呼び出せるようにします。
あなたは、job
パラメーターを含むキュー内のジョブデータを取る workerHandler()
関数を定義します。関数内で、ジョブが開始されたことをログに記録し、その後、processUploadedImages()
をジョブデータで呼び出します。その後、成功メッセージをログに記録して、null
を返します。
ワーカーが Redis に接続し、キューからジョブをデキューし、ジョブデータを使って workerHandler()
を呼び出すために、次の行をファイルに追加します:
...
const workerOptions = {
connection: {
host: "localhost",
port: 6379,
},
};
const worker = new Worker("imageJobQueue", workerHandler, workerOptions);
console.log("Worker started!");
ここでは、workerOptions
変数を Redis の接続設定を含むオブジェクトに設定します。worker
変数を Worker
クラスのインスタンスに設定します。これには次のパラメーターが必要です:
imageJobQueue
: ジョブキューの名前。workerHandler
: Redis キューからジョブがデキューされた後に実行される関数。workerOptions
: ワーカーが Redis との接続を確立するために使用する Redis の構成設定。
最後に、成功メッセージをログに記録します。
これらの行を追加した後、ファイルを保存して閉じます。
これで、bullmq
ワーカー機能がキューからジョブをデキューして実行されるようになりました。
ターミナルで、public/images
ディレクトリ内の画像を削除してアプリをテストするためにリセットします:
- rm public/images/*
次に、index.js
ファイルを実行します:
- node index.js
アプリが起動します:
OutputServer running on port 3000
これで、ワーカーを開始します。プロジェクトディレクトリに移動して、2 番目のターミナルセッションを開きます。
- cd image_processor/
作業者を以下のコマンドで起動します:
- node worker.js
作業者は次のように開始します:
OutputWorker started!
ブラウザでhttp://localhost:3000/
を訪れます。 ファイルを選択ボタンを押し、コンピューターからunderwater.png
を選択し、画像をアップロードボタンを押します。
数秒後にページを更新するように促す即時応答が表示される場合があります:
代わりに、一部の画像が処理中のままである場合、ページにいくつかの処理済み画像が表示される場合があります:
すべてのリサイズされた画像を読み込むためにページを数回更新できます。
作業者が実行されているターミナルに戻ります。そのターミナルには次のようなメッセージが表示されます:
OutputWorker started!
Starting job: processUploadedImages
Finished job: processUploadedImages
出力は、bullmq
がジョブを正常に実行したことを確認します。
作業者が実行されていなくても、アプリは引き続き時間のかかるタスクをオフロードできます。これを示すために、2番目のターミナルで作業者を停止します:CTRL+C
で止めます。
初めてのターミナルセッションで、Expressサーバーを停止し、public/images
内の画像を削除します:
- rm public/images/*
その後、サーバーを再起動します:
- node index.js
ブラウザでhttp://localhost:3000/
を訪れ、underwater.png
画像を再度アップロードします。 /result
パスにリダイレクトされると、作業者が実行されていないため、画像がページに表示されません:
作業者を実行しているターミナルに戻り、作業者を再度起動します:
- node worker.js
出力は、ジョブが開始されたことを示します:
OutputWorker started!
Starting job: processUploadedImages
作業が完了し、出力に Finished job: processUploadedImages
という行が含まれている場合は、ブラウザを更新してください。 画像は今、ロードされます:
サーバーとワーカーを停止します。
これで、時間のかかるタスクをバックグラウンドにオフロードし、bullmq
を使用して非同期に実行できるようになりました。次のステップでは、キューの状態を監視するダッシュボードを設定します。
ステップ4 — bullmq
キューを監視するダッシュボードを追加
このステップでは、bull-board
パッケージを使用して、Redis キュー内のジョブを視覚的なダッシュボードから監視します。このパッケージは、Redis キューに保存されている bullmq
ジョブの情報を表示して整理するユーザーインターフェース(UI)ダッシュボードを自動的に作成します。ブラウザを使用して、Redis CLI をターミナルで開かずに、完了したジョブ、待機中のジョブ、または失敗したジョブを監視できます。
テキストエディターで index.js
ファイルを開きます:
- nano index.js
ハイライトされたコードを追加して bull-board
をインポートします:
...
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
...
先行するコードでは、bull-board
からcreateBullBoard()
メソッドをインポートします。また、bull-board
からbullmq
キューへのアクセスを可能にするBullMQAdapter
と、Expressがダッシュボードを表示する機能を提供するExpressAdapter
もインポートします。
次に、bull-board
をbullmq
に接続するために、次のハイライトされたコードを追加します:
...
async function addJob(job) {
...
}
const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
queues: [new BullMQAdapter(imageJobQueue)],
serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");
const app = express();
...
まず、serverAdapter
をExpressAdapter
のインスタンスに設定します。次に、createBullBoard()
を呼び出して、bullmq
キューデータでダッシュボードを初期化します。関数にはqueues
とserverAdapter
プロパティを持つオブジェクト引数を渡します。最初のプロパティであるqueues
は、bullmq
で定義したキューの配列を受け入れます(ここではimageJobQueue
)。2番目のプロパティであるserverAdapter
には、Expressサーバーアダプターのインスタンスを含むオブジェクトが含まれます。その後、/admin
パスをsetBasePath()
メソッドでダッシュボードにアクセスできるように設定します。
次に、/admin
ルート用のserverAdapter
ミドルウェアを追加します:
app.use(express.static("public"))
app.use("/admin", serverAdapter.getRouter());
app.get("/", function (req, res) {
...
});
index.js
ファイル全体は次のように一致します:
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
const redisOptions = { host: "localhost", port: 6379 };
const imageJobQueue = new Queue("imageJobQueue", {
connection: redisOptions,
});
async function addJob(job) {
await imageJobQueue.add(job.type, job);
}
const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
queues: [new BullMQAdapter(imageJobQueue)],
serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
bodyParser.urlencoded({
extended: true,
})
);
app.use(fileUpload());
app.use(express.static("public"));
app.use("/admin", serverAdapter.getRouter());
app.get("/", function (req, res) {
res.render("form");
});
app.get("/result", (req, res) => {
const imgDirPath = path.join(__dirname, "./public/images");
let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
return `images/${image}`;
});
res.render("result", { imgFiles });
});
app.post("/upload", async function (req, res) {
const { image } = req.files;
if (!image) return res.sendStatus(400);
await addJob({
type: "processUploadedImages",
image: {
data: Buffer.from(image.data).toString("base64"),
name: image.name,
},
});
res.redirect("/result");
});
app.listen(3000, function () {
console.log("Server running on port 3000");
});
変更が完了したら、ファイルを保存して閉じます。
index.js
ファイルを実行します:
- node index.js
ブラウザに戻り、http://localhost:3000/admin
を訪れます。ダッシュボードが読み込まれます。
ダッシュボードでは、ジョブのタイプ、消費されるデータ、およびその他のジョブに関する情報を確認できます。完了したジョブに関する情報は、完了タブ、失敗したジョブに関する情報は失敗タブ、一時停止したジョブに関する情報は一時停止タブなど、他のタブに切り替えることもできます。
今後は、bull-board
ダッシュボードを使用してキューを監視できます。
結論
この記事では、bullmq
を使用して時間のかかるタスクをジョブキューにオフロードしました。最初に、bullmq
を使用せずに、遅いリクエスト/レスポンスサイクルを持つ時間のかかるタスクを持つアプリを作成しました。その後、bullmq
を使用して時間のかかるタスクをオフロードし、非同期で実行することで、リクエスト/レスポンスサイクルを向上させました。その後、bull-board
を使用して、Redis内のbullmq
キューを監視するダッシュボードを作成しました。
bullmq
ドキュメントを参照して、このチュートリアルでカバーされていないbullmq
の機能について詳しく学ぶことができます。ジョブのスケジューリング、優先順位付け、またはリトライなどの機能の設定、およびワーカーの並行性設定を構成する方法も学ぶことができます。また、bull-board
ドキュメントを参照して、ダッシュボードの機能について詳しく学ぶことができます。