Comment gérer les tâches asynchrones avec Node.js et BullMQ

L’auteur a choisi la Société des ingénieures en génie civil pour recevoir un don dans le cadre du programme Write for Donations.

Introduction

Les applications Web ont des cycles de demande/réponse. Lorsque vous visitez une URL, le navigateur envoie une demande au serveur exécutant une application qui traite les données ou exécute des requêtes dans la base de données. Pendant ce temps, l’utilisateur doit attendre jusqu’à ce que l’application renvoie une réponse. Pour certaines tâches, l’utilisateur peut obtenir une réponse rapidement ; pour les tâches qui prennent beaucoup de temps, telles que le traitement d’images, l’analyse de données, la génération de rapports ou l’envoi de courriels, ces tâches prennent beaucoup de temps pour se terminer et peuvent ralentir le cycle de demande/réponse. Par exemple, supposons que vous ayez une application où les utilisateurs téléchargent des images. Dans ce cas, vous devrez peut-être redimensionner, compresser ou convertir l’image dans un autre format pour préserver l’espace disque de votre serveur avant de montrer l’image à l’utilisateur. Le traitement d’une image est une tâche intensive en termes de processeur, qui peut bloquer un thread Node.js jusqu’à ce que la tâche soit terminée. Cela peut prendre quelques secondes ou quelques minutes. Les utilisateurs doivent attendre que la tâche soit terminée pour obtenir une réponse du serveur.

Pour éviter de ralentir le cycle de demande/réponse, vous pouvez utiliser bullmq, une file de tâches (jobs) distribuée qui vous permet de décharger les tâches consommatrices de temps de votre application Node.js vers bullmq, libérant ainsi le cycle de demande/réponse. Cet outil permet à votre application d’envoyer rapidement des réponses à l’utilisateur tandis que bullmq exécute les tâches de manière asynchrone en arrière-plan et indépendamment de votre application. Pour suivre l’état des jobs, bullmq utilise Redis pour stocker une brève description de chaque job dans une file d’attente. Un worker bullmq défile alors et exécute chaque job dans la file d’attente, le marquant comme terminé une fois terminé.

Dans cet article, vous utiliserez bullmq pour décharger une tâche consommatrice de temps en arrière-plan, ce qui permettra à une application de répondre rapidement aux utilisateurs. Tout d’abord, vous créerez une application avec une tâche consommatrice de temps sans utiliser bullmq. Ensuite, vous utiliserez bullmq pour exécuter la tâche de manière asynchrone. Enfin, vous installerez un tableau de bord visuel pour gérer les jobs bullmq dans une file d’attente Redis.

Prérequis

Pour suivre ce tutoriel, vous aurez besoin des éléments suivants :

Étape 1 – Configuration du répertoire du projet

Dans cette étape, vous allez créer un répertoire et installer les dépendances nécessaires pour votre application. L’application que vous allez construire dans ce tutoriel permettra aux utilisateurs de télécharger une image, qui sera ensuite traitée à l’aide du package sharp. Le traitement d’image est intensif en temps et peut ralentir le cycle demande/réponse, ce qui en fait une tâche idéale à décharger avec bullmq en arrière-plan. La technique que vous utiliserez pour décharger la tâche fonctionnera également pour d’autres tâches intensives en temps.

Pour commencer, créez un répertoire appelé image_processor et naviguez dans le répertoire :

  1. mkdir image_processor && cd image_processor

Ensuite, initialisez le répertoire en tant que package npm :

  1. npm init -y

La commande crée un fichier package.json. L’option -y indique à npm d’accepter toutes les valeurs par défaut.

Après avoir exécuté la commande, votre sortie correspondra à ce qui suit :

