如何使用 Node.js 和 BullMQ 處理異步任務

作者選擇了女性工程師協會作為為捐贈寫作計劃的一部分。

介紹

Web應用程序具有請求/響應周期。當您訪問URL時,瀏覽器會向運行處理數據或在數據庫中運行查詢的應用程序的服務器發送請求。在此期間,用戶會等待,直到應用程序返回響應。對於某些任務,用戶可以快速獲得響應;對於耗時的任務,例如處理圖像、分析數據、生成報告或發送電子郵件,這些任務需要很長時間才能完成,並且可能會減慢請求/響應周期。例如,假設您有一個應用程序,用戶上傳圖像。在這種情況下,您可能需要調整、壓縮或將圖像轉換為其他格式,以保留服務器的磁盤空間,然後再將圖像顯示給用戶。處理圖像是一個CPU密集型任務,它可以阻塞一個Node.js線程,直到任務完成。這可能需要幾秒鐘或幾分鐘。用戶必須等待任務完成才能從服務器獲得響應。

為了避免降低請求/響應循環的速度,您可以使用bullmq,這是一個分佈式任務(工作)隊列,允許您將耗時的任務從您的Node.js應用程序卸載到bullmq,從而釋放請求/響應循環。該工具使您的應用程序能夠快速向用戶發送響應,同時bullmq在後台異步執行任務,並與您的應用程序獨立執行。為了跟踪任務,bullmq使用Redis來存儲隊列中每個任務的簡短描述。bullmq工作程序然後從隊列中出列並執行每個任務,在完成後標記它為完成。

在本文中,您將使用bullmq將一個耗時的任務卸載到後台,從而使應用程序能夠快速響應用戶。首先,您將創建一個具有耗時任務的應用程序,而不使用bullmq。然後,您將使用bullmq異步執行該任務。最後,您將安裝一個可視化儀表板來管理Redis隊列中的bullmq任務。

先決條件

要完成本教程,您需要以下物品:

步驟 1 – 設置專案目錄

在這一步驟中,您將創建一個目錄並為您的應用程序安裝必要的依賴項。在本教程中,您將構建的應用程序將允許用戶上傳圖片,然後使用 sharp 軟件包進行處理。圖像處理是一項耗時的任務,可能會減慢請求/響應周期,因此將此任務轉移為 bullmq 背景處理是一個很好的選擇。您將使用的技術也適用於其他耗時任務。

首先,創建一個名為 image_processor 的目錄並進入此目錄:

  1. mkdir image_processor && cd image_processor

然後,將此目錄初始化為 npm 包:

  1. npm init -y

此命令將創建一個 package.json 文件。 -y 選項告訴 npm 接受所有默認值。

執行命令後,您的輸出將匹配以下內容:

