Cómo manejar tareas asíncronas con Node.js y BullMQ

El autor seleccionó a la Sociedad de Ingenieras para recibir una donación como parte del programa Write for Donations.

Introducción

Las aplicaciones web tienen ciclos de solicitud/respuesta. Cuando visitas una URL, el navegador envía una solicitud al servidor que ejecuta una aplicación para procesar datos o realizar consultas en la base de datos. Mientras esto sucede, el usuario debe esperar hasta que la aplicación devuelva una respuesta. Para algunas tareas, el usuario puede obtener una respuesta rápidamente; para tareas que consumen mucho tiempo, como procesar imágenes, analizar datos, generar informes o enviar correos electrónicos, estas tareas tardan mucho en completarse y pueden ralentizar el ciclo de solicitud/respuesta. Por ejemplo, supongamos que tienes una aplicación donde los usuarios cargan imágenes. En ese caso, es posible que necesites cambiar el tamaño, comprimir o convertir la imagen a otro formato para preservar el espacio en disco de tu servidor antes de mostrar la imagen al usuario. Procesar una imagen es una tarea intensiva en CPU, que puede bloquear un hilo de Node.js hasta que se complete la tarea. Eso podría llevar unos segundos o minutos. Los usuarios deben esperar a que la tarea termine para obtener una respuesta del servidor.

Para evitar ralentizar el ciclo de solicitud/respuesta, puedes usar bullmq, una cola de tareas (jobs) distribuida que te permite descargar tareas que consumen mucho tiempo de tu aplicación Node.js a bullmq, liberando así el ciclo de solicitud/respuesta. Esta herramienta permite que tu aplicación envíe respuestas al usuario rápidamente mientras bullmq ejecuta las tareas de forma asíncrona en segundo plano e independientemente de tu aplicación. Para hacer un seguimiento de los trabajos, bullmq utiliza Redis para almacenar una breve descripción de cada trabajo en una cola. Luego, un worker de bullmq extrae y ejecuta cada trabajo en la cola, marcándolo como completado una vez terminado.

En este artículo, usarás bullmq para descargar una tarea que consume mucho tiempo en segundo plano, lo que permitirá que una aplicación responda rápidamente a los usuarios. Primero, crearás una aplicación con una tarea que consume mucho tiempo sin usar bullmq. Luego, usarás bullmq para ejecutar la tarea de forma asíncrona. Finalmente, instalarás un panel de control visual para gestionar los trabajos de bullmq en una cola de Redis.

Requisitos previos

Para seguir este tutorial, necesitarás lo siguiente:

Paso 1: Configuración del Directorio del Proyecto

En este paso, crearás un directorio e instalarás las dependencias necesarias para tu aplicación. La aplicación que construirás en este tutorial permitirá a los usuarios cargar una imagen, que luego se procesará utilizando el paquete sharp. El procesamiento de imágenes es intensivo en tiempo y puede ralentizar el ciclo de solicitud/respuesta, convirtiendo la tarea en un buen candidato para que bullmq lo maneje en segundo plano. La técnica que utilizarás para transferir la tarea también funcionará para otras tareas intensivas en tiempo.

Para empezar, crea un directorio llamado image_processor y navega hacia él:

  1. mkdir image_processor && cd image_processor

Luego, inicializa el directorio como un paquete npm:

  1. npm init -y

El comando crea un archivo package.json. La opción -y le indica a npm que acepte todas las configuraciones predeterminadas.

Al ejecutar el comando, tu salida coincidirá con lo siguiente:

Output
Wrote to /home/sammy/image_processor/package.json: { "name": "image_processor", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC" }

La salida confirma que se ha creado el archivo package.json. Las propiedades importantes incluyen el nombre de tu aplicación (name), la versión de tu aplicación (version), y el punto de inicio de tu proyecto (main). Si deseas obtener más información sobre las otras propiedades, puedes revisar la documentación de package.json de npm.

La aplicación que construirás en este tutorial requerirá las siguientes dependencias:

  • express: un marco web para construir aplicaciones web.
  • express-fileupload: un middleware que permite que tus formularios suban archivos.
  • sharp: una biblioteca de procesamiento de imágenes.
  • ejs: un lenguaje de plantillas que te permite generar marcado HTML con Node.js.
  • bullmq: una cola de tareas distribuida.
  • bull-board: un panel de control que se basa en bullmq y muestra el estado de los trabajos con una interfaz de usuario (UI) agradable.

Para instalar todas estas dependencias, ejecuta el siguiente comando:

  1. npm install express express-fileupload sharp ejs bullmq @bull-board/express

Además de las dependencias que instalaste, también usarás la siguiente imagen más adelante en este tutorial:

Usa curl para descargar la imagen en la ubicación que elijas en tu computadora local

  1. curl -O https://deved-images.nyc3.cdn.digitaloceanspaces.com/CART-68886/underwater.png

Tienes las dependencias necesarias para construir una aplicación Node.js que no tiene bullmq, lo cual harás a continuación.

Paso 2 — Implementación de una tarea intensiva en tiempo sin bullmq

En este paso, construirás una aplicación con Express que permita a los usuarios subir imágenes. La aplicación iniciará una tarea intensiva en tiempo usando sharp para redimensionar la imagen en múltiples tamaños, los cuales serán luego mostrados al usuario después de que se envíe una respuesta. Este paso te ayudará a entender cómo las tareas intensivas en tiempo afectan el ciclo de solicitud/respuesta.

Usando nano, o tu editor de texto preferido, crea el archivo index.js:

  1. nano index.js

En tu archivo index.js, agrega el siguiente código para importar las dependencias:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

En la primera línea, importas el módulo path para calcular rutas de archivo con Node. En la segunda línea, importas el módulo fs para interactuar con directorios. Luego importas el framework web express. Importas el módulo body-parser para agregar middleware para analizar datos en las solicitudes HTTP. Después de eso, importas el módulo sharp para el procesamiento de imágenes. Finalmente, importas express-fileupload para manejar las subidas desde un formulario HTML.

Luego, agrega el siguiente código para implementar middleware en tu aplicación:

image_processor/index.js
...
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

Primero, estableces la variable app como una instancia de Express. Segundo, utilizando la variable app, el método set() configura Express para usar el lenguaje de plantillas ejs. Luego, agregas el middleware del módulo body-parser con el método use() para transformar los datos JSON en las solicitudes HTTP en variables que se pueden acceder con JavaScript. En la siguiente línea, haces lo mismo con la entrada codificada en URL.

A continuación, agrega las siguientes líneas para añadir más middleware para manejar la carga de archivos y servir archivos estáticos:

image_processor/index.js
...
app.use(fileUpload());
app.use(express.static("public"));

Añades middleware para analizar archivos cargados llamando al método fileUpload(), y estableces un directorio donde Express buscará y servirá archivos estáticos, como imágenes y CSS.

Con el middleware configurado, crea una ruta que muestre un formulario HTML para cargar una imagen:

image_processor/index.js
...
app.get("/", function (req, res) {
  res.render("form");
});

Aquí, utilizas el método get() del módulo Express para especificar la ruta / y la función de retorno que se ejecutará cuando el usuario visite la página de inicio o la ruta /. En la función de retorno, invocas res.render() para renderizar el archivo form.ejs en el directorio views. Aún no has creado el archivo form.ejs ni el directorio views.

Para crearlo, primero guarda y cierra tu archivo. En tu terminal, ingresa el siguiente comando para crear el directorio views en el directorio raíz de tu proyecto:

  1. mkdir views

Ingresa al directorio views:

  1. cd views

Crea el archivo form.ejs en tu editor:

  1. nano form.ejs

En tu archivo form.ejs, agrega el siguiente código para crear el formulario:

image_processor/views/form.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="home-wrapper">
      <h1>Image Processor</h1>
      <p>
        Resizes an image to multiple sizes and converts it to a
        <a href="https://en.wikipedia.org/wiki/WebP">webp</a> format.
      </p>
      <form action="/upload" method="POST" enctype="multipart/form-data">
        <input
          type="file"
          name="image"
          placeholder="Select image from your computer"
        />
        <button type="submit">Upload Image</button>
      </form>
    </div>
  </body>
</html>

Primero, haces referencia al archivo head.ejs, el cual aún no has creado. El archivo head.ejs contendrá el elemento HTML head al que podrás hacer referencia en otras páginas HTML.

En la etiqueta body, creas un formulario con los siguientes atributos:

  • action especifica la ruta a la que deben enviarse los datos del formulario cuando se envíe el formulario.
  • method especifica el método HTTP para enviar datos. El método POST incrusta los datos en una solicitud HTTP.
  • encytype especifica cómo deben codificarse los datos del formulario. El valor multipart/form-data permite a los elementos HTML input cargar datos de archivo.

En el elemento form, creas una etiqueta input para cargar archivos. Luego defines el elemento button con el atributo type establecido en submit, lo que te permite enviar formularios.

Una vez terminado, guarda y cierra tu archivo.

A continuación, crea un archivo head.ejs:

  1. nano head.ejs

En tu archivo head.ejs, agrega el siguiente código para crear la sección de encabezado de la aplicación:

image_processor/views/head.ejs
<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=edge" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Image Processor</title>
  <link rel="stylesheet" href="css/main.css" />
</head>

Aquí, haces referencia al archivo main.css, el cual crearás en el directorio public más adelante en este paso. Ese archivo contendrá los estilos para esta aplicación. Por ahora, continuarás configurando los procesos para los activos estáticos.

Guarda y cierra el archivo.

Para manejar los datos enviados desde el formulario, debes definir un método post en Express. Para hacer eso, regresa al directorio raíz de tu proyecto:

  1. cd ..

Abre tu archivo index.js de nuevo:

  1. nano index.js

En tu archivo index.js, agrega las líneas resaltadas para definir un método que maneje envíos de formularios en la ruta /upload:

image_processor/index.js
app.get("/", function (req, res) {
  ...
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

});

Utilizas la variable app para llamar al método post(), que manejará el formulario enviado en la ruta /upload. Después, extraes los datos de la imagen cargada desde la solicitud HTTP en la variable image. Posteriormente, estableces una respuesta para devolver un código de estado 400 si el usuario no carga una imagen.

Para configurar el proceso de la imagen cargada, agrega el siguiente código resaltado:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
});

