다음은 Node.js 및 BullMQ를 사용하여 비동기 작업을 처리하는 방법입니다.

작가는 기부 프로그램인 Write for Donations의 일환으로 여성 엔지니어 협회에 기부를 선택했습니다.

소개

웹 애플리케이션은 요청/응답 주기를 가지고 있습니다. URL을 방문하면 브라우저가 데이터를 처리하거나 데이터베이스에서 쿼리를 실행하는 앱이 실행 중인 서버에 요청을 보냅니다. 이 과정에서 사용자는 앱이 응답을 반환할 때까지 기다려야 합니다. 어떤 작업의 경우 사용자는 빠르게 응답을 받을 수 있지만, 이미지 처리, 데이터 분석, 보고서 생성, 이메일 전송과 같은 시간이 오래 걸리는 작업의 경우 요청/응답 주기를 늦출 수 있습니다. 예를 들어, 사용자가 이미지를 업로드하는 애플리케이션이 있다고 가정해봅시다. 그렇다면 이미지를 사용자에게 보여주기 전에 서버의 디스크 공간을 보존하기 위해 이미지를 크기 조정, 압축 또는 다른 형식으로 변환해야 할 수도 있습니다. 이미지 처리는 CPU 집약적인 작업으로, 작업이 끝날 때까지 Node.js 스레드를 차단할 수 있습니다. 이 작업은 몇 초 또는 몇 분이 걸릴 수 있습니다. 사용자는 작업이 완료되어 서버로부터 응답을 받기까지 기다려야 합니다.

요청/응답 주기를 늦추지 않으려면, bullmq를 사용하여 시간이 많이 소요되는 작업을 Node.js 앱에서 bullmq로 오프로드하여 요청/응답 주기를 해방할 수 있습니다. 이 도구를 사용하면 앱은 사용자에게 빠르게 응답을 보낼 수 있으며, bullmq는 작업을 백그라운드에서 독립적으로 비동기적으로 실행하여 추적합니다. bullmq는 각 작업의 간단한 설명을 큐에 저장하기 위해 Redis를 사용합니다. 그리고 bullmq 작업자(worker)는 큐에서 각 작업을 디큐하여 실행하고, 작업이 완료되면 완료로 표시합니다.

이 문서에서는 bullmq를 사용하여 시간이 많이 소요되는 작업을 백그라운드로 오프로드하여 응용 프로그램이 사용자에게 빠르게 응답할 수 있도록 할 것입니다. 먼저, bullmq를 사용하지 않고 시간이 많이 소요되는 작업을 포함하는 앱을 만들 것입니다. 그런 다음 bullmq를 사용하여 작업을 비동기적으로 실행할 것입니다. 마지막으로, Redis 큐에서 bullmq 작업을 관리하기 위해 시각적 대시보드를 설치할 것입니다.

사전 준비 사항

이 튜토리얼을 따르려면 다음이 필요합니다:

단계 1 — 프로젝트 디렉토리 설정

이 단계에서는 애플리케이션에 필요한 디렉토리를 생성하고 필요한 종속성을 설치합니다. 이 튜토리얼에서 구축할 애플리케이션은 사용자가 이미지를 업로드하고 sharp 패키지를 사용하여 처리하는 기능을 제공합니다. 이미지 처리는 시간이 많이 소요되며 요청/응답 주기를 늦출 수 있으므로 이 작업은 bullmq를 사용하여 백그라운드로 처리하기에 적합한 작업입니다. 이 작업을 백그라운드로 오프로드하기 위해 사용할 기술은 다른 시간이 많이 소요되는 작업에도 동일하게 적용됩니다.

먼저, image_processor라는 디렉토리를 생성하고 해당 디렉토리로 이동하세요:

  1. mkdir image_processor && cd image_processor

그런 다음, 디렉토리를 npm 패키지로 초기화하세요:

  1. npm init -y

이 명령은 package.json 파일을 생성합니다. -y 옵션은 npm이 모든 기본값을 받아들이도록 지시합니다.

명령을 실행하면 출력 결과가 다음과 일치합니다:

Output
Wrote to /home/sammy/image_processor/package.json: { "name": "image_processor", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC" }

출력 결과는 package.json 파일이 생성되었음을 확인합니다. 중요한 속성으로는 앱의 이름(name), 애플리케이션 버전 번호(version), 프로젝트의 시작점(main)이 있습니다. 다른 속성에 대해 자세히 알고 싶다면 npm의 package.json 문서를 확인할 수 있습니다.

이 튜토리얼에서 구축할 애플리케이션에는 다음 종속성이 필요합니다:

  • express: 웹 앱을 구축하기 위한 웹 프레임워크입니다.
  • express-fileupload: 폼을 통해 파일을 업로드할 수 있게 해주는 미들웨어입니다.
  • sharp: 이미지 처리 라이브러리입니다.
  • ejs: Node.js로 HTML 마크업을 생성할 수 있는 템플릿 언어입니다.
  • bullmq: 분산 작업 큐입니다.
  • bull-board: bullmq를 기반으로 작업의 상태를 표시하는 멋진 사용자 인터페이스(UI)를 제공하는 대시보드입니다.

이러한 종속성을 설치하려면 다음 명령을 실행하십시오:

  1. npm install express express-fileupload sharp ejs bullmq @bull-board/express

설치한 종속성 외에도 이 튜토리얼의 나중에 사용할 다음 이미지도 사용합니다:

curl을 사용하여 이미지를 로컬 컴퓨터에서 원하는 위치로 다운로드합니다.

  1. curl -O https://deved-images.nyc3.cdn.digitaloceanspaces.com/CART-68886/underwater.png

다음에는 bullmq가 없는 Node.js 앱을 구축하기 위해 필요한 종속성을 가지고 있습니다.

2단계 – bullmq 없이 시간 소모적인 작업 구현하기

이 단계에서는 Express를 사용하여 사용자가 이미지를 업로드 할 수있는 애플리케이션을 구축합니다. 앱은 sharp를 사용하여 이미지를 여러 크기로 조정하는 시간 소모적인 작업을 시작하고 응답이 보내진 후에 사용자에게 표시됩니다. 이 단계에서는 시간 소모적인 작업이 요청/응답 주기에 어떤 영향을 미치는지 이해하는 데 도움이됩니다.

nano 또는 선호하는 텍스트 편집기를 사용하여 index.js 파일을 만듭니다:

  1. nano index.js

index.js 파일에 다음 코드를 추가하여 종속성을 가져옵니다:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

첫 번째 줄에서는 Node를 사용하여 파일 경로를 계산하기 위해 path 모듈을 가져옵니다. 두 번째 줄에서는 디렉토리와 상호 작용하기 위해 fs 모듈을 가져옵니다. 그런 다음 express 웹 프레임 워크를 가져옵니다. body-parser 모듈을 가져와 HTTP 요청에서 데이터를 구문 분석하는 미들웨어를 추가합니다. 그 뒤로 이미지 처리를위한 sharp 모듈을 가져옵니다. 마지막으로 HTML 양식에서 업로드를 처리하기 위해 express-fileupload를 가져옵니다.

다음으로, 앱에 미들웨어를 구현하기 위해 다음 코드를 추가하세요:

image_processor/index.js
...
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

먼저, app 변수를 Express의 인스턴스로 설정합니다. 다음으로, app 변수를 사용하여 ejs 템플릿 언어를 사용하도록 Express를 구성하는 set() 메서드를 추가합니다. 그런 다음, use() 메서드를 사용하여 HTTP 요청의 JSON 데이터를 JavaScript에서 액세스할 수 있는 변수로 변환하기 위해 body-parser 모듈 미들웨어를 추가합니다. 다음 줄에서는 URL 인코딩된 입력과 같은 작업을 수행합니다.

다음으로, 다음 줄을 추가하여 파일 업로드를 처리하고 정적 파일을 제공하는 데 필요한 추가 미들웨어를 추가합니다:

image_processor/index.js
...
app.use(fileUpload());
app.use(express.static("public"));

fileUpload() 메서드를 호출하여 업로드된 파일을 파싱하는 미들웨어를 추가하고, Express가 이미지 및 CSS와 같은 정적 파일을 찾고 제공할 디렉토리를 설정합니다.

미들웨어를 설정한 후, HTML 이미지 업로드 양식을 표시하는 라우트를 생성합니다:

image_processor/index.js
...
app.get("/", function (req, res) {
  res.render("form");
});

여기서, Express 모듈의 get() 메서드를 사용하여 / 라우트와 사용자가 홈페이지 또는 / 라우트를 방문했을 때 실행되어야 할 콜백을 지정합니다. 콜백에서 res.render()를 호출하여 views 디렉토리의 form.ejs 파일을 렌더링합니다. form.ejs 파일이나 views 디렉토리는 아직 생성하지 않았습니다.

생성하려면, 먼저 파일을 저장하고 닫습니다. 터미널에서 다음 명령을 입력하여 프로젝트 루트 디렉토리에 views 디렉토리를 생성합니다:

  1. mkdir views

views 디렉토리로 이동합니다:

  1. cd views

편집기에서 form.ejs 파일을 생성합니다:

  1. nano form.ejs

form.ejs 파일에 다음 코드를 추가하여 양식을 생성합니다:

image_processor/views/form.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="home-wrapper">
      <h1>Image Processor</h1>
      <p>
        Resizes an image to multiple sizes and converts it to a
        <a href="https://en.wikipedia.org/wiki/WebP">webp</a> format.
      </p>
      <form action="/upload" method="POST" enctype="multipart/form-data">
        <input
          type="file"
          name="image"
          placeholder="Select image from your computer"
        />
        <button type="submit">Upload Image</button>
      </form>
    </div>
  </body>
</html>

먼저, 아직 만들지 않은 head.ejs 파일을 참조합니다. head.ejs 파일은 다른 HTML 페이지에서 참조할 수 있는 HTML head 요소를 포함하게 됩니다.

body 태그 내에서는 다음 속성을 가진 폼을 생성합니다:

  • action은 폼이 제출될 때 폼 데이터가 전송되어야하는 경로를 지정합니다.
  • method는 데이터를 전송하기 위한 HTTP 메서드를 지정합니다. POST 메서드는 데이터를 HTTP 요청에 포함합니다.
  • encype은 폼 데이터의 인코딩 방식을 지정합니다. 값으로 multipart/form-data를 사용하면 HTML input 요소에서 파일 데이터를 업로드할 수 있게 됩니다.

form 요소 내에서 파일을 업로드하기 위한 input 태그를 생성한 다음, type 속성이 submit으로 설정된 button 요소를 정의합니다. 이를 통해 폼을 제출할 수 있습니다.

작업을 완료한 후, 파일을 저장하고 닫습니다.

다음으로, head.ejs 파일을 생성합니다:

  1. nano head.ejs

head.ejs 파일에 다음 코드를 추가하여 앱의 헤드 섹션을 생성합니다:

image_processor/views/head.ejs
<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=edge" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Image Processor</title>
  <link rel="stylesheet" href="css/main.css" />
</head>

여기에서는 이후에 이 단계에서 public 디렉토리에 생성할 main.css 파일을 참조합니다. 이 파일은 이 애플리케이션의 스타일을 포함하게 됩니다. 현재는 정적 자산 처리를 위한 프로세스를 계속 설정하게 됩니다.

파일을 저장하고 닫습니다.

폼에서 제출된 데이터를 처리하려면 Express에서 post 메서드를 정의해야 합니다. 이를 위해 프로젝트의 루트 디렉토리로 돌아갑니다:

  1. cd ..

다시 index.js 파일을 열어보세요:

  1. nano index.js

index.js 파일에서 다음 코드를 추가하여 /upload 라우트의 폼 제출을 처리하는 메소드를 정의하세요:

image_processor/index.js
app.get("/", function (req, res) {
  ...
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

});

app 변수를 사용하여 post() 메소드를 호출하여 /upload 라우트에서 제출된 폼을 처리합니다. 그 다음, HTTP 요청에서 업로드된 이미지 데이터를 image 변수에 추출합니다. 그 후에는 사용자가 이미지를 업로드하지 않은 경우 400 상태 코드를 반환하도록 응답을 설정합니다.

업로드된 이미지의 처리를 위해 다음과 같이 코드를 추가하세요:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
});