Output
Wrote to /home/sammy/image_processor/package.json: { "name": "image_processor", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC" }

輸出確認已創建了 package.json 文件。重要的屬性包括您的應用程序名稱(name),應用程序版本號(version)和您項目的起始點(main)。如果您想了解更多關於其他屬性的信息,您可以查看 npm 的 package.json 文檔

應用程式在本教程中將需要以下依賴項目:

  • express:用於構建Web應用程序的Web框架。
  • express-fileupload:允許您的表單上傳文件的中間件。
  • sharp:一個圖像處理庫。
  • ejs:一種模板語言,允許您使用Node.js生成HTML標記。
  • bullmq:一個分佈式任務隊列。
  • bull-board:在bullmq的基礎上構建的顯示作業狀態的優美用戶界面(UI)的儀表板。

要安裝所有這些依賴項目,運行以下命令:

  1. npm install express express-fileupload sharp ejs bullmq @bull-board/express

除了您安裝的依賴項目外,您還將在本教程的後面使用以下圖像:

使用curl下載圖像到本地計算機上您選擇的位置

  1. curl -O https://deved-images.nyc3.cdn.digitaloceanspaces.com/CART-68886/underwater.png

您具有構建不包含bullmq的Node.js應用程序所需的所有依賴項目,接下來將執行。

第二步 — 不使用bullmq實現耗時任務

在這一步中,您將使用Express構建一個應用程序,允許用戶上傳圖片。應用程序將使用sharp開始一個耗時的任務,將圖片調整為多個尺寸,然後在響應發送後顯示給用戶。這一步將幫助您了解耗時任務如何影響請求/響應循環。

使用nano或您首選的文本編輯器,創建index.js文件:

  1. nano index.js

在您的index.js文件中,添加以下代碼以導入相依項目:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

第一行,您導入path模塊以計算Node中的文件路徑。第二行,您導入fs模塊以與目錄交互。然後您導入express網絡框架。您導入body-parser模塊以添加中間件來解析HTTP請求中的數據。隨後,您導入sharp模塊進行圖像處理。最後,您導入express-fileupload來處理來自HTML表單的上傳。

接下來,添加以下代碼以實現應用程序的中間件:

image_processor/index.js
...
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

首先,將app變數設置為Express的一個實例。其次,使用app變數,set()方法配置Express使用ejs模板語言。然後,使用use()方法添加body-parser模塊中間件,將HTTP請求中的JSON數據轉換為可以使用JavaScript訪問的變量。在下一行,您以相同的方式處理URL編碼輸入。

接下來,添加以下代碼以添加更多處理文件上傳和提供靜態文件的中間件:

image_processor/index.js
...
app.use(fileUpload());
app.use(express.static("public"));

通過調用fileUpload()方法添加處理上傳文件的中間件,並設置Express將查找並提供靜態文件(如圖像和CSS)的目錄。

設置中間件後,創建一個路由,用於顯示用於上傳圖像的HTML表單:

image_processor/index.js
...
app.get("/", function (req, res) {
  res.render("form");
});

在這裡,使用Express模塊的get()方法指定/路由以及當用戶訪問主頁或/路由時應運行的回調。在回調中,調用res.render()來渲染views目錄中的form.ejs文件。您還沒有創建form.ejs文件或views目錄。

要創建它,首先保存並關閉您的文件。在終端中,輸入以下命令在項目根目錄中創建views目錄:

  1. mkdir views

進入views目錄:

  1. cd views

在您的編輯器中創建form.ejs文件:

  1. nano form.ejs

form.ejs文件中,添加以下代碼以創建表單:

image_processor/views/form.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="home-wrapper">
      <h1>Image Processor</h1>
      <p>
        Resizes an image to multiple sizes and converts it to a
        <a href="https://en.wikipedia.org/wiki/WebP">webp</a> format.
      </p>
      <form action="/upload" method="POST" enctype="multipart/form-data">
        <input
          type="file"
          name="image"
          placeholder="Select image from your computer"
        />
        <button type="submit">Upload Image</button>
      </form>
    </div>
  </body>
</html>

首先,您引用尚未创建的head.ejs文件。 head.ejs文件将包含您可以在其他HTML页面中引用的HTML head元素。

body标签中,您创建一个带有以下属性的表单:

  • action指定表单数据在提交时应发送到的路由。
  • method指定发送数据的HTTP方法。 POST方法将数据嵌入到HTTP请求中。
  • encytype指定表单数据应如何编码。 值multipart/form-data使HTML input元素能够上传文件数据。

form元素中,您创建一个input标签来上传文件。 然后,您定义button元素,其type属性设置为submit,这样您就可以提交表单。

完成后,保存并关闭文件。

接下来,创建一个head.ejs文件:

  1. nano head.ejs

在您的head.ejs文件中,添加以下代码以创建应用程序的头部部分:

image_processor/views/head.ejs
<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=edge" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Image Processor</title>
  <link rel="stylesheet" href="css/main.css" />
</head>

在这里,您引用稍后将在此步骤中创建的public目录中的main.css文件。 该文件将包含此应用程序的样式。 现在,您将继续设置静态资产的流程。

保存并关闭文件。

要处理从表单提交的数据,必须在Express中定义一个post方法。 为此,请返回到您项目的根目录:

  1. cd ..

打開你的index.js文件:

  1. nano index.js

在你的index.js文件中,添加以下突出顯示的代碼以定義處理在路由/upload上提交表單的方法:

image_processor/index.js
app.get("/", function (req, res) {
  ...
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

});

使用app變量調用post()方法,該方法將處理在/upload路由上提交的表單。接下來,從HTTP請求中提取上傳的圖像數據到image變量中。然後,如果用戶未上傳圖像,設置響應以返回400狀態代碼。

為設置上傳圖像的處理過程,添加以下突出顯示的代碼:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
});

