如何使用Node.js和BullMQ处理异步任务

作者选择了女性工程师协会作为为捐赠而写计划的捐赠对象。

介绍

网络应用程序具有请求/响应循环。当您访问一个URL时,浏览器会向运行应用程序的服务器发送请求,该服务器会处理数据或在数据库中运行查询。在此过程中,用户会被保持等待,直到应用程序返回响应。对于一些任务,用户可以很快地得到响应;对于耗时的任务,例如处理图像、分析数据、生成报告或发送电子邮件,这些任务需要很长时间才能完成,可能会减慢请求/响应循环的速度。例如,假设您有一个应用程序,用户可以上传图像。在这种情况下,您可能需要调整大小、压缩或将图像转换为其他格式,以节省服务器的磁盘空间,然后再向用户显示图像。处理图像是一个消耗CPU资源的任务,它会阻塞一个Node.js线程,直到任务完成。这可能需要几秒钟或几分钟。用户必须等待任务完成,才能从服务器获得响应。

为了避免减慢请求/响应循环,您可以使用 bullmq,这是一个分布式任务(作业)队列,允许您将耗时任务从您的 Node.js 应用程序卸载到 bullmq 中,释放请求/响应循环。该工具使您的应用程序能够快速向用户发送响应,而 bullmq 则在后台异步执行任务,与您的应用程序独立运行。为了跟踪作业,bullmq 使用 Redis 在队列中存储每个作业的简要描述。然后,bullmq worker 出队并执行队列中的每个作业,完成后标记为完成。

在本文中,您将使用 bullmq 将耗时任务卸载到后台,从而使应用程序能够快速响应用户。首先,您将创建一个包含耗时任务的应用程序,而不使用 bullmq。然后,您将使用 bullmq 异步执行任务。最后,您将安装一个可视化仪表板以管理在 Redis 队列中的 bullmq 作业。

先决条件

要按照本教程操作,您需要以下内容:

步骤 1 — 设置项目目录

在这一步中,您将创建一个目录并安装应用程序所需的依赖项。在本教程中,您将构建的应用程序允许用户上传图像,然后使用 sharp 包进行处理。图像处理需要大量时间,可能会减缓请求/响应周期,因此使用 bullmq 将其转移到后台是个不错的选择。您将使用的转移任务的技术也适用于其他耗时的任务。

首先,创建一个名为 image_processor 的目录并进入该目录:

  1. mkdir image_processor && cd image_processor

然后,将目录初始化为一个 npm 包:

  1. npm init -y

该命令会创建一个 package.json 文件。选项 -y 告诉 npm 接受所有默认值。

运行该命令后,您的输出将匹配以下内容:

Output
Wrote to /home/sammy/image_processor/package.json: { "name": "image_processor", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC" }

输出确认 package.json 文件已创建。重要属性包括您的应用程序名称 (name)、应用程序版本号 (version) 和项目的起始点 (main)。如果您想了解更多关于其他属性的信息,可以查看 npm 的 package.json 文档

您将在本教程中构建的应用程序将需要以下依赖项:

  • express:用于构建 Web 应用程序的 Web 框架。
  • express-fileupload:允许您的表单上传文件的中间件。
  • sharp:图像处理库。
  • ejs:一种模板语言,可让您使用 Node.js 生成 HTML 标记。
  • bullmq:分布式任务队列。
  • bull-board:在 bullmq 基础上构建的仪表板,并以良好的用户界面(UI)显示作业的状态。

要安装所有这些依赖项,请运行以下命令:

  1. npm install express express-fileupload sharp ejs bullmq @bull-board/express

除了您安装的依赖项外,您还将在本教程的后面使用以下图像:

使用 curl 将图像下载到本地计算机上您选择的位置

  1. curl -O https://deved-images.nyc3.cdn.digitaloceanspaces.com/CART-68886/underwater.png

您已经有了构建不包含 bullmq 的 Node.js 应用程序所需的必要依赖项,接下来您将执行此操作。

第2步 —— 在没有 bullmq 的情况下实现耗时任务

在这一步中,您将使用Express构建一个应用程序,允许用户上传图像。该应用程序将使用 sharp 启动一个耗时任务,将图像调整为多个尺寸,然后在响应发送后显示给用户。这一步将帮助您了解耗时任务如何影响请求/响应周期。

使用 nano 或您首选的文本编辑器创建 index.js 文件:

  1. nano index.js

在您的 index.js 文件中,添加以下代码以导入依赖项:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

在第一行中,您导入 path 模块以使用Node计算文件路径。在第二行中,您导入 fs 模块以与目录交互。然后导入 express web框架。您导入 body-parser 模块以添加中间件以解析HTTP请求中的数据。接下来,您导入 sharp 模块进行图像处理。最后,导入 express-fileupload 以处理来自HTML表单的上传。

接下来,添加以下代码以在应用程序中实现中间件:

image_processor/index.js
...
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

首先,将app变量设置为Express的实例。其次,使用app变量,set()方法配置Express以使用ejs模板语言。然后,使用use()方法添加body-parser模块中间件,将HTTP请求中的JSON数据转换为JavaScript可访问的变量。接下来的一行,您可以使用URL编码输入做同样的操作。

然后,添加以下代码以添加更多中间件来处理文件上传和提供静态文件:

image_processor/index.js
...
app.use(fileUpload());
app.use(express.static("public"));

通过调用fileUpload()方法添加解析上传文件的中间件,并设置Express将查找并提供静态文件(如图像和CSS)的目录。

有了中间件设置后,创建一个路由来显示上传图像的HTML表单:

image_processor/index.js
...
app.get("/", function (req, res) {
  res.render("form");
});

在这里,您使用Express模块的get()方法指定/路由,并在用户访问主页或/路由时应运行的回调函数。在回调函数中,调用res.render()来渲染views目录中的form.ejs文件。您还没有创建form.ejs文件或views目录。

要创建它,首先保存并关闭您的文件。在终端中,输入以下命令以在项目根目录中创建views目录:

  1. mkdir views

进入views目录:

  1. cd views

在您的编辑器中创建form.ejs文件:

  1. nano form.ejs

在您的form.ejs文件中,添加以下代码以创建表单:

image_processor/views/form.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="home-wrapper">
      <h1>Image Processor</h1>
      <p>
        Resizes an image to multiple sizes and converts it to a
        <a href="https://en.wikipedia.org/wiki/WebP">webp</a> format.
      </p>
      <form action="/upload" method="POST" enctype="multipart/form-data">
        <input
          type="file"
          name="image"
          placeholder="Select image from your computer"
        />
        <button type="submit">Upload Image</button>
      </form>
    </div>
  </body>
</html>

首先,您需要引用尚未创建的head.ejs文件。head.ejs文件将包含您可以在其他HTML页面中引用的HTML head元素。

body标记中,您需要创建一个带有以下属性的表单:

  • action指定表单数据在提交时应发送到的路由。
  • method指定发送数据的HTTP方法。 POST方法将数据嵌入到HTTP请求中。
  • enctype指定表单数据的编码方式。值multipart/form-data使HTML input元素能够上传文件数据。

form元素中,您需要创建一个input标记来上传文件。然后,您可以定义button元素,其中type属性设置为submit,以便您可以提交表单。

完成后,请保存并关闭您的文件。

接下来,创建一个head.ejs文件:

  1. nano head.ejs

在您的head.ejs文件中,添加以下代码以创建应用程序的头部部分:

image_processor/views/head.ejs
<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=edge" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Image Processor</title>
  <link rel="stylesheet" href="css/main.css" />
</head>

在这里,您需要引用稍后将在此步骤中创建的public目录中的main.css文件。该文件将包含此应用程序的样式。暂时,您将继续设置静态资产的流程。

保存并关闭文件。

要处理从表单提交的数据,您必须在Express中定义一个post方法。为此,请返回到项目的根目录:

  1. cd ..

再次打开您的index.js文件:

  1. nano index.js

在您的index.js文件中,添加以下突出显示的行来定义处理路由/upload上表单提交的方法:

image_processor/index.js
app.get("/", function (req, res) {
  ...
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

});

您使用app变量调用post()方法,该方法将处理/upload路由上提交的表单。接下来,您从HTTP请求中提取上传的图像数据到image变量中。然后,如果用户未上传图像,您设置一个响应以返回400状态码。

为设置上传图像的处理过程,添加以下突出显示的代码:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
});