Output
Wrote to /home/sammy/image_processor/package.json: { "name": "image_processor", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC" }

La sortie confirme que le fichier package.json a été créé. Les propriétés importantes incluent le nom de votre application (name), le numéro de version de votre application (version) et le point de départ de votre projet (main). Si vous souhaitez en savoir plus sur les autres propriétés, vous pouvez consulter la documentation de package.json de npm.

L’application que vous allez construire dans ce tutoriel nécessitera les dépendances suivantes :

  • express : un framework web pour construire des applications web.
  • express-fileupload : un middleware qui permet à vos formulaires de télécharger des fichiers.
  • sharp : une bibliothèque de traitement d’images.
  • ejs : un langage de modèle qui vous permet de générer du code HTML avec Node.js.
  • bullmq : une file de tâches distribuée.
  • bull-board : un tableau de bord qui s’appuie sur bullmq et affiche l’état des tâches avec une interface utilisateur (UI) agréable.

Pour installer toutes ces dépendances, exécutez la commande suivante :

  1. npm install express express-fileupload sharp ejs bullmq @bull-board/express

En plus des dépendances que vous avez installées, vous utiliserez également l’image suivante plus tard dans ce tutoriel :

Utilisez curl pour télécharger l’image à l’emplacement de votre choix sur votre ordinateur local

  1. curl -O https://deved-images.nyc3.cdn.digitaloceanspaces.com/CART-68886/underwater.png

Vous disposez des dépendances nécessaires pour construire une application Node.js qui n’a pas bullmq, ce que vous ferez ensuite.

Étape 2 – Mise en œuvre d’une tâche intensive en temps sans bullmq

Dans cette étape, vous allez construire une application avec Express qui permet aux utilisateurs de télécharger des images. L’application lancera une tâche intensive en temps utilisant sharp pour redimensionner l’image en plusieurs tailles, qui seront ensuite affichées à l’utilisateur après l’envoi d’une réponse. Cette étape vous aidera à comprendre comment les tâches intensives en temps affectent le cycle requête/réponse.

À l’aide de nano, ou de votre éditeur de texte préféré, créez le fichier index.js :

  1. nano index.js

Dans votre fichier index.js, ajoutez le code suivant pour importer les dépendances :

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

Dans la première ligne, vous importez le module path pour calculer les chemins de fichiers avec Node. Dans la deuxième ligne, vous importez le module fs pour interagir avec les répertoires. Ensuite, vous importez le framework web express. Vous importez le module body-parser pour ajouter un middleware pour analyser les données dans les requêtes HTTP. Ensuite, vous importez le module sharp pour le traitement des images. Enfin, vous importez express-fileupload pour la gestion des téléchargements à partir d’un formulaire HTML.

Ensuite, ajoutez le code suivant pour mettre en œuvre le middleware dans votre application :

image_processor/index.js
...
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

Tout d’abord, vous définissez la variable app comme une instance d’Express. Ensuite, en utilisant la variable app, la méthode set() configure Express pour utiliser le langage de modèle ejs. Ensuite, vous ajoutez le middleware du module body-parser avec la méthode use() pour transformer les données JSON des requêtes HTTP en variables accessibles avec JavaScript. À la ligne suivante, vous faites de même avec les données encodées en URL.

Ensuite, ajoutez les lignes suivantes pour ajouter plus de middleware pour gérer les téléchargements de fichiers et servir des fichiers statiques :

image_processor/index.js
...
app.use(fileUpload());
app.use(express.static("public"));

Vous ajoutez un middleware pour analyser les fichiers téléchargés en appelant la méthode fileUpload(), et vous définissez un répertoire où Express cherchera et servira les fichiers statiques, tels que les images et les fichiers CSS.

Avec le middleware configuré, créez une route qui affiche un formulaire HTML pour télécharger une image :

image_processor/index.js
...
app.get("/", function (req, res) {
  res.render("form");
});

Ici, vous utilisez la méthode get() du module Express pour spécifier la route / et la fonction de rappel qui sera exécutée lorsque l’utilisateur visite la page d’accueil ou la route /. Dans la fonction de rappel, vous appelez res.render() pour afficher le fichier form.ejs dans le répertoire views. Vous n’avez pas encore créé le fichier form.ejs ni le répertoire views.

Pour le créer, d’abord, enregistrez et fermez votre fichier. Dans votre terminal, saisissez la commande suivante pour créer le répertoire views dans votre répertoire racine de projet :

  1. mkdir views

Déplacez-vous dans le répertoire views :

  1. cd views

Créez le fichier form.ejs dans votre éditeur :

  1. nano form.ejs

Dans votre fichier form.ejs, ajoutez le code suivant pour créer le formulaire :

image_processor/views/form.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="home-wrapper">
      <h1>Image Processor</h1>
      <p>
        Resizes an image to multiple sizes and converts it to a
        <a href="https://en.wikipedia.org/wiki/WebP">webp</a> format.
      </p>
      <form action="/upload" method="POST" enctype="multipart/form-data">
        <input
          type="file"
          name="image"
          placeholder="Select image from your computer"
        />
        <button type="submit">Upload Image</button>
      </form>
    </div>
  </body>
</html>

Tout d’abord, vous faites référence au fichier head.ejs, que vous n’avez pas encore créé. Le fichier head.ejs contiendra l’élément HTML head que vous pourrez référencer dans d’autres pages HTML.

Dans la balise body, vous créez un formulaire avec les attributs suivants :

  • action spécifie la route vers laquelle les données du formulaire doivent être envoyées lorsque le formulaire est soumis.
  • method spécifie la méthode HTTP d’envoi des données. La méthode POST incorpore les données dans une requête HTTP.
  • encytype spécifie comment les données du formulaire doivent être encodées. La valeur multipart/form-data permet aux éléments HTML input d’envoyer des données de fichier.

Dans l’élément form, vous créez une balise input pour télécharger des fichiers. Ensuite, vous définissez l’élément button avec l’attribut type défini sur submit, ce qui vous permet de soumettre les formulaires.

Une fois terminé, enregistrez et fermez votre fichier.

Ensuite, créez un fichier head.ejs :

  1. nano head.ejs

Dans votre fichier head.ejs, ajoutez le code suivant pour créer la section head de l’application:

image_processor/views/head.ejs
<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=edge" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Image Processor</title>
  <link rel="stylesheet" href="css/main.css" />
</head>

Ici, vous référencez le fichier main.css, que vous créerez dans le répertoire public plus tard dans cette étape. Ce fichier contiendra les styles pour cette application. Pour l’instant, vous continuerez à configurer les processus pour les ressources statiques.

Enregistrez et fermez le fichier.

Pour gérer les données soumises par le formulaire, vous devez définir une méthode post dans Express. Pour ce faire, retournez dans le répertoire racine de votre projet :

  1. cd ..

Ouvrez à nouveau votre fichier index.js :

  1. nano index.js

Dans votre fichier index.js, ajoutez les lignes surlignées pour définir une méthode de gestion des soumissions de formulaire sur la route /upload :

image_processor/index.js
app.get("/", function (req, res) {
  ...
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

});

Vous utilisez la variable app pour appeler la méthode post(), qui gérera le formulaire soumis sur la route /upload. Ensuite, vous extrayez les données de l’image téléchargée de la requête HTTP dans la variable image. Ensuite, vous définissez une réponse pour renvoyer un code d’état 400 si l’utilisateur ne télécharge pas d’image.

Pour définir le processus pour l’image téléchargée, ajoutez le code surligné suivant :

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
});