這些行代表應用程序將如何處理圖像。首先,從上傳的圖像中刪除圖像擴展名並將名稱保存在imageName變量中。接下來,定義processImage()函數。此函數接受size參數,其值將用於在調整大小期間確定圖像尺寸。在函數中,使用包含上傳圖像的二進制數據的緩衝器調用sharp()sharp根據大小參數中的值調整圖像的大小。使用sharpwebp()方法將圖像轉換為webp圖像格式。然後,將圖像保存在public/images/目錄中。

下列數字清單定義了用於調整上傳圖片大小的尺寸。然後使用JavaScript的map()方法來對sizes陣列中的每個元素調用processImage()函數,之後它將返回一個新的陣列。每次map()方法調用processImage()函數時,它都會對新的陣列返回一個promise。您使用Promise.all()方法來解析它們。

計算機處理速度各不相同,用戶可以上傳的圖片大小也可能會影響圖片處理速度。為了演示目的,請插入突出顯示的代碼行來添加一個佔用CPU的增量循環和一個重定向到顯示調整大小圖片的頁面:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  ...
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

循環將運行100億次來增加counter變量。您調用res.redirect()函數將應用程序重定向到/result路由。該路由將渲染一個HTML頁面,其中將顯示public/images目錄中的圖片。

目前還不存在/result路由。要創建它,請在index.js文件中添加突出顯示的代碼:

image_processor/index.js
...

app.get("/", function (req, res) {
 ...
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  ...
});

你使用app.get()方法定義/result路由。在該函數中,你使用完整路徑定義了imgDirPath變數,指向public/images目錄。然後,你使用fs模塊的readdirSync()方法讀取給定目錄中的所有文件。接著,你串接map()方法返回一個新的數組,其中包含以images/為前綴的圖片路徑。

最後,你調用res.render()來渲染result.ejs文件,該文件目前尚不存在。你將包含所有圖片相對路徑的數組imgFiles變量傳遞給result.ejs文件。

保存並關閉你的文件。

要創建result.ejs文件,返回views目錄:

  1. cd views

在你的編輯器中創建並打開result.ejs文件:

  1. nano result.ejs

result.ejs文件中添加以下行以顯示圖片:

image_processor/views/result.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="gallery-wrapper">
      <% if (imgFiles.length > 0){%>
      <p>The following are the processed images:</p>
      <ul>
        <% for (let imgFile of imgFiles){ %>
        <li><img src=<%= imgFile %> /></li>
        <% } %>
      </ul>
      <% } else{ %>
      <p>
        The image is being processed. Refresh after a few seconds to view the
        resized images.
      </p>
      <% } %>
    </div>
  </body>
</html>

首先,引用head.ejs文件。在body標籤中,檢查imgFiles變量是否為空。如果有數據,則遍歷每個文件並為每個數組元素創建一個圖片。如果imgFiles為空,則打印一條消息,告訴用戶幾秒後刷新以查看調整大小後的圖片。

保存並關閉你的文件。

接下來,返回根目錄並創建包含你的靜態資源的public目錄。

  1. cd .. && mkdir public

移動到public目錄:

  1. cd public

創建一個images目錄,用於保存上傳的圖片:

  1. mkdir images

接下來,創建css目錄並進入:

  1. mkdir css && cd css

在您的編輯器中,創建並打開main.css文件,該文件您在head.ejs文件中引用過:

  1. nano main.css

在您的main.css文件中,添加以下樣式:

image_processor/public/css/main.css
body {
  background: #f8f8f8;
}

h1 {
  text-align: center;
}

p {
  margin-bottom: 20px;
}

a:link,
a:visited {
  color: #00bcd4;
}

