Come gestire compiti asincroni con Node.js e BullMQ

L’autore ha selezionato la Società degli Ingegneri Donne per ricevere una donazione come parte del programma Scrivi per Donazioni.

Introduzione

Le applicazioni web hanno cicli di richiesta/risposta. Quando si visita un URL, il browser invia una richiesta al server in esecuzione di un’app che elabora dati o esegue query nel database. Durante questo processo, l’utente resta in attesa fino a quando l’app restituisce una risposta. Per alcune operazioni, l’utente può ottenere una risposta rapidamente; per operazioni che richiedono molto tempo, come l’elaborazione delle immagini, l’analisi dei dati, la generazione di report o l’invio di email, queste operazioni richiedono molto tempo per essere completate e possono rallentare il ciclo di richiesta/risposta. Ad esempio, supponiamo che tu abbia un’applicazione in cui gli utenti caricano immagini. In questo caso, potresti aver bisogno di ridimensionare, comprimere o convertire l’immagine in un altro formato per conservare lo spazio su disco del tuo server prima di mostrare l’immagine all’utente. L’elaborazione di un’immagine è un’operazione intensiva per la CPU, che può bloccare un thread Node.js fino a quando l’operazione non è completata. Ciò potrebbe richiedere alcuni secondi o minuti. Gli utenti devono attendere che l’operazione sia completata per ricevere una risposta dal server.

Per evitare di rallentare il ciclo di richiesta/risposta, puoi utilizzare bullmq, una coda di attività (job) distribuita che ti consente di scaricare compiti che richiedono tempo dalla tua app Node.js a bullmq, liberando il ciclo di richiesta/risposta. Questo strumento consente alla tua app di inviare risposte agli utenti rapidamente mentre bullmq esegue i compiti in modo asincrono sullo sfondo e indipendentemente dalla tua app. Per tenere traccia dei compiti, bullmq utilizza Redis per memorizzare una breve descrizione di ogni compito in una coda. Un worker di bullmq quindi svuota la coda ed esegue ogni compito, contrassegnandolo come completato una volta finito.

In questo articolo, utilizzerai bullmq per scaricare un compito che richiede tempo sullo sfondo, il che consentirà a un’applicazione di rispondere rapidamente agli utenti. Inizierai creando un’applicazione con un compito che richiede tempo senza utilizzare bullmq. Successivamente, utilizzerai bullmq per eseguire il compito in modo asincrono. Infine, installerai un pannello di controllo visivo per gestire i compiti di bullmq in una coda Redis.

Prerequisiti

Per seguire questo tutorial, avrai bisogno dei seguenti:

Fase 1 – Configurazione della Directory del Progetto

In questa fase, creerai una directory e installerai le dipendenze necessarie per la tua applicazione. L’applicazione che costruirai in questo tutorial consentirà agli utenti di caricare un’immagine, che verrà poi elaborata utilizzando il pacchetto sharp. Il processo di elaborazione delle immagini richiede tempo e può rallentare il ciclo richiesta/risposta, rendendo questa attività un ottimo candidato per bullmq per spostarla in background. La tecnica che userai per spostare l’attività funzionerà anche per altre attività che richiedono molto tempo.

Per iniziare, crea una directory chiamata image_processor e naviga nella directory:

  1. mkdir image_processor && cd image_processor

Successivamente, inizializza la directory come un pacchetto npm:

  1. npm init -y

Il comando crea un file package.json. L’opzione -y indica a npm di accettare tutti i valori predefiniti.

Dopo aver eseguito il comando, l’output corrisponderà al seguente:

Output
Wrote to /home/sammy/image_processor/package.json: { "name": "image_processor", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC" }

L’output conferma che il file package.json è stato creato. Le proprietà importanti includono il nome della tua app (name), il numero di versione della tua applicazione (version) e il punto di partenza del tuo progetto (main). Se vuoi saperne di più sulle altre proprietà, puoi consultare la documentazione di package.json di npm.

L’applicazione che costruirai in questo tutorial richiederà le seguenti dipendenze:

  • express: un framework web per la costruzione di app web.
  • express-fileupload: un middleware che consente ai tuoi moduli di caricare file.
  • sharp: una libreria di elaborazione delle immagini.
  • ejs: un linguaggio di template che consente di generare markup HTML con Node.js.
  • bullmq: una coda di attività distribuita.
  • bull-board: una dashboard che si basa su bullmq e visualizza lo stato dei lavori con un’interfaccia utente (UI) piacevole.

Per installare tutte queste dipendenze, esegui il seguente comando:

  1. npm install express express-fileupload sharp ejs bullmq @bull-board/express

In aggiunta alle dipendenze che hai installato, utilizzerai anche l’immagine seguente più avanti in questo tutorial:

Usa curl per scaricare l’immagine nella posizione desiderata sul tuo computer locale

  1. curl -O https://deved-images.nyc3.cdn.digitaloceanspaces.com/CART-68886/underwater.png

Hai le dipendenze necessarie per costruire un’app Node.js che non ha bullmq, cosa che farai dopo.

Passaggio 2 — Implementazione di un compito intensivo in termini di tempo senza bullmq

In questo passaggio, costruirai un’applicazione con Express che permette agli utenti di caricare immagini. L’applicazione avvierà un compito intensivo in termini di tempo utilizzando sharp per ridimensionare l’immagine in diverse dimensioni, che verranno poi visualizzate all’utente dopo che viene inviata una risposta. Questo passaggio ti aiuterà a capire come i compiti intensivi in termini di tempo influenzano il ciclo richiesta/risposta.

Utilizzando nano, o il tuo editor di testo preferito, crea il file index.js:

  1. nano index.js

Nel tuo file index.js, aggiungi il seguente codice per importare le dipendenze:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

Nella prima riga, importi il modulo path per calcolare i percorsi dei file con Node. Nella seconda riga, importi il modulo fs per interagire con le directory. Poi importi il framework web express. Importi il modulo body-parser per aggiungere middleware per analizzare i dati nelle richieste HTTP. Successivamente, importi il modulo sharp per il trattamento delle immagini. Infine, importi express-fileupload per gestire gli upload da un modulo HTML.

Successivamente, aggiungi il seguente codice per implementare il middleware nella tua app:

image_processor/index.js
...
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

Prima, imposti la variabile app su un’istanza di Express. Successivamente, utilizzando la variabile app, il metodo set() configura Express per utilizzare il linguaggio di template ejs. Aggiungi poi il middleware del modulo body-parser con il metodo use() per trasformare i dati JSON nelle richieste HTTP in variabili accessibili con JavaScript. Nella linea successiva, fai lo stesso con l’input codificato nell’URL.

Successivamente, aggiungi le seguenti linee per aggiungere ulteriori middleware per gestire il caricamento di file e servire file statici:

image_processor/index.js
...
app.use(fileUpload());
app.use(express.static("public"));

Aggiungi un middleware per analizzare i file caricati chiamando il metodo fileUpload(), e imposta una directory in cui Express cercherà e servirà file statici, come immagini e CSS.

Con il middleware impostato, crea un percorso che visualizza un modulo HTML per caricare un’immagine:

image_processor/index.js
...
app.get("/", function (req, res) {
  res.render("form");
});

Qui, si utilizza il metodo get() del modulo Express per specificare il percorso / e la funzione di richiamo che deve essere eseguita quando l’utente visita la homepage o il percorso /. Nel richiamo, si invoca res.render() per renderizzare il file form.ejs nella directory views. Non hai ancora creato il file form.ejs o la directory views.

Per crearlo, prima salva e chiudi il tuo file. Nel terminale, inserisci il seguente comando per creare la directory views nella directory principale del tuo progetto:

  1. mkdir views

Sposta nella directory views:

  1. cd views

Crea il file form.ejs nel tuo editor:

  1. nano form.ejs

Nel tuo file form.ejs, aggiungi il seguente codice per creare il modulo:

image_processor/views/form.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="home-wrapper">
      <h1>Image Processor</h1>
      <p>
        Resizes an image to multiple sizes and converts it to a
        <a href="https://en.wikipedia.org/wiki/WebP">webp</a> format.
      </p>
      <form action="/upload" method="POST" enctype="multipart/form-data">
        <input
          type="file"
          name="image"
          placeholder="Select image from your computer"
        />
        <button type="submit">Upload Image</button>
      </form>
    </div>
  </body>
</html>

Prima di tutto, fai riferimento al file head.ejs, che non hai ancora creato. Il file head.ejs conterrà l’elemento HTML head a cui potrai fare riferimento in altre pagine HTML.

Nel tag body, crea un modulo con gli attributi seguenti:

  • action specifica il percorso a cui devono essere inviati i dati del modulo quando viene inviato.
  • method specifica il metodo HTTP per l’invio dei dati. Il metodo POST incorpora i dati in una richiesta HTTP.
  • encytype specifica come devono essere codificati i dati del modulo. Il valore multipart/form-data abilita gli elementi HTML input per caricare dati di file.

Nell’elemento form, crea un tag input per caricare i file. Quindi definisci l’elemento button con l’attributo type impostato su submit, che ti consente di inviare i moduli.

Una volta terminato, salva e chiudi il tuo file.

Successivamente, crea un file head.ejs:

  1. nano head.ejs

Nel tuo file head.ejs, aggiungi il seguente codice per creare la sezione dell’intestazione dell’app:

image_processor/views/head.ejs
<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=edge" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Image Processor</title>
  <link rel="stylesheet" href="css/main.css" />
</head>

Qui fai riferimento al file main.css, che creerai nella directory public più avanti in questo passaggio. Quel file conterrà gli stili per questa applicazione. Per ora, continuerai a impostare i processi per gli asset statici.

Salva e chiudi il file.

Per gestire i dati inviati dal modulo, è necessario definire un metodo post in Express. Per farlo, torna alla directory principale del tuo progetto:

  1. cd ..

Apri nuovamente il file `index.js`:

  1. nano index.js

Nel tuo file `index.js`, aggiungi le righe evidenziate per definire un metodo per gestire le presentazioni dei moduli sulla rotta `/upload`:

image_processor/index.js
app.get("/", function (req, res) {
  ...
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

});

Utilizzi la variabile `app` per chiamare il metodo `post()`, che gestirà il modulo inviato sulla rotta `/upload`. Successivamente, estrai i dati dell’immagine caricata dalla richiesta HTTP nella variabile `image`. Dopo di ciò, imposti una risposta per restituire un codice di stato `400` se l’utente non carica un’immagine.

Per impostare il processo per l’immagine caricata, aggiungi il seguente codice evidenziato:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
});

