Node.jsとBullMQを使用した非同期タスクの処理方法

著者は、女性エンジニア協会を寄付の受取先として、寄付のために書くプログラムの一環として選びました。

導入

Webアプリケーションにはリクエスト/レスポンスのサイクルがあります。URLを訪れると、ブラウザはデータを処理したりデータベースでクエリを実行したりするアプリケーションを実行しているサーバーにリクエストを送信します。この間、ユーザーはアプリケーションがレスポンスを返すまで待たされます。一部のタスクでは、ユーザーはすぐにレスポンスを得ることができますが、画像の処理、データの分析、レポートの生成、またはメールの送信などの時間がかかるタスクでは、リクエスト/レスポンスのサイクルが遅くなります。たとえば、ユーザーが画像をアップロードするアプリケーションを持っている場合、画像を表示する前にサーバーのディスクスペースを節約するために画像のサイズを変更、圧縮、または別の形式に変換する必要があるかもしれません。画像の処理はCPUを使用するタスクであり、タスクが完了するまでNode.jsのスレッドをブロックすることがあります。それには数秒または数分かかるかもしれません。ユーザーはタスクが完了するのを待たなければならず、その後にサーバーからのレスポンスを受け取ります。

bullmqを使用すると、リクエスト/レスポンスサイクルを遅らせることなく、Node.jsアプリから時間のかかるタスクをオフロードできる分散タスク(ジョブ)キューを利用できます。このツールを使用すると、bullmqはバックグラウンドでタスクを非同期に実行し、アプリとは独立してタスクを処理できるため、アプリがユーザーに素早く応答できるようになります。ジョブの追跡には、Redisを使用して、キュー内の各ジョブの短い説明を保存します。次に、bullmqワーカーがキュー内の各ジョブをデキューして実行し、完了したらマークします。

この記事では、bullmqを使用して時間のかかるタスクをバックグラウンドにオフロードし、アプリケーションがユーザーに素早く応答できるようにします。まず、bullmqを使用せずに時間のかかるタスクを持つアプリを作成します。次に、bullmqを使用してタスクを非同期で実行します。最後に、Redisキュー内のbullmqジョブを管理するためのビジュアルダッシュボードをインストールします。

前提条件

このチュートリアルに従うには、以下が必要です:

ステップ1 — プロジェクトディレクトリの設定

このステップでは、アプリケーションのためのディレクトリを作成し、必要な依存関係をインストールします。このチュートリアルで構築するアプリケーションでは、ユーザーが画像をアップロードし、それをsharpパッケージを使用して処理します。画像処理は時間がかかるため、リクエスト/レスポンスのサイクルが遅くなる可能性があり、このタスクはbullmqを使用してバックグラウンドにオフロードすることが適しています。タスクをオフロードするために使用する手法は、他の時間のかかるタスクにも適用することができます。

まず、image_processorという名前のディレクトリを作成し、そのディレクトリに移動します:

  1. mkdir image_processor && cd image_processor

次に、ディレクトリをnpmパッケージとして初期化します:

  1. npm init -y

このコマンドはpackage.jsonファイルを作成します。オプションの-yは、npmにすべてのデフォルトを受け入れるように指示します。

コマンドを実行すると、出力は以下のようになります:

Output
Wrote to /home/sammy/image_processor/package.json: { "name": "image_processor", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC" }

出力によって、package.jsonファイルが作成されたことが確認されます。重要なプロパティには、アプリの名前(name)、アプリのバージョン番号(version)、プロジェクトの開始ポイント(main)などが含まれます。他のプロパティについて詳しく知りたい場合は、npmのpackage.jsonドキュメントを参照してください。

アプリケーションをビルドするには、次の依存関係が必要です:

  • express: ウェブアプリを構築するためのウェブフレームワーク。
  • express-fileupload: フォームからファイルをアップロードするためのミドルウェア。
  • sharp: 画像処理ライブラリ。
  • ejs: Node.jsでHTMLマークアップを生成するためのテンプレート言語。
  • bullmq: 分散タスクキュー。
  • bull-board: bullmq を基にしたダッシュボードで、ジョブの状態を素敵なユーザーインターフェース(UI)で表示します。

これらの依存関係をすべてインストールするには、次のコマンドを実行してください:

  1. npm install express express-fileupload sharp ejs bullmq @bull-board/express

インストールした依存関係に加えて、後でこのチュートリアルで以下の画像も使用します:

お好みの場所に画像をダウンロードするにはcurlを使用してください。

  1. curl -O https://deved-images.nyc3.cdn.digitaloceanspaces.com/CART-68886/underwater.png

これで、次に進むNode.jsアプリケーションを構築するために必要な依存関係が揃いました。

ステップ2 — bullmqを使用しない時間集中型タスクの実装

このステップでは、Expressを使用して画像をアップロードできるアプリケーションを構築します。アプリはsharpを使用して画像を複数のサイズにリサイズする時間集中型のタスクを開始し、その後にレスポンスが送信されます。このステップでは、時間集中型のタスクがリクエスト/レスポンスサイクルにどのように影響するかを理解するのに役立ちます。

nano、またはお好みのテキストエディタを使用して、index.jsファイルを作成します:

  1. nano index.js

index.jsファイルに、以下のコードを追加して依存関係をインポートします:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

最初の行では、Nodeでファイルパスを計算するためにpathモジュールをインポートします。2行目では、ディレクトリとのやり取りにfsモジュールをインポートします。次に、expressウェブフレームワークをインポートします。HTTPリクエスト内のデータを解析するためのミドルウェアを追加するためにbody-parserモジュールをインポートします。その後、画像処理のためにsharpモジュールをインポートします。最後に、HTMLフォームからのアップロードを処理するためにexpress-fileuploadをインポートします。

次に、アプリ内でミドルウェアを実装するための以下のコードを追加します:

image_processor/index.js
...
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

最初に、app変数をExpressのインスタンスに設定します。次に、app変数を使用して、set()メソッドを使用してExpressがejsテンプレート言語を使用するように設定します。次に、use()メソッドを使用して、HTTPリクエストでのJSONデータをJavaScriptでアクセスできる変数に変換するためのbody-parserモジュールミドルウェアを追加します。次の行では、同様にURLエンコードされた入力を行います。

次に、ファイルのアップロードを処理し静的ファイルを提供するためのさらなるミドルウェアを追加するために、次の行を追加します:

image_processor/index.js
...
app.use(fileUpload());
app.use(express.static("public"));

fileUpload()メソッドを呼び出してアップロードされたファイルを解析するためのミドルウェアを追加し、Expressが画像やCSSなどの静的ファイルを参照し提供するディレクトリを設定します。

ミドルウェアが設定されたら、次のHTMLフォームを表示するためのルートを作成します:

image_processor/index.js
...
app.get("/", function (req, res) {
  res.render("form");
});

ここでは、Expressモジュールのget()メソッドを使用して、/ルートとユーザーがホームページまたは/ルートを訪れたときに実行するコールバックを指定します。コールバックでは、res.render()を呼び出してviewsディレクトリ内のform.ejsファイルをレンダリングします。まだform.ejsファイルまたはviewsディレクトリを作成していません。

それを作成するには、まずファイルを保存して閉じます。ターミナルで、プロジェクトのルートディレクトリにviewsディレクトリを作成するために次のコマンドを入力します:

  1. mkdir views

viewsディレクトリに移動します:

  1. cd views

エディタでform.ejsファイルを作成します:

  1. nano form.ejs

form.ejsファイルに、以下のコードを追加してフォームを作成します:

image_processor/views/form.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="home-wrapper">
      <h1>Image Processor</h1>
      <p>
        Resizes an image to multiple sizes and converts it to a
        <a href="https://en.wikipedia.org/wiki/WebP">webp</a> format.
      </p>
      <form action="/upload" method="POST" enctype="multipart/form-data">
        <input
          type="file"
          name="image"
          placeholder="Select image from your computer"
        />
        <button type="submit">Upload Image</button>
      </form>
    </div>
  </body>
</html>

最初に、head.ejsファイルを参照していることに言及しますが、まだ作成していません。 head.ejsファイルには、他のHTMLページで参照できるHTML head要素が含まれます。

bodyタグ内で、次の属性を持つフォームを作成します:

  • actionは、フォームが送信されたときにフォームデータを送信するルートを指定します。
  • methodはデータを送信するためのHTTPメソッドを指定します。POSTメソッドは、データをHTTPリクエストに埋め込みます。
  • encytypeは、フォームデータのエンコード方法を指定します。multipart/form-dataの値は、HTML input要素がファイルデータをアップロードできるようにします。

form要素内に、ファイルをアップロードするためのinputタグを作成します。その後、type属性がsubmitに設定されたbutton要素を定義します。これにより、フォームを送信できるようになります。

作業が完了したら、ファイルを保存して閉じます。

次に、head.ejsファイルを作成します:

  1. nano head.ejs

head.ejsファイルで、アプリのヘッドセクションを作成するために以下のコードを追加します:

image_processor/views/head.ejs
<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=edge" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Image Processor</title>
  <link rel="stylesheet" href="css/main.css" />
</head>

ここでは、このアプリケーションのスタイルを含むmain.cssファイルを参照しています。このファイルは、この手順の後半でpublicディレクトリに作成します。今のところ、静的アセットのプロセスを設定し続けます。

ファイルを保存して閉じます。

フォームから送信されたデータを処理するには、Expressでpostメソッドを定義する必要があります。これを行うには、プロジェクトのルートディレクトリに戻ります。

  1. cd ..

再度、あなたのindex.jsファイルを開いてください:

  1. nano index.js

次に、/uploadルートでフォームの送信を処理するためのメソッドを定義するために、index.jsファイルに以下の行を追加してください:

image_processor/index.js
app.get("/", function (req, res) {
  ...
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

});

あなたはapp変数を使用して/uploadルートで送信されたフォームを処理するためにpost()メソッドを呼び出します。次に、HTTPリクエストからアップロードされた画像データをimage変数に抽出します。その後、ユーザーが画像をアップロードしない場合は、400ステータスコードを返すようにレスポンスを設定します。

アップロードされた画像の処理手順を設定するには、以下のハイライトされたコードを追加してください:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
});