/** 用於 "選擇文件" 按鈕的樣式 **/
button[type="submit"] {
  background: none;
  border: 1px solid orange;
  padding: 10px 30px;
  border-radius: 30px;
  transition: all 1s;
}

button[type="submit"]:hover {
  background: orange;
}

/** 用於 "上傳圖片" 按鈕的樣式 **/
input[type="file"]::file-selector-button {
  border: 2px solid #2196f3;
  padding: 10px 20px;
  border-radius: 0.2em;
  background-color: #2196f3;
}

ul {
  list-style: none;
  padding: 0;
  display: flex;
  flex-wrap: wrap;
  gap: 20px;
}

.home-wrapper {
  max-width: 500px;
  margin: 0 auto;
  padding-top: 100px;
}

.gallery-wrapper {
  max-width: 1200px;
  margin: 0 auto;
}

這些行將為應用程序中的元素設置樣式。使用HTML屬性,您可以將選擇文件按鈕的背景設置為十六進制代碼#2196f3(藍色的一種色調),將上傳圖片按鈕的邊框設置為orange。您還可以設置/result路由上的元素以使它們更具可呈現性。

完成後,保存並關閉文件。

返回到項目根目錄:

  1. cd ../..

在您的編輯器中打開index.js

  1. nano index.js

在您的index.js中添加以下代碼,將啟動服務器:

image_processor/index.js
...
app.listen(3000, function () {
  console.log("Server running on port 3000");
});

完整的index.js文件現在將與以下匹配:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

app.use(fileUpload());

app.use(express.static("public"));

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

完成更改後,保存並關閉文件。

使用node命令運行應用程序:

  1. node index.js

您將收到以下輸出:

Output
Server running on port 3000

此輸出確認服務器正在運行,沒有任何問題。

打開你偏好的瀏覽器並訪問http://localhost:3000/

注意: 如果您正在遠程服務器上跟隨教程,可以使用端口轉發在本地瀏覽器中訪問應用程序。

當Node.js服務器運行時,打開另一個終端並輸入以下命令:

  1. ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip

一旦連接到服務器,運行node index.js然後在本機機器的網絡瀏覽器中導航到http://localhost:3000/

當頁面加載完成後,將匹配以下內容:

接下來,按下選擇文件按鈕並選擇本機機器上的underwater.png圖像。顯示將從未選擇文件切換到underwater.png。之後,按下上傳圖像按鈕。應用程序將加載一段時間,因為它處理圖像並運行增量循環。

任務完成後,/result路徑將加載調整大小的圖像:

您現在可以使用CTRL+C停止服務器。當文件更改時,Node.js不會自動重新加載服務器,因此您需要在更新文件時停止並重新啟動服務器。

您現在知道一個耗時的任務如何影響應用程序的請求/響應循環。接下來,您將異步執行該任務。

步驟 3 — 使用 bullmq 非同步執行耗時任務

在這一步中,您將使用 bullmq 將一個耗時的任務移至後台。這樣的調整將釋放請求/響應循環,並允許您的應用程序立即響應用戶,同時圖像正在處理中。

為此,您需要為工作創建簡潔的描述,並將其添加到 bullmq 的隊列中。隊列是一種數據結構,其工作方式類似於現實生活中的隊列。當人們排隊進入一個空間時,排在最前面的人將是第一個進入該空間的人。稍後到來的任何人都將排在隊列的末尾,並在排在他們前面的每個人進入空間之後進入空間,直到最後一個人進入空間。使用隊列數據結構的先進先出(FIFO)過程,添加到隊列的第一個項目是要刪除的第一個項目(出隊)。使用 bullmq,生產者將在隊列中添加一個工作,而消費者(或工作者)將從隊列中刪除一個工作並執行它。隊列先進先出(FIFO)出隊生產者消費者工作者

bullmq 中的排隊是在 Redis 中進行的。 當您描述一個工作並將其添加到隊列時,將在 Redis 隊列中創建一個工作項目。 工作描述可以是一個字符串,也可以是一個包含最小數據或引用數據的屬性的對象,這將允許 bullmq 在以後執行該工作。 一旦您定義了將工作添加到隊列的功能,就將耗時的代碼移入一個單獨的函數中。 稍後,bullmq 將使用您在隊列中存儲的數據調用此函數,當工作出列時。 任務完成後,bullmq 將標記為已完成,從隊列中拉出另一個工作並執行它。