이 코드는 앱이 이미지를 처리하는 방식을 나타냅니다. 먼저, 업로드된 이미지에서 이미지 확장자를 제거하고 그 이름을 imageName 변수에 저장합니다. 그 다음, processImage() 함수를 정의합니다. 이 함수는 size 매개변수를 사용하며, 이 값은 이미지 크기 조정 중 이미지의 크기를 결정하는 데 사용됩니다. 함수 내에서 sharp()를 호출하여 업로드된 이미지의 이진 데이터가 포함된 버퍼image.data를 전달합니다. sharp는 크기 매개변수의 값에 따라 이미지 크기를 조정합니다. sharpwebp() 메소드를 사용하여 이미지를 webp 이미지 형식으로 변환합니다. 그런 다음 이미지를 public/images/ 디렉토리에 저장합니다.

업로드된 이미지의 크기를 조정하는 데 사용될 숫자 목록입니다. 그런 다음 JavaScript의 map() 메서드를 사용하여 sizes 배열의 각 요소에 processImage()를 호출한 다음 새 배열을 반환합니다. map() 메서드가 processImage() 함수를 호출할 때마다 새 배열에 대한 프로미스를 반환합니다. 이를 해결하기 위해 Promise.all() 메서드를 사용합니다.

컴퓨터 처리 속도와 사용자가 업로드할 수 있는 이미지의 크기는 다양할 수 있으며, 이는 이미지 처리 속도에 영향을 줄 수 있습니다. 데모 목적으로이 코드를 지연시키기 위해 하이라이트 된 줄을 삽입하여 CPU 집약적인 증가 루프와 이미지 크기가 조정된 이미지를 표시 할 페이지로 리디렉션합니다:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  ...
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

루프는 100억 번 실행되어 counter 변수를 증가시킵니다. res.redirect() 함수를 호출하여 앱을 /result 경로로 리디렉션합니다. 이 경로는 public/images 디렉토리에있는 이미지를 표시하는 HTML 페이지를 렌더링합니다.

/result 경로는 아직 존재하지 않습니다. 이를 생성하려면 index.js 파일에 하이라이트 된 코드를 추가하십시오:

image_processor/index.js
...

app.get("/", function (req, res) {
 ...
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  ...
});