这些行代表您的应用程序将如何处理图像。首先,您从上传的图像中删除图像扩展名,并将名称保存在imageName变量中。接下来,您定义processImage()函数。此函数接受size参数,其值将用于确定调整大小期间的图像尺寸。在函数中,您使用sharp()调用image.data,这是一个包含上传图像的二进制数据的缓冲区sharp根据size参数的值调整图像大小。您使用sharpwebp()方法将图像转换为webp图像格式。然后,您将图像保存在public/images/目录中。

接下来的数字列表定义了用于调整上传图像大小的尺寸。然后,您可以使用JavaScript的map()方法来调用processImage(),对sizes数组中的每个元素进行处理,之后它将返回一个新数组。每次map()方法调用processImage()函数时,它都会返回一个对新数组的Promise。您可以使用Promise.all()方法来解析它们。

计算机处理速度各不相同,用户可以上传的图像大小也会不同,这可能会影响图像处理速度。为了演示目的延迟此代码,插入以下突出显示的行以添加一个CPU密集型的增量循环和一个重定向到一个页面,该页面将显示调整大小后的图像:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  ...
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

循环将运行100亿次以增加counter变量。您调用res.redirect()函数将应用程序重定向到/result路由。该路由将呈现一个HTML页面,该页面将显示public/images目录中的图像。

/result 路由目前不存在。要创建它,请在您的index.js文件中添加以下突出显示的代码:

image_processor/index.js
...

app.get("/", function (req, res) {
 ...
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  ...
});

您使用app.get()方法定义/result路由。在函数中,您使用完整路径定义imgDirPath变量,指向public/images目录。您使用fs模块的readdirSync()方法读取给定目录中的所有文件。然后,您链式调用map()方法,返回以images/为前缀的新数组,其中包含图像路径。

最后,您调用res.render()来渲染result.ejs文件,该文件尚不存在。您将包含所有图像相对路径的数组imgFiles变量传递给result.ejs文件。

保存并关闭您的文件。

要创建result.ejs文件,请返回views目录:

  1. cd views

在编辑器中创建并打开result.ejs文件:

  1. nano result.ejs

result.ejs文件中添加以下行以显示图像:

image_processor/views/result.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="gallery-wrapper">
      <% if (imgFiles.length > 0){%>
      <p>The following are the processed images:</p>
      <ul>
        <% for (let imgFile of imgFiles){ %>
        <li><img src=<%= imgFile %> /></li>
        <% } %>
      </ul>
      <% } else{ %>
      <p>
        The image is being processed. Refresh after a few seconds to view the
        resized images.
      </p>
      <% } %>
    </div>
  </body>
</html>

首先,引用head.ejs文件。在body标签中,检查imgFiles变量是否为空。如果有数据,您会遍历每个文件,并为每个数组元素创建一个图像。如果imgFiles为空,您会打印一条消息,告诉用户稍后几秒钟后刷新以查看调整大小的图像。

保存并关闭您的文件。

接下来,返回根目录并创建包含静态资源的public目录:

  1. cd .. && mkdir public

移动到public目录:

  1. cd public

创建一个images目录,用于保存上传的图像:

  1. mkdir images

接下来,创建css目录并导航到该目录:

  1. mkdir css && cd css

在你的编辑器中,创建并打开main.css文件,这是你之前在head.ejs文件中引用的:

  1. nano main.css