打開您的編輯器中的 index.js

  1. nano index.js

在您的 index.js 文件中,添加以下突出顯示的行以在 Redis 中創建一個隊列,使用 bullmq

image_processor/index.js
...
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}
...

您首先從 bullmq 中提取 Queue 類,該類用於在 Redis 中創建一個隊列。 然後,您將 redisOptions 變量設置為一個對象,該對象具有 Queue 類實例將用於與 Redis 建立連接的屬性。 您將 host 屬性值設置為 localhost,因為 Redis 在您的本地機器上運行。

注意: 如果 Redis 在與應用程序分開的遠程服務器上運行,則應將 host 屬性值更新為遠程服務器的 IP 地址。 您還將 port 屬性值設置為 6379,這是 Redis 用於監聽連接的默認端口。

如果您已經設置了對遠程運行Redis和應用程序的服務器的端口轉發,則無需更新host屬性,但您每次登錄到服務器運行應用程序時都需要使用端口轉發連接。

接下來,您將imageJobQueue變量設置為Queue類的一個實例,將隊列的名稱作為第一個參數,將對象作為第二個參數。該對象具有connection屬性,其值設置為redisOptions變量中的對象。在實例化Queue類之後,Redis中將創建一個名為imageJobQueue的隊列。

最後,您定義addJob()函數,您將使用該函數在imageJobQueue中添加作業。該函數接受一個包含作業信息的job參數⁠(您將使用要保存在隊列中的數據調用addJob()函數)。在函數中,您調用imageJobQueueadd()方法,將作業名稱作為第一個參數,作業數據作為第二個參數。

添加突出顯示的代碼以調用addJob()函數將作業添加到隊列中:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  ...
  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

在這裡,您使用描述工作的物件呼叫addJob()函數。該物件具有type屬性,其值為工作名稱。第二個屬性image被設置為一個包含使用者上傳的圖片數據的物件。因為image.data中的圖片數據是一個緩衝區(二進制形式),您調用JavaScript的toString()方法將其轉換為可以存儲在Redis中的字符串,這將設置data屬性。image屬性被設置為上傳圖片的名稱(包括圖片擴展名)。

您現在已經定義了供bullmq稍後執行此工作所需的信息。根據您的工作,您可以添加更多或更少的工作信息。

警告:由於Redis是一個內存數據庫,請避免將大量數據存儲在隊列中的工作中。如果您有一個需要處理的大文件,請將文件保存在磁盤或雲中,然後將文件的鏈接以字符串形式保存在隊列中。當bullmq執行工作時,它將從Redis中保存的鏈接中獲取文件。

保存並關閉您的文件。

接下來,創建並打開包含圖片處理代碼的utils.js文件:

  1. nano utils.js

在您的utils.js文件中,添加以下代碼以定義處理圖片的函數:

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
}

module.exports = { processUploadedImages };

您在前兩行導入了處理圖片和計算路徑所需的模塊。然後定義了processUploadedImages()函數,該函數將包含耗時的圖片處理任務。此函數接受一個job參數,當工作者從隊列中獲取作業數據時,該參數將被填充,然後調用帶有隊列數據的processUploadedImages()函數。您還將processUploadedImages()函數導出,以便在其他文件中引用它。

保存並關閉您的文件。

返回到index.js文件:

  1. nano index.js

index.js文件中複製突出顯示的行,然後從此文件中刪除它們。稍後您將需要複製的代碼,請將其保存到剪貼板。如果您正在使用nano,您可以突出顯示這些行,然後右鍵單擊鼠標以複製這些行:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage))
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
...
  res.redirect("/result");
});

upload路由的post方法現在將匹配以下內容:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

保存並關閉此文件,然後打開utils.js文件:

  1. nano utils.js

在您的utils.js文件中,將剛剛複製的/upload路由回調的代碼粘貼到processUploadedImages函數中:

image_processor/utils.js
...
function processUploadedImages(job) {
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}
...