Queste righe rappresentano come la tua app elaborerà l’immagine. Prima, rimuovi l’estensione dell’immagine dall’immagine caricata e salva il nome nella variabile `imageName`. Successivamente, definisci la funzione `processImage()`. Questa funzione prende il parametro `size`, il cui valore verrà utilizzato per determinare le dimensioni dell’immagine durante il ridimensionamento. Nella funzione, invochi `sharp()` con `image.data`, che è un buffer contenente i dati binari dell’immagine caricata. `Sharp` ridimensiona l’immagine in base al valore nel parametro `size`. Utilizzi il metodo `webp()` di `sharp` per convertire l’immagine nel formato di immagine `webp`. Poi, salvi l’immagine nella directory `public/images/`.

La successiva lista di numeri definisce le dimensioni che verranno utilizzate per ridimensionare l’immagine caricata. Quindi si utilizza il metodo map() di JavaScript per invocare processImage() per ogni elemento nell’array sizes, dopodiché restituirà un nuovo array. Ogni volta che il metodo map() chiama la funzione processImage(), restituisce una promise al nuovo array. Si utilizza il metodo Promise.all() per risolverli.

Le velocità di elaborazione del computer variano, così come le dimensioni delle immagini che un utente può caricare, il che potrebbe influenzare la velocità di elaborazione delle immagini. Per ritardare questo codice a scopo dimostrativo, inserire le linee evidenziate per aggiungere un loop di incremento intensivo della CPU e un reindirizzamento a una pagina che visualizzerà le immagini ridimensionate con le linee evidenziate:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  ...
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

Il loop verrà eseguito 10 miliardi di volte per incrementare la variabile counter. Si invoca la funzione res.redirect() per reindirizzare l’app al percorso /result. Il percorso renderà una pagina HTML che visualizzerà le immagini nella directory public/images.

Il percorso /result non esiste ancora. Per crearlo, aggiungere il codice evidenziato nel file index.js:

image_processor/index.js
...

app.get("/", function (req, res) {
 ...
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  ...
});

Si definisce il percorso /risultato con il metodo app.get(). Nella funzione, si definisce la variabile imgDirPath con il percorso completo alla cartella public/images. Si utilizza il metodo readdirSync() del modulo fs per leggere tutti i file nella directory specificata. Da qui, si concatena il metodo map() per restituire un nuovo array con i percorsi delle immagini preceduti da images/.

