כיצד לטפל במשימות אסינכרוניות עם Node.js ו־BullMQ

הסופר בחר את חברת המהנדסות של נשים לקבלת תרומה כחלק מתוכנית כתוב כדי לתרום.

הקדמה

יישומי האינטרנט עוברים במחזורי בקשה/תגובה. כאשר אתה מבקר ב-URL, הדפדפן שולח בקשה לשרת הפועל יישום שמעבד נתונים או מפעיל שאילתות במסד הנתונים. במהלך זה, המשתמש ממתין עד שהיישום מחזיר תגובה. לפעמים, המשתמש יכול לקבל תגובה במהירות; למשימות הדורשות זמן רב כמו עיבוד תמונות, ניתוח נתונים, יצירת דוחות או שליחת דואר אלקטרוני, אלו משימות שנדרש זמן רב כדי להשלים ויכולות לאט את מחזור הבקשה/תגובה. לדוגמה, נניח שיש לך אפליקציה שבה משתמשים מעלים תמונות. במקרה כזה, עשוי להיות צורך לשנות את גודל התמונה, לדחוס אותה או להמירה לתבנית אחרת כדי לשמור על מקום הדיסק של השרת שלך לפני שמציגים את התמונה למשתמש. עיבוד תמונה הוא משימה אינטנסיבית מבחינת יחידת המעבד המרכזית (CPU), שעשוי לחסום תהליך חוט של Node.js עד לסיום המשימה. זה עשוי לקחת מספר שניות או דקות. המשתמשים חייבים להמתין עד שהמשימה תסתיים כדי לקבל תגובה מהשרת.

כדי למנוע האטה במחזור בקשה/תגובה, ניתן להשתמש ב־bullmq, שהוא תור משימות (עבודה) מבוזר שמאפשר לך להעביר משימות ארוכות מחזור מיושן מיישום ה־Node.js שלך אל bullmq, משחרר את מחזור הבקשה/תגובה. כלי זה מאפשר לאפליקציה שלך לשלוח תגובות למשתמש מהר בעוד ש־bullmq מפעיל את המשימות אסינכרונית ברקע ובאופן תלוי עצמאית מהאפליקציה שלך. כדי לעקוב אחר העבודות, bullmq משתמש ב־Redis לאחסון תיאור קצר של כל משימה בתור. עובד bullmq מסיר את המשימות מהתור ומבצע כל משימה בתור, סומן אותה כשהיא נגמרת.

במאמר זה, תשתמש ב־bullmq כדי להעביר משימה ארוכת זמן אל הרקע, מה שיאפשר לאפליקציה להגיב במהירות למשתמשים. תתחיל ראשית על ידי יצירת אפליקציה עם משימה ארוכת זמן מבלי להשתמש ב־bullmq. לאחר מכן, תשתמש ב־bullmq כדי לבצע את המשימה אסינכרונית. לבסוף, תתקין לוח מחוונים חזותי כדי לנהל את העבודות של bullmq בתור של Redis.

דרישות מוקדמות

כדי לעקוב אחרי המדריך הזה, תצטרך את הפריטים הבאים:

שלב 1 — הגדרת ספריית הפרוייקט

בשלב זה, תיצור ספרייה ותתקין את התלויות הדרושות ליישום שלך. היישום שתבנה במדריך זה יאפשר למשתמשים להעלות תמונה, שתעבוד באמצעות החבילה sharp. עיבוד התמונות מחייב זמן רב ויכול להאט את מחזור הבקשה/התגובה, ולכן זו משימה טובה לשימוש ב־bullmq כדי להעביר אותה לרקע. הטכניקה שתשתמש בה להעברת המשימה תתאים גם למשימות אחרות שמחייבות זמן רב.

להתחיל, צור ספרייה בשם image_processor ונווט אל הספרייה:

  1. mkdir image_processor && cd image_processor

לאחר מכן, אתחל את הספרייה כחבילת npm:

  1. npm init -y

הפקודה תיצור קובץ package.json. אפשרות ה־-y מודיעה ל־npm לקבל את כל הברירות המחדליות.

לאחר הרצת הפקודה, הפלט שלך יתאים לכך:

Output
Wrote to /home/sammy/image_processor/package.json: { "name": "image_processor", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC" }

הפלט מאשר כי הקובץ package.json נוצר. מאפיינים חשובים כוללים את שם האפליקציה (name), מספר גרסת האפליקציה (version), ונקודת ההתחלה של הפרוייקט (main). אם ברצונך ללמוד עוד על המאפיינים האחרים, תוכל לקרוא את מסמך ההסבר של npm על package.json.

היישום שתצור במדריך זה יחייב את התלויות הבאות:

  • express: סביבת עבודה לבניית אפליקציות אינטרנט.
  • express-fileupload: תוסף שמאפשר לטפסים שלך להעלות קבצים.
  • sharp: ספריית עיבוד תמונות.
  • ejs: שפת תבניות שמאפשרת לך ליצור סימוני HTML עם Node.js.
  • bullmq: תור משימות מבוזר.
  • bull-board: לוח מחוונים המבוסס על bullmq ומציג את מצב המשימות עם ממשק משתמש ידידותי.

כדי להתקין את כל התלויות הללו, הריצו את הפקודה הבאה:

  1. npm install express express-fileupload sharp ejs bullmq @bull-board/express

בנוסף לתלויות שהתקנת, תשתמש גם בתמונה הבאה במדריך:

השתמש ב־curl כדי להוריד את התמונה למיקום שבחרת על מחשבך המקומי

  1. curl -O https://deved-images.nyc3.cdn.digitaloceanspaces.com/CART-68886/underwater.png

יש לך את התלויות הדרושות לבניית אפליקציית Node.js שאין בה bullmq, שתבנה בשלב הבא.

שלב 2 — מממש משימה זמן-מאמץ ללא bullmq

בשלב זה, תבנה אפליקציה עם Express שמאפשרת למשתמשים להעלות תמונות. האפליקציה תתחיל משימה זמן-מאמץ באמצעות sharp לשנות את גודל התמונה למספר גדלים שונים, שיתצוגו למשתמש לאחר שנשלח תגובה. שלב זה יעזור לך להבין איך משימות זמן-מאמץ משפיעות על מחזור הבקשות/תגובות.

באמצעות nano, או העורך הטקסט המועדף עליך, צור את קובץ ה-index.js:

  1. nano index.js

בקובץ ה-index.js שלך, הוסף את הקוד הבא כדי לייבא תלות:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

בשורה הראשונה, אתה מייבא את מודול ה-path לחישוב נתיבי קבצים עם Node. בשורה השנייה, אתה מייבא את מודול ה-fs לפעולות בספריות. לאחר מכן, אתה מייבא את הגישה הרשתית Express. אתה מייבא את מודול ה-body-parser כדי להוסיף Middleware לפענוח נתונים בבקשות HTTP. לאחר מכן, אתה מייבא את המודול sharp לעיבוד תמונות. לבסוף, אתה מייבא את express-fileupload לטיפול בהעלאות מטופס HTML.

לבסוף, הוסף את הקוד הבא כדי לממש Middleware באפליקציה שלך:

image_processor/index.js
...
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

ראשית, אתה מגדיר את המשתנה app למופע של Express. שנית, באמצעות המשתנה app, שיטת ה-set() מגדירה את Express להשתמש בשפת התבניות ejs. לאחר מכן אתה מוסיף את middleware של מודול body-parser באמצעות השיטה use() כדי להמיר נתוני JSON בבקשות HTTP למשתנים שניתן לגשת אליהם בעזרת JavaScript. בשורה הבאה, אתה עושה את אותו הדבר עם קלט מקודד ב-URL.

לבסוף, הוסף את השורות הבאות כדי להוסיף middleware נוספת לטיפול בהעלאת קבצים ושירות קבצים סטטיים:

image_processor/index.js
...
app.use(fileUpload());
app.use(express.static("public"));

אתה מוסיף middleware כדי לנתח קבצים שהועלו על ידי קריאה לשיטת fileUpload(), ואתה מגדיר ספרייה שבה Express יחפש וישרת קבצים סטטיים, כגון תמונות ו-CSS.

עם ה-middleware מוגדר, צור מסלול שמציג טופס HTML להעלאת תמונה:

image_processor/index.js
...
app.get("/", function (req, res) {
  res.render("form");
});

כאן, אתה משתמש בשיטת get() של מודול Express כדי לציין את המסלול / והקולבק שצריך להריץ כאשר המשתמש מבקר בדף הבית או במסלול /. בקולבק, אתה קורא ל-res.render() כדי לעשות תבנית של קובץ form.ejs בתיקיית views. עדיין לא יצרת את קובץ form.ejs או את תיקיית views.

כדי ליצור אותם, ראשית, שמור וסגור את הקובץ שלך. בטרמינל שלך, הזן את הפקודה הבאה כדי ליצור את תיקיית views בתיקיית השורש של הפרויקט שלך:

  1. mkdir views

העבור לתיקיית views:

  1. cd views

צור את קובץ ה-form.ejs בעורך שלך:

  1. nano form.ejs

בקובץ ה-form.ejs שלך, הוסף את הקוד הבא כדי ליצור את הטופס:

image_processor/views/form.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="home-wrapper">
      <h1>Image Processor</h1>
      <p>
        Resizes an image to multiple sizes and converts it to a
        <a href="https://en.wikipedia.org/wiki/WebP">webp</a> format.
      </p>
      <form action="/upload" method="POST" enctype="multipart/form-data">
        <input
          type="file"
          name="image"
          placeholder="Select image from your computer"
        />
        <button type="submit">Upload Image</button>
      </form>
    </div>
  </body>
</html>

ראשית, אתה מתייחס לקובץ head.ejs, שעדיין לא יצרת. בקובץ head.ejs יהיה רכיב HTML של head שתוכל להתייחס אליו בדפים HTML אחרים.

בתוך תגית body, אתה צריך ליצור טופס עם המאפיינים הבאים:

  • action מציין את הנתיב אליו יש לשלוח נתוני הטופס כאשר הטופס מוגש.
  • method מציין את שיטת ה-HTTP לשליחת הנתונים. שיטת POST משבצת את הנתונים בבקשת HTTP.
  • encytype מציין כיצד יש לקודד את נתוני הטופס. הערך multipart/form-data מאפשר לרכיבי ה-HTML input להעלות נתוני קובץ.

בתוך הרכיב form, אתה צריך ליצור רכיב input להעלאת קבצים. לאחר מכן, עליך להגדיר את הרכיב button עם המאפיין type המוגדר ל-submit, שמאפשר לך להגיש טפסים.

כאשר סיימת, שמור וסגור את הקובץ שלך.

לבסוף, צור קובץ head.ejs:

  1. nano head.ejs

בקובץ ה-head.ejs שלך, הוסף את הקוד הבא כדי ליצור את חלק ה-head של האפליקציה:

image_processor/views/head.ejs
<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=edge" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Image Processor</title>
  <link rel="stylesheet" href="css/main.css" />
</head>

כאן, אתה מתייחס לקובץ main.css, שתיצור בתיקיית public מאוחר יותר בשלב זה. הקובץ הזה יכיל את הסגנונות עבור האפליקציה. לעת עתה, תמשיך להגדיר את התהליכים לנכסים סטטיים.

שמור וסגור את הקובץ.

כדי לטפל בנתונים שנשלחים מהטופס, עליך להגדיר את השיטה post ב-Express. כדי לעשות זאת, חזור לתיקיית השורש של הפרויקט שלך:

  1. cd ..

פתחו שוב את קובץ ה- index.js:

  1. nano index.js

בקובץ ה- index.js שלך, הוסיפו את השורות המודגשות הבאות כדי להגדיר שיטה לטיפול בהגשות טופס בנתיב /upload:

image_processor/index.js
app.get("/", function (req, res) {
  ...
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

});

אתה משתמש במשתנה app כדי לקרוא לשיטה post(), שתטפל בטופס שהוגש בנתיב /upload. לאחר מכן, אתה מחלץ את נתוני התמונה שהועלתה מבקשת ה-HTTP לתוך המשתנה image. לאחר מכן, אתה מגדיר תגובה כדי להחזיר קוד מצב 400 אם המשתמש לא מעלה תמונה.

כדי להגדיר את התהליך עבור התמונה שהועלתה, הוסף את הקוד המודגש הבא:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
});