これらの行は、アプリが画像を処理する方法を表します。まず、アップロードされた画像から画像の拡張子を削除し、名前をimageName変数に保存します。次に、processImage()関数を定義します。この関数はsizeパラメータを取り、その値はリサイズ中の画像の寸法を決定するために使用されます。関数内で、sharp()image.dataとともに呼び出します。これはアップロードされた画像のバイナリデータを含むbufferです。sharpは、サイズパラメータの値に従って画像のサイズを変更します。次に、sharpからwebp()メソッドを使用して画像をwebp画像形式に変換します。その後、画像をpublic/images/ディレクトリに保存します。

次の番号のリストは、アップロードされた画像をリサイズするために使用されるサイズを定義しています。その後、JavaScriptのmap()メソッドを使用して、sizes配列内の各要素にprocessImage()を呼び出します。その後、新しい配列が返されます。 map()メソッドがprocessImage()関数を呼び出すたびに、新しい配列に対するプロミスが返されます。それらを解決するためにPromise.all()メソッドを使用します。

コンピュータの処理速度は異なりますし、ユーザーがアップロードできる画像のサイズも異なります。これは画像処理速度に影響する可能性があります。このコードをデモンストレーション目的で遅延させるために、CPU負荷の高いインクリメントループと、リサイズされた画像が表示されるページへのリダイレクトを追加するには、以下のハイライトされた行を挿入します:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  ...
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

ループはcounter変数を増分させるために100億回実行されます。 res.redirect()関数を使用してアプリを/resultルートにリダイレクトします。そのルートは、public/imagesディレクトリ内の画像を表示するHTMLページをレンダリングします。

/resultルートはまだ存在しません。これを作成するには、index.jsファイルに以下のハイライトされたコードを追加してください:

image_processor/index.js
...

app.get("/", function (req, res) {
 ...
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  ...
});

