Автор выбрал Open Sourcing Mental Illness для получения пожертвования в рамках программы Write for DOnations.
Введение
Node.js выполняет код на JavaScript в одном потоке, что означает, что ваш код может выполнять только одну задачу за раз. Однако сам Node.js многопоточен и предоставляет скрытые потоки через библиотеку libuv
, которая обрабатывает операции ввода-вывода, такие как чтение файлов с диска или сетевые запросы. С использованием скрытых потоков Node.js предоставляет асинхронные методы, которые позволяют вашему коду выполнять ввод-вывод без блокировки основного потока.
Хотя у Node.js есть скрытые потоки, их нельзя использовать для выполнения задач, интенсивных для процессора, таких как сложные вычисления, изменение размера изображений или сжатие видео. Поскольку JavaScript работает в одном потоке, когда выполняется задача, требующая много вычислительных ресурсов, она блокирует основной поток, и другой код не выполняется, пока задача не завершится. Без использования других потоков единственным способом ускорить такую задачу, связанную с процессором, является увеличение скорости процессора.
Однако в последние годы процессоры не стали быстрее. Вместо этого компьютеры поставляются с дополнительными ядрами, и теперь более распространено иметь 8 или более ядер. Несмотря на эту тенденцию, ваш код не будет использовать дополнительные ядра на вашем компьютере для ускорения задач, зависящих от процессора, или избегать нарушения основного потока, потому что JavaScript работает в одном потоке.
Чтобы исправить это, Node.js представил модуль worker-threads
, который позволяет создавать потоки и выполнять несколько задач на JavaScript параллельно. Когда поток завершает задачу, он отправляет сообщение в основной поток, содержащее результат операции, чтобы его можно было использовать с другими частями кода. Преимущество использования рабочих потоков в том, что задачи, зависящие от процессора, не блокируют основной поток, и вы можете разделить и распределить задачу на несколько рабочих для ее оптимизации.
В этом уроке вы создадите приложение Node.js с задачей, требующей много ресурсов процессора, и блокирующей основной поток. Затем вы будете использовать модуль worker-threads
для переноса задачи, требующей много ресурсов процессора, в другой поток, чтобы избежать блокировки основного потока. Наконец, вы разделите задачу, зависящую от процессора, и четыре потока будут работать над ней параллельно для ускорения задачи.
Предварительные требования
Для завершения этого урока вам понадобится:
-
Многоядерная система с четырьмя или более ядрами. Вы все еще можете следовать инструкциям с Шага 1 по 6 на двухъядерной системе. Однако для выполнения Шага 7 требуется четыре ядра для улучшения производительности.
-
Среда разработки Node.js. Если вы используете Ubuntu 22.04, установите последнюю версию Node.js, следуя шагу 3 из Как установить Node.js на Ubuntu 22.04. Если вы используете другую операционную систему, см. Как установить Node.js и создать локальную среду разработки.
-
Хорошее понимание цикла событий, обратных вызовов и обещаний в JavaScript, которое вы можете найти в нашем руководстве, Понимание цикла событий, обратных вызовов, обещаний и Async/Await в JavaScript.
-
Базовое понимание использования веб-фреймворка Express. Ознакомьтесь с нашим руководством, Как начать работу с Node.js и Express.
Настройка проекта и установка зависимостей
На этом этапе вы создадите каталог проекта, инициализируете npm
и установите все необходимые зависимости.
Для начала создайте и перейдите в каталог проекта:
- mkdir multi-threading_demo
- cd multi-threading_demo
Команда mkdir
создает каталог, а команда cd
изменяет рабочий каталог на только что созданный.
Затем инициализируйте каталог проекта с помощью npm, используя команду npm init
:
- npm init -y
Параметр -y
принимает все значения по умолчанию.
При выполнении этой команды вывод будет похож на следующий:
Wrote to /home/sammy/multi-threading_demo/package.json:
{
"name": "multi-threading_demo",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC"
}
Затем установите express
, фреймворк веб-приложений для Node.js:
- npm install express
Вы будете использовать Express для создания серверного приложения с блокирующими и неблокирующими конечными точками.
Node.js поставляется с модулем worker-threads
по умолчанию, поэтому вам не нужно его устанавливать.
Теперь вы установили необходимые пакеты. Затем вы узнаете больше о процессах и потоках, а также о том, как они выполняются на компьютере.
Понимание процессов и потоков
Прежде чем приступить к написанию задач, связанных с использованием процессора, и перенесению их на отдельные потоки, вам сначала нужно понять, что такое процессы и потоки, а также различия между ними. Важно также изучить, как процессы и потоки выполняются в однопроцессорной или многопроцессорной системе.
Процесс
A process is a running program in the operating system. It has its own memory and cannot see nor access the memory of other running programs. It also has an instruction pointer, which indicates the instruction currently being executed in a program. Only one task can be executed at a time.
Для понимания этого вы создадите программу Node.js с бесконечным циклом, чтобы она не завершалась при запуске.
Используя nano
или ваш текстовый редактор по умолчанию, создайте и откройте файл process.js
:
- nano process.js
В вашем файле process.js
введите следующий код:
const process_name = process.argv.slice(2)[0];
count = 0;
while (true) {
count++;
if (count == 2000 || count == 4000) {
console.log(`${process_name}: ${count}`);
}
}
В первой строке свойство process.argv
возвращает массив, содержащий аргументы командной строки программы. Затем вы используете метод slice()
JavaScript с аргументом 2
, чтобы сделать поверхностную копию массива с индекса 2 и далее. Это позволяет пропустить первые два аргумента, которые представляют собой путь Node.js и имя файла программы. Затем вы используете синтаксис квадратных скобок для извлечения первого аргумента из срезанного массива и сохранения его в переменной process_name
.
После этого вы определяете цикл while
и передаете ему условие true
, чтобы цикл работал бесконечно. Внутри цикла переменная count
увеличивается на 1
во время каждой итерации. После этого идет оператор if
, который проверяет, равен ли count
2000
или 4000
. Если условие оценивается как истинное, метод console.log()
регистрирует сообщение в терминале.
Сохраните и закройте файл, используя CTRL+X
, затем нажмите Y
, чтобы сохранить изменения.
Запустите программу с помощью команды node
:
- node process.js A &
A
is a command-line argument that is passed to the program and stored in the process_name
variable. The &
at end the allows the Node program to run in the background, which lets you enter more commands in the shell.
Когда вы запустите программу, вы увидите вывод, аналогичный следующему:
Output[1] 7754
A: 2000
A: 4000
Число 7754
– это идентификатор процесса, который ему назначила операционная система. A: 2000
и A: 4000
– это вывод программы.
Когда вы запускаете программу с помощью команды node
, вы создаете процесс. Операционная система выделяет память для программы, находит исполняемый файл программы на диске вашего компьютера и загружает программу в память. Затем она назначает ей идентификатор процесса и начинает выполнение программы. На этом этапе ваша программа теперь становится процессом.
Когда процесс запущен, его идентификатор процесса добавляется в список процессов операционной системы и может быть виден с помощью инструментов типа htop
, top
или ps
. Эти инструменты предоставляют более подробную информацию о процессах, а также опции для их остановки или приоритезации.
Чтобы быстро получить краткое описание процесса Node, нажмите ENTER
в вашем терминале, чтобы вернуть приглашение обратно. Затем запустите команду ps
, чтобы увидеть процессы Node:
- ps |grep node
Команда ps
перечисляет все процессы, связанные с текущим пользователем в системе. Оператор перенаправления |
передает всё вывод ps
фильтру grep
, чтобы отфильтровать процессы и показать только процессы Node.
При выполнении команды будет получен вывод, аналогичный следующему:
Output7754 pts/0 00:21:49 node
Вы можете создать бесчисленное количество процессов из одной программы. Например, используйте следующую команду, чтобы создать еще три процесса с разными аргументами и поместить их в фоновый режим:
- node process.js B & node process.js C & node process.js D &
В этой команде вы создали еще три экземпляра программы process.js
. Символ &
помещает каждый процесс в фоновый режим.
После выполнения команды вывод будет выглядеть примерно следующим образом (хотя порядок может отличаться):
Output[2] 7821
[3] 7822
[4] 7823
D: 2000
D: 4000
B: 2000
B: 4000
C: 2000
C: 4000
Как видно из вывода, каждый процесс регистрировал имя процесса в терминале, когда счетчик достигал 2000
и 4000
. Каждый процесс не знает о другом выполняющемся процессе: процесс D
не знает о процессе C
, и наоборот. Любые действия в любом из процессов не повлияют на другие процессы Node.js.
Если вы внимательно рассмотрите вывод, то заметите, что порядок вывода не совпадает с порядком, который был у вас при создании трех процессов. При запуске команды аргументы процессов были упорядочены как B
, C
и D
. Но теперь порядок – D
, B
и C
. Причина в том, что в операционной системе есть алгоритмы планирования, которые решают, какой процесс выполнять на ЦП в определенный момент времени.
На одноядерной машине процессы выполняются одновременно. То есть операционная система переключается между процессами через равные интервалы. Например, процесс D
выполняется в течение ограниченного времени, затем его состояние сохраняется где-то, и ОС планирует выполнение процесса B
в течение ограниченного времени, и так далее. Это происходит до завершения всех задач. Из вывода может показаться, что каждый процесс выполняется до завершения, но на самом деле планировщик ОС постоянно переключается между ними.
На многоядерной системе — предполагая, что у вас четыре ядра — ОС планирует выполнение каждого процесса на каждом ядре одновременно. Это известно как параллелизм. Однако, если вы создадите еще четыре процесса (всего восемь), каждое ядро будет выполнять два процесса одновременно, пока они не завершатся.
Потоки
Потоки похожи на процессы: у них есть собственный указатель инструкций и они могут выполнять одну задачу JavaScript за раз. В отличие от процессов, у потоков нет своей собственной памяти. Вместо этого они находятся в памяти процесса. При создании процесса можно создать несколько потоков с помощью модуля worker_threads
, выполняющих код JavaScript параллельно. Кроме того, потоки могут общаться друг с другом через передачу сообщений или обмен данных в памяти процесса. Это делает их легковесными по сравнению с процессами, поскольку создание потока не требует больше памяти от операционной системы.
Что касается выполнения потоков, они ведут себя аналогично процессам. Если у вас работают несколько потоков на одноядерной системе, операционная система будет переключаться между ними через равные промежутки времени, давая каждому потоку возможность выполняться непосредственно на одном процессоре. На многоядерной системе ОС планирует потоки на все ядра и выполняет код JavaScript одновременно. Если вы создаете больше потоков, чем доступно ядер, каждое ядро будет выполнять несколько потоков параллельно.
С этим нажмите ENTER
, затем остановите все текущие процессы Node с помощью команды kill
:
- sudo kill -9 `pgrep node`
pgrep
возвращает идентификаторы процессов всех четырех процессов Node для команды kill
. Опция -9
указывает kill
отправить сигнал SIGKILL.
При выполнении команды вы увидите вывод, аналогичный следующему:
Output[1] Killed node process.js A
[2] Killed node process.js B
[3] Killed node process.js C
[4] Killed node process.js D
Иногда вывод может задерживаться и появляться, когда вы запускаете другую команду позже.
Теперь, когда вы знаете разницу между процессом и потоком, вы будете работать с скрытыми потоками Node.js в следующем разделе.
Понимание скрытых потоков в Node.js
Node.js действительно предоставляет дополнительные потоки, поэтому его считают многопоточным. В этом разделе вы рассмотрите скрытые потоки в Node.js, которые помогают сделать операции ввода-вывода неблокирующими.
Как упоминалось во введении, JavaScript является однопоточным, и весь код JavaScript выполняется в одном потоке. К этому относится и исходный код вашей программы, и сторонние библиотеки, которые вы включаете в свою программу. Когда программа выполняет операцию ввода-вывода для чтения файла или сетевого запроса, это блокирует основной поток.
Однако Node.js реализует библиотеку libuv
, которая предоставляет четыре дополнительных потока для процесса Node.js. С их помощью операции ввода-вывода обрабатываются отдельно, и по завершении они добавляют обратный вызов, связанный с задачей ввода-вывода, в очередь микрозадач. Когда стек вызовов в основном потоке освобождается, обратный вызов помещается в стек вызовов и затем выполняется. Для ясности отмечу, что обратный вызов, связанный с данной задачей ввода-вывода, не выполняется параллельно; однако сама задача, такая как чтение файла или сетевой запрос, выполняется параллельно с использованием потоков. После завершения задачи ввода-вывода обратный вызов выполняется в основном потоке.
Помимо этих четырех потоков, движок V8 также предоставляет два потока для обработки вещей, таких как автоматическое сборка мусора. Это дает общее количество потоков в процессе – семь: один основной поток, четыре потока Node.js и два потока V8.
Чтобы подтвердить, что у каждого процесса Node.js есть семь потоков, снова запустите файл process.js
и поместите его в фоновый режим:
- node process.js A &
Терминал зарегистрирует идентификатор процесса, а также вывод программы:
Output[1] 9933
A: 2000
A: 4000
Запомните идентификатор процесса где-то и нажмите ENTER
, чтобы снова воспользоваться приглашением.
Чтобы увидеть потоки, выполните команду top
и передайте ей идентификатор процесса, отображенный в выводе:
- top -H -p 9933
-H
указывает top
отображать потоки в процессе. Флаг -p
указывает top
мониторить только активность в указанном идентификаторе процесса.
При выполнении этой команды вывод будет похож на следующий:
Outputtop - 09:21:11 up 15:00, 1 user, load average: 0.99, 0.60, 0.26
Threads: 7 total, 1 running, 6 sleeping, 0 stopped, 0 zombie
%Cpu(s): 24.8 us, 0.3 sy, 0.0 ni, 75.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
MiB Mem : 7951.2 total, 6756.1 free, 248.4 used, 946.7 buff/cache
MiB Swap: 0.0 total, 0.0 free, 0.0 used. 7457.4 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
9933 node-us+ 20 0 597936 51864 33956 R 99.9 0.6 4:19.64 node
9934 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node
9935 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.84 node
9936 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node
9937 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.93 node
9938 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node
9939 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node
Как видно в выводе, процесс Node.js имеет в общей сложности семь потоков: один основной поток для выполнения JavaScript, четыре потока Node.js и два потока V8.
Как было обсуждено ранее, четыре потока Node.js используются для операций ввода-вывода с целью их сделать неблокирующими. Они хорошо справляются с этой задачей, и создание собственных потоков для операций ввода-вывода может даже ухудшить производительность вашего приложения. То же самое нельзя сказать о задачах, связанных с центральным процессором. Задача, связанная с ЦП, не использует дополнительных доступных потоков в процессе и блокирует основной поток.
Теперь нажмите q
, чтобы выйти из top
и остановить процесс Node с помощью следующей команды:
- kill -9 9933
Теперь, когда вы знаете о потоках в процессе Node.js, вы напишете задачу, связанную с ЦП, в следующем разделе, и наблюдаете, как она влияет на основной поток.
Создание задачи, связанной с ЦП, без использования рабочих потоков
В этом разделе вы создадите Express-приложение с неблокирующим маршрутом и блокирующим маршрутом, выполняющим задачу, связанную с ЦП.
Во-первых, откройте index.js
в вашем предпочтительном редакторе:
- nano index.js
В вашем файле index.js
добавьте следующий код для создания основного сервера:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
В следующем блоке кода вы создаете HTTP-сервер с использованием Express. В первой строке вы импортируете модуль express
. Затем вы устанавливаете переменную app
для хранения экземпляра Express. После этого вы определяете переменную port
, которая содержит номер порта, на котором сервер должен слушать.
Затем вы используете app.get('/non-blocking')
, чтобы определить маршрут, по которому должны отправляться запросы GET
. Наконец, вы вызываете метод app.listen()
, чтобы указать серверу начать прослушивание порта 3000
.
Далее определите еще один маршрут /blocking/
, который будет содержать задачу, требующую больших вычислительных ресурсов:
...
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
Вы определяете маршрут /blocking
с помощью app.get("/blocking")
, который принимает асинхронный обратный вызов с префиксом async
в качестве второго аргумента, который выполняет задачу, требующую больших вычислительных ресурсов. Внутри обратного вызова вы создаете цикл for
, который выполняется 20 миллиардов раз, и на каждой итерации увеличивает переменную counter
на 1
. Эта задача выполняется на процессоре и займет несколько секунд для завершения.
На этом этапе ваш файл index.js
будет выглядеть следующим образом:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
Сохраните и закройте файл, затем запустите сервер с помощью следующей команды:
- node index.js
При выполнении этой команды вы увидите вывод, аналогичный следующему:
OutputApp listening on port 3000
Это показывает, что сервер работает и готов обслуживать запросы.
Теперь посетите http://localhost:3000/non-blocking
в вашем предпочитаемом браузере. Вы увидите мгновенный ответ со сообщением This page is non-blocking
.
Примечание: Если вы следуете учебнику на удаленном сервере, вы можете использовать переадресацию портов для тестирования приложения в браузере.
Пока сервер Express все еще работает, откройте еще один терминал на вашем локальном компьютере и введите следующую команду:
- ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip
Подключившись к серверу, перейдите по адресу http://localhost:3000/non-blocking
в веб-браузере вашего локального компьютера. Держите второй терминал открытым на протяжении оставшейся части этого учебника.
Затем откройте новую вкладку и посетите http://localhost:3000/blocking
. При загрузке страницы быстро откройте еще две вкладки и снова посетите http://localhost:3000/non-blocking
. Вы увидите, что вы не получите мгновенный ответ, и страницы будут продолжать пытаться загрузиться. Это произойдет только после того, как маршрут /blocking
завершит загрузку и вернет ответ result is 20000000000
, что остальные маршруты также вернут ответ.
Причина, по которой все маршруты /non-blocking
не работают при загрузке маршрута /blocking
, заключается в том, что блокирующий цикл for
, привязанный к ЦП, блокирует основной поток. Когда основной поток заблокирован, Node.js не может обслуживать запросы до завершения задачи, связанной с ЦП. Так что если у вашего приложения тысячи одновременных запросов GET
к маршруту /non-blocking
, достаточно одного посещения маршрута /blocking
, чтобы сделать все маршруты приложения нереагирующими.
Как видите, блокировка основного потока может нанести вред пользовательскому опыту работы с вашим приложением. Чтобы решить эту проблему, вам необходимо перенести задачу, связанную с использованием ЦП, на другой поток, чтобы основной поток мог продолжить обработку других HTTP-запросов.
Для этого остановите сервер, нажав клавишу CTRL+C
. Вы снова запустите сервер в следующем разделе после внесения дополнительных изменений в файл index.js
. Причина остановки сервера заключается в том, что Node.js не обновляется автоматически при внесении изменений в файл.
Теперь, когда вы понимаете негативное влияние задач, интенсивно использующих ЦП, на ваше приложение, вы попробуете избежать блокировки основного потока с помощью обещаний.
Перенос задачи, связанной с использованием ЦП, с помощью обещаний
Часто, когда разработчики узнают о блокирующем эффекте задач, связанных с использованием ЦП, они обращаются к обещаниям, чтобы сделать код неблокирующим. Это инстинкт исходит из знания использования неблокирующих методов ввода-вывода на основе обещаний, таких как readFile()
и writeFile()
. Но, как вы узнали, операции ввода-вывода используют скрытые потоки Node.js, которые не используются задачами, связанными с использованием ЦП. Однако в этом разделе вы обернете задачу, связанную с использованием ЦП, в обещание, чтобы попытаться сделать ее неблокирующей. Это не сработает, но поможет вам увидеть ценность использования рабочих потоков, что вы и сделаете в следующем разделе.
Снова откройте файл index.js
в вашем редакторе.
- nano index.js
В вашем файле index.js
удалите выделенный код, содержащий трудоемкую задачу для процессора:
...
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
...
Затем добавьте следующий выделенный код, содержащий функцию, возвращающую обещание:
...
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
res.status(200).send(`result is ${counter}`);
}
Функция calculateCount()
теперь содержит вычисления, которые у вас были в обработчике функции /blocking
. Функция возвращает обещание, которое инициализируется синтаксисом new Promise
. Обещание принимает обратный вызов с параметрами resolve
и reject
, которые обрабатывают успех или ошибку. Когда цикл for
завершает свою работу, обещание разрешается значением в переменной counter
.
Затем вызовите функцию calculateCount()
в обработчике функции /blocking/
в файле index.js
:
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
Здесь вы вызываете функцию calculateCount()
с ключевым словом await
, предварительно дождавшись разрешения обещания. Как только обещание разрешается, переменная counter
устанавливается в разрешенное значение.
Ваш полный код теперь будет выглядеть следующим образом:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
Сохраните и закройте ваш файл, затем снова запустите сервер:
- node index.js
В вашем веб-браузере посетите http://localhost:3000/blocking
и при его загрузке быстро перезагрузите вкладки http://localhost:3000/non-blocking
. Как вы заметите, маршруты non-blocking
по-прежнему затронуты, и все они будут ждать завершения загрузки маршрута /blocking
. Поскольку маршруты все еще затронуты, обещания не позволяют выполнять код JavaScript параллельно и не могут использоваться для выполнения трудоемких задач, не блокируя их.
С этим остановите сервер приложений с помощью CTRL+C
.
Теперь, когда вы знаете, что обещания не предоставляют никакого механизма для выполнения CPU-зависимых задач неблокирующими, вы будете использовать модуль Node.js worker-threads
для переноса CPU-зависимой задачи в отдельный поток.
Перенос CPU-зависимой задачи с помощью модуля worker-threads
В этом разделе вы перенесете интенсивную задачу на центральный процессор в другой поток с использованием модуля worker-threads
, чтобы избежать блокировки основного потока. Для этого вы создадите файл worker.js
, который будет содержать интенсивную задачу для центрального процессора. В файле index.js
вы будете использовать модуль worker-threads
для инициализации потока и запуска задачи в файле worker.js
для параллельного выполнения основному потоку. После завершения задачи поток-рабочий отправит сообщение с результатом обратно в основной поток.
Чтобы начать, убедитесь, что у вас есть 2 или более ядра с помощью команды nproc
:
- nproc
Output4
Если отображается два или более ядра, вы можете продолжить с этим шагом.
Затем создайте и откройте файл worker.js
в вашем текстовом редакторе:
- nano worker.js
В вашем файле worker.js
добавьте следующий код для импорта модуля worker-threads
и выполнения интенсивной задачи для центрального процессора:
const { parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
Первая строка загружает модуль worker_threads
и извлекает класс parentPort
. Класс предоставляет методы, которые можно использовать для отправки сообщений в основной поток. Затем у вас есть задача, требующая больших вычислительных ресурсов, которая в настоящее время находится в функции calculateCount()
в файле index.js
. Позже в этом шаге вы удалите эту функцию из файла index.js
.
Далее добавьте следующий выделенный код:
const { parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
parentPort.postMessage(counter);
Здесь вы вызываете метод postMessage()
класса parentPort
, который отправляет сообщение в основной поток, содержащее результат задачи, связанной с центральным процессором, хранящийся в переменной counter
.
Сохраните и закройте файл. Откройте файл index.js
в текстовом редакторе:
- nano index.js
Поскольку у вас уже есть задача, требующая много вычислительных ресурсов, в файле worker.js
, удалите выделенный код из файла index.js
:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
Затем в обратный вызов app.get("/blocking")
добавьте следующий код для инициализации потока:
const express = require("express");
const { Worker } = require("worker_threads");
...
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error occurred: ${msg}`);
});
});
...
Сначала вы импортируете модуль worker_threads
и распаковываете класс Worker
. Внутри обратного вызова app.get("/blocking")
вы создаете экземпляр класса Worker
с использованием ключевого слова new
, за которым следует вызов Worker
с аргументом пути к файлу worker.js
. Это создает новый поток, и код в файле worker.js
начинает выполняться в потоке на другом ядре.
После этого вы присоединяете событие к экземпляру worker
, используя метод on("message")
, чтобы слушать событие сообщения. Когда сообщение содержит результат из файла worker.js
, оно передается в виде параметра обратному вызову метода, который возвращает ответ пользователю с результатом задачи, связанной с ЦП.
Затем вы присоединяете еще одно событие к экземпляру worker, используя метод on("error")
, чтобы слушать событие ошибки. Если произойдет ошибка, обратный вызов возвращает ответ 404
с сообщением об ошибке пользователю.
Теперь ваш файл будет выглядеть следующим образом:
const express = require("express");
const { Worker } = require("worker_threads");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error occurred: ${msg}`);
});
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
Сохраните и закройте файл, затем запустите сервер:
- node index.js
Посетите вкладку http://localhost:3000/blocking
снова в вашем веб-браузере. Прежде чем она загрузится, обновите все вкладки http://localhost:3000/non-blocking
. Теперь вы заметите, что они мгновенно загружаются, не ожидая завершения загрузки маршрута /blocking
. Это происходит потому, что задача, связанная с ЦП, передается на другой поток, и основной поток обрабатывает все входящие запросы.
Теперь остановите сервер, используя CTRL+C
.
Теперь, когда вы можете сделать интенсивную задачу ЦП неблокирующей, используя рабочий поток, вы используете четыре рабочих потока для улучшения производительности интенсивной задачи ЦП.
Оптимизация задачи, требующей много процессорного времени, с использованием четырех рабочих потоков
В этом разделе вы разделите задачу, требующую много процессорного времени, между четырьмя рабочими потоками, чтобы они могли завершить задачу быстрее и сократить время загрузки маршрута /blocking
.
Чтобы больше рабочих потоков работали над той же задачей, вам нужно разделить задачи. Поскольку задача включает в себя циклическое выполнение 20 миллиардов раз, вы разделите 20 миллиардов на количество потоков, которые вы хотите использовать. В данном случае это 4
. Вычисление 20_000_000_000 / 4
приведет к результату 5_000_000_000
. Таким образом, каждый поток будет выполнять цикл от 0
до 5_000_000_000
и увеличивать counter
на 1
. Когда каждый поток заканчивает работу, он отправляет сообщение главному потоку, содержащее результат. После того как главный поток получит сообщения от всех четырех потоков по отдельности, вы объедините результаты и отправите ответ пользователю.
Вы также можете использовать тот же подход, если у вас есть задача, которая перебирает большие массивы. Например, если вы хотите изменить размер 800 изображений в каталоге, вы можете создать массив, содержащий все пути к файлам изображений. Затем разделите 800
на 4
(количество потоков) и позвольте каждому потоку работать в своем диапазоне. Первый поток будет изменять размер изображений с индекса массива 0
до 199
, второй поток от индекса 200
до 399
и так далее.
Сначала убедитесь, что у вас есть четыре или более ядра:
- nproc
Output4
Сделайте копию файла worker.js
, используя команду cp
:
- cp worker.js four_workers.js
Текущие файлы index.js
и worker.js
останутся нетронутыми, чтобы вы могли запустить их снова для сравнения их производительности с изменениями в этом разделе позже.
Затем откройте файл four_workers.js
в вашем текстовом редакторе:
- nano four_workers.js
В вашем файле four_workers.js
добавьте выделенный код для импорта объекта workerData
:
const { workerData, parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000 / workerData.thread_count; i++) {
counter++;
}
parentPort.postMessage(counter);
Сначала вы извлекаете объект WorkerData
, который будет содержать данные, переданные из основного потока при инициализации потока (что вы сделаете скоро в файле index.js
). У объекта есть свойство thread_count
, содержащее количество потоков, которое равно 4
. Затем в цикле for
значение 20_000_000_000
делится на 4
, что приводит к 5_000_000_000
.
Сохраните и закройте ваш файл, затем скопируйте файл index.js
:
- cp index.js index_four_workers.js
Откройте файл index_four_workers.js
в вашем редакторе:
- nano index_four_workers.js
В вашем файле index_four_workers.js
добавьте выделенный код для создания экземпляра потока:
...
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
...
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
});
}
app.get("/blocking", async (req, res) => {
...
})
...
Сначала вы определяете константу THREAD_COUNT
, содержащую количество потоков, которые вы хотите создать. Позже, когда у вас будет больше ядер на вашем сервере, масштабирование будет включать изменение значения THREAD_COUNT
на количество потоков, которые вы хотите использовать.
Далее, функция createWorker()
создает и возвращает обещание. Внутри обратного вызова обещания вы инициализируете новый поток, передавая классу Worker
путь к файлу four_workers.js
в качестве первого аргумента. Затем вы передаете объект в качестве второго аргумента. Далее вы присваиваете объекту свойство workerData
, которое имеет в качестве значения другой объект. Наконец, вы присваиваете объекту свойство thread_count
, значение которого – количество потоков в константе THREAD_COUNT
. Объект workerData
– тот, на который вы ссылаетесь в файле workers.js
ранее.
Чтобы убедиться, что обещание разрешается или выдает ошибку, добавьте следующие выделенные строки:
...
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
worker.on("message", (data) => {
resolve(data);
});
worker.on("error", (msg) => {
reject(`An error ocurred: ${msg}`);
});
});
}
...
Когда рабочий поток отправляет сообщение основному потоку, обещание разрешается с возвращенными данными. Однако, если произошла ошибка, обещание возвращает сообщение об ошибке.
Теперь, когда вы определили функцию, которая инициализирует новый поток и возвращает данные из потока, вы будете использовать эту функцию в app.get("/blocking")
, чтобы создавать новые потоки.
Но сначала удалите следующий выделенный код, поскольку вы уже определили эту функциональность в функции createWorker()
:
...
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error ocurred: ${msg}`);
});
});
...
После удаления кода добавьте следующий код для инициализации четырех рабочих потоков:
...
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
});
...
Сначала вы создаете переменную workerPromises
, которая содержит пустой массив. Затем вы выполняете итерацию столько раз, сколько указано в переменной THREAD_COUNT
, которая равна 4
. Во время каждой итерации вы вызываете функцию createWorker()
, чтобы создать новый поток. Затем вы добавляете объект promise, который возвращает эта функция, в массив workerPromises
с использованием метода push
в JavaScript. По завершении цикла, в массиве workerPromises
будет четыре объекта promise, каждый из которых возвращен вызовом функции createWorker()
четыре раза.
Теперь добавьте следующий выделенный код ниже, чтобы дождаться разрешения promise и вернуть ответ пользователю:
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
const thread_results = await Promise.all(workerPromises);
const total =
thread_results[0] +
thread_results[1] +
thread_results[2] +
thread_results[3];
res.status(200).send(`result is ${total}`);
});
Поскольку массив workerPromises
содержит promises, возвращенные вызовом createWorker()
, вы добавляете префикс await
перед методом Promise.all()
и вызываете метод all()
с аргументом workerPromises
. Метод Promise.all()
ожидает разрешения всех promises в массиве. Когда это происходит, переменная thread_results
содержит значения, на которые разрешены promises. Поскольку вычисления были разделены между четырьмя работниками, вы добавляете их все вместе, получая каждое значение из thread_results
с использованием синтаксиса квадратных скобок. После сложения вы возвращаете общее значение на страницу.
Ваш полный файл теперь должен выглядеть так:
const express = require("express");
const { Worker } = require("worker_threads");
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
worker.on("message", (data) => {
resolve(data);
});
worker.on("error", (msg) => {
reject(`An error ocurred: ${msg}`);
});
});
}
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
const thread_results = await Promise.all(workerPromises);
const total =
thread_results[0] +
thread_results[1] +
thread_results[2] +
thread_results[3];
res.status(200).send(`result is ${total}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
Сохраните и закройте файл. Прежде чем запустить этот файл, сначала запустите index.js
, чтобы измерить время его отклика.
- node index.js
Далее откройте новый терминал на своем локальном компьютере и введите следующую команду curl
, которая измеряет время получения ответа от маршрута /blocking
:
- time curl --get http://localhost:3000/blocking
Команда time
измеряет, как долго выполняется команда curl
. Команда curl
отправляет HTTP-запрос по указанному URL, а опция --get
указывает curl
сделать запрос типа GET
.
Когда команда выполняется, ваш вывод будет выглядеть примерно так:
Outputreal 0m28.882s
user 0m0.018s
sys 0m0.000s
Выделенный вывод показывает, что для получения ответа требуется примерно 28 секунд, что может варьироваться на вашем компьютере.
Далее остановите сервер с помощью CTRL+C
и запустите файл index_four_workers.js
:
- node index_four_workers.js
Посетите маршрут /blocking
снова во втором терминале:
- time curl --get http://localhost:3000/blocking
Вы увидите вывод, согласующийся примерно с следующим:
Outputreal 0m8.491s
user 0m0.011s
sys 0m0.005s
Вывод показывает, что это занимает примерно 8 секунд, что означает, что вы сократили время загрузки примерно на 70%.
Вы успешно оптимизировали задачу, связанную с использованием процессора, с использованием четырех рабочих потоков. Если у вас есть машина с более чем четырьмя ядрами, обновите THREAD_COUNT
до этого числа, и вы еще более сократите время загрузки.
Заключение
В этой статье вы создали приложение Node с задачей, связанной с ЦП, которая блокирует основной поток. Затем вы попытались сделать эту задачу неблокирующей, используя промисы, что оказалось неудачным. После этого вы использовали модуль worker_threads
, чтобы перенести задачу, связанную с ЦП, в другой поток, чтобы сделать ее неблокирующей. Наконец, вы использовали модуль worker_threads
, чтобы создать четыре потока для ускорения работы с задачами, интенсивными для ЦП.
В качестве следующего шага ознакомьтесь с документацией по потокам Worker в Node.js, чтобы узнать больше о доступных вариантах. Кроме того, вы можете изучить библиотеку piscina
, которая позволяет создавать пул рабочих для ваших задач, интенсивных для ЦП. Если вы хотите продолжить изучение Node.js, посмотрите серию учебных пособий Как программировать на Node.js.
Source:
https://www.digitalocean.com/community/tutorials/how-to-use-multithreading-in-node-js