השורות הללו מייצגות איך האפליקציה שלך תעבד את התמונה. ראשית, אתה מסיר את סיומת התמונה מהתמונה שהועלתה ושומר את השם במשתנה imageName. לאחר מכן, אתה מגדיר את הפונקציה processImage(). הפונקציה הזו מקבלת את הפרמטר size, שערכו יועשה שימוש לקביעת ממדי התמונה במהלך שינוי הגודל. בפונקציה, אתה קורא לפונקציה sharp() עם image.data, שהוא מטמון המכיל את הנתונים הבינאריים של התמונה שהועלתה. sharp משנה את גודל התמונה לפי הערך בפרמטר ה- size. אתה משתמש בשיטת webp() מתוך sharp כדי להמיר את התמונה לפורמט תמונת webp. לאחר מכן, אתה שומר את התמונה בתיקיית public/images/.

הרשימה לאחר מכן מגדירה את הגדלים שישמשו לשינוי גודל התמונה שהועלתה. אז אתה משתמש ב map() של JavaScript כדי להפעיל את processImage() עבור כל איבר במערך sizes, אחריו יחזיר מערך חדש. בכל פעם ששיטת map() קוראת לפונקציה processImage(), היא מחזירה promise למערך החדש. אתה משתמש בשיטה Promise.all() כדי לפתור אותם.

מהירויות עיבוד המחשב שונות, כמו גם גודל התמונות שמשתמש יכול להעלות, שעשוי לשפר את מהירות עיבוד התמונה. כדי להשהות קוד זה לצורך הדגמה, הוסף את השורות המודגשות כדי להוסיף לולאה כוללת עם לולאת גדילה שצורפה והפניה לדף שיציג את התמונות שעברו שינוי עם השורות המודגשות:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  ...
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

הלולאה תרוץ 10 מיליארד פעמים לגדול את המשתנה counter. אתה קורא לפונקציה res.redirect() כדי להפנות את האפליקציה לנתיב /result. הנתיב ידמה דף HTML שיציג את התמונות בתיקייה public/images.

הנתיב /result עדיין לא קיים. כדי ליצור אותו, הוסף את הקוד המודגש בקובץ index.js שלך:

image_processor/index.js
...

app.get("/", function (req, res) {
 ...
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  ...
});

אתה מגדיר את הנתיב /result עם השיטה app.get(). בתוך הפונקציה, אתה מגדיר את המשתנה imgDirPath עם הנתיב המלא אל תיקיית public/images. אתה משתמש בשיטת readdirSync() של מודול ה־fs כדי לקרוא את כל הקבצים בתיקייה הנתונה. משם, אתה מקשר את השיטה map() כדי להחזיר מערך חדש עם נתיבי התמונות שמוקדמים ב־images/.