/result 경로를 app.get() 메소드로 정의합니다. 함수 내에서 imgDirPath 변수를 public/images 디렉토리의 전체 경로로 정의합니다. 주어진 디렉토리의 모든 파일을 읽기 위해 fs 모듈의 readdirSync() 메소드를 사용합니다. 그런 다음, map() 메소드를 연결하여 이미지 경로가 images/로 접두어가 있는 새로운 배열을 반환합니다.

마지막으로, res.render()를 호출하여 아직 존재하지 않는 result.ejs 파일을 렌더링합니다. imgFiles 변수는 모든 이미지의 상대 경로 배열을 포함하고 result.ejs 파일에 전달됩니다.

파일을 저장하고 닫습니다.

result.ejs 파일을 생성하기 위해 views 디렉토리로 돌아갑니다:

  1. cd views

result.ejs 파일을 생성하고 편집기에서 엽니다:

  1. nano result.ejs

result.ejs 파일에 다음 라인을 추가하여 이미지를 표시합니다:

image_processor/views/result.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="gallery-wrapper">
      <% if (imgFiles.length > 0){%>
      <p>The following are the processed images:</p>
      <ul>
        <% for (let imgFile of imgFiles){ %>
        <li><img src=<%= imgFile %> /></li>
        <% } %>
      </ul>
      <% } else{ %>
      <p>
        The image is being processed. Refresh after a few seconds to view the
        resized images.
      </p>
      <% } %>
    </div>
  </body>
</html>

먼저, head.ejs 파일을 참조합니다. body 태그에서 imgFiles 변수가 비어 있는지 확인합니다. 데이터가 있는 경우 각 파일을 반복하고 각 배열 요소에 대해 이미지를 생성합니다. imgFiles가 비어 있으면 Refresh after a few seconds to view the resized images.라는 메시지를 출력합니다.

파일을 저장하고 닫습니다.

그다음, 루트 디렉토리로 돌아가 정적 에셋을 포함할 public 디렉토리를 생성합니다.

  1. cd .. && mkdir public

public 디렉토리로 이동하십시오:

  1. cd public

업로드된 이미지를 보관할 images 디렉토리를 생성하십시오:

  1. mkdir images

다음으로 css 디렉토리를 생성하고 해당 디렉토리로 이동하십시오:

  1. mkdir css && cd css

에디터에서 앞서 head.ejs 파일에서 참조한 main.css 파일을 생성하고 열어봅니다:

  1. nano main.css

main.css 파일에 다음 스타일을 추가하십시오:

image_processor/public/css/main.css
body {
  background: #f8f8f8;
}

h1 {
  text-align: center;
}

p {
  margin-bottom: 20px;
}

a:link,
a:visited {
  color: #00bcd4;
}

/** "Choose File" 버튼에 대한 스타일 **/
button[type="submit"] {
  background: none;
  border: 1px solid orange;
  padding: 10px 30px;
  border-radius: 30px;
  transition: all 1s;
}

button[type="submit"]:hover {
  background: orange;
}

/** "Upload Image" 버튼에 대한 스타일 **/
input[type="file"]::file-selector-button {
  border: 2px solid #2196f3;
  padding: 10px 20px;
  border-radius: 0.2em;
  background-color: #2196f3;
}

ul {
  list-style: none;
  padding: 0;
  display: flex;
  flex-wrap: wrap;
  gap: 20px;
}

.home-wrapper {
  max-width: 500px;
  margin: 0 auto;
  padding-top: 100px;
}

.gallery-wrapper {
  max-width: 1200px;
  margin: 0 auto;
}

이러한 줄들은 앱의 요소들에 스타일을 적용합니다. HTML 속성을 사용하여 Choose File 버튼의 배경을 #2196f3 (파란색의 한 가지)으로, 그리고 Upload Image 버튼의 테두리를 orange로 스타일링합니다. 또한 /result 경로의 요소들을 더욱 보기 좋게 스타일링합니다.

작업을 완료하면 파일을 저장하고 닫으십시오.

프로젝트 루트 디렉토리로 돌아가십시오:

  1. cd ../..

index.js 파일을 에디터에서 엽니다:

  1. nano index.js

index.js 파일에 다음 코드를 추가하십시오. 이 코드는 서버를 시작합니다:

image_processor/index.js
...
app.listen(3000, function () {
  console.log("Server running on port 3000");
});

이제 완성된 index.js 파일은 아래와 같습니다:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

app.use(fileUpload());

app.use(express.static("public"));

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

변경 사항을 완료하면 파일을 저장하고 닫으십시오.

node 명령을 사용하여 앱을 실행하십시오:

  1. node index.js

다음과 같은 출력이 표시됩니다:

Output
Server running on port 3000

이 출력은 서버가 문제 없이 실행 중임을 확인합니다.

선호하는 브라우저를 열고 http://localhost:3000/을 방문하세요.

참고: 원격 서버에서 튜토리얼을 따르고 있다면 포트 포워딩을 사용하여 로컬 브라우저에서 앱에 접속할 수 있습니다.