Infine, si chiama res.render() per visualizzare il file result.ejs, che ancora non esiste. Si passa la variabile imgFiles, che contiene un array con tutti i percorsi relativi delle immagini, al file result.ejs.

Salva e chiudi il file.

Per creare il file result.ejs, torna alla cartella views:

  1. cd views

Crea e apri il file result.ejs nel tuo editor:

  1. nano result.ejs

Nel file result.ejs, aggiungi le seguenti righe per visualizzare le immagini:

image_processor/views/result.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="gallery-wrapper">
      <% if (imgFiles.length > 0){%>
      <p>The following are the processed images:</p>
      <ul>
        <% for (let imgFile of imgFiles){ %>
        <li><img src=<%= imgFile %> /></li>
        <% } %>
      </ul>
      <% } else{ %>
      <p>
        The image is being processed. Refresh after a few seconds to view the
        resized images.
      </p>
      <% } %>
    </div>
  </body>
</html>

Prima di tutto, si fa riferimento al file head.ejs. Nel tag body, si controlla se la variabile imgFiles è vuota. Se contiene dati, si itera su ogni file e si crea un’immagine per ogni elemento dell’array. Se imgFiles è vuota, viene visualizzato un messaggio che invita l’utente a Aggiorna dopo qualche secondo per visualizzare le immagini ridimensionate..

Salva e chiudi il file.

Successivamente, torna alla directory principale e crea la directory public che conterrà le tue risorse statiche.

  1. cd .. && mkdir public

Sposta nella directory public:

  1. cd public

Crea una directory images che conterrà le immagini caricate:

  1. mkdir images

Successivamente, crea la directory css e navigaci all’interno:

  1. mkdir css && cd css

Nel tuo editor, crea e apri il file main.css, che hai precedentemente richiamato nel file head.ejs:

  1. nano main.css

Nel tuo file main.css, aggiungi i seguenti stili:

image_processor/public/css/main.css
body {
  background: #f8f8f8;
}

h1 {
  text-align: center;
}

p {
  margin-bottom: 20px;
}

a:link,
a:visited {
  color: #00bcd4;
}

/** Stili per il pulsante "Scegli file" **/
button[type="submit"] {
  background: none;
  border: 1px solid orange;
  padding: 10px 30px;
  border-radius: 30px;
  transition: all 1s;
}

button[type="submit"]:hover {
  background: orange;
}

/** Stili per il pulsante "Carica immagine" **/
input[type="file"]::file-selector-button {
  border: 2px solid #2196f3;
  padding: 10px 20px;
  border-radius: 0.2em;
  background-color: #2196f3;
}

ul {
  list-style: none;
  padding: 0;
  display: flex;
  flex-wrap: wrap;
  gap: 20px;
}

.home-wrapper {
  max-width: 500px;
  margin: 0 auto;
  padding-top: 100px;
}

.gallery-wrapper {
  max-width: 1200px;
  margin: 0 auto;
}

Queste righe stilizzeranno gli elementi nell’app. Utilizzando gli attributi HTML, stili il background del pulsante Scegli File con il codice esadecimale #2196f3 (una tonalità di blu) e il bordo del pulsante Carica Immagine con arancione. Stilizzi anche gli elementi sul percorso /result per renderli più presentabili.

Una volta finito, salva e chiudi il tuo file.

Torna alla directory radice del progetto:

  1. cd ../..

Apri index.js nel tuo editor:

  1. nano index.js

Nel tuo index.js, aggiungi il seguente codice, che avvierà il server:

image_processor/index.js
...
app.listen(3000, function () {
  console.log("Server running on port 3000");
});

Il file index.js completo ora corrisponderà al seguente:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

app.use(fileUpload());

app.use(express.static("public"));

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

Una volta completate le modifiche, salva e chiudi il tuo file.

Esegui l’applicazione utilizzando il comando node:

  1. node index.js

Riceverai un output simile a questo:

Output
Server running on port 3000

Questo output conferma che il server è in esecuzione senza problemi.

Apri il tuo browser preferito e visita http://localhost:3000/.

Nota: Se stai seguendo il tutorial su un server remoto, puoi accedere all’app nel tuo browser locale utilizzando il port forwarding.

Mentre il server Node.js è in esecuzione, apri un’altra terminale e inserisci il seguente comando:

  1. ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip

Una volta connesso al server, esegui node index.js e quindi vai su http://localhost:3000/ sul browser web della tua macchina locale.

