Как обрабатывать асинхронные задачи с помощью Node.js и BullMQ

Автор выбрал Общество женщин-инженеров для получения пожертвования в рамках программы Пишите в поддержку.

Введение

Веб-приложения имеют циклы запроса/ответа. Когда вы посещаете URL-адрес, браузер отправляет запрос на сервер, запускающий приложение, которое обрабатывает данные или выполняет запросы в базе данных. В это время пользователь ожидает ответа, пока приложение не вернет ответ. Для некоторых задач пользователь может получить ответ быстро; для задач, требующих много времени, таких как обработка изображений, анализ данных, создание отчетов или отправка электронных писем, эти задачи занимают много времени и могут замедлить цикл запроса/ответа. Например, предположим, у вас есть приложение, в котором пользователи загружают изображения. В этом случае вам может потребоваться изменить размер, сжать или конвертировать изображение в другой формат, чтобы сохранить дисковое пространство сервера перед отображением изображения пользователю. Обработка изображения – это задача, требующая много ресурсов ЦП, которая может заблокировать поток Node.js, пока задача не будет завершена. Это может занять несколько секунд или минут. Пользователи должны ждать завершения задачи, чтобы получить ответ от сервера.

Чтобы избежать замедления цикла запроса/ответа, вы можете использовать bullmq, распределенную очередь задач (job), которая позволяет вам перенести трудоемкие задачи из вашего приложения на Node.js на bullmq, освобождая цикл запроса/ответа. Этот инструмент позволяет вашему приложению быстро отправлять ответы пользователю, в то время как bullmq выполняет задачи асинхронно в фоновом режиме и независимо от вашего приложения. Для отслеживания задач bullmq использует Redis, чтобы хранить краткое описание каждой задачи в очереди. Затем bullmq worker извлекает и выполняет каждую задачу в очереди, помечая ее как завершенную после выполнения.

В этой статье вы будете использовать bullmq, чтобы перенести трудоемкую задачу в фоновый режим, что позволит приложению быстро реагировать на пользователей. Сначала вы создадите приложение с трудоемкой задачей без использования bullmq. Затем вы будете использовать bullmq, чтобы выполнить задачу асинхронно. Наконец, вы установите визуальную панель управления задачами bullmq в очереди Redis.

Предварительные требования

Для выполнения этого руководства вам понадобится следующее:

Шаг 1 — Настройка каталога проекта

На этом этапе вы создадите каталог и установите необходимые зависимости для вашего приложения. Приложение, которое вы создадите в этом уроке, позволит пользователям загружать изображение, которое затем будет обрабатываться с использованием пакета sharp. Обработка изображений требует много времени и может замедлить цикл запроса/ответа, что делает эту задачу хорошим кандидатом для bullmq для оффлайн-загрузки в фоновом режиме. Техника, которую вы будете использовать для оффлайн-загрузки задачи, также будет работать для других задач, требующих много времени.

Для начала создайте каталог с именем image_processor и перейдите в него:

  1. mkdir image_processor && cd image_processor

Затем инициализируйте каталог как пакет npm:

  1. npm init -y

Команда создает файл package.json. Опция -y указывает npm принять все значения по умолчанию.

После выполнения команды ваш вывод будет соответствовать следующему:

Output
Wrote to /home/sammy/image_processor/package.json: { "name": "image_processor", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC" }

Вывод подтверждает, что файл package.json был создан. Важными свойствами являются имя вашего приложения (name), номер версии вашего приложения (version) и точка начала вашего проекта (main). Если вы хотите узнать больше о других свойствах, вы можете ознакомиться с документацией по package.json в npm.

Приложение, которое вы создадите в этом руководстве, потребует следующих зависимостей:

  • express: веб-фреймворк для создания веб-приложений.
  • express-fileupload: промежуточное ПО, позволяющее вашим формам загружать файлы.
  • sharp: библиотека обработки изображений.
  • ejs: язык шаблонов, который позволяет генерировать разметку HTML с помощью Node.js.
  • bullmq: распределенная очередь задач.
  • bull-board: панель инструментов, которая базируется на bullmq и отображает состояние задач с помощью удобного пользовательского интерфейса (UI).

Чтобы установить все эти зависимости, выполните следующую команду:

  1. npm install express express-fileupload sharp ejs bullmq @bull-board/express

Помимо установленных вами зависимостей, вы также будете использовать следующее изображение позже в этом руководстве:

Используйте curl, чтобы загрузить изображение в выбранное вами местоположение на вашем локальном компьютере

  1. curl -O https://deved-images.nyc3.cdn.digitaloceanspaces.com/CART-68886/underwater.png

У вас есть необходимые зависимости для создания приложения на Node.js без bullmq, что вы и сделаете далее.

Шаг 2 — Реализация времязатратной задачи без bullmq

На этом этапе вы создадите приложение с помощью Express, которое позволяет пользователям загружать изображения. Приложение запустит времязатратную задачу с использованием sharp для изменения размера изображения на несколько размеров, которые затем будут отображены пользователю после отправки ответа. Этот шаг поможет вам понять, как времязатратные задачи влияют на цикл запроса/ответа.

Используя nano или ваш текстовый редактор по умолчанию, создайте файл index.js:

  1. nano index.js

В вашем файле index.js добавьте следующий код для импорта зависимостей:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

На первой строке вы импортируете модуль path для вычисления путей к файлам с помощью Node. На второй строке вы импортируете модуль fs для взаимодействия с каталогами. Затем вы импортируете веб-фреймворк express. Вы импортируете модуль body-parser для добавления промежуточного программного обеспечения для разбора данных в HTTP-запросах. Затем вы импортируете модуль sharp для обработки изображений. Наконец, вы импортируете express-fileupload для обработки загрузок из HTML-формы.

Затем добавьте следующий код для реализации промежуточного программного обеспечения в вашем приложении:

image_processor/index.js
...
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

Сначала вы устанавливаете переменную app на экземпляр Express. Затем, используя переменную app, метод set() настраивает Express на использование языка шаблонов ejs. Затем вы добавляете промежуточное ПО модуля body-parser с помощью метода use(), чтобы преобразовать данные JSON в HTTP-запросах в переменные, к которым можно обращаться с помощью JavaScript. На следующей строке вы делаете то же самое с закодированным URL-вводом.

Затем добавьте следующие строки, чтобы добавить еще промежуточное ПО для обработки загрузки файлов и обслуживания статических файлов:

image_processor/index.js
...
app.use(fileUpload());
app.use(express.static("public"));

Вы добавляете промежуточное ПО для разбора загруженных файлов, вызывая метод fileUpload(), и устанавливаете каталог, где Express будет искать и обслуживать статические файлы, такие как изображения и CSS.

С установленным промежуточным ПО создайте маршрут, который отображает HTML-форму для загрузки изображения:

image_processor/index.js
...
app.get("/", function (req, res) {
  res.render("form");
});

Здесь вы используете метод get() модуля Express, чтобы указать маршрут / и функцию обратного вызова, которая должна выполняться, когда пользователь посещает домашнюю страницу или маршрут /. В обратном вызове вы вызываете res.render(), чтобы отрисовать файл form.ejs в каталоге views. Вы еще не создали файл form.ejs или каталог views.

Чтобы создать его, сначала сохраните и закройте файл. В терминале введите следующую команду, чтобы создать каталог views в корневом каталоге вашего проекта:

  1. mkdir views

Перейдите в каталог views:

  1. cd views

Создайте файл form.ejs в вашем редакторе:

  1. nano form.ejs

В вашем файле form.ejs добавьте следующий код для создания формы:

image_processor/views/form.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="home-wrapper">
      <h1>Image Processor</h1>
      <p>
        Resizes an image to multiple sizes and converts it to a
        <a href="https://en.wikipedia.org/wiki/WebP">webp</a> format.
      </p>
      <form action="/upload" method="POST" enctype="multipart/form-data">
        <input
          type="file"
          name="image"
          placeholder="Select image from your computer"
        />
        <button type="submit">Upload Image</button>
      </form>
    </div>
  </body>
</html>

Сначала вы ссылаетесь на файл head.ejs, который вы еще не создали. Файл head.ejs будет содержать элемент HTML head, на который вы можете ссылаться в других HTML-страницах.

В теге body создайте форму с следующими атрибутами:

  • action указывает маршрут, по которому данные формы должны быть отправлены при отправке формы.
  • method указывает метод HTTP для отправки данных. Метод POST встраивает данные в HTTP-запрос.
  • encytype указывает, как данные формы должны быть закодированы. Значение multipart/form-data позволяет элементам HTML input загружать файлы.

В элементе form создайте тег input для загрузки файлов. Затем определите элемент button с атрибутом type, установленным в submit, что позволяет отправлять формы.

После завершения сохраните и закройте файл.

Затем создайте файл head.ejs:

  1. nano head.ejs

В своем файле head.ejs добавьте следующий код, чтобы создать раздел заголовка приложения:

image_processor/views/head.ejs
<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=edge" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Image Processor</title>
  <link rel="stylesheet" href="css/main.css" />
</head>

Здесь вы ссылаетесь на файл main.css, который вы создадите в каталоге public позже на этом этапе. Этот файл будет содержать стили для этого приложения. Пока вы продолжите настройку процессов для статических ресурсов.

Сохраните и закройте файл.

Чтобы обработать данные, отправленные из формы, вы должны определить метод post в Express. Для этого вернитесь в корневой каталог вашего проекта:

  1. cd ..

Откройте ваш файл index.js снова:

  1. nano index.js

В вашем файле index.js добавьте выделенные строки для определения метода обработки отправки формы по маршруту /upload:

image_processor/index.js
app.get("/", function (req, res) {
  ...
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

});

Вы используете переменную app для вызова метода post(), который будет обрабатывать отправленную форму по маршруту /upload. Затем вы извлекаете данные загруженного изображения из HTTP-запроса в переменную image. После этого вы устанавливаете ответ для возврата статуса 400, если пользователь не загружает изображение.

Для настройки обработки загруженного изображения добавьте следующий выделенный код:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
});