Ces lignes représentent la façon dont votre application traitera l’image. Tout d’abord, vous supprimez l’extension de l’image téléchargée et enregistrez le nom dans la variable imageName. Ensuite, vous définissez la fonction processImage(). Cette fonction prend le paramètre size, dont la valeur sera utilisée pour déterminer les dimensions de l’image lors du redimensionnement. Dans la fonction, vous appelez sharp() avec image.data, qui est un buffer contenant les données binaires de l’image téléchargée. sharp redimensionne l’image en fonction de la valeur du paramètre size. Vous utilisez la méthode webp() de sharp pour convertir l’image au format d’image webp. Ensuite, vous enregistrez l’image dans le répertoire public/images/.

La liste suivante de nombres définit les tailles qui seront utilisées pour redimensionner l’image téléchargée. Ensuite, vous utilisez la méthode map() de JavaScript pour invoquer processImage() pour chaque élément du tableau sizes, après quoi il renverra un nouveau tableau. Chaque fois que la méthode map() appelle la fonction processImage(), elle renvoie une promesse au nouveau tableau. Vous utilisez la méthode Promise.all() pour les résoudre.

Les vitesses de traitement des ordinateurs varient, de même que la taille des images qu’un utilisateur peut télécharger, ce qui peut affecter la vitesse de traitement des images. Pour retarder ce code à des fins de démonstration, insérez les lignes surlignées pour ajouter une boucle d’incrémentation intensive en CPU et une redirection vers une page qui affichera les images redimensionnées avec les lignes surlignées :

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  ...
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

La boucle s’exécutera 10 milliards de fois pour incrémenter la variable counter. Vous invoquez la fonction res.redirect() pour rediriger l’application vers l’itinéraire /result. L’itinéraire affichera une page HTML qui affichera les images du répertoire public/images.

L’itinéraire /result n’existe pas encore. Pour le créer, ajoutez le code surligné dans votre fichier index.js:

image_processor/index.js
...

app.get("/", function (req, res) {
 ...
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  ...
});

Vous définissez la route /result avec la méthode app.get(). Dans la fonction, vous définissez la variable imgDirPath avec le chemin complet du répertoire public/images. Vous utilisez la méthode readdirSync() du module fs pour lire tous les fichiers du répertoire donné. Ensuite, vous enchaînez la méthode map() pour renvoyer un nouveau tableau avec les chemins des images précédés de images/.

Enfin, vous appelez res.render() pour rendre le fichier result.ejs, qui n’existe pas encore. Vous transmettez la variable imgFiles, qui contient un tableau de tous les chemins relatifs des images, au fichier result.ejs.