Quando la pagina si carica, verrà visualizzato quanto segue:

Successivamente, premi il pulsante Scegli File e seleziona l’immagine underwater.png sulla tua macchina locale. La visualizzazione passerà da Nessun file selezionato a underwater.png. Dopo di ciò, premi il pulsante Carica Immagine. L’applicazione si caricherà per un po’ mentre elabora l’immagine ed esegue il ciclo di incremento.

Una volta completato il compito, verrà caricato il percorso /result con le immagini ridimensionate:

Ora puoi fermare il server con CTRL+C. Node.js non ricarica automaticamente il server quando i file vengono modificati, quindi dovrai fermare e riavviare il server ogni volta che aggiorni i file.

Ora sai come un compito intensivo in termini di tempo può influenzare il ciclo di richiesta/risposta di un’applicazione. Eseguirai il compito in modo asincrono dopo.

Passaggio 3 — Esecuzione delle attività intensive in termini di tempo in modo asincrono con bullmq

In questo passaggio, scaricherai un’attività intensiva in termini di tempo in background utilizzando bullmq. Questo adeguamento libererà il ciclo richiesta/risposta e consentirà alla tua app di rispondere immediatamente agli utenti mentre l’immagine viene elaborata.

Per fare ciò, è necessario creare una descrizione sintetica del lavoro e aggiungerla a una coda con bullmq. Una coda è una struttura dati che funziona in modo simile a come funziona una coda nella vita reale. Quando le persone si mettono in fila per entrare in uno spazio, la prima persona in fila sarà la prima a entrare nello spazio. Chiunque arrivi successivamente si metterà in fila alla fine della fila e entrerà nello spazio dopo tutti coloro che li precedono nella fila fino a quando l’ultimo entrerà nello spazio. Con il processo First-In, First-Out (FIFO) della struttura dati della coda, il primo elemento aggiunto alla coda è il primo elemento da rimuovere (dequeue). Con bullmq, un produttore aggiungerà un lavoro in una coda, e un consumatore (o lavoratore) rimuoverà un lavoro dalla coda ed eseguirà.

La coda in bullmq è in Redis. Quando si descrive un lavoro e lo si aggiunge alla coda, viene creata un’entrata per il lavoro in una coda Redis. La descrizione di un lavoro può essere una stringa o un oggetto con proprietà che contengono dati minimi o riferimenti ai dati che consentiranno a bullmq di eseguire il lavoro in seguito. Una volta definita la funzionalità per aggiungere lavori alla coda, si sposta il codice intensivo in termini di tempo in una funzione separata. Successivamente, bullmq chiamerà questa funzione con i dati memorizzati nella coda quando il lavoro viene rimosso dalla coda. Una volta che il compito è terminato, bullmq lo contrassegnerà come completato, estrarrà un altro lavoro dalla coda e lo eseguirà.

Apri index.js nel tuo editor:

  1. nano index.js

Nel tuo file index.js, aggiungi le linee evidenziate per creare una coda in Redis con bullmq:

image_processor/index.js
...
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}
...

Inizi estraendo la classe Queue da bullmq, che viene utilizzata per creare una coda in Redis. Imposti quindi la variabile redisOptions su un oggetto con proprietà che l’istanza della classe Queue utilizzerà per stabilire una connessione con Redis. Imposti il valore della proprietà host su localhost perché Redis è in esecuzione sul tuo computer locale.

Nota: Se Redis fosse in esecuzione su un server remoto separato dalla tua app, aggiornerebbe il valore della proprietà host con l’indirizzo IP del server remoto. Imposti anche il valore della proprietà port su 6379, la porta predefinita che Redis utilizza per ascoltare le connessioni.

Se hai configurato il forwarding della porta per un server remoto in esecuzione Redis e l’applicazione insieme, non è necessario aggiornare la proprietà host, ma dovrai utilizzare la connessione del forwarding della porta ogni volta che accedi al tuo server per eseguire l’applicazione.

Successivamente, imposti la variabile imageJobQueue su un’istanza della classe Queue, prendendo il nome della coda come primo argomento e un oggetto come secondo argomento. L’oggetto ha una proprietà connection con il valore impostato su un oggetto nella variabile redisOptions. Dopo aver istanziato la classe Queue, verrà creata una coda chiamata imageJobQueue in Redis.