Node.js 서버가 실행 중인 동안 다른 터미널을 열고 다음 명령을 입력하세요:

  1. ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip

서버에 연결한 후 node index.js를 실행한 다음 로컬 컴퓨터의 웹 브라우저에서 http://localhost:3000/로 이동하세요.

페이지가 로드되면 다음과 일치해야 합니다:

다음으로 파일 선택 버튼을 누르고 로컬 컴퓨터에서 underwater.png 이미지를 선택하세요. 디스플레이는 선택된 파일 없음에서 underwater.png으로 변경됩니다. 그 후 이미지 업로드 버튼을 누르세요. 앱은 이미지를 처리하고 증가하는 루프를 실행하는 동안 잠시 로드됩니다.

작업이 완료되면 크기가 조정된 이미지가 있는 /result 경로가 로드됩니다:

이제 CTRL+C로 서버를 중지할 수 있습니다. Node.js는 파일이 변경될 때 서버를 자동으로 다시 로드하지 않으므로 파일을 업데이트할 때마다 서버를 중지하고 다시 시작해야 합니다.

이제 시간이 오래 걸리는 작업이 어떻게 애플리케이션의 요청/응답 주기에 영향을 미칠 수 있는지 알게 되었습니다. 다음에는 작업을 비동기적으로 실행할 것입니다.

단계 3 — bullmq를 사용하여 시간이 많이 소요되는 작업 비동기적으로 실행하기

이 단계에서는 bullmq를 사용하여 시간이 많이 소요되는 작업을 백그라운드로 옮길 것입니다. 이 조정은 요청/응답 주기를 해제하고 이미지가 처리되는 동안 앱이 즉시 사용자에게 응답할 수 있도록 합니다.

이를 위해 작업에 대한 간결한 설명을 작성하고 bullmq와 함께 대기열에 추가해야 합니다. 대기열은 현실에서 대기열이 작동하는 방식과 유사한 데이터 구조입니다. 사람들이 공간에 들어가기 위해 줄을 서면, 줄에서 가장 먼저 온 사람이 먼저 공간에 들어갑니다. 이후에 온 사람들은 줄의 끝에 줄을 서고, 줄 앞에 있는 모든 사람들이 공간에 들어간 다음에 공간에 진입하게 됩니다. 대기열 데이터 구조의 선입선출(FIFO) 프로세스로 인해, 대기열에 추가된 첫 번째 항목이 먼저 제거(dequeue)됩니다. bullmq에서, 생산자는 대기열에 작업을 추가하고, 소비자(또는 작업자)는 대기열에서 작업을 제거하고 실행합니다.

bullmq의 대기열은 Redis에 있습니다. 작업을 설명하고 대기열에 추가할 때, Redis 대기열에 작업에 대한 항목이 생성됩니다. 작업 설명은 문자열 또는 최소한의 데이터 또는 나중에 작업을 실행할 수 있는 데이터에 대한 참조를 포함하는 속성을 가진 객체일 수 있습니다. 작업을 대기열에 추가하는 기능을 정의한 후, 시간이 오래 걸리는 코드를 별도의 함수로 이동합니다. 나중에 bullmq는 작업이 대기열에서 제거될 때 대기열에 저장한 데이터와 함께 이 함수를 호출합니다. 작업이 완료되면 bullmq는 해당 작업을 완료로 표시하고 대기열에서 다른 작업을 가져와 실행합니다.

index.js 파일을 에디터에서 열어보세요:

  1. nano index.js

index.js 파일에서 하이라이트된 줄을 추가하여 Redis에 bullmq 대기열을 생성합니다:

image_processor/index.js
...
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}
...

bullmq에서 Queue 클래스를 추출하여 Redis에 대기열을 생성하는 데 사용합니다. 그런 다음 redisOptions 변수를 생성하여 Queue 클래스 인스턴스가 Redis와의 연결을 설정하는 데 사용할 속성이 포함된 객체로 설정합니다. Redis가 로컬 컴퓨터에서 실행 중이므로 host 속성 값을 localhost로 설정합니다.

참고: Redis가 앱과 별도의 원격 서버에서 실행 중인 경우, host 속성 값을 원격 서버의 IP 주소로 업데이트해야 합니다. 또한, port 속성 값을 Redis가 연결을 수신하는 기본 포트인 6379로 설정합니다.

Redis 및 앱을 함께 실행하는 원격 서버에 포트 포워딩을 설정한 경우 host 속성을 업데이트할 필요는 없지만, 앱을 실행하기 위해 서버에 로그인할 때마다 포트 포워딩 연결을 사용해야 합니다.

다음으로, imageJobQueue 변수를 Queue 클래스의 인스턴스로 설정하고, 첫 번째 인수로 큐의 이름을, 두 번째 인수로 객체를 전달합니다. 객체는 redisOptions 변수 내에 있는 객체의 connection 속성의 값을 가지고 있습니다. Queue 클래스를 인스턴스화한 후, Redis에 imageJobQueue라는 큐가 생성됩니다.