あなたは/resultルートをapp.get()メソッドで定義します。その関数内で、imgDirPath変数をpublic/imagesディレクトリの完全なパスで定義します。fsモジュールのreaddirSync()メソッドを使用して指定されたディレクトリ内のすべてのファイルを読み込みます。それから、map()メソッドをチェーンして、images/でプレフィックスが付いた新しい配列を返します。

最後に、res.render()を呼び出してまだ存在しないresult.ejsファイルをレンダリングします。imgFiles変数(すべての画像の相対パスが含まれる配列)をresult.ejsファイルに渡します。

ファイルを保存して閉じます。

result.ejsファイルを作成するには、viewsディレクトリに戻ります:

  1. cd views

result.ejsファイルを作成して開きます:

  1. nano result.ejs

result.ejsファイルに以下の行を追加して画像を表示します:

image_processor/views/result.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="gallery-wrapper">
      <% if (imgFiles.length > 0){%>
      <p>The following are the processed images:</p>
      <ul>
        <% for (let imgFile of imgFiles){ %>
        <li><img src=<%= imgFile %> /></li>
        <% } %>
      </ul>
      <% } else{ %>
      <p>
        The image is being processed. Refresh after a few seconds to view the
        resized images.
      </p>
      <% } %>
    </div>
  </body>
</html>

まず、head.ejsファイルを参照します。bodyタグでは、imgFiles変数が空かどうかを確認します。データがある場合、各ファイルに対して画像を作成します。 imgFilesが空の場合、数秒後にリサイズされた画像を表示するにはリフレッシュしてください。とユーザーに伝えるメッセージを表示します。

ファイルを保存して閉じます。

次に、ルートディレクトリに戻り、静的アセットを含むpublicディレクトリを作成します:

  1. cd .. && mkdir public

publicディレクトリに移動してください:

  1. cd public

アップロードされた画像を保持するimagesディレクトリを作成してください:

  1. mkdir images

次に、cssディレクトリを作成し、そこに移動してください:

  1. mkdir css && cd css

エディタで、head.ejsファイルで以前参照したmain.cssファイルを作成して開いてください:

  1. nano main.css

main.cssファイルで、次のスタイルを追加してください:

image_processor/public/css/main.css
body {
  background: #f8f8f8;
}

h1 {
  text-align: center;
}

p {
  margin-bottom: 20px;
}

a:link,
a:visited {
  color: #00bcd4;
}

/** 「ファイルを選択」ボタンのスタイル **/
button[type="submit"] {
  background: none;
  border: 1px solid orange;
  padding: 10px 30px;
  border-radius: 30px;
  transition: all 1s;
}

button[type="submit"]:hover {
  background: orange;
}

/** 「画像をアップロード」ボタンのスタイル **/
input[type="file"]::file-selector-button {
  border: 2px solid #2196f3;
  padding: 10px 20px;
  border-radius: 0.2em;
  background-color: #2196f3;
}

ul {
  list-style: none;
  padding: 0;
  display: flex;
  flex-wrap: wrap;
  gap: 20px;
}

.home-wrapper {
  max-width: 500px;
  margin: 0 auto;
  padding-top: 100px;
}

.gallery-wrapper {
  max-width: 1200px;
  margin: 0 auto;
}

これらの行は、アプリ内の要素のスタイルを設定します。HTML属性を使用して、ファイルを選択ボタンの背景を16進数コード#2196f3(青のシェード)に、画像をアップロードボタンの境界線をorangeにスタイル設定します。また、/resultルート上の要素をより見栄え良くスタイル設定します。

作業が完了したら、ファイルを保存して閉じてください。

プロジェクトのルートディレクトリに戻ってください:

  1. cd ../..

エディタでindex.jsを開いてください:

  1. nano index.js

index.jsに、次のコードを追加してサーバーを起動してください:

image_processor/index.js
...
app.listen(3000, function () {
  console.log("Server running on port 3000");
});

完全なindex.jsファイルは次のようになります:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

app.use(fileUpload());

app.use(express.static("public"));

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

変更を終えたら、ファイルを保存して閉じてください。

nodeコマンドを使用してアプリを実行してください:

  1. node index.js

次のような出力が表示されます:

Output
Server running on port 3000

この出力は、サーバーが問題なく実行されていることを確認します。

好きなブラウザを開き、http://localhost:3000/にアクセスします。

注意: リモートサーバーでチュートリアルに従っている場合は、ポートフォワーディングを使用してローカルのブラウザからアプリにアクセスできます。

Node.jsサーバーが実行されている状態で、別のターミナルを開き、次のコマンドを入力します:

  1. ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip

サーバーに接続したら、node index.jsを実行し、次にローカルマシンのウェブブラウザでhttp://localhost:3000/に移動します。

ページが読み込まれると、以下のようになります:

次に、ファイルを選択ボタンを押し、ローカルマシン上のunderwater.png画像を選択します。表示がファイルが選択されていませんからunderwater.pngに切り替わります。その後、画像をアップロードボタンを押します。アプリが画像を処理し、インクリメントループを実行するためにしばらく読み込みます。

タスクが完了すると、リサイズされた画像が/resultルートで表示されます:

サーバーをCTRL+Cで停止できます。Node.jsはファイルが変更された場合にサーバーを自動的に再読み込みしないため、ファイルを更新するたびにサーバーを停止して再起動する必要があります。

時間のかかるタスクがアプリケーションのリクエスト/レスポンスサイクルにどのように影響を与えるかを理解しました。次に、タスクを非同期で実行します。

ステップ3 — bullmqを使用して時間のかかるタスクを非同期に実行する

このステップでは、bullmqを使用して時間のかかるタスクをバックグラウンドで処理します。この調整により、リクエスト/レスポンスサイクルが解放され、画像が処理されている間にアプリがユーザーに即座に応答できるようになります。

そのためには、ジョブの簡潔な説明を作成し、bullmqでキューに追加する必要があります。キューは、実際の生活でのキューの動作と同様に機能するデータ構造です。人々がスペースに入るために列に並ぶとき、列の最初の人物が最初にスペースに入ります。後から来た人は列の最後尾に並び、列の先頭にいるすべての人がスペースに入るまで、列の最後尾に並びます。キューのデータ構造の先入れ先出し(FIFO)プロセスでは、キューに追加された最初のアイテムが最初に削除されます(デキュー)。bullmqでは、プロデューサーがジョブをキューに追加し、コンシューマー(またはワーカー)がキューからジョブを削除して実行します。

bullmqのキューはRedisにあります。ジョブを記述してキューに追加すると、Redisキューにジョブのエントリが作成されます。ジョブの説明は、最小限のデータを含む文字列またはオブジェクトであり、後でbullmqがジョブを実行するためのデータへの参照を含みます。キューにジョブを追加する機能を定義したら、時間のかかるコードを別の関数に移動します。その後、bullmqはこの関数を呼び出し、ジョブがキューから取り出されるときにキューに格納されているデータで呼び出します。タスクが完了すると、bullmqはそれを完了とマークし、キューから別のジョブを取り出して実行します。

エディターでindex.jsを開きます:

  1. nano index.js

index.jsファイルで、Redisでbullmqキューを作成するためにハイライトされた行を追加します:

image_processor/index.js
...
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}
...

Queueクラスをbullmqから抽出し、Redisでキューを作成するために使用されます。次に、redisOptions変数を設定し、QueueクラスのインスタンスがRedisとの接続を確立するために使用するプロパティを含むオブジェクトに設定します。hostプロパティの値をlocalhostに設定します。これは、Redisがローカルマシンで実行されているためです。

注意: Redisがアプリとは別のリモートサーバーで実行されている場合、hostプロパティの値をリモートサーバーのIPアドレスに更新します。また、portプロパティの値をRedisが接続を待ち受けるデフォルトポートである6379に設定します。

リモートサーバーでRedisとアプリを一緒に実行している場合は、ポートフォワーディングを設定している場合は、hostプロパティを更新する必要はありませんが、アプリを実行するためにサーバーにログインするたびにポートフォワーディング接続を使用する必要があります。