לבסוף, אתה קורא ל־res.render() כדי להציג את קובץ ה־result.ejs, שעדיין לא קיים. אתה מעביר את המשתנה imgFiles, שמכיל מערך של נתיבים יחסיים של כל התמונות, אל קובץ ה־result.ejs.

שמור וסגור את הקובץ שלך.

כדי ליצור את קובץ ה־result.ejs, חזור אל תיקיית ה־views:

  1. cd views

צור ופתח את קובץ ה־result.ejs בעורך שלך:

  1. nano result.ejs

בקובץ ה־result.ejs שלך, הוסף את השורות הבאות כדי להציג תמונות:

image_processor/views/result.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="gallery-wrapper">
      <% if (imgFiles.length > 0){%>
      <p>The following are the processed images:</p>
      <ul>
        <% for (let imgFile of imgFiles){ %>
        <li><img src=<%= imgFile %> /></li>
        <% } %>
      </ul>
      <% } else{ %>
      <p>
        The image is being processed. Refresh after a few seconds to view the
        resized images.
      </p>
      <% } %>
    </div>
  </body>
</html>

בתחילה, הפנה לקובץ ה־head.ejs. בתגית body, בדוק אם המשתנה imgFiles ריק. אם יש בו נתונים, עבור על כל קובץ וצור תמונה עבור כל איבר במערך. אם imgFiles ריק, הדפס הודעה שאומרת למשתמש ל־רענן את הדף אחרי מספר שניות כדי לצפות בתמונות ששונו את הגודל שלהן.

שמור וסגור את הקובץ שלך.

באשף השורש, חזור וצור את התיקייה public שתכיל את הנכסים הסטטיים שלך.

  1. cd .. && mkdir public

העבר אל תיקיית public:

  1. cd public

צור תיקייה בשם images שתשמש לאחסון התמונות שיתעלו:

  1. mkdir images

לבא מכך, צור תיקייה בשם css ונווט אליה:

  1. mkdir css && cd css

בעורך שלך, צור ופתח את הקובץ main.css שהוזכר בעבר בקובץ head.ejs:

  1. nano main.css

בקובץ main.css שלך, הוסף את הסגנונות הבאים:

image_processor/public/css/main.css
body {
  background: #f8f8f8;
}

h1 {
  text-align: center;
}

p {
  margin-bottom: 20px;
}

a:link,
a:visited {
  color: #00bcd4;
}

/** סגנונות עבור כפתור "בחר קובץ" **/
button[type="submit"] {
  background: none;
  border: 1px solid orange;
  padding: 10px 30px;
  border-radius: 30px;
  transition: all 1s;
}

button[type="submit"]:hover {
  background: orange;
}

/** סגנונות עבור כפתור "העלאת תמונה" **/
input[type="file"]::file-selector-button {
  border: 2px solid #2196f3;
  padding: 10px 20px;
  border-radius: 0.2em;
  background-color: #2196f3;
}

ul {
  list-style: none;
  padding: 0;
  display: flex;
  flex-wrap: wrap;
  gap: 20px;
}

.home-wrapper {
  max-width: 500px;
  margin: 0 auto;
  padding-top: 100px;
}

.gallery-wrapper {
  max-width: 1200px;
  margin: 0 auto;
}

שורות אלו יעצבו אלמנטים באפליקציה. באמצעות מאפייני HTML, תעצב את רקע כפתור בחר קובץ בקוד הספרי #2196f3 (גוון של כחול) וגובה הגבול של כפתור העלאת תמונה לקוד צבע orange. תעצב גם את האלמנטים בנתיב /result כדי להציג אותם בצורה נעימה יותר.

כשתסיים, שמור וסגור את הקובץ שלך.

חזור לתיקיית שורש הפרויקט:

  1. cd ../..

פתח את index.js בעורך שלך:

  1. nano index.js

בקובץ index.js שלך, הוסף את הקוד הבא שיתחיל את השרת:

image_processor/index.js
...
app.listen(3000, function () {
  console.log("Server running on port 3000");
});

הקובץ השלם של index.js יתאים כעת לקובץ הבא:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

app.use(fileUpload());

app.use(express.static("public"));

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

כשתסיים לבצע את השינויים, שמור וסגור את הקובץ שלך.

הרץ את האפליקציה באמצעות פקודת node:

  1. node index.js

תקבל פלט כזה:

Output
Server running on port 3000

הפלט הזה מאשר שהשרת פועל בלעדי בעיה כלשהי.

פתח את הדפדפן המועדף עליך וגש ל־http://localhost:3000/.

הערה: אם אתה עוקב אחר המדריך על שרת מרוחק, תוכל לגשת לאפליקציה בדפדפן המקומי שלך באמצעות פניית פורט.