Эти строки представляют, как ваше приложение будет обрабатывать изображение. Сначала вы удаляете расширение изображения из загруженного изображения и сохраняете имя в переменной imageName. Затем вы определяете функцию processImage(). Эта функция принимает параметр size, значение которого будет использоваться для определения размеров изображения во время изменения размера. В функции вы вызываете sharp() с image.data, который представляет собой буфер, содержащий двоичные данные для загруженного изображения. sharp изменяет размер изображения в соответствии со значением параметра size. Вы используете метод webp() из sharp для преобразования изображения в формат изображения webp. Затем вы сохраняете изображение в каталоге public/images/.

Последующий список чисел определяет размеры, которые будут использоваться для изменения размера загруженного изображения. Затем вы используете метод JavaScript map() для вызова processImage() для каждого элемента в массиве sizes, после чего он вернет новый массив. Каждый раз, когда метод map() вызывает функцию processImage(), он возвращает обещание новому массиву. Вы используете метод Promise.all() для их разрешения.

Скорости обработки компьютеров различаются, как и размер изображений, которые пользователь может загрузить, что может повлиять на скорость обработки изображений. Чтобы задержать этот код в демонстрационных целях, вставьте выделенные строки, чтобы добавить интенсивный по ресурсам цикл увеличения и перенаправление на страницу, которая будет отображать измененные размеры изображений с выделенными строками:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  ...
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

Цикл будет выполняться 10 миллиардов раз для увеличения переменной counter. Вы вызываете функцию res.redirect() для перенаправления приложения на маршрут /result. Маршрут будет отображать HTML-страницу, которая будет показывать изображения в директории public/images.

Маршрут /result еще не существует. Чтобы создать его, добавьте выделенный код в ваш файл index.js:

image_processor/index.js
...

app.get("/", function (req, res) {
 ...
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  ...
});

Вы определяете маршрут /result с использованием метода app.get(). В функции вы создаете переменную imgDirPath с полным путем к каталогу public/images. Используется метод readdirSync() модуля fs для чтения всех файлов в указанном каталоге. Затем вы применяете метод map() для создания нового массива с путями к изображениям, предварительно добавив префикс images/.

Наконец, вызывается res.render() для отображения файла result.ejs, которого пока не существует. В файл result.ejs передается переменная imgFiles, содержащая массив относительных путей ко всем изображениям.