次に、imageJobQueue変数をQueueクラスのインスタンスに設定し、キューの名前を第1引数として、オブジェクトを第2引数として渡します。オブジェクトには、redisOptions変数内のオブジェクトに値が設定されたconnectionプロパティがあります。Queueクラスをインスタンス化した後、RedisにimageJobQueueという名前のキューが作成されます。

最後に、imageJobQueueにジョブを追加するために使用するaddJob()関数を定義します。この関数は、ジョブの情報を含むjobパラメーターを取ります(キューに保存したいデータでaddJob()関数を呼び出します)。関数内では、imageJobQueueadd()メソッドを呼び出し、ジョブの名前を第1引数として、ジョブデータを第2引数として渡します。

強調表示されたコードを追加して、キューにジョブを追加するためにaddJob()関数を呼び出します:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  ...
  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

ここでは、addJob() 関数を呼び出して、ジョブを説明するオブジェクトを渡します。オブジェクトには、ジョブの名前を示すtype属性があります。2番目のプロパティimageは、ユーザーがアップロードした画像データを含むオブジェクトに設定されます。 image.data内の画像データはバッファ(バイナリ形式)なので、JavaScriptのtoString()メソッドを呼び出してRedisに保存できる文字列に変換します。dataプロパティがその結果として設定されます。imageプロパティは、アップロードされた画像の名前(画像の拡張子を含む)に設定されます。

bullmqが後でこのジョブを実行するために必要な情報がこれで定義されました。ジョブに応じて、追加のジョブ情報を追加するか、減らすことができます。

注意: Redisはインメモリデータベースなので、キュー内のジョブの大量のデータの保存は避けてください。ジョブが処理する必要のある大きなファイルがある場合は、ファイルをディスクまたはクラウドに保存し、そのリンクをキュー内の文字列として保存してください。bullmqがジョブを実行するとき、Redisに保存されたリンクからファイルを取得します。

ファイルを保存して閉じます。

次に、画像処理コードを含むutils.jsファイルを作成して開きます。

  1. nano utils.js

utils.jsファイルに、画像処理関数を定義するための次のコードを追加します:

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
}

module.exports = { processUploadedImages };

最初の2行で画像を処理し、パスを計算するために必要なモジュールをインポートします。次に、processUploadedImages() 関数を定義します。この関数には、時間のかかる画像処理タスクが含まれます。この関数は job パラメータを取ります。このパラメータは、ワーカーがジョブデータをキューから取得し、その後キューデータを使用して processUploadedImages() 関数を呼び出す際に補完されます。また、processUploadedImages() 関数をエクスポートして、他のファイルから参照できるようにします。

ファイルを保存して閉じます。

index.js ファイルに戻ります:

  1. nano index.js

index.js ファイルからハイライトされた行をコピーし、その後、このファイルから削除します。これらのコードは後で必要になるため、クリップボードに保存してください。もし nano を使用している場合は、これらの行をハイライトしてマウスで右クリックしてコピーできます:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage))
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
...
  res.redirect("/result");
});

upload ルートの post メソッドは、以下と一致します:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

このファイルを保存して閉じ、utils.js ファイルを開きます:

  1. nano utils.js

utils.js ファイルで、/upload ルートのコールバックのためにコピーした行を processUploadedImages 関数に貼り付けます:

image_processor/utils.js
...
function processUploadedImages(job) {
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}
...

画像の処理のコードを移動したので、以前に定義した processUploadedImages() 関数の job パラメータから画像データを使用するように更新する必要があります。

以下のハイライトされた行を追加および更新してください:

image_processor/utils.js

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);
  ...
}

画像データの文字列化されたバージョンをBuffer.from()メソッドを使ってバイナリに変換します。次に、path.parse()を更新し、キューに保存されている画像名への参照を追加します。その後、sharp()メソッドを更新して、imageFileData変数に格納されている画像バイナリデータを取るようにします。

完全なutils.jsファイルは次のようになります:

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}

module.exports = { processUploadedImages };

ファイルを保存して閉じ、index.jsに戻ります:

  1. nano index.js

画像が今やutils.jsファイルで処理されるので、sharp変数はもはや必要ありません。ファイルからハイライトされた行を削除します:

image_processor/index.js
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
...