כל עוד שרת ה־Node.js רץ, פתח טרמינל נוסף והזן את הפקודה הבאה:

  1. ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip

כשתתחבר לשרת, הרץ את node index.js ואז נווט אל http://localhost:3000/ בדפדפן האינטרנט המקומי של המחשב שלך.

כאשר העמוד יטען, הוא יתאים את הסדר הבא:

באשך, לחץ על כפתור בחר קובץ ובחר את התמונה underwater.png במחשב המקומי שלך. התצוגה תחליף מ־לא נבחר קובץ ל־underwater.png. לאחר מכן, לחץ על כפתור העלה תמונה. האפליקציה תטען לזמן מה בעוד שהיא מעבדת את התמונה ומריצה את לולאת ההגדלה.

לאחר שהמשימה מסתיימת, יטען נתיב ה־/result עם התמונות המוקטנות:

כעת תוכל לעצור את השרת בעזרת CTRL+C. Node.js לא מטעה את השרת באופן אוטומטי כאשר קבצים משתנים, לכן תצטרך לעצור ולהתחיל מחדש את השרת בכל פעם שתעדכן את הקבצים.

עכשיו אתה יודע כיצד משימה ארוכת זמן יכולה להשפיע על מחזור הבקשות/התגובות של אפליקציה. בשלב הבא תבצע את המשימה באופן אסינכרוני.

שלב 3 — ביצוע משימות זמן-מוגבל באופן אסינכרוני עם bullmq

בשלב זה, תעביר משימה שתומכת בזמן לרקע באמצעות bullmq. התאמה זו תשחרר את מעגל הבקשות/תגובות ותאפשר ליישומך להגיב למשתמשים מיד כשהתמונה מעובדת.

כדי לעשות זאת, עליך ליצור תיאור תקצירי של העבודה ולהוסיף אותה לתור עם bullmq. תור הוא מבנה נתונים שפועל בדיוק באופן שתור פועל בחיים האמיתיים. כאשר אנשים עומדים בתור להיכנס למקום, האדם הראשון בתור יהיה הראשון להיכנס למקום. כל אחד שמגיע אחר יתקבל בסוף התור ויכנס למקום לאחר כל מי שלפניו בתור עד שהאדם האחרון נכנס למקום. במבנה הנתונים של תור עם תהליך ראשון-בא-ראשון-לצאת (FIFO), הפריט הראשון שנוסף לתור הוא הפריט הראשון שיוסר (dequeue). עם bullmq, מפיק יוסיף עבודה לתוך תור, ו־צרף (או פועל) יסיר עבודה מהתור ויבצע אותה.

התור ב־bullmq נמצא ב־Redis. כאשר אתה מתאר משימה ומוסיף אותה לתור, יתקבל רשומה עבור המשימה בתוך תור Redis. תיאור משימה יכול להיות מחרוזת או אובייקט עם מאפיינים שמכילים מינימום נתונים או הפניות לנתונים שיאפשרו ל־bullmq לבצע את המשימה מאוחר יותר. לאחר שאתה מגדיר את הפונקציונליות להוספת משימות לתור, אתה מעביר את קטע הקוד הזמני לפונקציה נפרדת. מאוחר יותר, bullmq יקרא לפונקציה זו עם הנתונים שאתה שמרת בתור כאשר המשימה מוצפנת. לאחר שהמשימה הסתיימה, bullmq יסמן אותה כסיומת, יוציא משימה נוספת מהתור ויבצע אותה.

פתח את index.js בעורך שלך:

  1. nano index.js

בקובץ שלך index.js, הוסף את השורות המודגשות ליצירת תור ב־Redis עם bullmq:

image_processor/index.js
...
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}
...

אתה מתחיל על ידי שימוש במחלקת Queue מתוך bullmq, שמשמשת ליצירת תור ב־Redis. לאחר מכן אתה מגדיר את המשתנה redisOptions לאובייקט עם מאפיינים שהמופע של מחלקת Queue ישתמש בהם כדי להקיש חיבור עם Redis. אתה מגדיר את ערך המאפיין host ל־localhost משום ש־Redis פועלת על המחשב המקומי שלך.

הערה: אם Redis הייתה פועלת על שרת רחוק שונה מהאפליקציה שלך, הייתה מעדכן את ערך המאפיין host לכתובת ה־IP של השרת המרוחק. אתה גם מגדיר את ערך המאפיין port ל־6379, הפורט הברירת מחדל שבו Redis קשובה לחיבורים.

אם הגדרת קידום דרך (Port Forwarding) לשרת רחוק הפועל Redis והאפליקציה ביחד, אין צורך לעדכן את המאפיין host, אך יהיה עליך להשתמש בחיבור קידום הדרך בכל פעם שאתה נכנס לשרת כדי להפעיל את האפליקציה.

באופן נוסף, אתה צריך להגדיר את המשתנה imageJobQueue למופע של מחלקת Queue, כשאתה מזין את שם התור כארגומנט ראשון ואובייקט כארגומנט שני. האובייקט מכיל מאפיין של connection והערך שלו הוא אובייקט במשתנה redisOptions. לאחר הפעלת מחלקת Queue, יופעל תור בשם imageJobQueue ב-Redis.