Sauvegardez et fermez votre fichier.

Pour créer le fichier result.ejs, retournez dans le répertoire views:

  1. cd views

Créez et ouvrez le fichier result.ejs dans votre éditeur:

  1. nano result.ejs

Dans votre fichier result.ejs, ajoutez les lignes suivantes pour afficher les images:

image_processor/views/result.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="gallery-wrapper">
      <% if (imgFiles.length > 0){%>
      <p>The following are the processed images:</p>
      <ul>
        <% for (let imgFile of imgFiles){ %>
        <li><img src=<%= imgFile %> /></li>
        <% } %>
      </ul>
      <% } else{ %>
      <p>
        The image is being processed. Refresh after a few seconds to view the
        resized images.
      </p>
      <% } %>
    </div>
  </body>
</html>

Tout d’abord, vous faites référence au fichier head.ejs. Dans la balise body, vous vérifiez si la variable imgFiles est vide. Si elle contient des données, vous itérez sur chaque fichier et créez une image pour chaque élément du tableau. Si imgFiles est vide, vous affichez un message indiquant à l’utilisateur de Rafraîchir après quelques secondes pour afficher les images redimensionnées..

Sauvegardez et fermez votre fichier.

Ensuite, retournez au répertoire racine et créez le répertoire public qui contiendra vos ressources statiques.

  1. cd .. && mkdir public

Déplacez-vous dans le répertoire public:

  1. cd public

Créez un répertoire images pour stocker les images téléchargées:

  1. mkdir images

Ensuite, créez le répertoire css et naviguez-y:

  1. mkdir css && cd css

Dans votre éditeur, créez et ouvrez le fichier main.css, que vous avez référencé précédemment dans le fichier head.ejs:

  1. nano main.css

Dans votre fichier main.css, ajoutez les styles suivants:

image_processor/public/css/main.css
body {
  background: #f8f8f8;
}

h1 {
  text-align: center;
}

p {
  margin-bottom: 20px;
}

a:link,
a:visited {
  color: #00bcd4;
}

/** Styles pour le bouton "Choisir un fichier" **/
button[type="submit"] {
  background: none;
  border: 1px solid orange;
  padding: 10px 30px;
  border-radius: 30px;
  transition: all 1s;
}

button[type="submit"]:hover {
  background: orange;
}

/** Styles pour le bouton "Télécharger une image" **/
input[type="file"]::file-selector-button {
  border: 2px solid #2196f3;
  padding: 10px 20px;
  border-radius: 0.2em;
  background-color: #2196f3;
}

ul {
  list-style: none;
  padding: 0;
  display: flex;
  flex-wrap: wrap;
  gap: 20px;
}

.home-wrapper {
  max-width: 500px;
  margin: 0 auto;
  padding-top: 100px;
}

.gallery-wrapper {
  max-width: 1200px;
  margin: 0 auto;
}

Ces lignes permettent de styliser les éléments de l’application. En utilisant des attributs HTML, vous stylisez le bouton Choisir un fichier avec le code hexadécimal #2196f3 (une nuance de bleu) et la bordure du bouton Télécharger une image en orange. Vous stylisez également les éléments de la route /result pour les rendre plus présentables.

Une fois terminé, enregistrez et fermez votre fichier.

Retournez au répertoire racine du projet:

  1. cd ../..

Ouvrez le fichier index.js dans votre éditeur:

  1. nano index.js

Dans votre fichier index.js, ajoutez le code suivant, qui démarrera le serveur:

image_processor/index.js
...
app.listen(3000, function () {
  console.log("Server running on port 3000");
});

Le fichier index.js complet correspondra maintenant au code suivant:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

app.use(fileUpload());

app.use(express.static("public"));

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

Une fois que vous avez terminé de apporter les modifications, enregistrez et fermez votre fichier.

Exécutez l’application en utilisant la commande node:

  1. node index.js

Vous obtiendrez une sortie comme celle-ci:

Output
Server running on port 3000

Cette sortie confirme que le serveur fonctionne sans problème.

Ouvrez votre navigateur préféré et visitez http://localhost:3000/.

Note: Si vous suivez le tutoriel sur un serveur distant, vous pouvez accéder à l’application dans votre navigateur local en utilisant le port forwarding.

Tandis que le serveur Node.js est en cours d’exécution, ouvrez un autre terminal et entrez la commande suivante:

  1. ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip

Une fois que vous vous êtes connecté au serveur, exécutez node index.js et ensuite accédez à http://localhost:3000/ dans le navigateur web de votre machine locale.