Сохраните и закройте ваш файл.

Чтобы создать файл result.ejs, вернитесь в каталог views:

  1. cd views

Создайте и откройте файл result.ejs в вашем редакторе:

  1. nano result.ejs

В вашем файле result.ejs добавьте следующие строки для отображения изображений:

image_processor/views/result.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="gallery-wrapper">
      <% if (imgFiles.length > 0){%>
      <p>The following are the processed images:</p>
      <ul>
        <% for (let imgFile of imgFiles){ %>
        <li><img src=<%= imgFile %> /></li>
        <% } %>
      </ul>
      <% } else{ %>
      <p>
        The image is being processed. Refresh after a few seconds to view the
        resized images.
      </p>
      <% } %>
    </div>
  </body>
</html>

Сначала вы ссылаетесь на файл head.ejs. В теге body проверяется, пуста ли переменная imgFiles. Если у нее есть данные, вы перебираете каждый файл и создаете изображение для каждого элемента массива. Если imgFiles пуста, выводится сообщение, предлагающее пользователю Обновите страницу через несколько секунд, чтобы увидеть измененные изображения..

Сохраните и закройте ваш файл.

Затем вернитесь в корневой каталог и создайте каталог public, который будет содержать ваши статические ресурсы:

  1. cd .. && mkdir public

Переместите в каталог public:

  1. cd public

Создайте каталог images, который будет содержать загруженные изображения:

  1. mkdir images

Затем создайте каталог css и перейдите в него:

  1. mkdir css && cd css

В вашем редакторе создайте и откройте файл main.css, на который вы ссылаетесь ранее в файле head.ejs:

  1. nano main.css

В вашем файле main.css добавьте следующие стили:

image_processor/public/css/main.css
body {
  background: #f8f8f8;
}

h1 {
  text-align: center;
}

p {
  margin-bottom: 20px;
}

a:link,
a:visited {
  color: #00bcd4;
}

/** Стили для кнопки "Выбрать файл" **/
button[type="submit"] {
  background: none;
  border: 1px solid orange;
  padding: 10px 30px;
  border-radius: 30px;
  transition: all 1s;
}

button[type="submit"]:hover {
  background: orange;
}

/** Стили для кнопки "Загрузить изображение" **/
input[type="file"]::file-selector-button {
  border: 2px solid #2196f3;
  padding: 10px 20px;
  border-radius: 0.2em;
  background-color: #2196f3;
}

ul {
  list-style: none;
  padding: 0;
  display: flex;
  flex-wrap: wrap;
  gap: 20px;
}

.home-wrapper {
  max-width: 500px;
  margin: 0 auto;
  padding-top: 100px;
}

.gallery-wrapper {
  max-width: 1200px;
  margin: 0 auto;
}

Эти строки стилизуют элементы в приложении. Используя атрибуты HTML, вы стилизуете фон кнопки Выбрать файл с шестнадцатеричным кодом #2196f3 (оттенок синего цвета) и границу кнопки Загрузить изображение в цвет оранжевый. Вы также стилизуете элементы на маршруте /result, чтобы сделать их более представительными.

После завершения сохраните и закройте файл.

Вернитесь в корневой каталог проекта:

  1. cd ../..

Откройте index.js в вашем редакторе:

  1. nano index.js

В вашем файле index.js добавьте следующий код, который запустит сервер:

image_processor/index.js
...
app.listen(3000, function () {
  console.log("Server running on port 3000");
});

Теперь полный файл index.js будет соответствовать следующему:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

app.use(fileUpload());

app.use(express.static("public"));

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

После завершения внесения изменений сохраните и закройте файл.

Запустите приложение с помощью команды node:

  1. node index.js

Вы получите вывод следующего вида:

Output
Server running on port 3000

Этот вывод подтверждает, что сервер работает без проблем.

Откройте ваш браузер по умолчанию и перейдите по адресу http://localhost:3000/.

Примечание: Если вы следуете пошаговому руководству на удаленном сервере, вы можете получить доступ к приложению в своем локальном браузере с использованием переадресации портов.