ファイルを保存して閉じます。

これで、Redisでキューを作成し、ジョブを追加する機能を定義しました。また、processUploadedImages()関数を定義して、アップロードされた画像を処理します。

残りのタスクは、キューからジョブを引き出し、ジョブデータでprocessUploadedImages()関数を呼び出すconsumer(またはworker)を作成することです。

エディターにworker.jsファイルを作成します:

  1. nano worker.js

worker.jsファイルに、次のコードを追加します:

image_processor/worker.js
const { Worker } = require("bullmq");

const { processUploadedImages } = require("./utils");

const workerHandler = (job) => {
  console.log("Starting job:", job.name);
  processUploadedImages(job.data);
  console.log("Finished job:", job.name);
  return;
};

最初の行では、bullmqからWorkerクラスをインポートしています。これはインスタンス化されると、Redisのキューからジョブをデキューして実行するワーカーを開始します。次に、utils.jsファイルからprocessUploadedImages()関数を参照して、ワーカーがキュー内のデータで関数を呼び出せるようにします。

あなたは、job パラメーターを含むキュー内のジョブデータを取る workerHandler() 関数を定義します。関数内で、ジョブが開始されたことをログに記録し、その後、processUploadedImages() をジョブデータで呼び出します。その後、成功メッセージをログに記録して、null を返します。

ワーカーが Redis に接続し、キューからジョブをデキューし、ジョブデータを使って workerHandler() を呼び出すために、次の行をファイルに追加します:

image_processor/worker.js
...
const workerOptions = {
  connection: {
    host: "localhost",
    port: 6379,
  },
};

const worker = new Worker("imageJobQueue", workerHandler, workerOptions);

console.log("Worker started!");

ここでは、workerOptions 変数を Redis の接続設定を含むオブジェクトに設定します。worker 変数を Worker クラスのインスタンスに設定します。これには次のパラメーターが必要です:

  • imageJobQueue: ジョブキューの名前。
  • workerHandler: Redis キューからジョブがデキューされた後に実行される関数。
  • workerOptions: ワーカーが Redis との接続を確立するために使用する Redis の構成設定。

最後に、成功メッセージをログに記録します。

これらの行を追加した後、ファイルを保存して閉じます。

これで、bullmq ワーカー機能がキューからジョブをデキューして実行されるようになりました。