Une fois que la page est chargée, elle correspondra à ce qui suit:

Ensuite, appuyez sur le bouton Choisir un fichier et sélectionnez l’image underwater.png sur votre machine locale. L’affichage passera de Aucun fichier sélectionné à underwater.png. Ensuite, appuyez sur le bouton Télécharger l’image. L’application se chargera pendant un moment alors qu’elle traite l’image et exécute la boucle d’incrémentation.

Une fois la tâche terminée, la route /result se chargera avec les images redimensionnées:

Vous pouvez maintenant arrêter le serveur avec CTRL+C. Node.js ne recharge pas automatiquement le serveur lorsque les fichiers sont modifiés, vous devrez donc arrêter et redémarrer le serveur chaque fois que vous mettez à jour les fichiers.

Vous savez maintenant comment une tâche intensive en temps peut affecter le cycle de demande/réponse d’une application. Vous exécuterez la tâche de manière asynchrone ensuite.

Étape 3 — Exécution de tâches chronophages de manière asynchrone avec bullmq

Dans cette étape, vous allez décharger une tâche chronophage en arrière-plan en utilisant bullmq. Cette modification permettra de libérer le cycle requête/réponse et permettra à votre application de répondre immédiatement aux utilisateurs pendant que l’image est en cours de traitement.

Pour cela, vous devez créer une description concise de la tâche et l’ajouter à une file d’attente avec bullmq. Une file d’attente est une structure de données qui fonctionne de manière similaire à une file d’attente dans la vie réelle. Lorsque les gens font la queue pour entrer dans un espace, la première personne en ligne sera la première à entrer dans l’espace. Toute personne arrivant ensuite fera la queue à la fin de la file et entrera dans l’espace après toutes les personnes qui la précèdent dans la file jusqu’à ce que la dernière personne entre dans l’espace. Avec le processus First-In, First-Out (FIFO) de la structure de données de la file d’attente, le premier élément ajouté à la file est le premier élément à être supprimé (défiler). Avec bullmq, un producteur ajoutera un travail dans une file d’attente, et un consommateur (ou travailleur) supprimera un travail de la file d’attente et l’exécutera.

La file d’attente dans bullmq se trouve dans Redis. Lorsque vous décrivez un travail et l’ajoutez à la file d’attente, une entrée pour le travail est créée dans une file d’attente Redis. Une description de travail peut être une chaîne de caractères ou un objet avec des propriétés contenant des données minimales ou des références aux données qui permettront à bullmq d’exécuter le travail ultérieurement. Une fois que vous avez défini la fonctionnalité pour ajouter des travaux à la file d’attente, vous déplacez le code intensif en temps dans une fonction séparée. Plus tard, bullmq appellera cette fonction avec les données que vous avez stockées dans la file d’attente lorsque le travail sera retiré de la file. Une fois la tâche terminée, bullmq la marquera comme terminée, retirera un autre travail de la file d’attente et l’exécutera.

Ouvrez index.js dans votre éditeur:

  1. nano index.js

Dans votre fichier index.js, ajoutez les lignes surlignées pour créer une file d’attente dans Redis avec bullmq:

image_processor/index.js
...
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}
...

Vous commencez par extraire la classe Queue de bullmq, qui est utilisée pour créer une file d’attente dans Redis. Ensuite, vous définissez la variable redisOptions sur un objet avec des propriétés que l’instance de la classe Queue utilisera pour établir une connexion avec Redis. Vous définissez la valeur de la propriété host sur localhost car Redis s’exécute sur votre machine locale.

Note: Si Redis s’exécutait sur un serveur distant séparé de votre application, vous mettriez à jour la valeur de la propriété host avec l’adresse IP du serveur distant. Vous définissez également la valeur de la propriété port sur 6379, le port par défaut utilisé par Redis pour écouter les connexions.

Si vous avez configuré la redirection de port vers un serveur distant exécutant Redis et l’application ensemble, vous n’avez pas besoin de mettre à jour la propriété host, mais vous devrez utiliser la connexion de redirection de port à chaque fois que vous vous connectez à votre serveur pour exécuter l’application.

Ensuite, vous définissez la variable imageJobQueue sur une instance de la classe Queue, en prenant le nom de la file d’attente comme premier argument et un objet comme deuxième argument. L’objet a une propriété connection avec la valeur définie sur un objet dans la variable redisOptions. Après avoir instancié la classe Queue, une file d’attente appelée imageJobQueue sera créée dans Redis.