마지막으로, imageJobQueue에 작업을 추가하기 위해 사용할 addJob() 함수를 정의합니다. 이 함수는 작업에 대한 정보를 포함하는 job 매개변수를 사용합니다 (큐에 저장할 데이터로 addJob() 함수를 호출할 것입니다). 함수 내에서는 imageJobQueueadd() 메소드를 호출하여 작업 이름을 첫 번째 인수로, 작업 데이터를 두 번째 인수로 전달합니다.

강조 표시된 코드를 추가하여 addJob() 함수를 호출하여 큐에 작업을 추가하세요:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  ...
  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

여기에서는 작업을 설명하는 속성 값을 가진 객체로 addJob() 함수를 호출합니다. 객체에는 작업의 이름을 나타내는 type 속성이 있습니다. 두 번째 속성인 image은 사용자가 업로드 한 이미지 데이터가 포함된 객체로 설정됩니다. image.data에서 이미지 데이터가 버퍼(이진 형식)로 저장되어 있기 때문에 JavaScript의 toString() 메서드를 사용하여 Redis에 저장할 수 있는 문자열로 변환합니다. 이는 data 속성을 설정합니다. image 속성은 업로드 된 이미지의 이름(확장자 포함)으로 설정됩니다.

이제 bullmq가 나중에 이 작업을 실행하기 위해 필요한 정보를 정의했습니다. 작업에 따라 더 많은 작업 정보 또는 더 적은 작업 정보를 추가할 수 있습니다.

경고: Redis는 메모리에 데이터베이스를 저장하기 때문에 큐에 대량의 데이터를 저장하는 것을 피하십시오. 작업이 처리해야하는 큰 파일이 있는 경우, 파일을 디스크나 클라우드에 저장한 다음 파일의 링크를 문자열로 대기열에 저장하십시오. bullmq가 작업을 실행할 때, Redis에 저장된 링크에서 파일을 가져올 것입니다.

파일을 저장하고 닫으십시오.

다음으로, 이미지 처리 코드를 포함 할 utils.js 파일을 만들고 엽니다:

  1. nano utils.js

utils.js 파일에 다음 코드를 추가하여 이미지 처리 함수를 정의하세요.

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
}

module.exports = { processUploadedImages };

첫 두 줄에서 이미지 처리 및 경로 계산에 필요한 모듈을 가져옵니다. 그런 다음, 시간이 오래 걸리는 이미지 처리 작업을 포함할 processUploadedImages() 함수를 정의합니다. 이 함수는 워커가 작업 데이터를 큐에서 가져온 다음 큐 데이터로 processUploadedImages() 함수를 호출할 때 채워질 job 매개변수를 취합니다. 또한 다른 파일에서 이 processUploadedImages() 함수를 참조할 수 있도록 내보냅니다.

파일을 저장하고 닫습니다.

index.js 파일로 돌아갑니다:

  1. nano index.js

index.js 파일에서 강조 표시된 줄을 복사한 다음 이 파일에서 삭제합니다. 복사한 코드가 잠시 후에 필요하므로 클립 보드에 저장합니다. nano를 사용하는 경우 이러한 줄을 강조 표시하고 마우스 오른쪽 버튼을 클릭하여 줄을 복사할 수 있습니다:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage))
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
...
  res.redirect("/result");
});

upload 경로에 대한 post 메서드는 이제 다음과 일치합니다:

image_processor/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

이 파일을 저장하고 닫은 다음 utils.js 파일을 엽니다:

  1. nano utils.js

utils.js 파일에서 방금 /upload 경로 콜백에 대한 복사한 줄을 processUploadedImages 함수에 붙여넣습니다:

image_processor/utils.js
...
function processUploadedImages(job) {
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}
...

이제 이미지 처리 코드를 이동했으므로 이전에 정의한 processUploadedImages() 함수의 job 매개변수에서 이미지 데이터를 사용하도록 업데이트해야 합니다.

아래 강조 표시된 줄을 추가하고 업데이트하십시오.

image_processor/utils.js

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);
  ...
}

이미지 데이터의 문자열 버전을 Buffer.from() 메서드를 사용하여 다시 이진 형식으로 변환합니다. 그런 다음, 대기열에 저장된 이미지 이름에 대한 참조를 사용하여 path.parse()를 업데이트합니다. 그 다음, sharp() 메서드를 업데이트하여 imageFileData 변수에 저장된 이미지 이진 데이터를 사용합니다.

다음은 완성된 utils.js 파일입니다:

image_processor/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}

module.exports = { processUploadedImages };

파일을 저장하고 닫은 다음, index.js로 돌아갑니다:

  1. nano index.js

이제 이미지가 utils.js 파일에서 처리되므로, sharp 변수는 더 이상 필요하지 않습니다. 파일에서 강조된 줄을 삭제합니다:

image_processor/index.js
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
...

파일을 저장하고 닫습니다.

이제 Redis에서 대기열을 생성하고 작업을 추가하는 기능을 정의했습니다. 또한, 업로드된 이미지를 처리하는 processUploadedImages() 함수를 정의했습니다.

남은 작업은 대기열에서 작업을 가져와서 작업 데이터와 함께 processUploadedImages() 함수를 호출하는 소비자(또는 워커)를 생성하는 것입니다.

에디터에서 worker.js 파일을 생성하세요:

  1. nano worker.js

worker.js 파일에 다음 코드를 추가하세요:

image_processor/worker.js
const { Worker } = require("bullmq");

const { processUploadedImages } = require("./utils");

const workerHandler = (job) => {
  console.log("Starting job:", job.name);
  processUploadedImages(job.data);
  console.log("Finished job:", job.name);
  return;
};

첫 번째 줄에서는 bullmq에서 Worker 클래스를 가져옵니다. 인스턴스화하면 이 클래스는 대기열에서 작업을 가져와 실행하는 워커를 시작합니다. 그 다음으로, 워커가 대기열의 데이터와 함께 함수를 호출할 수 있도록 utils.js 파일에서 processUploadedImages() 함수를 참조합니다.

큐에있는 작업 데이터를 포함하는 job 매개 변수를 사용하는 workerHandler() 함수를 정의합니다. 함수에서 작업이 시작되었다고 로그를 기록한 다음 job 데이터로 processUploadedImages()를 호출합니다. 그 후에 성공 메시지를 기록하고 null을 반환합니다.

작업자가 Redis에 연결하고 큐에서 작업을 디큐하고 작업 데이터로 workerHandler()를 호출할 수 있도록 다음 줄을 파일에 추가합니다.

image_processor/worker.js
...
const workerOptions = {
  connection: {
    host: "localhost",
    port: 6379,
  },
};

const worker = new Worker("imageJobQueue", workerHandler, workerOptions);

console.log("Worker started!");

여기에서 workerOptions 변수를 Redis 연결 설정을 포함하는 객체로 설정합니다. worker 변수를 다음 매개 변수를 사용하여 Worker 클래스의 인스턴스로 설정합니다:

  • imageJobQueue: 작업 큐의 이름.
  • workerHandler: Redis 큐에서 작업이 디큐 된 후 실행될 함수.
  • workerOptions: 작업자가 Redis와의 연결을 설정하는 데 사용하는 Redis 구성 설정.

마지막으로 성공 메시지를 기록합니다.

라인을 추가 한 후 파일을 저장하고 닫습니다.

이제 큐에서 작업을 디큐하고 실행하기 위해 bullmq 작업자 기능을 정의했습니다.