Infine, definisci la funzione addJob() che utilizzerai per aggiungere un lavoro nella coda imageJobQueue. La funzione prende un parametro job contenente le informazioni sul lavoro (chiamerai la funzione addJob() con i dati che desideri salvare in una coda). Nella funzione, invochi il metodo add() della coda imageJobQueue, prendendo il nome del lavoro come primo argomento e i dati del lavoro come secondo argomento.

Aggiungi il codice evidenziato per chiamare la funzione addJob() per aggiungere un lavoro nella coda:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  ...
  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

Qui, chiami la funzione addJob() con un oggetto che descrive il lavoro. L’oggetto ha l’attributo type con un valore pari al nome del lavoro. La seconda proprietà, image, è impostata su un oggetto che contiene i dati dell’immagine caricati dall’utente. Poiché i dati dell’immagine in image.data sono in un buffer (forma binaria), viene invocato il metodo toString() di JavaScript per convertirli in una stringa che può essere memorizzata in Redis, che imposterà la proprietà data come risultato. La proprietà image viene impostata sul nome dell’immagine caricata (compresa l’estensione dell’immagine).

Ora hai definito le informazioni necessarie affinché bullmq esegua questo lavoro in seguito. A seconda del lavoro, potresti aggiungere o meno ulteriori informazioni sul lavoro.

Avviso: Poiché Redis è un database in memoria, evita di memorizzare grandi quantità di dati per i lavori nella coda. Se hai un file di grandi dimensioni che un lavoro deve elaborare, salva il file sul disco o nel cloud, quindi salva il collegamento al file come stringa nella coda. Quando bullmq esegue il lavoro, recupererà il file dal collegamento salvato in Redis.

Salva e chiudi il file.

Successivamente, crea e apri il file utils.js che conterrà il codice di elaborazione dell’immagine:

  1. nano utils.js

Nel tuo file utils.js, aggiungi il seguente codice per definire la funzione di elaborazione di un’immagine:

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
}

module.exports = { processUploadedImages };

Importi i moduli necessari per elaborare le immagini e calcolare i percorsi nelle prime due righe. Quindi definisci la funzione processUploadedImages(), che conterrà il compito di elaborazione delle immagini che richiede molto tempo. Questa funzione prende un parametro job che verrà popolato quando il lavoratore recupera i dati del lavoro dalla coda e poi invoca la funzione processUploadedImages() con i dati della coda. Esporti anche la funzione processUploadedImages() in modo che tu possa fare riferimento ad essa in altri file.

Salva e chiudi il tuo file.

Torna al file index.js:

  1. nano index.js

Copia le righe evidenziate dal file index.js, quindi cancellale da questo file. Avrai bisogno del codice copiato tra poco, quindi salvatelo negli appunti. Se stai usando nano, puoi evidenziare queste righe e fare clic destro con il mouse per copiarle:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage))
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
...
  res.redirect("/result");
});

Il metodo post per il percorso upload ora corrisponderà al seguente:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

Salva e chiudi questo file, quindi apri il file utils.js:

  1. nano utils.js

Nel tuo file utils.js, incolla le righe che hai appena copiato per il callback del percorso /upload nella funzione processUploadedImages:

image_processor/utils.js
...
function processUploadedImages(job) {
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}
...

Ora che hai spostato il codice per elaborare un’immagine, devi aggiornarlo per utilizzare i dati dell’immagine dal parametro job della funzione processUploadedImages() che hai definito in precedenza.

Per fare ciò, aggiungi e aggiorna le righe evidenziate di seguito:

image_processor/utils.js

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);
  ...
}

Converti la versione stringa dei dati dell’immagine di nuovo in binario con il metodo Buffer.from(). Poi aggiorna path.parse() con un riferimento al nome dell’immagine salvato nella coda. Dopo di ciò, aggiorna il metodo sharp() per prendere i dati binari dell’immagine memorizzati nella variabile imageFileData.

Il file utils.js completo ora corrisponderà al seguente:

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}

module.exports = { processUploadedImages };

Salva e chiudi il tuo file, quindi torna al file index.js:

  1. nano index.js

La variabile sharp non è più necessaria come dipendenza poiché l’immagine viene ora elaborata nel file utils.js. Elimina la riga evidenziata dal file:

image_processor/index.js
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
...

Salva e chiudi il tuo file.