Estas líneas representan cómo tu aplicación procesará la imagen. Primero, eliminas la extensión de la imagen cargada y guardas el nombre en la variable imageName. Luego, defines la función processImage(). Esta función toma el parámetro size, cuyo valor se usará para determinar las dimensiones de la imagen durante el cambio de tamaño. En la función, invocas sharp() con image.data, que es un buffer que contiene los datos binarios de la imagen cargada. sharp cambia el tamaño de la imagen según el valor en el parámetro size. Utilizas el método webp() de sharp para convertir la imagen al formato de imagen webp. Luego, guardas la imagen en el directorio public/images/.

La siguiente lista de números define los tamaños que se utilizarán para redimensionar la imagen cargada. Luego, se utiliza el método map() de JavaScript para invocar processImage() para cada elemento en el array sizes, después de lo cual se devuelve un nuevo array. Cada vez que el método map() llama a la función processImage(), devuelve una promesa al nuevo array. Se utiliza el método Promise.all() para resolverlas.

Las velocidades de procesamiento de la computadora varían, al igual que el tamaño de las imágenes que un usuario puede cargar, lo que puede afectar la velocidad de procesamiento de imágenes. Para retrasar este código con fines de demostración, inserta las líneas resaltadas para agregar un bucle de incremento intensivo de CPU y una redirección a una página que mostrará las imágenes redimensionadas con las líneas resaltadas:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  ...
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

El bucle se ejecutará 10 mil millones de veces para incrementar la variable counter. Se invoca la función res.redirect() para redirigir la aplicación a la ruta /result. La ruta representará una página HTML que mostrará las imágenes en el directorio public/images.

La ruta /result aún no existe. Para crearla, agrega el código resaltado en tu archivo index.js:

image_processor/index.js
...

app.get("/", function (req, res) {
 ...
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  ...
});

Defines la ruta /result con el método app.get(). En la función, defines la variable imgDirPath con la ruta completa al directorio public/images. Utilizas el método readdirSync() del módulo fs para leer todos los archivos en el directorio dado. A partir de ahí, encadenas el método map() para devolver un nuevo array con las rutas de las imágenes precedidas de images/.