Пока сервер Node.js работает, откройте еще один терминал и выполните следующую команду:

  1. ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip

Как только вы подключитесь к серверу, выполните команду node index.js, а затем перейдите по адресу http://localhost:3000/ в веб-браузере вашего локального компьютера.

Когда страница загрузится, она будет соответствовать следующему:

Затем нажмите кнопку Выбрать файл и выберите изображение underwater.png на вашем локальном компьютере. Отображение изменится с Нет выбранного файла на underwater.png. Затем нажмите кнопку Загрузить изображение. Приложение будет загружаться некоторое время, так как оно обрабатывает изображение и запускает инкрементный цикл.

После завершения задачи, маршрут /result загрузится с измененными изображениями:

Теперь вы можете остановить сервер с помощью CTRL+C. Node.js не автоматически перезагружает сервер при изменении файлов, поэтому вам нужно будет остановить и перезапустить сервер при каждом обновлении файлов.

Теперь вы знаете, как времязатратная задача может повлиять на цикл запроса/ответа приложения. Вы выполните следующую задачу асинхронно.

Шаг 3 — Выполнение времязатратных задач асинхронно с помощью bullmq

На этом этапе вы будете перемещать времязатратную задачу на фоновый план, используя bullmq. Это позволит освободить цикл запроса/ответа и позволит вашему приложению немедленно реагировать на пользователей в то время, как изображение обрабатывается.

Для этого вам нужно создать краткое описание задачи и добавить его в очередь с помощью bullmq. Очередь — это структура данных, работающая аналогично тому, как очередь работает в реальной жизни. Когда люди выстраиваются в очередь, чтобы войти в пространство, первый человек в очереди будет первым, кто войдет в пространство. Любой, кто приходит позже, выстроится в конце очереди и войдет в пространство после всех, кто стоит перед ними в очереди, пока в пространство не зайдет последний человек. С использованием процесса First-In, First-Out (FIFO) очередь данных, первый элемент, добавленный в очередь, является первым элементом, который будет удален (dequeue). С bullmq производитель добавляет задачу в очередь, а потребитель (или рабочий) удаляет задачу из очереди и выполняет ее.

Очередь в bullmq хранится в Redis. Когда вы описываете задачу и добавляете её в очередь, создаётся запись для задачи в очереди Redis. Описание задачи может быть строкой или объектом с свойствами, которые содержат минимальные данные или ссылки на данные, которые позволят bullmq выполнить задачу позже. После того как вы определите функциональность добавления задач в очередь, вы перемещаете затратный по времени код в отдельную функцию. Позже bullmq вызовет эту функцию с данными, которые вы сохранили в очереди, когда задача будет извлечена из очереди. После завершения задачи bullmq отметит её как завершённую, извлечёт следующую задачу из очереди и выполнит её.

Откройте index.js в вашем редакторе:

  1. nano index.js

В вашем файле index.js добавьте выделенные строки, чтобы создать очередь в Redis с помощью bullmq:

image_processor/index.js
...
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}
...

Вы начинаете с извлечения класса Queue из bullmq, который используется для создания очереди в Redis. Затем вы устанавливаете переменную redisOptions в объект с свойствами, которые экземпляр класса Queue будет использовать для установления соединения с Redis. Вы устанавливаете значение свойства host в localhost, потому что Redis работает на вашем локальном компьютере.

Примечание: Если Redis работает на удалённом сервере, отличном от вашего приложения, вы бы обновили значение свойства host на IP-адрес удалённого сервера. Вы также устанавливаете значение свойства port в 6379, порт по умолчанию, который Redis использует для прослушивания подключений.

Если вы настроили переадресацию портов на удаленном сервере, где работают Redis и приложение вместе, вам не нужно обновлять свойство host, но вам придется использовать соединение с переадресацией портов каждый раз при входе на сервер для запуска приложения.

Затем вы устанавливаете переменную imageJobQueue как экземпляр класса Queue, передавая имя очереди в качестве первого аргумента и объект в качестве второго аргумента. В объекте есть свойство connection со значением, установленным в объекте переменной redisOptions. После создания экземпляра класса Queue будет создана очередь с именем imageJobQueue в Redis.