Hai ora definito la funzionalità per creare una coda in Redis e aggiungere un lavoro. Hai anche definito la funzione processUploadedImages() per elaborare le immagini caricate.

Il compito rimanente è creare un consumer (o worker) che estrarrà un lavoro dalla coda e chiamerà la funzione processUploadedImages() con i dati del lavoro.

Crea un file worker.js nel tuo editor:

  1. nano worker.js

Nel tuo file worker.js, aggiungi il seguente codice:

image_processor/worker.js
const { Worker } = require("bullmq");

const { processUploadedImages } = require("./utils");

const workerHandler = (job) => {
  console.log("Starting job:", job.name);
  processUploadedImages(job.data);
  console.log("Finished job:", job.name);
  return;
};

Nella prima riga, importi la classe Worker da bullmq; quando istanziata, questa avvierà un worker che estrae i lavori dalla coda in Redis ed esegue l’elaborazione. Successivamente, fai riferimento alla funzione processUploadedImages() dal file utils.js in modo che il worker possa chiamare la funzione con i dati nella coda.

Defini una funzione workerHandler() che accetta un parametro job contenente i dati del lavoro nella coda. All’interno della funzione, registri che il lavoro è iniziato, quindi invoca processUploadedImages() con i dati del lavoro. Successivamente, registri un messaggio di successo e restituisci null.

Per consentire al lavoratore di connettersi a Redis, rimuovi un lavoro dalla coda e chiama la funzione workerHandler() con i dati del lavoro, aggiungi le seguenti righe al file:

image_processor/worker.js
...
const workerOptions = {
  connection: {
    host: "localhost",
    port: 6379,
  },
};

const worker = new Worker("imageJobQueue", workerHandler, workerOptions);

console.log("Worker started!");

Qui, imposti la variabile workerOptions su un oggetto contenente le impostazioni di connessione di Redis. Imposti la variabile worker su un’istanza della classe Worker che prende i seguenti parametri:

  • imageJobQueue: il nome della coda di lavoro.
  • workerHandler: la funzione che verrà eseguita dopo che un lavoro è stato rimosso dalla coda Redis.
  • workerOptions: le impostazioni di configurazione di Redis che il lavoratore utilizza per stabilire una connessione con Redis.

Infine, registri un messaggio di successo.

Dopo aver aggiunto le righe, salva e chiudi il tuo file.

Hai ora definito la funzionalità del lavoratore bullmq per rimuovere lavori dalla coda ed eseguirli.