Enfin, vous définissez la fonction addJob() que vous utiliserez pour ajouter un travail dans la file d’attente imageJobQueue. La fonction prend un paramètre job contenant les informations sur le travail (vous appellerez la fonction addJob() avec les données que vous souhaitez enregistrer dans une file d’attente). Dans la fonction, vous invoquez la méthode add() de imageJobQueue, en prenant le nom du travail comme premier argument et les données du travail comme deuxième argument.

Ajoutez le code surligné pour appeler la fonction addJob() afin d’ajouter un travail dans la file d’attente.

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  ...
  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

Ici, vous appelez la fonction addJob() avec un objet qui décrit le travail. L’objet a l’attribut type avec une valeur correspondant au nom du travail. La deuxième propriété, image, est définie comme un objet contenant les données de l’image téléchargée par l’utilisateur. Comme les données de l’image dans image.data sont sous forme de tampon (forme binaire), vous invoquez la méthode toString() de JavaScript pour les convertir en une chaîne de caractères pouvant être stockée dans Redis, qui définira la propriété data en conséquence. La propriété image est définie comme le nom de l’image téléchargée (y compris l’extension de l’image).

Vous avez maintenant défini les informations nécessaires pour que bullmq exécute ce travail ultérieurement. Selon votre travail, vous pouvez ajouter plus ou moins d’informations sur le travail.

Avertissement : Étant donné que Redis est une base de données en mémoire, évitez de stocker de grandes quantités de données pour les travaux dans la file d’attente. Si vous avez un gros fichier qu’un travail doit traiter, enregistrez le fichier sur le disque ou le cloud, puis enregistrez le lien vers le fichier en tant que chaîne dans la file d’attente. Lorsque bullmq exécute le travail, il récupérera le fichier à partir du lien enregistré dans Redis.

Enregistrez et fermez votre fichier.

Ensuite, créez et ouvrez le fichier utils.js qui contiendra le code de traitement de l’image :

  1. nano utils.js

Dans votre fichier utils.js, ajoutez le code suivant pour définir la fonction de traitement d’une image :

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
}

module.exports = { processUploadedImages };

Vous importez les modules nécessaires pour traiter les images et calculer les trajets dans les deux premières lignes. Ensuite, vous définissez la fonction processUploadedImages(), qui contiendra la tâche intensive en temps de traitement des images. Cette fonction prend un paramètre job qui sera renseigné lorsque le travailleur récupérera les données de travail de la file d’attente, puis invoquera la fonction processUploadedImages() avec les données de la file d’attente. Vous exportez également la fonction processUploadedImages() afin de pouvoir y faire référence dans d’autres fichiers.

Enregistrez et fermez votre fichier.

Revenez au fichier index.js:

  1. nano index.js

Copiez les lignes surlignées du fichier index.js, puis supprimez-les de ce fichier. Vous aurez besoin du code copié dans un instant, alors enregistrez-le dans le presse-papiers. Si vous utilisez nano, vous pouvez surligner ces lignes et cliquer avec le bouton droit de la souris pour les copier :

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage))
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
...
  res.redirect("/result");
});

La méthode post pour la route upload correspondra maintenant à ce qui suit :

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

Enregistrez et fermez ce fichier, puis ouvrez le fichier utils.js :

  1. nano utils.js

Dans votre fichier utils.js, collez les lignes que vous venez de copier pour le rappel de la route /upload dans la fonction processUploadedImages :

image_processor/utils.js
...
function processUploadedImages(job) {
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}
...

Maintenant que vous avez déplacé le code pour traiter une image, vous devez le mettre à jour pour utiliser les données de l’image à partir du paramètre job de la fonction processUploadedImages() que vous avez définie précédemment.

Pour ce faire, ajoutez et mettez à jour les lignes surlignées ci-dessous :

image_processor/utils.js

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);
  ...
}

Vous convertissez la version stringifiée des données de l’image en binaire avec la méthode Buffer.from(). Ensuite, vous mettez à jour path.parse() avec une référence au nom de l’image enregistrée dans la file d’attente. Après cela, vous mettez à jour la méthode sharp() pour prendre les données binaires de l’image stockées dans la variable imageFileData.

Le fichier utils.js complet correspondra désormais à ce qui suit :

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}

module.exports = { processUploadedImages };

Enregistrez et fermez votre fichier, puis revenez à index.js :

  1. nano index.js

La variable sharp n’est plus nécessaire en tant que dépendance car l’image est désormais traitée dans le fichier utils.js. Supprimez la ligne surlignée du fichier :

image_processor/index.js
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
...

Enregistrez et fermez votre fichier.