Finalmente, llamas a res.render() para renderizar el archivo result.ejs, que aún no existe. Pasas la variable imgFiles, que contiene un array de todas las rutas relativas de las imágenes, al archivo result.ejs.

Guarda y cierra tu archivo.

Para crear el archivo result.ejs, regresa al directorio views:

  1. cd views

Crea y abre el archivo result.ejs en tu editor:

  1. nano result.ejs

En tu archivo result.ejs, agrega las siguientes líneas para mostrar las imágenes:

image_processor/views/result.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="gallery-wrapper">
      <% if (imgFiles.length > 0){%>
      <p>The following are the processed images:</p>
      <ul>
        <% for (let imgFile of imgFiles){ %>
        <li><img src=<%= imgFile %> /></li>
        <% } %>
      </ul>
      <% } else{ %>
      <p>
        The image is being processed. Refresh after a few seconds to view the
        resized images.
      </p>
      <% } %>
    </div>
  </body>
</html>

Primero, haces referencia al archivo head.ejs. En la etiqueta body, verificas si la variable imgFiles está vacía. Si tiene datos, iteras sobre cada archivo y creas una imagen para cada elemento del array. Si imgFiles está vacío, imprimes un mensaje que indica al usuario que Actualice después de unos segundos para ver las imágenes redimensionadas..

Guarda y cierra tu archivo.

A continuación, regresa al directorio raíz y crea el directorio public que contendrá tus activos estáticos:

  1. cd .. && mkdir public

Mueva el contenido al directorio public:

  1. cd public

Cree un directorio images que mantendrá las imágenes cargadas:

  1. mkdir images

A continuación, cree el directorio css y navegue hasta él:

  1. mkdir css && cd css

En su editor, cree y abra el archivo main.css, al que hizo referencia anteriormente en el archivo head.ejs:

  1. nano main.css

En su archivo main.css, agregue los siguientes estilos:

image_processor/public/css/main.css
body {
  background: #f8f8f8;
}

h1 {
  text-align: center;
}

p {
  margin-bottom: 20px;
}

a:link,
a:visited {
  color: #00bcd4;
}

/** Estilos para el botón "Elegir archivo" **/
button[type="submit"] {
  background: none;
  border: 1px solid orange;
  padding: 10px 30px;
  border-radius: 30px;
  transition: all 1s;
}

button[type="submit"]:hover {
  background: orange;
}

/** Estilos para el botón "Cargar imagen" **/
input[type="file"]::file-selector-button {
  border: 2px solid #2196f3;
  padding: 10px 20px;
  border-radius: 0.2em;
  background-color: #2196f3;
}

ul {
  list-style: none;
  padding: 0;
  display: flex;
  flex-wrap: wrap;
  gap: 20px;
}

.home-wrapper {
  max-width: 500px;
  margin: 0 auto;
  padding-top: 100px;
}

.gallery-wrapper {
  max-width: 1200px;
  margin: 0 auto;
}

Estas líneas darán estilo a elementos en la aplicación. Usando atributos HTML, da estilo al fondo del botón Elegir archivo con el código hexadecimal #2196f3 (un tono de azul) y el borde del botón Cargar imagen a naranja. También da estilo a los elementos en la ruta /result para hacerlos más presentables.

Una vez finalizado, guarde y cierre su archivo.

Vuelva al directorio raíz del proyecto:

  1. cd ../..

Abra index.js en su editor:

  1. nano index.js

En su index.js, agregue el siguiente código, que iniciará el servidor:

image_processor/index.js
...
app.listen(3000, function () {
  console.log("Server running on port 3000");
});

El archivo index.js completo ahora coincidirá con lo siguiente:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

app.use(fileUpload());

app.use(express.static("public"));

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

Una vez que haya terminado de realizar los cambios, guarde y cierre su archivo.

Ejecute la aplicación usando el comando node:

  1. node index.js

Recibirá una salida como esta:

Output
Server running on port 3000

Esta salida confirma que el servidor se está ejecutando sin problemas.

Abre tu navegador preferido y visita http://localhost:3000/.

Nota: Si estás siguiendo el tutorial en un servidor remoto, puedes acceder a la aplicación en tu navegador local usando el reenvío de puertos.