ターミナルで、public/images ディレクトリ内の画像を削除してアプリをテストするためにリセットします:

  1. rm public/images/*

次に、index.js ファイルを実行します:

  1. node index.js

アプリが起動します:

Output
Server running on port 3000

これで、ワーカーを開始します。プロジェクトディレクトリに移動して、2 番目のターミナルセッションを開きます。

  1. cd image_processor/

作業者を以下のコマンドで起動します:

  1. node worker.js

作業者は次のように開始します:

Output
Worker started!

ブラウザでhttp://localhost:3000/を訪れます。 ファイルを選択ボタンを押し、コンピューターからunderwater.pngを選択し、画像をアップロードボタンを押します。

数秒後にページを更新するように促す即時応答が表示される場合があります:

代わりに、一部の画像が処理中のままである場合、ページにいくつかの処理済み画像が表示される場合があります:

すべてのリサイズされた画像を読み込むためにページを数回更新できます。

作業者が実行されているターミナルに戻ります。そのターミナルには次のようなメッセージが表示されます:

Output
Worker started! Starting job: processUploadedImages Finished job: processUploadedImages

出力は、bullmqがジョブを正常に実行したことを確認します。

作業者が実行されていなくても、アプリは引き続き時間のかかるタスクをオフロードできます。これを示すために、2番目のターミナルで作業者を停止します:CTRL+Cで止めます。

初めてのターミナルセッションで、Expressサーバーを停止し、public/images内の画像を削除します:

  1. rm public/images/*

その後、サーバーを再起動します:

  1. node index.js

ブラウザでhttp://localhost:3000/を訪れ、underwater.png画像を再度アップロードします。 /resultパスにリダイレクトされると、作業者が実行されていないため、画像がページに表示されません:

作業者を実行しているターミナルに戻り、作業者を再度起動します:

  1. node worker.js

出力は、ジョブが開始されたことを示します:

Output
Worker started! Starting job: processUploadedImages

作業が完了し、出力に Finished job: processUploadedImages という行が含まれている場合は、ブラウザを更新してください。 画像は今、ロードされます:

サーバーとワーカーを停止します。

これで、時間のかかるタスクをバックグラウンドにオフロードし、bullmq を使用して非同期に実行できるようになりました。次のステップでは、キューの状態を監視するダッシュボードを設定します。

ステップ4 — bullmq キューを監視するダッシュボードを追加

このステップでは、bull-board パッケージを使用して、Redis キュー内のジョブを視覚的なダッシュボードから監視します。このパッケージは、Redis キューに保存されている bullmq ジョブの情報を表示して整理するユーザーインターフェース(UI)ダッシュボードを自動的に作成します。ブラウザを使用して、Redis CLI をターミナルで開かずに、完了したジョブ、待機中のジョブ、または失敗したジョブを監視できます。

テキストエディターで index.js ファイルを開きます:

  1. nano index.js

ハイライトされたコードを追加して bull-board をインポートします:

image_processor/index.js
...
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
...

先行するコードでは、bull-boardからcreateBullBoard()メソッドをインポートします。また、bull-boardからbullmqキューへのアクセスを可能にするBullMQAdapterと、Expressがダッシュボードを表示する機能を提供するExpressAdapterもインポートします。

次に、bull-boardbullmqに接続するために、次のハイライトされたコードを追加します:

image_processor/index.js
...
async function addJob(job) {
  ...
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
...

まず、serverAdapterExpressAdapterのインスタンスに設定します。次に、createBullBoard()を呼び出して、bullmqキューデータでダッシュボードを初期化します。関数にはqueuesserverAdapterプロパティを持つオブジェクト引数を渡します。最初のプロパティであるqueuesは、bullmqで定義したキューの配列を受け入れます(ここではimageJobQueue)。2番目のプロパティであるserverAdapterには、Expressサーバーアダプターのインスタンスを含むオブジェクトが含まれます。その後、/adminパスをsetBasePath()メソッドでダッシュボードにアクセスできるように設定します。

次に、/adminルート用のserverAdapterミドルウェアを追加します:

image_processor/index.js
app.use(express.static("public"))

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  ...
});

index.jsファイル全体は次のように一致します:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);
app.use(fileUpload());

app.use(express.static("public"));

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: Buffer.from(image.data).toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

変更が完了したら、ファイルを保存して閉じます。

index.jsファイルを実行します:

  1. node index.js

ブラウザに戻り、http://localhost:3000/adminを訪れます。ダッシュボードが読み込まれます。

ダッシュボードでは、ジョブのタイプ、消費されるデータ、およびその他のジョブに関する情報を確認できます。完了したジョブに関する情報は、完了タブ、失敗したジョブに関する情報は失敗タブ、一時停止したジョブに関する情報は一時停止タブなど、他のタブに切り替えることもできます。

今後は、bull-boardダッシュボードを使用してキューを監視できます。

結論

この記事では、bullmqを使用して時間のかかるタスクをジョブキューにオフロードしました。最初に、bullmqを使用せずに、遅いリクエスト/レスポンスサイクルを持つ時間のかかるタスクを持つアプリを作成しました。その後、bullmqを使用して時間のかかるタスクをオフロードし、非同期で実行することで、リクエスト/レスポンスサイクルを向上させました。その後、bull-boardを使用して、Redis内のbullmqキューを監視するダッシュボードを作成しました。

bullmqドキュメントを参照して、このチュートリアルでカバーされていないbullmqの機能について詳しく学ぶことができます。ジョブのスケジューリング、優先順位付け、またはリトライなどの機能の設定、およびワーカーの並行性設定を構成する方法も学ぶことができます。また、bull-boardドキュメントを参照して、ダッシュボードの機能について詳しく学ぶことができます。

Source:
https://www.digitalocean.com/community/tutorials/how-to-handle-asynchronous-tasks-with-node-js-and-bullmq