לבסוף, עליך להגדיר את הפונקציה addJob() שתשמש להוספת עבודה לתוך התור imageJobQueue. הפונקציה מקבלת פרמטר של job המכיל את המידע על העבודה (תקשורת עם addJob() תתבצע עם המידע שתרצה לשמור בתוך התור). בתוך הפונקציה, אתה קורא לשיטת add() של imageJobQueue, כשאתה מזין את שם העבודה כארגומנט ראשון ומידע העבודה כארגומנט שני.

הוסף את הקוד המסומן לקרוא לפונקציה addJob() כדי להוסיף עבודה לתוך התור:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  ...
  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

הנה, אתה קורא לפונקציית addJob() עם אובייקט שתיאר את העבודה. האובייקט מכיל את המאפיין type עם ערך שהוא שם העבודה. המאפיין השני, image, מוגדר כאובייקט המכיל את נתוני התמונה שהמשתמש העלה. מכיוון שנתוני התמונה ב- image.data הם בפורמט ביינרי, אתה קורא לשיטת toString() של JavaScript כדי להמיר אותם למחרוזת שניתן לאחסן ב-Redis, שתקבע את המאפיין data כתוצאה. המאפיין image מוגדר כשם התמונה שהועלתה (כולל הסיומת של התמונה).

עכשיו הגדרת את המידע הנדרש עבור bullmq כדי לבצע את העבודה מאוחר יותר. בהתאם לעבודה שלך, תוכל להוסיף עוד מידע או פחות.

אזהרה: מאחר ש-Redis הוא מסד נתונים בזיכרון, יש להימנע מאחסון כמויות גדולות של נתונים עבור עבודות בתור. אם יש לך קובץ גדול שעבודה צריכה לעבד, שמור את הקובץ על הדיסק או בענן, ולאחר מכן שמור את הקישור לקובץ כמחרוזת בתור. כאשר bullmq מבצעת את העבודה, היא תאחסן את הקובץ מהקישור שנשמר ב-Redis.

שמור וסגור את הקובץ שלך.

בשלב הבא, צור ופתח את קובץ utils.js שיכיל את קוד עיבוד התמונות:

  1. nano utils.js

בקובץ utils.js שלך, הוסף את הקוד הבא כדי להגדיר את הפונקציה לעיבוד תמונות.

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
}

module.exports = { processUploadedImages };

אתה מייבא את המודולים הדרושים לעיבוד תמונות וחישוב נתיבים בשורות הראשונות שנייה. אז אתה מגדיר את הפונקציה processUploadedImages(), שתכיל את המשימה לעיבוד התמונות המורכבת מזמן רב. פונקציה זו מקבלת פרמטר job שיתווסף כאשר העובד מביא את נתוני המשימה מתוך התור ואז מפעיל את הפונקציה processUploadedImages() עם נתוני התור. גם אתה מייצא את הפונקציה processUploadedImages() כך שתוכל להתייחס אליה בקבצים אחרים.

שמור וסגור את הקובץ שלך.

חזור לקובץ index.js:

  1. nano index.js

העתק את השורות שמודגשות מתוך קובץ index.js, ואז מחק אותן מתוך הקובץ הזה. תצטרך את הקוד שהועתק לרגע, אז שמור אותו ללוח הגזירים. אם אתה משתמש ב־nano, תבחר בשורות אלו ותקליק ימינה עם העכבר כדי להעתיק אותן:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage))
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
...
  res.redirect("/result");
});

שיטת ה־post עבור הנתיב upload תואמת כעת את הבאה:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

שמור וסגור את הקובץ הזה, ואז פתח את הקובץ utils.js:

  1. nano utils.js

בקובץ utils.js שלך, הדבק את השורות שהעתקת כעת עבור קולבק הנתיב /upload לתוך הפונקציה processUploadedImages:

image_processor/utils.js
...
function processUploadedImages(job) {
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}
...

עכשיו שהעברת את הקוד לעיבוד תמונה, עליך לעדכן אותו כך שישמוש בנתוני התמונה מתוך הפרמטר job של הפונקציה processUploadedImages() שהגדרת מראש.

כדי לעשות זאת, הוסף ועדכן את השורות שמודגשות בהמשך:

image_processor/utils.js

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);
  ...
}

אתה ממיר את הגרסה המחרוזתית של נתוני התמונה בחזרה לבינארי עם השיטה Buffer.from(). לאחר מכן, אתה מעדכן את path.parse() עם הפניה לשם התמונה שנשמר בתור. לאחר מכן, אתה מעדכן את השיטה sharp() כך שתקבל את נתוני התמונה הבינאריים שמאוחסנים במשתנה imageFileData.

הקובץ המלא utils.js יתאים כעת למה שבאמצעות הקוד הבא:

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}

module.exports = { processUploadedImages };

שמור וסגור את הקובץ שלך, ואז חזור לקובץ index.js:

  1. nano index.js