Vous avez maintenant défini la fonctionnalité pour créer une file d’attente dans Redis et ajouter une tâche. Vous avez également défini la fonction processUploadedImages() pour traiter les images téléchargées.

La tâche restante consiste à créer un consommateur (ou worker) qui va extraire une tâche de la file d’attente et appeler la fonction processUploadedImages() avec les données de la tâche.

Créez un fichier worker.js dans votre éditeur :

  1. nano worker.js

Dans votre fichier worker.js, ajoutez le code suivant :

image_processor/worker.js
const { Worker } = require("bullmq");

const { processUploadedImages } = require("./utils");

const workerHandler = (job) => {
  console.log("Starting job:", job.name);
  processUploadedImages(job.data);
  console.log("Finished job:", job.name);
  return;
};

Dans la première ligne, vous importez la classe Worker de bullmq ; lorsqu’elle est instanciée, cela démarrera un worker qui défile les tâches de la file d’attente dans Redis et les exécute. Ensuite, vous référencez la fonction processUploadedImages() du fichier utils.js afin que le worker puisse appeler la fonction avec les données de la file d’attente.

Vous définissez une fonction workerHandler() qui prend un paramètre job contenant les données de travail dans la file d’attente. Dans la fonction, vous enregistrez que le travail a commencé, puis vous invoquez processUploadedImages() avec les données de travail. Ensuite, vous enregistrez un message de réussite et retournez null.

Pour permettre au travailleur de se connecter à Redis, de retirer un travail de la file d’attente et d’appeler workerHandler() avec les données de travail, ajoutez les lignes suivantes au fichier:

image_processor/worker.js
...
const workerOptions = {
  connection: {
    host: "localhost",
    port: 6379,
  },
};

const worker = new Worker("imageJobQueue", workerHandler, workerOptions);

console.log("Worker started!");

Ici, vous définissez la variable workerOptions sur un objet contenant les paramètres de connexion de Redis. Vous définissez la variable worker sur une instance de la classe Worker prenant les paramètres suivants:

  • imageJobQueue: le nom de la file d’attente des travaux.
  • workerHandler: la fonction qui s’exécutera après qu’un travail a été retiré de la file d’attente Redis.
  • workerOptions: les paramètres de configuration de Redis que le travailleur utilise pour établir une connexion avec Redis.

Enfin, vous enregistrez un message de réussite.

Après avoir ajouté les lignes, enregistrez et fermez votre fichier.

Vous avez maintenant défini la fonctionnalité du travailleur bullmq pour retirer des travaux de la file d’attente et les exécuter.