現在,您已經移動了處理圖片代碼,需要更新它以使用您之前定義的processUploadedImages()函數的job參數中的圖片數據。

為此,請添加並更新下面突出顯示的行:

image_processor/utils.js

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);
  ...
}

您可以使用Buffer.from()方法將圖像數據的字符串版本轉換回二進制。然後,您可以使用保存在隊列中的圖像名稱更新path.parse()。之後,您可以更新sharp()方法以使用保存在imageFileData變量中的圖像二進制數據。

現在,完整的utils.js文件將如下所示:

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}

module.exports = { processUploadedImages };

保存並關閉文件,然後返回index.js

  1. nano index.js

由於圖像現在在utils.js文件中處理,因此不再需要sharp變量作為依賴。從文件中刪除突出顯示的行:

image_processor/index.js
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
...

保存並關閉文件。

您現在已經定義了在Redis中創建隊列並添加作業的功能。您還定義了processUploadedImages()函數來處理上傳的圖像。

剩下的任務是創建一個消費者(或工作者),它將從隊列中拉取作業並使用作業數據調用processUploadedImages()函數。

在您的編輯器中創建一個worker.js文件:

  1. nano worker.js

在您的worker.js文件中,添加以下代碼:

image_processor/worker.js
const { Worker } = require("bullmq");

const { processUploadedImages } = require("./utils");

const workerHandler = (job) => {
  console.log("Starting job:", job.name);
  processUploadedImages(job.data);
  console.log("Finished job:", job.name);
  return;
};

在第一行,您從bullmq中導入Worker類;當實例化時,這將啟動一個工作者,該工作者從Redis中的隊列中出列作業並執行它們。接下來,您從utils.js文件中引用processUploadedImages()函數,以便工作者可以使用隊列中的數據調用該函數。

你定義了一個 `workerHandler()` 函數,它接受一個 `job` 參數,其中包含佇列中的作業數據。在這個函數中,你記錄了作業已經開始,然後使用作業數據調用 `processUploadedImages()`。之後,你記錄了一個成功的消息並返回 `null`。

為了讓工作者能夠連接到Redis,從佇列中出列一個作業,並使用作業數據調用 `workerHandler()`,請在文件中添加以下行:

image_processor/worker.js
...
const workerOptions = {
  connection: {
    host: "localhost",
    port: 6379,
  },
};

const worker = new Worker("imageJobQueue", workerHandler, workerOptions);

console.log("Worker started!");

在這裡,你將 `workerOptions` 變量設置為包含Redis連接設置的對象。你將 `worker` 變量設置為 `Worker` 類的一個實例,它帶有以下參數:

  • `imageJobQueue`:作業佇列的名稱。
  • `workerHandler`:當從Redis佇列出列作業後將運行的函數。
  • `workerOptions`:工作者用於與Redis建立連接的Redis配置設置。

最後,你記錄了一個成功的消息。

添加完這些行後,保存並關閉你的文件。

你現在已經定義了 `bullmq` 工作者功能,用於從佇列中出列作業並執行它們。