Наконец, вы определяете функцию addJob(), которую вы будете использовать для добавления задания в imageJobQueue. Функция принимает параметр job, содержащий информацию о задании (вы вызовете функцию addJob() с данными, которые вы хотите сохранить в очереди). В функции вы вызываете метод add() у imageJobQueue, передавая имя задания в качестве первого аргумента и данные задания в качестве второго аргумента.

Добавьте выделенный код для вызова функции addJob() и добавления задания в очередь:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  ...
  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

Вот, вы вызываете функцию addJob() с объектом, описывающим задачу. У объекта есть атрибут type со значением имени задачи. Второе свойство, image, устанавливается в объект, содержащий данные изображения, загруженного пользователем. Поскольку данные изображения в image.data представлены в виде буфера (в двоичной форме), вы вызываете метод toString() JavaScript для их преобразования в строку, которая может быть сохранена в Redis. Это установит свойство data в качестве результата. Свойство image устанавливается в имя загруженного изображения (включая расширение изображения).

Теперь вы определили информацию, необходимую для выполнения этой задачи bullmq позднее. В зависимости от вашей задачи вы можете добавить больше или меньше информации о задаче.

Предупреждение: Поскольку Redis – это база данных в памяти, избегайте сохранения больших объемов данных для задач в очереди. Если у вас есть большой файл, который задача должна обработать, сохраните файл на диске или в облаке, затем сохраните ссылку на файл как строку в очереди. Когда bullmq выполнит задачу, он извлечет файл из ссылки, сохраненной в Redis.

Сохраните и закройте ваш файл.

Затем создайте и откройте файл utils.js, который будет содержать код обработки изображения:

  1. nano utils.js

В вашем файле utils.js добавьте следующий код для определения функции обработки изображения:

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
}

module.exports = { processUploadedImages };

Вы импортируете необходимые модули для обработки изображений и вычисления путей в первых двух строках. Затем вы определяете функцию processUploadedImages(), которая будет содержать трудоемкую задачу по обработке изображений. Эта функция принимает параметр job, который будет заполнен, когда рабочий извлечет данные задачи из очереди, а затем вызовет функцию processUploadedImages() с данными из очереди. Вы также экспортируете функцию processUploadedImages(), чтобы вы могли ссылаться на нее в других файлах.

Сохраните и закройте ваш файл.

Вернитесь в файл index.js:

  1. nano index.js

Скопируйте выделенные строки из файла index.js, затем удалите их из этого файла. Вам понадобится скопированный код в данный момент, так что сохраните его в буфер обмена. Если вы используете nano, вы можете выделить эти строки и щелкнуть правой кнопкой мыши, чтобы скопировать их:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage))
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
...
  res.redirect("/result");
});

Метод post для маршрута upload теперь будет соответствовать следующему:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

Сохраните и закройте этот файл, затем откройте файл utils.js:

  1. nano utils.js

В вашем файле utils.js вставьте строки, которые вы только что скопировали для обратного вызова маршрута /upload, в функцию processUploadedImages:

image_processor/utils.js
...
function processUploadedImages(job) {
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}
...

Теперь, когда вы переместили код для обработки изображения, вам нужно обновить его для использования данных изображения из параметра job функции processUploadedImages(), которую вы определили ранее.

Для этого добавьте и обновите выделенные строки ниже:

image_processor/utils.js

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);
  ...
}

Вы преобразуете строковое представление данных изображения обратно в двоичный формат с помощью метода Buffer.from(). Затем вы обновляете вызов path.parse(), указав ссылку на имя изображения, сохраненное в очереди. После этого вы обновляете метод sharp(), чтобы он принимал двоичные данные изображения, хранящиеся в переменной imageFileData.

Полный файл utils.js теперь будет выглядеть следующим образом:

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}

module.exports = { processUploadedImages };

Сохраните и закройте файл, затем вернитесь к файлу index.js:

  1. nano index.js

Переменная sharp больше не нужна в качестве зависимости, так как изображение теперь обрабатывается в файле utils.js. Удалите выделенную строку из файла:

image_processor/index.js
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
...

Сохраните и закройте файл.