Dans votre terminal, supprimez les images du répertoire public/images afin de pouvoir commencer à zéro pour tester votre application:

  1. rm public/images/*

Ensuite, exécutez le fichier index.js:

  1. node index.js

L’application va démarrer:

Output
Server running on port 3000

Vous allez maintenant démarrer le travailleur. Ouvrez une deuxième session de terminal et accédez au répertoire du projet.

  1. cd image_processor/

Démarrez le travailleur avec la commande suivante:

  1. node worker.js

Le travailleur démarrera:

Output
Worker started!

Visitez http://localhost:3000/ dans votre navigateur. Appuyez sur le bouton Choisir un fichier et sélectionnez le fichier underwater.png depuis votre ordinateur, puis appuyez sur le bouton Téléverser l’image.

Vous pouvez recevoir une réponse instantanée vous demandant de rafraîchir la page après quelques secondes:

Alternativement, vous pourriez recevoir une réponse instantanée avec quelques images traitées sur la page tandis que d’autres sont encore en cours de traitement:

Vous pouvez rafraîchir la page plusieurs fois pour charger toutes les images redimensionnées.

Retournez au terminal où votre travailleur s’exécute. Ce terminal affichera un message correspondant au suivant:

Output
Worker started! Starting job: processUploadedImages Finished job: processUploadedImages

La sortie confirme que bullmq a exécuté le travail avec succès.

Votre application peut toujours décharger des tâches longues même si le travailleur ne fonctionne pas. Pour le démontrer, arrêtez le travailleur dans le deuxième terminal avec CTRL+C.

Dans votre session de terminal initiale, arrêtez le serveur Express et supprimez les images dans public/images:

  1. rm public/images/*

Ensuite, démarrez à nouveau le serveur:

  1. node index.js

Dans votre navigateur, visitez http://localhost:3000/ et téléversez à nouveau l’image underwater.png. Lorsque vous êtes redirigé vers le chemin /result, les images ne s’afficheront pas sur la page car le travailleur ne fonctionne pas:

Retournez au terminal où vous avez exécuté le travailleur et démarrez à nouveau le travailleur:

  1. node worker.js

La sortie correspondra à ce qui suit, ce qui vous indique que le travail a commencé:

Output
Worker started! Starting job: processUploadedImages

Une fois le travail terminé et la sortie contient une ligne qui indique Travail terminé : processUploadedImages, rafraîchissez le navigateur. Les images se chargeront maintenant :

Arrêtez le serveur et le worker.

Vous pouvez maintenant déléguer une tâche intensive en temps à l’arrière-plan et l’exécuter de manière asynchrone à l’aide de bullmq. À l’étape suivante, vous configurerez un tableau de bord pour surveiller l’état de la file d’attente.

Étape 4 – Ajout d’un tableau de bord pour surveiller les files d’attente bullmq

À cette étape, vous utiliserez le package bull-board pour surveiller les tâches dans la file d’attente Redis depuis un tableau de bord visuel. Ce package créera automatiquement une interface utilisateur (UI) qui affiche et organise les informations sur les tâches bullmq stockées dans la file d’attente Redis. Vous pourrez surveiller les tâches terminées, en attente ou ayant échoué sans ouvrir l’interface de ligne de commande Redis dans le terminal avec votre navigateur.

Ouvrez le fichier index.js dans votre éditeur de texte :

  1. nano index.js

Ajoutez le code surligné pour importer bull-board :

image_processor/index.js
...
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
...

Dans le code précédent, vous importez la méthode createBullBoard() de bull-board. Vous importez également BullMQAdapter, qui permet à bull-board d’accéder aux files d’attente bullmq, et ExpressAdapter, qui fournit des fonctionnalités pour afficher le tableau de bord dans Express.

Ensuite, ajoutez le code surligné pour connecter bull-board à bullmq:

image_processor/index.js
...
async function addJob(job) {
  ...
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
...

Tout d’abord, vous définissez serverAdapter comme une instance de ExpressAdapter. Ensuite, vous appelez createBullBoard() pour initialiser le tableau de bord avec les données de la file d’attente bullmq. Vous passez à la fonction un objet en argument avec les propriétés queues et serverAdapter. La première propriété, queues, accepte un tableau des files d’attente que vous avez définies avec bullmq, qui est ici imageJobQueue. La deuxième propriété, serverAdapter, contient un objet qui accepte une instance de l’adaptateur de serveur Express. Ensuite, vous définissez le chemin /admin pour accéder au tableau de bord avec la méthode setBasePath().

Ensuite, ajoutez le middleware serverAdapter pour la route /admin:

image_processor/index.js
app.use(express.static("public"))

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  ...
});

Le fichier index.js complet correspondra au suivant:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);
app.use(fileUpload());

app.use(express.static("public"));

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: Buffer.from(image.data).toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

Une fois que vous avez terminé de faire des modifications, enregistrez et fermez votre fichier.

Exécutez le fichier index.js:

  1. node index.js

Revenez à votre navigateur et visitez http://localhost:3000/admin. Le tableau de bord se chargera.

Dans le tableau de bord, vous pouvez consulter le type de travail, les données qu’il consomme et plus d’informations sur le travail. Vous pouvez également basculer vers d’autres onglets, tels que l’onglet Terminé pour obtenir des informations sur les travaux terminés, l’onglet Échoué pour plus d’informations sur les travaux qui ont échoué, et l’onglet Suspendu pour plus d’informations sur les travaux qui ont été suspendus.

Vous pouvez maintenant utiliser le tableau de bord bull-board pour surveiller les files d’attente.

Conclusion

Dans cet article, vous avez déchargé une tâche intensive en temps sur une file d’attente de travaux à l’aide de bullmq. Tout d’abord, sans utiliser bullmq, vous avez créé une application avec une tâche intensive en temps ayant un cycle de demande/réponse lent. Ensuite, vous avez utilisé bullmq pour décharger la tâche intensive en temps et l’exécuter de manière asynchrone, ce qui accélère le cycle de demande/réponse. Ensuite, vous avez utilisé bull-board pour créer un tableau de bord afin de surveiller les files d’attente bullmq dans Redis.

Vous pouvez consulter la documentation de bullmq pour en savoir plus sur les fonctionnalités de bullmq non abordées dans ce tutoriel, telles que la planification, la priorisation ou la répétition des travaux, et la configuration des paramètres de concurrence pour les travailleurs. Vous pouvez également consulter la documentation de bull-board pour en savoir plus sur les fonctionnalités du tableau de bord.

Source:
https://www.digitalocean.com/community/tutorials/how-to-handle-asynchronous-tasks-with-node-js-and-bullmq