在你的main.css文件中,添加以下样式:

image_processor/public/css/main.css
body {
  background: #f8f8f8;
}

h1 {
  text-align: center;
}

p {
  margin-bottom: 20px;
}

a:link,
a:visited {
  color: #00bcd4;
}

/** "选择文件" 按钮的样式 **/
button[type="submit"] {
  background: none;
  border: 1px solid orange;
  padding: 10px 30px;
  border-radius: 30px;
  transition: all 1s;
}

button[type="submit"]:hover {
  background: orange;
}

/** "上传图片" 按钮的样式 **/
input[type="file"]::file-selector-button {
  border: 2px solid #2196f3;
  padding: 10px 20px;
  border-radius: 0.2em;
  background-color: #2196f3;
}

ul {
  list-style: none;
  padding: 0;
  display: flex;
  flex-wrap: wrap;
  gap: 20px;
}

.home-wrapper {
  max-width: 500px;
  margin: 0 auto;
  padding-top: 100px;
}

.gallery-wrapper {
  max-width: 1200px;
  margin: 0 auto;
}

这些行将为应用程序中的元素添加样式。使用HTML属性,你可以将选择文件按钮的背景样式设置为十六进制代码#2196f3(一种蓝色)并将上传图片按钮的边框样式设置为orange。你还可以为/result路由上的元素添加样式,使它们更具吸引力。

完成后,保存并关闭文件。

返回到项目根目录:

  1. cd ../..

在你的编辑器中打开index.js

  1. nano index.js

在你的index.js中,添加以下代码,用于启动服务器:

image_processor/index.js
...
app.listen(3000, function () {
  console.log("Server running on port 3000");
});

现在,完整的index.js文件将如下所示:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

app.use(fileUpload());

app.use(express.static("public"));

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

完成所有更改后,保存并关闭文件。

使用node命令运行应用程序:

  1. node index.js

你将收到以下输出:

Output
Server running on port 3000

这个输出确认服务器正在运行且没有任何问题。

打开您首选的浏览器,访问http://localhost:3000/

注意:如果您正在远程服务器上进行教程,可以使用端口转发在本地浏览器中访问该应用程序。

当Node.js服务器正在运行时,打开另一个终端并输入以下命令:

  1. ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip

一旦连接到服务器,运行node index.js然后在您本地机器的Web浏览器中导航至http://localhost:3000/

页面加载后,会匹配以下内容:

接下来,点击选择文件按钮,然后在您本地机器上选择underwater.png图像。显示将从未选择文件切换到underwater.png。之后,点击上传图片按钮。应用程序会加载一段时间,因为它处理图像并运行增量循环。

任务完成后,将加载/result路由与调整大小的图像:

现在可以使用CTRL+C停止服务器。当文件更改时,Node.js不会自动重新加载服务器,因此您需要在更新文件时停止并重新启动服务器。

您现在知道一个耗时的任务如何影响应用程序的请求/响应周期。接下来,您将异步执行该任务。

第3步 – 使用bullmq异步执行耗时任务

在这一步中,您将使用bullmq将耗时任务转移到后台。这个调整将释放请求/响应循环,并允许您的应用在图像处理过程中立即响应用户。

为此,您需要创建工作的简要描述,并将其添加到bullmq队列中。一个队列是一种类似于现实生活中队列的数据结构。当人们排队进入一个空间时,排在最前面的人将是第一个进入空间的人。稍后到达的任何人都将排在队列的末尾,并将在排在他们前面的每个人之后进入空间,直到最后一个人进入空间。使用队列数据结构的先进先出(FIFO)过程,添加到队列的第一项是被移除的第一项(出队)。通过bullmq,一个生产者将在队列中添加一个作业,而消费者(或工作者)将从队列中移除作业并执行它。