在終端中,刪除 `public/images` 目錄中的圖像,以便你可以為測試你的應用程序重新開始:

  1. rm public/images/*

接下來,運行 `index.js` 文件:

  1. node index.js

應用程序將啟動:

Output
Server running on port 3000

現在,你將開始工作者。打開第二個終端會話,並導航到項目目錄:

  1. cd image_processor/

開始工作的命令是:

  1. node worker.js

工作程序將啟動:

Output
Worker started!

在瀏覽器中訪問http://localhost:3000/。點擊選擇文件按鈕,選擇來自您計算機的underwater.png,然後按上傳圖片按鈕。

您可能會在幾秒鐘後收到一條即時響應,告訴您在刷新頁面之前等待:

或者,您可能會立即收到一些處理中的圖像,而其他圖像仍在處理中:

您可以刷新頁面幾次以加載所有調整大小的圖像。

返回正在運行工作程序的終端。該終端將顯示與以下相匹配的消息:

Output
Worker started! Starting job: processUploadedImages Finished job: processUploadedImages

該輸出確認bullmq成功運行了作業。

即使工作程序未運行,您的應用程序仍然可以卸載耗時的任務。為了演示這一點,在第二個終端中使用CTRL+C停止工作程序。

在您的初始終端會話中,停止Express服務器並刪除public/images中的圖像:

  1. rm public/images/*

之後,再次啟動服務器:

  1. node index.js

在瀏覽器中,訪問http://localhost:3000/並重新上傳underwater.png圖像。當您被重定向到/result路徑時,圖像將不會顯示在頁面上,因為工作程序未運行:

返回您運行工作程序的終端,然後再次啟動工作程序:

  1. node worker.js

輸出將與以下相匹配,這讓您知道作業已經開始:

Output
Worker started! Starting job: processUploadedImages

完成工作並且輸出包含一行文字完成工作: processUploadedImages後,請重新整理瀏覽器。 現在圖像將加載:

停止服務器和工作程序。

您現在可以將耗時的任務卸載到後台並使用bullmq異步執行它。在下一步中,您將設置一個儀表板來監控佇列的狀態。

步驟 4 — 添加儀表板以監控bullmq佇列

在此步驟中,您將使用bull-board套件來從視覺儀表板監控 Redis 佇列中的作業。此套件將自動創建一個用戶界面(UI)儀表板,顯示和組織有關存儲在 Redis 佇列中的bullmq作業的信息。使用您的瀏覽器,您可以監控已完成的作業、正在等待的作業或失敗的作業,而無需在終端中打開 Redis CLI。

在您的文本編輯器中打開index.js文件:

  1. nano index.js

添加以下突出顯示的代碼來導入bull-board

image_processor/index.js
...
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
...

在前面的代碼中,您從bull-board導入createBullBoard()方法。同時,您還導入了BullMQAdapter,該適配器允許bull-board訪問bullmq隊列,以及ExpressAdapter,它提供了Express顯示儀表板的功能。

接下來,在代碼中添加突出顯示的部分,以將bull-boardbullmq連接起來:

image_processor/index.js
...
async function addJob(job) {
  ...
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
...

首先,將serverAdapter設置為ExpressAdapter的一個實例。然後,調用createBullBoard()來使用bullmq隊列數據初始化儀表板。您向該函數傳遞一個包含queuesserverAdapter屬性的對象參數。第一個屬性queues接受一個包含您使用bullmq定義的隊列的數組,這裡是imageJobQueue。第二個屬性serverAdapter包含一個對象,該對象接受一個Express伺服器適配器的實例。之後,使用setBasePath()方法將/admin路徑設置為訪問儀表板的路徑。

接下來,在/admin路由上添加serverAdapter中間件:

image_processor/index.js
app.use(express.static("public"))

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  ...
});

完整的index.js文件將如下所示:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);
app.use(fileUpload());

app.use(express.static("public"));

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: Buffer.from(image.data).toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

在完成更改後,保存並關閉文件。

運行index.js文件:

  1. node index.js

返回到您的瀏覽器,訪問http://localhost:3000/admin。儀表板將加載:

在儀表板中,您可以審查工作類型、它消耗的數據以及有關工作的更多信息。您還可以切換到其他標籤,例如已完成標籤查看已完成工作的信息,失敗標籤查看失敗工作的更多信息,以及暫停標籤查看已被暫停工作的更多信息。

您現在可以使用bull-board儀表板來監控隊列。

結論

在本文中,您使用bullmq將一項耗時的任務卸載到工作隊列中。首先,在未使用bullmq的情況下,您創建了一個包含耗時任務的應用,該任務具有慢速的請求/響應週期。然後您使用bullmq卸載耗時任務並異步執行,這提高了請求/響應週期。之後,您使用bull-board創建了一個儀表板來監控Redis中的bullmq隊列。

您可以訪問bullmq文檔以了解本教程未涵蓋的bullmq功能,例如排程、優先級或重試工作,以及配置工人的並發設置。您也可以訪問bull-board文檔以了解更多關於儀表板功能。

Source:
https://www.digitalocean.com/community/tutorials/how-to-handle-asynchronous-tasks-with-node-js-and-bullmq