Mientras el servidor Node.js esté en funcionamiento, abre otra terminal e introduce el siguiente comando:

  1. ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip

Una vez que te hayas conectado al servidor, ejecuta node index.js y luego navega a http://localhost:3000/ en el navegador web de tu máquina local.

Cuando la página cargue, debería coincidir con lo siguiente:

A continuación, presiona el botón Elegir archivo y selecciona la imagen underwater.png en tu máquina local. El display cambiará de Ningún archivo seleccionado a underwater.png. Después de eso, presiona el botón Subir imagen. La aplicación cargará durante un tiempo mientras procesa la imagen y ejecuta el bucle de incremento.

Una vez que la tarea haya terminado, la ruta /result se cargará con las imágenes redimensionadas:

Ahora puedes detener el servidor con CTRL+C. Node.js no recarga automáticamente el servidor cuando se cambian los archivos, así que necesitarás detener y reiniciar el servidor cada vez que actualices los archivos.

Ahora sabes cómo una tarea que consume mucho tiempo puede afectar el ciclo de solicitud/respuesta de una aplicación. La ejecutarás de forma asíncrona la próxima vez.

Paso 3 — Ejecutar tareas intensivas en tiempo de forma asíncrona con bullmq

En este paso, descargarás una tarea intensiva en tiempo al fondo utilizando bullmq. Este ajuste liberará el ciclo de solicitud/respuesta y permitirá que tu aplicación responda a los usuarios de inmediato mientras se procesa la imagen.

Para hacer eso, necesitas crear una descripción concisa del trabajo y agregarlo a una cola con bullmq. Una cola es una estructura de datos que funciona de manera similar a una cola en la vida real. Cuando las personas hacen fila para entrar en un espacio, la primera persona en la fila será la primera en entrar en el espacio. Cualquier persona que llegue después se pondrá al final de la fila y entrará en el espacio después de todos los que la preceden en la fila hasta que la última persona entre en el espacio. Con el proceso Primero en entrar, primero en salir (FIFO) de la estructura de datos de la cola, el primer elemento agregado a la cola es el primero en ser removido (desencolar). Con bullmq, un productor agregará un trabajo en una cola, y un consumidor (o trabajador) eliminará un trabajo de la cola y lo ejecutará.

La cola en bullmq está en Redis. Cuando describes un trabajo y lo añades a la cola, se crea una entrada para el trabajo en una cola de Redis. Una descripción de trabajo puede ser una cadena o un objeto con propiedades que contienen datos mínimos o referencias a los datos que permitirán a bullmq ejecutar el trabajo más tarde. Una vez que defines la funcionalidad para agregar trabajos a la cola, mueves el código intensivo en tiempo a una función separada. Más tarde, bullmq llamará a esta función con los datos que almacenaste en la cola cuando se desencole el trabajo. Una vez que la tarea haya finalizado, bullmq la marcará como completada, extraerá otro trabajo de la cola y lo ejecutará.

Abre index.js en tu editor:

  1. nano index.js

En tu archivo index.js, agrega las líneas resaltadas para crear una cola en Redis con bullmq:

image_processor/index.js
...
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}
...

Comienzas extrayendo la clase Queue de bullmq, que se utiliza para crear una cola en Redis. Luego, estableces la variable redisOptions a un objeto con propiedades que la instancia de la clase Queue utilizará para establecer una conexión con Redis. Estableces el valor de la propiedad host en localhost porque Redis se está ejecutando en tu máquina local.

Nota: Si Redis se estuviera ejecutando en un servidor remoto separado de tu aplicación, actualizarías el valor de la propiedad host a la dirección IP del servidor remoto. También estableces el valor de la propiedad port en 6379, el puerto predeterminado que Redis utiliza para escuchar conexiones.

Si ha configurado el reenvío de puertos a un servidor remoto que ejecuta Redis y la aplicación juntos, no es necesario actualizar la propiedad host, pero deberá usar la conexión de reenvío de puertos cada vez que inicie sesión en su servidor para ejecutar la aplicación.

A continuación, establezca la variable imageJobQueue en una instancia de la clase Queue, tomando el nombre de la cola como su primer argumento y un objeto como segundo argumento. El objeto tiene una propiedad connection con el valor configurado en un objeto en la variable redisOptions. Después de instanciar la clase Queue, se creará una cola llamada imageJobQueue en Redis.