bullmq中的队列位于Redis中。当您描述一个作业并将其添加到队列时,Redis队列中会创建一个作业条目。作业描述可以是一个字符串或一个具有包含最小数据或引用数据的属性的对象,这些数据将允许bullmq稍后执行该作业。一旦您定义了将作业添加到队列的功能,就将耗时的代码移动到一个单独的函数中。稍后,bullmq将使用您在队列中存储的数据调用此函数,当作业被出列时。一旦任务完成,bullmq将标记为已完成,然后从队列中拉取另一个作业并执行它。

在编辑器中打开index.js:

  1. nano index.js

在您的index.js文件中,添加下面突出显示的行以在Redis中使用bullmq创建一个队列:

image_processor/index.js
...
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}
...

您首先从bullmq中提取Queue类,该类用于在Redis中创建一个队列。然后,您将redisOptions变量设置为一个具有Queue类实例将用于与Redis建立连接的属性的对象。您将host属性值设置为localhost,因为Redis正在您的本地计算机上运行。

注意:如果Redis在与您的应用程序分开的远程服务器上运行,您将更新host属性值为远程服务器的IP地址。您还将port属性值设置为6379,这是Redis用于侦听连接的默认端口。

如果您已经设置了端口转发到远程运行Redis和应用程序的服务器,则无需更新host属性,但您需要在每次登录服务器运行应用程序时使用端口转发连接。

接下来,您将imageJobQueue变量设置为Queue类的实例,将队列的名称作为第一个参数,将对象作为第二个参数。该对象具有一个connection属性,其值设置为redisOptions变量中的对象。在实例化Queue类之后,将在Redis中创建一个名为imageJobQueue的队列。

最后,您定义addJob()函数,该函数用于向imageJobQueue中添加作业。该函数接受一个包含作业信息的job参数(您将使用希望保存在队列中的数据调用addJob()函数)。在函数中,调用imageJobQueueadd()方法,将作业的名称作为第一个参数,作业数据作为第二个参数。

将突出显示的代码添加到调用addJob()函数以在队列中添加作业:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  ...
  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

在这里,您使用描述作业的对象调用addJob()函数。该对象具有type属性,其值为作业名称。第二个属性image设置为包含用户上传的图像数据的对象。因为image.data中的图像数据是以缓冲区(二进制形式)的形式存在的,您需要调用JavaScript的toString()方法将其转换为可以存储在Redis中的字符串,这将将data属性设置为结果。image属性设置为已上传图像的名称(包括图像扩展名)。

您现在已经定义了bullmq以便稍后执行此作业所需的信息。根据您的作业,您可能会添加更多或更少的作业信息。

警告:由于Redis是一个内存数据库,避免在队列中存储大量的作业数据。如果有一个大文件需要作业处理,请将文件保存在磁盘或云中,然后将文件的链接保存为队列中的字符串。当bullmq执行作业时,它将从Redis中保存的链接中获取文件。

保存并关闭您的文件。

接下来,创建并打开utils.js文件,其中将包含图像处理代码:

  1. nano utils.js

在您的utils.js文件中,添加以下代码以定义处理图像的函数:

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
}

module.exports = { processUploadedImages };

您在前两行导入了处理图像和计算路径所需的模块。然后定义了processUploadedImages()函数,其中包含耗时的图像处理任务。此函数接受一个job参数,当工作者从队列中获取作业数据时,该参数将被填充,然后使用队列数据调用processUploadedImages()函数。您还导出了processUploadedImages()函数,以便在其他文件中引用。

保存并关闭您的文件。

返回index.js文件:

  1. nano index.js

index.js文件中复制突出显示的行,然后从此文件中删除它们。您稍后会需要复制的代码,所以将其保存到剪贴板中。如果您正在使用nano,您可以突出显示这些行,并使用鼠标右键单击复制这些行:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage))
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
...
  res.redirect("/result");
});

upload路由的post方法现在将匹配以下内容:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

保存并关闭此文件,然后打开utils.js文件:

  1. nano utils.js

在您的utils.js文件中,将刚刚复制的/upload路由回调的行粘贴到processUploadedImages函数中:

image_processor/utils.js
...
function processUploadedImages(job) {
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}
...