터미널에서 앱을 테스트하기 위해 public/images 디렉토리의 이미지를 제거합니다:

  1. rm public/images/*

다음으로 index.js 파일을 실행합니다:

  1. node index.js

앱이 시작됩니다:

Output
Server running on port 3000

이제 작업자를 시작합니다. 두 번째 터미널 세션을 열고 프로젝트 디렉토리로 이동합니다.

  1. cd image_processor/

다음 명령어로 worker를 시작하세요:

  1. node worker.js

Worker가 시작됩니다:

Output
Worker started!

브라우저에서 http://localhost:3000/을 방문하세요. 파일 선택 버튼을 누르고 컴퓨터에서 underwater.png을 선택한 다음, 이미지 업로드 버튼을 누르세요.

몇 초 후 페이지를 새로고침하라는 즉시 응답을 받을 수도 있습니다:

또는 일부 이미지가 처리되는 동안 페이지에 일부 처리된 이미지가 나타날 수도 있습니다:

모든 크기 조정된 이미지를 로드하기 위해 페이지를 몇 번 새로고침할 수 있습니다.

Worker가 실행 중인 터미널로 돌아가세요. 해당 터미널에 다음과 같은 메시지가 표시됩니다:

Output
Worker started! Starting job: processUploadedImages Finished job: processUploadedImages

출력 결과는 bullmq가 작업을 성공적으로 실행했음을 확인합니다.

Worker가 실행 중이지 않더라도 앱은 여전히 시간이 많이 소요되는 작업을 할당할 수 있습니다. 이를 증명하기 위해 두 번째 터미널에서 worker를 중지하세요(CTRL+C).

처음 터미널 세션에서 Express 서버를 중지하고 public/images에 있는 이미지를 제거하세요:

  1. rm public/images/*

그 후, 서버를 다시 시작하세요:

  1. node index.js

브라우저에서 http://localhost:3000/을 방문하고 다시 underwater.png 이미지를 업로드하세요. /result 경로로 리디렉션되었을 때, worker가 실행되지 않기 때문에 이미지가 페이지에 표시되지 않습니다:

worker를 실행한 터미널로 돌아가서 worker를 다시 시작하세요:

  1. node worker.js

작업이 시작되었음을 알려주는 다음과 같은 출력이 표시됩니다:

Output
Worker started! Starting job: processUploadedImages

작업이 완료되고 출력에는 Finished job: processUploadedImages라는 줄이 포함되어 있으며, 브라우저를 새로 고침하십시오. 이미지가 이제로드됩니다 :

서버와 워커를 중지하십시오.

이제 시간이 많이 걸리는 작업을 백그라운드로 오프로드하고 bullmq를 사용하여 비동기적으로 실행할 수 있습니다. 다음 단계에서는 대기열 상태를 모니터링하기 위해 대시보드를 설정합니다.

단계 4 – bullmq 대기열 모니터링 대시보드 추가

이 단계에서는 bull-board 패키지를 사용하여 시각적 대시보드에서 Redis 대기열의 작업을 모니터링합니다. 이 패키지는 Redis 대기열에 저장된 bullmq 작업에 대한 정보를 표시하고 조직화하는 사용자 인터페이스 (UI) 대시 보드를 자동으로 생성합니다. 브라우저를 사용하여 Redis CLI를 터미널에서 열지 않고도 완료된 작업, 대기 중인 작업 또는 실패한 작업을 모니터링 할 수 있습니다.

텍스트 편집기에서 index.js 파일을 엽니다 :

  1. nano index.js

하이라이트 된 코드를 추가하여 bull-board를 가져옵니다.

image_processor/index.js
...
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
...

이전 코드에서는 bull-board에서 createBullBoard() 메서드를 가져오고 있습니다. 또한 bull-boardbullmq 큐에 액세스할 수 있도록 BullMQAdapter를 가져오고, Express에서 대시보드를 표시하는 기능을 제공하는 ExpressAdapter도 가져오고 있습니다.

다음으로, bull-boardbullmq와 연결하기 위해 하이라이트된 코드를 추가합니다:

image_processor/index.js
...
async function addJob(job) {
  ...
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
...

먼저, serverAdapterExpressAdapter의 인스턴스로 설정합니다. 그다음, createBullBoard()를 호출하여 bullmq 큐 데이터로 대시보드를 초기화합니다. 이 함수에는 queuesserverAdapter 속성을 가진 객체 인수를 전달합니다. 첫 번째 속성인 queuesbullmq로 정의한 큐 배열을 받습니다. 여기에서는 imageJobQueue입니다. 두 번째 속성인 serverAdapter는 Express 서버 어댑터의 인스턴스를 받는 객체를 포함합니다. 그 후에는 setBasePath() 메서드를 사용하여 /admin 경로를 대시보드에 액세스할 수 있도록 설정합니다.

다음으로, /admin 경로에 대한 serverAdapter 미들웨어를 추가합니다:

image_processor/index.js
app.use(express.static("public"))

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  ...
});

완성된 index.js 파일은 다음과 같아야 합니다:

image_processor/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);
app.use(fileUpload());

app.use(express.static("public"));

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: Buffer.from(image.data).toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

변경 사항을 완료한 후 파일을 저장하고 닫습니다.

index.js 파일을 실행합니다:

  1. node index.js

브라우저로 돌아가서 http://localhost:3000/admin을 방문합니다. 대시보드가 로드됩니다.

대시보드에서는 작업 유형, 사용하는 데이터 및 작업에 대한 자세한 정보를 검토할 수 있습니다. 또한 완료된 작업에 대한 정보를 제공하는 완료됨 탭, 실패한 작업에 대한 자세한 정보를 제공하는 실패함 탭, 일시 정지된 작업에 대한 자세한 정보를 제공하는 일시 정지됨 탭 등 다른 탭으로 전환할 수도 있습니다.

이제 bull-board 대시보드를 사용하여 큐를 모니터링할 수 있습니다.

결론

이 글에서는 bullmq를 사용하여 시간이 많이 소요되는 작업을 작업 큐로 오프로드했습니다. 먼저, bullmq를 사용하지 않고, 요청/응답 주기가 느린 시간이 많이 소요되는 작업을 가진 앱을 만들었습니다. 그런 다음, bullmq를 사용하여 시간이 많이 소요되는 작업을 오프로드하고 비동기적으로 실행하여 요청/응답 주기를 향상시켰습니다. 그 후에는 Redis에서 bullmq 큐를 모니터링하기 위해 bull-board를 사용하여 대시보드를 생성했습니다.

이 튜토리얼에서 다루지 않은 bullmq의 스케줄링, 작업 우선 순위 설정, 재시도 작업 및 워커의 동시성 설정과 같은 기능에 대해 더 알아보려면 bullmq 문서를 참조할 수 있습니다. 또한 대시보드의 기능에 대해 더 알아보려면 bull-board 문서를 방문할 수 있습니다.

Source:
https://www.digitalocean.com/community/tutorials/how-to-handle-asynchronous-tasks-with-node-js-and-bullmq