Finalmente, defina la función addJob() que utilizará para agregar un trabajo en la imageJobQueue. La función toma un parámetro de job que contiene la información sobre el trabajo ⁠(llamará a la función addJob() con los datos que desea guardar en una cola). En la función, invoque el método add() de la imageJobQueue, tomando el nombre del trabajo como primer argumento y los datos del trabajo como segundo argumento.

Agregue el código resaltado para llamar a la función addJob() y agregar un trabajo en la cola:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  ...
  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

Aquí, llama a la función addJob() con un objeto que describe el trabajo. El objeto tiene el atributo type con un valor igual al nombre del trabajo. La segunda propiedad, image, se establece como un objeto que contiene los datos de la imagen que el usuario ha cargado. Dado que los datos de la imagen en image.data están en un búfer (forma binaria), invocas el método toString() de JavaScript para convertirlo en una cadena que se puede almacenar en Redis, lo cual establecerá la propiedad data como resultado. La propiedad image se establece con el nombre de la imagen cargada (incluida la extensión de la imagen).

Ahora has definido la información necesaria para que bullmq ejecute este trabajo más adelante. Dependiendo de tu trabajo, puedes agregar más información o menos.

Advertencia: Dado que Redis es una base de datos en memoria, evita almacenar grandes cantidades de datos para trabajos en la cola. Si tienes un archivo grande que un trabajo necesita procesar, guárdalo en el disco o en la nube, luego guarda el enlace al archivo como una cadena en la cola. Cuando bullmq ejecuta el trabajo, recuperará el archivo desde el enlace guardado en Redis.

Guarda y cierra tu archivo.

A continuación, crea y abre el archivo utils.js que contendrá el código de procesamiento de imágenes:

  1. nano utils.js

En tu archivo utils.js, agrega el siguiente código para definir la función de procesamiento de imágenes:

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
}

module.exports = { processUploadedImages };

Importas los módulos necesarios para procesar imágenes y calcular rutas en las dos primeras líneas. Luego defines la función processUploadedImages(), que contendrá la tarea de procesamiento de imágenes que consume tiempo. Esta función toma un parámetro job que se poblara cuando el trabajador obtenga los datos del trabajo de la cola y luego invoque la función processUploadedImages() con los datos de la cola. También exportas la función processUploadedImages() para que puedas hacer referencia a ella en otros archivos.

Guarda y cierra tu archivo.

Vuelve al archivo index.js:

  1. nano index.js

Copia las líneas resaltadas del archivo index.js, luego bórralas de este archivo. Necesitarás el código copiado en breve, así que guárdalo en el portapapeles. Si estás utilizando nano, puedes resaltar estas líneas y hacer clic derecho con tu ratón para copiar las líneas:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage))
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
...
  res.redirect("/result");
});

El método post para la ruta upload ahora coincidirá con lo siguiente:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

Guarda y cierra este archivo, luego abre el archivo utils.js:

  1. nano utils.js

En tu archivo utils.js, pega las líneas que acabas de copiar para el callback de la ruta /upload en la función processUploadedImages:

image_processor/utils.js
...
function processUploadedImages(job) {
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}
...

Ahora que has movido el código para procesar una imagen, necesitas actualizarlo para usar los datos de imagen del parámetro job de la función processUploadedImages() que definiste anteriormente.

Para hacer eso, agrega y actualiza las líneas resaltadas a continuación:

image_processor/utils.js

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);
  ...
}

Conviertes la versión en cadena de datos de la imagen de nuevo a binario con el método Buffer.from(). Luego actualizas path.parse() con una referencia al nombre de la imagen guardada en la cola. Después, actualizas el método sharp() para que tome los datos binarios de la imagen almacenados en la variable imageFileData.

El archivo utils.js completo ahora coincidirá con lo siguiente:

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}

module.exports = { processUploadedImages };

Guarda y cierra tu archivo, luego vuelve al archivo index.js:

  1. nano index.js

La variable sharp ya no es necesaria como dependencia ya que la imagen ahora se procesa en el archivo utils.js. Elimina la línea resaltada del archivo:

image_processor/index.js
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
...

Guarda y cierra tu archivo.

Ahora has definido la funcionalidad para crear una cola en Redis y agregar un trabajo. También definiste la función processUploadedImages() para procesar imágenes cargadas.

La tarea restante es crear un consumidor (o trabajador) que retirará un trabajo de la cola y llamará a la función processUploadedImages() con los datos del trabajo.

Crea un archivo worker.js en tu editor:

  1. nano worker.js

En tu archivo worker.js, agrega el siguiente código:

image_processor/worker.js
const { Worker } = require("bullmq");

const { processUploadedImages } = require("./utils");

const workerHandler = (job) => {
  console.log("Starting job:", job.name);
  processUploadedImages(job.data);
  console.log("Finished job:", job.name);
  return;
};

En la primera línea, importas la clase Worker de bullmq; al instanciarla, esto iniciará un trabajador que extraerá trabajos de la cola en Redis y los ejecutará. Luego, haces referencia a la función processUploadedImages() del archivo utils.js para que el trabajador pueda llamar a la función con los datos en la cola.

Define una función workerHandler() que tome un parámetro job que contenga los datos del trabajo en la cola. En la función, registras que el trabajo ha comenzado, luego invocas processUploadedImages() con los datos del trabajo. Después de eso, registras un mensaje de éxito y devuelves null.

Para permitir que el trabajador se conecte a Redis, desencola un trabajo de la cola y llama a workerHandler() con los datos del trabajo, agrega las siguientes líneas al archivo:

image_processor/worker.js
...
const workerOptions = {
  connection: {
    host: "localhost",
    port: 6379,
  },
};

const worker = new Worker("imageJobQueue", workerHandler, workerOptions);

console.log("Worker started!");

Aquí, defines la variable workerOptions como un objeto que contiene la configuración de conexión de Redis. Estableces la variable worker como una instancia de la clase Worker que toma los siguientes parámetros:

  • imageJobQueue: el nombre de la cola de trabajos.
  • workerHandler: la función que se ejecutará después de que se desencole un trabajo de la cola de Redis.
  • workerOptions: la configuración de Redis que el trabajador utiliza para establecer una conexión con Redis.

Finalmente, registras un mensaje de éxito.

Después de agregar las líneas, guarda y cierra tu archivo.

Ahora has definido la funcionalidad del trabajador bullmq para desencolar trabajos de la cola y ejecutarlos.