现在,您已经移动了处理图像的代码,需要更新它以使用您之前定义的processUploadedImages()函数的job参数中的图像数据。

为此,请添加并更新下面突出显示的行:

image_processor/utils.js

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);
  ...
}

您可以使用Buffer.from()方法将图像数据的字符串版本转换回二进制。然后,您可以更新path.parse(),引用保存在队列中的图像名称。接下来,您需要更新sharp()方法,以接受存储在imageFileData变量中的图像二进制数据。

现在完整的utils.js文件应如下所示:

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}

module.exports = { processUploadedImages };

保存并关闭文件,然后返回到index.js

  1. nano index.js

由于图像现在在utils.js文件中处理,sharp变量不再需要作为依赖项。从文件中删除突出显示的行:

image_processor/index.js
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
...

保存并关闭文件。

您现在已经定义了在Redis中创建队列并添加作业的功能。您还定义了processUploadedImages()函数来处理上传的图像。

剩下的任务是创建一个消费者(或工作者),该消费者将从队列中拉取作业,并使用作业数据调用processUploadedImages()函数。

在您的编辑器中创建一个worker.js文件:

  1. nano worker.js

在您的worker.js文件中,添加以下代码:

image_processor/worker.js
const { Worker } = require("bullmq");

const { processUploadedImages } = require("./utils");

const workerHandler = (job) => {
  console.log("Starting job:", job.name);
  processUploadedImages(job.data);
  console.log("Finished job:", job.name);
  return;
};

在第一行,您从bullmq中导入Worker类;当实例化时,这将启动一个工作者,从Redis队列中出列作业并执行它们。接下来,您引用utils.js文件中的processUploadedImages()函数,以便工作者可以使用队列中的数据调用该函数。

你定义一个workerHandler()函数,接受一个包含队列中作业数据的job参数。在函数中,你记录作业已启动,然后调用processUploadedImages()并传入作业数据。之后,你记录一个成功消息并返回null

为了让工作进程连接到Redis,从队列中取出一个作业并调用workerHandler()处理作业数据,将以下代码添加到文件中:

image_processor/worker.js
...
const workerOptions = {
  connection: {
    host: "localhost",
    port: 6379,
  },
};

const worker = new Worker("imageJobQueue", workerHandler, workerOptions);

console.log("Worker started!");

在这里,你将workerOptions变量设置为一个包含Redis连接设置的对象。你将worker变量设置为Worker类的实例,并传入以下参数:

  • imageJobQueue:作业队列的名称。
  • workerHandler:从Redis队列中取出作业后将运行的函数。
  • workerOptions:工作进程用于与Redis建立连接的Redis配置设置。

最后,你记录一个成功消息。

添加完代码后,保存并关闭你的文件。

现在你已经定义了从队列中取出作业并执行的bullmq工作进程功能。