Теперь вы определили функциональность для создания очереди в Redis и добавления задачи. Вы также определили функцию processUploadedImages() для обработки загруженных изображений.

Оставшаяся задача – создать потребителя (или рабочего), который будет извлекать задачу из очереди и вызывать функцию processUploadedImages() с данными задачи.

Создайте файл worker.js в вашем редакторе:

  1. nano worker.js

В вашем файле worker.js добавьте следующий код:

image_processor/worker.js
const { Worker } = require("bullmq");

const { processUploadedImages } = require("./utils");

const workerHandler = (job) => {
  console.log("Starting job:", job.name);
  processUploadedImages(job.data);
  console.log("Finished job:", job.name);
  return;
};

На первой строке вы импортируете класс Worker из bullmq; при его создании этот класс запускает рабочего, который извлекает задачи из очереди в Redis и выполняет их. Затем вы ссылаетесь на функцию processUploadedImages() из файла utils.js, чтобы рабочий мог вызвать эту функцию с данными из очереди.

Вы определяете функцию workerHandler(), которая принимает параметр job, содержащий данные задания в очереди. Внутри функции вы записываете, что задание началось, затем вызываете processUploadedImages() с данными задания. После этого вы записываете сообщение об успешном выполнении и возвращаете null.

Чтобы разрешить воркеру подключаться к Redis, извлечь задание из очереди и вызвать workerHandler() с данными задания, добавьте следующие строки в файл:

image_processor/worker.js
...
const workerOptions = {
  connection: {
    host: "localhost",
    port: 6379,
  },
};

const worker = new Worker("imageJobQueue", workerHandler, workerOptions);

console.log("Worker started!");

Здесь вы устанавливаете переменную workerOptions в объект, содержащий параметры подключения к Redis. Вы устанавливаете переменную worker в экземпляр класса Worker, который принимает следующие параметры:

  • imageJobQueue: имя очереди заданий.
  • workerHandler: функция, которая будет выполняться после извлечения задания из очереди Redis.
  • workerOptions: настройки конфигурации Redis, которые воркер использует для установки соединения с Redis.

Наконец, вы записываете сообщение об успешном выполнении.

После добавления этих строк сохраните и закройте файл.

Теперь вы определили функциональность воркера bullmq для извлечения заданий из очереди и их выполнения.