En tu terminal, elimina las imágenes en el directorio public/images para que puedas comenzar desde cero para probar tu aplicación:

  1. rm public/images/*

Luego, ejecuta el archivo index.js:

  1. node index.js

La aplicación se iniciará:

Output
Server running on port 3000

Ahora iniciarás el trabajador. Abre una segunda sesión de terminal y navega hasta el directorio del proyecto:

  1. cd image_processor/

Inicia el trabajador con el siguiente comando:

  1. node worker.js

El trabajador se iniciará:

Output
Worker started!

Visita http://localhost:3000/ en tu navegador. Presiona el botón Elegir archivo y selecciona underwater.png desde tu computadora, luego presiona el botón Subir imagen.

Puedes recibir una respuesta instantánea que te indique que actualices la página después de unos segundos:

Alternativamente, podrías recibir una respuesta instantánea con algunas imágenes procesadas en la página mientras otras aún se están procesando:

Puedes actualizar la página varias veces para cargar todas las imágenes redimensionadas.

Regresa al terminal donde se está ejecutando tu trabajador. Ese terminal tendrá un mensaje que coincida con lo siguiente:

Output
Worker started! Starting job: processUploadedImages Finished job: processUploadedImages

La salida confirma que bullmq ejecutó el trabajo exitosamente.

Tu aplicación aún puede descargar tareas que consumen mucho tiempo incluso si el trabajador no está corriendo. Para demostrar esto, detén el trabajador en el segundo terminal con CTRL+C.

En tu sesión de terminal inicial, detén el servidor Express y elimina las imágenes en public/images:

  1. rm public/images/*

Después, inicia el servidor nuevamente:

  1. node index.js

En tu navegador, visita http://localhost:3000/ y carga la imagen underwater.png nuevamente. Cuando seas redirigido a la ruta /result, las imágenes no se mostrarán en la página porque el trabajador no está corriendo:

Regresa al terminal donde ejecutaste el trabajador y comienza el trabajador nuevamente:

  1. node worker.js

La salida coincidirá con lo siguiente, lo que te permite saber que el trabajo ha comenzado:

Output
Worker started! Starting job: processUploadedImages

Después de que se haya completado el trabajo y la salida incluya una línea que dice Trabajo finalizado: processUploadedImages, actualiza el navegador. Las imágenes cargarán ahora:

Detén el servidor y el trabajador.

Ahora puedes transferir una tarea intensiva en tiempo al fondo y ejecutarla de forma asíncrona usando bullmq. En el siguiente paso, configurarás un panel para monitorear el estado de la cola.

Paso 4 — Agregar un Panel para Monitorear las Colas de bullmq

En este paso, usarás el paquete bull-board para monitorear los trabajos en la cola de Redis desde un panel visual. Este paquete creará automáticamente un panel de interfaz de usuario (UI) que muestra y organiza la información sobre los trabajos de bullmq almacenados en la cola de Redis. Usando tu navegador, puedes monitorear los trabajos completados, los que están en espera o han fallado sin abrir la CLI de Redis en la terminal.

Abre el archivo index.js en tu editor de texto:

  1. nano index.js

Agrega el código resaltado para importar bull-board:

image_processor/index.js
...
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
...

En el código anterior, importas el método createBullBoard() de bull-board. También importas BullMQAdapter, que permite que bull-board acceda a las colas de bullmq, y ExpressAdapter, que proporciona funcionalidad para que Express muestre el panel.

A continuación, agrega el código resaltado para conectar bull-board con bullmq:

image_processor/index.js
...
async function addJob(job) {
  ...
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
...

Primero, estableces el serverAdapter como una instancia de ExpressAdapter. Luego, invocas createBullBoard() para inicializar el panel con los datos de la cola de bullmq. Le pasas a la función un argumento de objeto con las propiedades queues y serverAdapter. La primera propiedad, queues, acepta un array de las colas que definiste con bullmq, que en este caso es imageJobQueue. La segunda propiedad, serverAdapter, contiene un objeto que acepta una instancia del adaptador del servidor Express. Después de eso, estableces la ruta /admin para acceder al panel con el método setBasePath().

A continuación, agrega el middleware serverAdapter para la ruta /admin:

image_processor/index.js
app.use(express.static("public"))

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  ...
});

El archivo completo index.js se verá así:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);
app.use(fileUpload());

app.use(express.static("public"));

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: Buffer.from(image.data).toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

Después de realizar los cambios, guarda y cierra tu archivo.

Ejecuta el archivo index.js:

  1. node index.js

Regresa a tu navegador y visita http://localhost:3000/admin. El panel se cargará:

En el panel, puedes revisar el tipo de trabajo, los datos que consume y más información sobre el trabajo. También puedes cambiar a otras pestañas, como la pestaña Completados para obtener información sobre los trabajos completados, la pestaña Fallidos para obtener más información sobre los trabajos que fallaron y la pestaña Pausados para obtener más información sobre los trabajos que han sido pausados.

Ahora puedes utilizar el panel bull-board para monitorizar colas.

Conclusión

En este artículo, delegaste una tarea intensiva en tiempo a una cola de trabajos utilizando bullmq. Primero, sin utilizar bullmq, creaste una aplicación con una tarea intensiva en tiempo que tiene un ciclo de solicitud / respuesta lento. Luego utilizaste bullmq para delegar la tarea intensiva en tiempo y ejecutarla de forma asincrónica, lo que aumenta el ciclo de solicitud / respuesta. Después de eso, utilizaste bull-board para crear un panel para monitorear las colas de bullmq en Redis.

Puedes visitar la documentación de bullmq para aprender más sobre las características de bullmq no cubiertas en este tutorial, como la programación, la priorización o la reintentar trabajos, y la configuración de la concurrencia para los trabajadores. También puedes visitar la documentación de bull-board para aprender más sobre las características del panel.

Source:
https://www.digitalocean.com/community/tutorials/how-to-handle-asynchronous-tasks-with-node-js-and-bullmq