המשתנה sharp כבר אינו נחוץ כתלות משנית מאחר והתמונה מעובדת כעת בקובץ utils.js. מחק את השורה שמודגשת מהקובץ:

image_processor/index.js
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
...

שמור וסגור את הקובץ שלך.

כעת הגדרת את הפונקציונליות ליצירת תור ב־Redis והוספת משימה. גם הגדרת את הפונקציה processUploadedImages() לעיבוד תמונות שהועלו.

המשימה הנותרת היא ליצור צרכן (או עובד) שימשוך משימה מהתור ויקרא לפונקציה processUploadedImages() עם נתוני המשימה.

צור קובץ worker.js בעורך שלך:

  1. nano worker.js

בקובץ worker.js שלך, הוסף את הקוד הבא:

image_processor/worker.js
const { Worker } = require("bullmq");

const { processUploadedImages } = require("./utils");

const workerHandler = (job) => {
  console.log("Starting job:", job.name);
  processUploadedImages(job.data);
  console.log("Finished job:", job.name);
  return;
};

בשורה הראשונה, אתה מייבא את המחלקה Worker מ־bullmq; כאשר מאתחלים, זה יתחיל עובד שמנתק משימות מהתור ב־Redis ומבצע אותן. לאחר מכן, אתה מפנה לפונקציה processUploadedImages() מהקובץ utils.js כך שהעובד יכול לקרוא לפונקציה עם הנתונים בתור.

אתה מגדיר פונקציה בשם workerHandler() שמקבלת פרמטר job המכיל את נתוני העבודה בתור. בפונקציה, אתה מתעד שהעבודה התחילה, לאחר מכן קורא לprocessUploadedImages() עם נתוני העבודה. אחרי זה, אתה מתעד הודעת הצלחה ומחזיר null.

כדי לאפשר לעובד להתחבר לRedis, להוציא עבודה מהתור, ולקרוא לworkerHandler() עם נתוני העבודה, הוסף את השורות הבאות לקובץ:

image_processor/worker.js
...
const workerOptions = {
  connection: {
    host: "localhost",
    port: 6379,
  },
};

const worker = new Worker("imageJobQueue", workerHandler, workerOptions);

console.log("Worker started!");

כאן, אתה מגדיר את משתנה workerOptions לאובייקט המכיל את הגדרות החיבור לRedis. אתה מגדיר את המשתנה worker למופע של הכיתה Worker שמקבלת את הפרמטרים הבאים:

  • imageJobQueue: שם תור העבודה.
  • workerHandler: הפונקציה שתרוץ לאחר שעבודה נלקחה מתור הRedis.
  • workerOptions: הגדרות התצורה של Redis שהעובד משתמש בהן כדי ליצור חיבור עם Redis.

לבסוף, אתה מתעד הודעת הצלחה.

לאחר הוספת השורות, שמור וסגור את הקובץ שלך.

כעת הגדרת את פונקציונליות העובד של bullmq להוציא עבודות מהתור ולבצע אותן.