Nel tuo terminale, rimuovi le immagini dalla directory public/images per iniziare da zero per testare la tua app:

  1. rm public/images/*

Successivamente, esegui il file index.js:

  1. node index.js

L’applicazione partirà:

Output
Server running on port 3000

Ora avvierai il lavoratore. Apri una seconda sessione del terminale e vai alla directory del progetto:

  1. cd image_processor/

Avvia il lavoratore con il seguente comando:

  1. node worker.js

Il lavoratore si avvierà:

Output
Worker started!

Visita http://localhost:3000/ nel tuo browser. Premi il pulsante Scegli File e seleziona il underwater.png dal tuo computer, poi premi il pulsante Carica Immagine.

Potresti ricevere una risposta istantanea che ti chiede di aggiornare la pagina dopo alcuni secondi:

In alternativa, potresti ricevere una risposta istantanea con alcune immagini elaborate sulla pagina mentre altre sono ancora in fase di elaborazione:

Puoi aggiornare la pagina diverse volte per caricare tutte le immagini ridimensionate.

Torna al terminale dove il tuo lavoratore è in esecuzione. Quel terminale avrà un messaggio che corrisponde al seguente:

Output
Worker started! Starting job: processUploadedImages Finished job: processUploadedImages

L’output conferma che bullmq ha eseguito il lavoro con successo.

La tua app può ancora delegare compiti che richiedono molto tempo anche se il lavoratore non è in esecuzione. Per dimostrarlo, interrompi il lavoratore nel secondo terminale con CTRL+C.

Nella tua sessione iniziale del terminale, interrompi il server Express e rimuovi le immagini in public/images:

  1. rm public/images/*

Dopo di che, avvia nuovamente il server:

  1. node index.js

Nel tuo browser, visita http://localhost:3000/ e carica di nuovo l’immagine underwater.png. Quando sarai reindirizzato al percorso /risultato, le immagini non verranno visualizzate sulla pagina perché il lavoratore non è in esecuzione:

Torna al terminale dove hai eseguito il lavoratore e avvia nuovamente il lavoratore:

  1. node worker.js

L’output corrisponderà al seguente, che ti avvisa che il lavoro è stato avviato:

Output
Worker started! Starting job: processUploadedImages

Dopo che il lavoro è stato completato e l’output include una riga che dice Finito lavoro: processUploadedImages, aggiorna il browser. Le immagini verranno ora caricate:

Ferma il server e il worker.

Ora puoi delegare un compito intensivo in termini di tempo allo sfondo ed eseguirlo in modo asincrono utilizzando bullmq. Nel prossimo passaggio, configurerai una dashboard per monitorare lo stato della coda.

Passaggio 4 — Aggiunta di una Dashboard per Monitorare le Code bullmq

In questo passaggio, utilizzerai il pacchetto bull-board per monitorare i lavori nella coda Redis da una dashboard visiva. Questo pacchetto creerà automaticamente un’interfaccia utente (UI) che visualizza e organizza le informazioni sui lavori bullmq memorizzati nella coda Redis. Utilizzando il tuo browser, puoi monitorare i lavori completati, in attesa o falliti senza aprire la CLI Redis nel terminale.

Apri il file index.js nel tuo editor di testo:

  1. nano index.js

Aggiungi il codice evidenziato per importare bull-board:

image_processor/index.js
...
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
...

Nel codice precedente, importi il metodo createBullBoard() da bull-board. Importi anche BullMQAdapter, che consente l’accesso di bull-board alle code di bullmq, e ExpressAdapter, che fornisce funzionalità per Express per visualizzare il pannello.

Successivamente, aggiungi il codice evidenziato per collegare bull-board con bullmq:

image_processor/index.js
...
async function addJob(job) {
  ...
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
...

Prima, imposti il serverAdapter su un’istanza di ExpressAdapter. Successivamente, invochi createBullBoard() per inizializzare il pannello con i dati della coda di bullmq. Passi alla funzione un argomento di tipo oggetto con le proprietà queues e serverAdapter. La prima proprietà, queues, accetta un array delle code che hai definito con bullmq, che è la imageJobQueue qui. La seconda proprietà, serverAdapter, contiene un oggetto che accetta un’istanza dell’adattatore del server Express. Dopo di ciò, imposti il percorso /admin per accedere al pannello con il metodo setBasePath().

Successivamente, aggiungi il middleware serverAdapter per il percorso /admin:

image_processor/index.js
app.use(express.static("public"))

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  ...
});

Il file index.js completo sarà simile al seguente:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);
app.use(fileUpload());

app.use(express.static("public"));

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: Buffer.from(image.data).toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

Dopo aver terminato di apportare le modifiche, salva e chiudi il file.

Esegui il file index.js:

  1. node index.js

Torna al tuo browser e visita http://localhost:3000/admin. Il pannello si caricherà:

Nel cruscotto, puoi rivedere il tipo di lavoro, i dati che utilizza e ulteriori informazioni sul lavoro. Puoi anche passare ad altre schede, come la scheda Completati per informazioni sui lavori completati, la scheda Falliti per ulteriori dettagli sui lavori falliti e la scheda In pausa per ulteriori informazioni sui lavori in pausa.

Ora puoi utilizzare il cruscotto bull-board per monitorare le code.

Conclusione

In questo articolo, hai spostato un compito intensivo in termini di tempo in una coda di lavoro usando bullmq. Inizialmente, senza utilizzare bullmq, hai creato un’app con un compito intensivo in termini di tempo che ha un ciclo di richiesta/risposta lento. Successivamente, hai utilizzato bullmq per spostare il compito intensivo in termini di tempo ed eseguirlo in modo asincrono, migliorando il ciclo di richiesta/risposta. Dopo di che, hai utilizzato bull-board per creare un cruscotto per monitorare le code bullmq in Redis.

Puoi visitare la documentazione di bullmq per apprendere ulteriori dettagli sulle funzionalità di bullmq non trattate in questo tutorial, come la pianificazione, la prioritizzazione o il ripetere dei lavori, e la configurazione delle impostazioni di concorrenza per i lavoratori. Puoi anche visitare la documentazione di bull-board per conoscere ulteriori dettagli sulle funzionalità del cruscotto.

Source:
https://www.digitalocean.com/community/tutorials/how-to-handle-asynchronous-tasks-with-node-js-and-bullmq