在你的终端中,删除public/images目录中的图像,以便你可以为测试你的应用程序开始新的:

  1. rm public/images/*

接下来,运行index.js文件:

  1. node index.js

应用程序将启动:

Output
Server running on port 3000

现在你将启动工作进程。打开第二个终端会话并导航到项目目录:

  1. cd image_processor/

开始工作人员,请使用以下命令:

  1. node worker.js

工作人员将启动:

Output
Worker started!

在浏览器中访问http://localhost:3000/。点击选择文件按钮并从计算机中选择underwater.png,然后点击上传图片按钮。

您可能会收到即时响应,告诉您在几秒钟后刷新页面:

或者,您可能会收到一些处理后的图像,而其他图像仍在处理中:

您可以多次刷新页面以加载所有调整大小后的图像。

返回运行工作人员的终端。该终端将显示与以下内容匹配的消息:

Output
Worker started! Starting job: processUploadedImages Finished job: processUploadedImages

输出确认bullmq成功运行了作业。

即使工作人员没有运行,您的应用程序仍可以卸载耗时的任务。为了演示这一点,使用CTRL+C停止第二个终端中的工作人员。

在您的初始终端会话中,停止Express服务器并删除public/images中的图像:

  1. rm public/images/*

之后,再次启动服务器:

  1. node index.js

在浏览器中访问http://localhost:3000/并再次上传underwater.png图像。当您被重定向到/result路径时,图像将不会显示在页面上,因为工作人员没有运行:

返回您运行工作人员的终端并重新启动工作人员:

  1. node worker.js

输出将与以下内容匹配,让您知道作业已经开始:

Output
Worker started! Starting job: processUploadedImages

完成工作后,如果输出包含一行文字 `Finished job: processUploadedImages`,请刷新浏览器。现在图片将会加载:

停止服务器和工作进程。

您现在可以将耗时任务转移到后台,并使用 bullmq 异步执行。接下来,您将设置一个仪表板来监控队列的状态。

第四步 —— 添加监控 bullmq 队列的仪表板

在这一步中,您将使用 bull-board 包来从可视化仪表板监控 Redis 队列中的作业。该包将自动创建一个用户界面(UI)仪表板,显示和组织关于存储在 Redis 队列中的 bullmq 作业的信息。通过浏览器,您可以监控已完成的作业、正在等待的作业或失败的作业,而无需在终端中打开 Redis CLI。

打开您的文本编辑器中的 index.js 文件:

  1. nano index.js

添加突出显示的代码以导入 bull-board

image_processor/index.js
...
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
...

在上面的代码中,你从`bull-board`中导入了`createBullBoard()`方法。你还导入了`BullMQAdapter`,它允许`bull-board`访问`bullmq`队列,并导入了`ExpressAdapter`,它提供了Express显示仪表板的功能。

接下来,将下面的代码添加到连接`bull-board`和`bullmq`的地方:

image_processor/index.js
...
async function addJob(job) {
  ...
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
...

首先,将`serverAdapter`设置为`ExpressAdapter`的实例。然后,调用`createBullBoard()`来使用`bullmq`队列数据初始化仪表板。你将一个对象参数传递给函数,其中包含`queues`和`serverAdapter`属性。第一个属性`queues`接受一个由你用`bullmq`定义的队列组成的数组,这里是`imageJobQueue`。第二个属性`serverAdapter`包含一个对象,接受Express服务器适配器的实例。之后,将`/admin`路径设置为使用`setBasePath()`方法访问仪表板。

接下来,为`/admin`路由添加`serverAdapter`中间件:

image_processor/index.js
app.use(express.static("public"))

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  ...
});

完整的`index.js`文件如下:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);
app.use(fileUpload());

app.use(express.static("public"));

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: Buffer.from(image.data).toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

在完成更改后,保存并关闭文件。

运行`index.js`文件:

  1. node index.js

返回浏览器并访问`http://localhost:3000/admin`。仪表板将加载:

在仪表板中,您可以查看作业类型、其消耗的数据以及有关作业的更多信息。您还可以切换到其他选项卡,例如“已完成”选项卡,以获取有关已完成作业的信息,“失败”选项卡,以获取有关失败作业的更多信息,以及“已暂停”选项卡,以获取有关已暂停作业的更多信息。

现在,您可以使用bull-board仪表板监视队列。

结论

在本文中,您将一个耗时的任务转移到了使用bullmq的作业队列中。首先,您创建了一个具有较慢请求/响应周期的耗时任务的应用程序,而没有使用bullmq。然后,您使用bullmq将耗时任务转移到异步执行,从而提高了请求/响应周期。之后,您使用bull-board创建了一个仪表板,用于监视Redis中的bullmq队列。

您可以访问bullmq文档,了解本教程未涵盖的bullmq功能,例如调度、设置作业优先级或重试作业以及为工作人员配置并发设置。您还可以访问bull-board文档,了解有关仪表板功能的更多信息。

Source:
https://www.digitalocean.com/community/tutorials/how-to-handle-asynchronous-tasks-with-node-js-and-bullmq