בטרמינל שלך, הסר את התמונות בתיקיית public/images כך שתוכל להתחיל מחדש לבדיקת האפליקציה שלך:

  1. rm public/images/*

לאחר מכן, הרץ את הקובץ index.js:

  1. node index.js

האפליקציה תתחיל:

Output
Server running on port 3000

עכשיו תתחיל את העובד. פתח מושב טרמינל שני ונווט ישירות לפרויקט:

  1. cd image_processor/

התחל את העובד עם הפקודה הבאה:

  1. node worker.js

העובד יתחיל:

Output
Worker started!

בקר ב־http://localhost:3000/ בדפדפן שלך. לחץ על הכפתור בחר קובץ ובחר את הקובץ underwater.png מהמחשב שלך, ולאחר מכן לחץ על הכפתור העלה תמונה.

יתכן שתקבל תגובה מיידית שתספר לך לרענן את העמוד אחרי מספר שניות:

באפשרותך גם לקבל תגובה מיידית עם כמה תמונות שעוברות עיבוד על העמוד בזמן שאחרים עדיין בתהליך העיבוד:

אפשר לרענן את העמוד מספר פעמים כדי לטעון את כל התמונות שעברו שינוי גודל.

חזור לטרמינל בה רץ העובד שלך. בטרמינל זה יהיה הודעה שתתאים לטקסט הבא:

Output
Worker started! Starting job: processUploadedImages Finished job: processUploadedImages

הפלט מאשר ש־bullmq הפעיל את המשימה בהצלחה.

האפליקציה שלך יכולה עדיין להעביר משימות שדורשות הרבה זמן גם אם העובד לא פעיל. כדי להדגיש זאת, עצור את העובד בטרמינל השני עם CTRL+C.

בטרמינל הראשוני שלך, עצור את שרת Express ומחק את התמונות בתיקיית public/images:

  1. rm public/images/*

אחרי כן, התחל מחדש את השרת:

  1. node index.js

בדפדפן שלך, בקר ב־http://localhost:3000/ והעלה את התמונה underwater.png שוב. כשתופנה לנתיב /result, התמונות לא יוצגו בעמוד מכיוון שהעובד לא פעיל:

חזור לטרמינל בו הפעלת את העובד והתחל את העובד מחדש:

  1. node worker.js

הפלט יתאים לטקסט הבא, מה שמציין שהמשימה החלה:

Output
Worker started! Starting job: processUploadedImages

לאחר שהעבודה הושלמה והפלט כולל שורה שאומרת סיים עבודה: processUploadedImages, רענן את הדפדפן. התמונות יטענו כעת:

עצור את השרת והעובד.

כעת תוכל להעביר משימה זמן-מכריע לרקע ולבצע אותה באופן אסינכרוני באמצעות bullmq. בשלב הבא, תגדיר לוח בקרה כדי לפקח על מצב התור.

שלב 4 — הוספת לוח בקרה למעקב אחר תורי bullmq

בשלב זה, תשתמש בחבילת bull-board כדי לפקח על העבודות בתור ה-Redis מלוח בקרה חזותי. חבילה זו תיצור באופן אוטומטי ממשק משתמש (UI) שמציג ומארגן את המידע על העבודות של bullmq המאוחסנות בתור ה-Redis. באמצעות הדפדפן שלך, תוכל לפקח על העבודות שהושלמו, מחכות או נכשלו מבלי לפתוח את ה-Redis CLI בטרמינל.

פתח את קובץ ה-index.js בעורך הטקסט שלך:

  1. nano index.js

הוסף את הקוד המודגש לייבוא של bull-board:

image_processor/index.js
...
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
...

בקוד הקודם, אתה מייבא את השיטה createBullBoard() מתוך bull-board. גם אתה מייבא BullMQAdapter, שמאפשר ל- bull-board לגשת לתורים של bullmq, ו- ExpressAdapter, שמספק פונקציונליות ל- Express כדי להציג את לוח המחוונים.

לאחר מכן, הוסף את הקוד המודגש הבא כדי להתחבר את bull-board עם bullmq:

image_processor/index.js
...
async function addJob(job) {
  ...
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
...

ראשית, הגדר את serverAdapter למופע של ExpressAdapter. לאחר מכן, הפעל את createBullBoard() כדי לאתחל את לוח המחוונים עם נתוני התור של bullmq. עבור את הפונקציה בארגומנט אובייקט עם מאפייני queues ו- serverAdapter. המאפיין הראשון, queues, מקבל מערך של התורים שהגדרת עם bullmq, שהוא התור imageJobQueue כאן. המאפיין השני, serverAdapter, מכיל אובייקט שמקבל מופע של מתאם שרת Express. לאחר מכן, הגדר את הנתיב /admin כדי לגשת ללוח המחוונים עם השיטה setBasePath().

לאחר מכן, הוסף את middleware של serverAdapter עבור נתיב /admin:

image_processor/index.js
app.use(express.static("public"))

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  ...
});

הקובץ index.js השלם יתאים לכך:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);
app.use(fileUpload());

app.use(express.static("public"));

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: Buffer.from(image.data).toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

לאחר שתסיים לבצע שינויים, שמור וסגור את הקובץ שלך.

הריץ את הקובץ index.js:

  1. node index.js

חזור לדפדפן שלך ובקר בכתובת http://localhost:3000/admin. הלוח יטען:

בלוח המחוונים, ניתן לצפות בסוג העבודה, הנתונים שהיא מצריכה, ועוד מידע על העבודה. ניתן גם להחליף לכרטיסיות אחרות, כמו למשל לכרטיסיית הושלם למידע על העבודות שהושלמו, לכרטיסיית נכשל למידע נוסף על העבודות שנכשלו, ולכרטיסיית נעצר למידע נוסף על העבודות שהושהו.

כעת ניתן להשתמש בלוח המחוונים bull-board למעקב אחר התורים.

מסקנה

במאמר זה, העברת משימה שנלקחת זמן רב לתוך תור עבודה באמצעות bullmq. בתחילה, בלי להשתמש ב־bullmq, יצרת אפליקציה עם משימה שנלקחת זמן רב עם מחזור בקשה/תגובה איטי. לאחר מכן, השתמשת ב־bullmq כדי להעביר את המשימה שנלקחת זמן רב ולהפעילה באופן אסינכרוני, מה שמגביר את מחזור הבקשה/תגובה. לאחר מכן, השתמשת ב־bull-board כדי ליצור לוח בקרה למעקב אחר תורי bullmq ב־Redis.

ניתן לבקר במסמך המסמכים של bullmq כדי ללמוד עוד על תכונות של bullmq שלא נכללו במדריך זה, כגון קביעת זמנים, עדיפות או ניסיונות של עבודות, והגדרת הגדרות לקונקורנציה עבור עובדים. ניתן גם לבקר במסמך המסמכים של bull-board כדי ללמוד עוד על תכונות הלוח.

Source:
https://www.digitalocean.com/community/tutorials/how-to-handle-asynchronous-tasks-with-node-js-and-bullmq