В терминале удалите изображения в каталоге public/images, чтобы начать с чистого листа для тестирования вашего приложения:

  1. rm public/images/*

Затем запустите файл index.js:

  1. node index.js

Приложение начнется:

Output
Server running on port 3000

Теперь вы можете запустить воркер. Откройте вторую сессию терминала и перейдите в каталог проекта:

  1. cd image_processor/

Запустите рабочего с помощью следующей команды:

  1. node worker.js

Рабочий начнет выполнение:

Output
Worker started!

Посетите http://localhost:3000/ в вашем браузере. Нажмите кнопку Выбрать файл и выберите underwater.png с вашего компьютера, затем нажмите кнопку Загрузить изображение.

Возможно, вы получите мгновенный ответ, который скажет вам обновить страницу через несколько секунд:

В альтернативе, вы можете получить мгновенный ответ с обработанными изображениями на странице, в то время как другие все еще обрабатываются:

Вы можете обновить страницу несколько раз, чтобы загрузить все измененные изображения.

Вернитесь в терминал, где работает ваш рабочий процесс. В этом терминале будет сообщение, соответствующее следующему:

Output
Worker started! Starting job: processUploadedImages Finished job: processUploadedImages

Результат подтверждает, что bullmq успешно выполнил задачу.

Ваше приложение может все равно выполнять трудоемкие задачи, даже если рабочий процесс не выполняется. Чтобы продемонстрировать это, остановите рабочего во втором терминале с помощью CTRL+C.

В вашей первой сессии терминала остановите сервер Express и удалите изображения в public/images:

  1. rm public/images/*

После этого снова запустите сервер:

  1. node index.js

В вашем браузере посетите http://localhost:3000/ и загрузите изображение underwater.png снова. Когда вы будете перенаправлены на путь /result, изображения не будут отображаться на странице, потому что рабочий процесс не выполняется:

Вернитесь в терминал, где вы запускали рабочего, и запустите рабочего снова:

  1. node worker.js

Вывод будет соответствовать следующему, что дает вам знать, что задача началась:

Output
Worker started! Starting job: processUploadedImages

После завершения работы и вывода строки Finished job: processUploadedImages обновите браузер. Теперь изображения будут загружены:

Остановите сервер и рабочего.

Теперь вы можете передать тяжелую задачу на фоновый режим и выполнять ее асинхронно с помощью bullmq. На следующем шаге вы настроите панель мониторинга для отслеживания статуса очереди.

Шаг 4 — Добавление панели мониторинга для отслеживания очередей bullmq

На этом этапе вы будете использовать пакет bull-board для отслеживания заданий в очереди Redis с помощью визуальной панели мониторинга. Этот пакет автоматически создаст интерфейс пользователя (UI), отображающий и организующий информацию о заданиях bullmq, которые хранятся в очереди Redis. С помощью вашего браузера вы сможете отслеживать выполненные задания, ожидающие выполнения или завершившиеся с ошибкой, не открывая Redis CLI в терминале.

Откройте файл index.js в вашем текстовом редакторе:

  1. nano index.js

Добавьте выделенный код для импорта bull-board:

image_processor/index.js
...
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
...

В предшествующем коде вы импортируете метод `createBullBoard()` из `bull-board`. Вы также импортируете `BullMQAdapter`, который позволяет `bull-board` получать доступ к очередям `bullmq`, и `ExpressAdapter`, который предоставляет функциональность для отображения панели управления Express.

Затем добавьте выделенный код для соединения `bull-board` с `bullmq`:

image_processor/index.js
...
async function addJob(job) {
  ...
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
...

Сначала установите `serverAdapter` как экземпляр `ExpressAdapter`. Затем вызовите `createBullBoard()` для инициализации панели управления данными очереди `bullmq`. Передайте функции объектный аргумент с свойствами `queues` и `serverAdapter`. Первое свойство, `queues`, принимает массив очередей, определенных вами с помощью `bullmq`, который здесь `imageJobQueue`. Второе свойство, `serverAdapter`, содержит объект, который принимает экземпляр адаптера сервера Express. Затем установите путь `/admin` для доступа к панели управления с помощью метода `setBasePath()`.

Затем добавьте промежуточное ПО `serverAdapter` для маршрута `/admin`:

image_processor/index.js
app.use(express.static("public"))

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  ...
});

Полный файл `index.js` будет соответствовать следующему:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);
app.use(fileUpload());

app.use(express.static("public"));

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: Buffer.from(image.data).toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

После внесения изменений сохраните и закройте файл.

Запустите файл `index.js`:

  1. node index.js

Вернитесь в браузер и посетите `http://localhost:3000/admin`. Панель управления загрузится:

На панели инструментов вы можете просмотреть тип задания, данные, которые он использует, и дополнительную информацию о задании. Вы также можете переключиться на другие вкладки, такие как вкладка Завершено для информации о завершенных заданиях, вкладка Сбой для получения дополнительной информации о заданиях, которые завершились неудачей, и вкладка Приостановлено для получения дополнительной информации о приостановленных заданиях.

Теперь вы можете использовать панель управления bull-board для мониторинга очередей.

Вывод

В этой статье вы перенесли трудоемкую задачу в очередь заданий с использованием bullmq. Сначала, не используя bullmq, вы создали приложение с трудоемкой задачей, имеющей медленный цикл запроса/ответа. Затем вы использовали bullmq, чтобы перенести трудоемкую задачу и выполнить ее асинхронно, что увеличило скорость цикла запроса/ответа. После этого вы использовали bull-board, чтобы создать панель мониторинга очередей bullmq в Redis.

Вы можете посетить документацию bullmq, чтобы узнать больше о функциях bullmq, не рассмотренных в этом учебнике, таких как планирование, установка приоритетов или повторное выполнение заданий и настройка параметров параллелизма для рабочих процессов. Вы также можете посетить документацию bull-board, чтобы узнать больше о функциях панели управления.

Source:
https://www.digitalocean.com/community/tutorials/how-to-handle-asynchronous-tasks-with-node-js-and-bullmq