Параллельность c grammY runner (runner
)
Этот пакет можно использовать, если вы запускаете бота с использованием long polling, и хотите, чтобы сообщения обрабатывались параллельно.
Обязательно изучите Масштабирование II, прежде чем использовать grammY runner.
Почему нам нужен runner
Если вы размещаете своего бота на хостинге с long polling и хотите увеличить его масштабы, вам не обойтись без одновременной обработки обновлений, поскольку последовательная обработка обновлений слишком медленная. В результате боты сталкиваются с рядом проблем.
- Существуют ли условия гонки?
- Можем ли мы по-прежнему “ожидать” стек middleware? Это необходимо для обработки ошибок!
- Что, если middleware по какой-то причине не проходит дальше, блокирует ли это работу бота?
- Можем ли мы обрабатывать некоторые выбранные обновления последовательно?
- Можем ли мы ограничить нагрузку на сервер?
- Можем ли мы обрабатывать обновления на нескольких ядрах?
Как видите, нам нужно решение, способное решить все вышеперечисленные проблемы, чтобы добиться правильного long polling бота. Эта проблема совершенно отлична от создания middleware или отправки сообщений в Telegram. Следовательно, она не решается в основном пакете grammY. Вместо этого вы можете использовать grammY runner. У него также есть своя API документация.
Использование
Вот простой пример.
import { Bot } from "grammy";
import { run } from "@grammyjs/runner";
// Создайте бота
const bot = new Bot("");
// Добавьте обычный middleware и бла-бла-бла
bot.on("message", (ctx) => ctx.reply("Получил твое сообщение."));
// Правильно запустите это!
run(bot);
2
3
4
5
6
7
8
9
10
11
const { Bot } = require("grammy");
const { run } = require("@grammyjs/runner");
// Создайте бота
const bot = new Bot("");
// Добавьте обычный middleware и бла-бла-бла
bot.on("message", (ctx) => ctx.reply("Получил твое сообщение."));
// Правильно запустите это!
run(bot);
2
3
4
5
6
7
8
9
10
11
import { Bot } from "https://deno.land/x/grammy@v1.34.0/mod.ts";
import { run } from "https://deno.land/x/grammy_runner@v2.0.3/mod.ts";
// Создайте бота
const bot = new Bot("");
// Добавьте обычный middleware и бла-бла-бла
bot.on("message", (ctx) => ctx.reply("Получил твое сообщение."));
// Правильно запустите это!
run(bot);
2
3
4
5
6
7
8
9
10
11
Последовательная обработка при необходимости
Скорее всего, вам нужна гарантия того, что сообщения из одного и того же чата будут обрабатываться по порядку. Это полезно при установке middleware сессии, а также для того, чтобы ваш бот не перепутал порядок сообщений в одном и том же чате.
grammY runner экспортирует middleware sequentialize
, который заботится об этом. Вы можете посмотреть этот раздел, чтобы узнать, как его использовать.
Теперь мы рассмотрим более продвинутые возможности использования плагина.
Функцию ограничитель можно использовать не только для указания идентификатора чата или пользователя. Вместо этого вы можете возвращать список строк идентификаторов ограничений, которые определяют для каждого обновления в отдельности, каких еще вычислений оно должно дождаться, прежде чем начнется обработка.
Например, можно вернуть идентификатор чата и идентификатор пользователя автора сообщения.
bot.use(sequentialize((ctx) => {
const chat = ctx.chat?.id.toString();
const user = ctx.from?.id.toString();
return [chat, user].filter((con) => con !== undefined);
}));
2
3
4
5
Это гарантирует, что сообщения в одном и том же чате будут упорядочены правильно. Кроме того, если Алиса отправляет сообщение в группе, а затем посылает сообщение вашему боту в личном чате, то эти два сообщения будут упорядочены правильно.
В некотором смысле, вы можете задать граф зависимостей между обновлениями. grammY runner будет решать все необходимые ограничения на лету и блокировать обновления столько, сколько необходимо для обеспечения правильного упорядочивания сообщений.
Реализация этого очень эффективна. Ей требуется постоянная память (если вы не зададите бесконечную параллельность), и (амортизированное) постоянное время обработки одного обновления.
Правильно выключение
Для того чтобы бот корректно завершил свою работу, вы должны подать ему сигнал на остановку, когда процесс будет уничтожен.
Заметьте, что вы можете дождаться завершения работы runner, ожидая
задачи в Runner
, возвращаемой из run
.
const handle = run(bot);
// Эта функция будет вызвана, когда бот остановится.
handle.task().then(() => {
console.log("Бот закончил обработку!");
});
// Позже остановите бота с помощью обработчика runner.
await handle.stop();
2
3
4
5
6
7
8
9
Расширенные настройки
grammY runner состоит из трех частей: источника, поглотителя и runner’а. Источник получает обновления, поглотитель потребляет обновления, а runner настраивает и соединяет их.
Подробное описание внутренней работы runner’а можно найти здесь.
Каждая из этих трех частей может быть настроена с помощью различных параметров. Это может уменьшить сетевой трафик, позволить вам указать разрешенные обновления и многое другое.
Каждая часть runner’а получает свои настройки через специальный объект options.
run(bot, {
source: {},
runner: {},
sink: {},
});
2
3
4
5
Вы должны посмотреть Run
в документации API, чтобы узнать, какие параметры доступны.
Например, вы узнаете, что allowed
могут быть включены с помощью следующего фрагмента кода.
run(bot, { runner: { fetch: { allowed_updates: [] } } });
Многопоточность
Нет смысла в многопоточности, если ваш бот не обрабатывает хотя бы 50 миллионов обновлений в день (>500 в секунду). Пропустите этот раздел, если ваш бот обрабатывает меньше трафика, чем это.
JavaScript является однопоточным. Это удивительно, потому что параллелизм — это сложно, а значит, если есть только один поток, то много головной боли, естественно, снимается.
Однако если нагрузка на бота очень высока (речь идет о 1000 обновлений в секунду и выше), то одного ядра может оказаться недостаточно. В принципе, одно ядро начнет справляться с обработкой JSON всех сообщений, которые должен обработать ваш бот.
Workers бота для обработки обновлений
Есть простой выход: workers бота! grammY runner позволяет вам создать несколько worker’ов, которые могут обрабатывать ваши обновления параллельно на фактически разных ядрах (используя разные циклы событий и отдельную память).
На Node.js grammY runner использует Worker Threads. На Deno grammY runner использует Web Workers.
Концептуально, grammY runner предоставляет вам класс Bot
, который может обрабатывать обновления. Он равносилен обычному классу Bot
(фактически, он даже расширяет Bot
). Основное различие между Bot
и Bot
заключается в том, что Bot
не может получать обновления. Вместо этого он должен получать их от обычного Bot
, который управляет своими worker’ами.
1. получение обновлений Bot
__// \\__
__/ / \ \__
2. отправка worker'ам __/ / \ \__
__/ / \ \__
/ / \ \
3. обработка обновлений BotWorker BotWorker BotWorker BotWorker
grammY runner предоставляет вам middleware, который может отправлять обновления worker’ам бота. Бот workers могут получать эти обновления и обрабатывать их. Таким образом, центральный бот должен заниматься только получением и распределением обновлений между worker’ами, которыми он управляет. Фактическая обработка обновлений (фильтрация сообщений, отправка ответов и т.д.) выполняется worker’ами бота.
Давайте посмотрим, как это можно использовать.
Использование бот workers
Примеры этого можно найти в репозитории grammY runner.
Мы начнем с создания центрального экземпляра бота, который будет получать обновления и распределять их между worker’ами. Начнем с создания файла bot
со следующим содержанием.
// bot.ts
import { Bot } from "grammy";
import { distribute, run } from "@grammyjs/runner";
// Создайте бота.
const bot = new Bot(""); // <-- поместите токен бота между ""
// По желанию, здесь можно выполнить последовательную обработку обновлений.
// bot.use(sequentialize(...))
// Распределите обновления между worker'ами бота
bot.use(distribute(__dirname + "/worker"));
// Запускайте бота с многопоточностью.
run(bot);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// bot.js
const { Bot } = require("grammy");
const { distribute, run } = require("@grammyjs/runner");
// Создайте бота.
const bot = new Bot(""); // <-- поместите токен бота между ""
// По желанию, здесь можно выполнить последовательную обработку обновлений.
// bot.use(sequentialize(...))
// Распределите обновления между worker'ами бота
bot.use(distribute(__dirname + "/worker"));
// Запускайте бота с многопоточностью.
run(bot);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// bot.ts
import { Bot } from "https://deno.land/x/grammy@v1.34.0/mod.ts";
import { distribute, run } from "https://deno.land/x/grammy_runner@v2.0.3/mod.ts";
// Создайте бота.
const bot = new Bot(""); // <-- поместите токен бота между ""
// По желанию, здесь можно выполнить последовательную обработку обновлений.
// bot.use(sequentialize(...))
// Распределите обновления между worker'ами бота
bot.use(distribute(new URL("./worker.ts", import.meta.url)));
// Запускайте бота с многопоточностью.
run(bot);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Рядом с bot
мы создаем второй файл под названием worker
(как указано в строке 12 в коде выше). Он будет содержать фактическую логику бота.
// worker.ts
import { BotWorker } from "@grammyjs/runner";
// Создайте нового BotWorker
const bot = new BotWorker(""); // <-- Снова поместите токен вашего бота между ""
// Добавьте логику обработки
bot.on("message", (ctx) => ctx.reply("Ура!"));
2
3
4
5
6
7
8
// worker.js
const { BotWorker } = require("@grammyjs/runner");
// Создайте нового BotWorker
const bot = new BotWorker(""); // <-- Снова поместите токен вашего бота между ""
// Добавьте логику обработки
bot.on("message", (ctx) => ctx.reply("Ура!"));
2
3
4
5
6
7
8
// worker.ts
import { BotWorker } from "https://deno.land/x/grammy_runner@v2.0.3/mod.ts";
// Создайте нового BotWorker
const bot = new BotWorker(""); // <-- Снова поместите токен вашего бота между ""
// Добавьте логику обработки
bot.on("message", (ctx) => ctx.reply("Ура!"));
2
3
4
5
6
7
8
Обратите внимание, что каждый worker может отправлять сообщения обратно в Telegram. Поэтому вы должны передать токен бота каждому worker’у.
Вам не нужно запускать рабочих ботов или экспортировать что-либо из файла. Достаточно создать экземпляр Bot
. Он будет автоматически слушать обновления.
Важно понимать, что только необработанные обновления отправляются worker’ами бота. Другими словами, объекты контекста создаются дважды для каждого обновления: один раз в bot
, чтобы оно могло быть передано worker’у, и один раз в worker
, чтобы оно могло быть обработано. Более того: свойства, которые устанавливаются для объекта контекста в bot
, не отправляются worker’ам. Это означает, что все плагины должны быть установлены в worker’ах бота.
Распространяйте только некоторые обновления
В качестве оптимизации производительности вы можете отбрасывать обновления, которые не хотите обрабатывать. Таким образом, вашему боту не придется отправлять обновление на worker’у, чтобы оно было проигнорировано.
// Наш бот обрабатывает только сообщения, изменения и callback запросы,
// поэтому мы можем игнорировать все остальные обновления и не распространять их.
bot.on(
["message", "edited_message", "callback_query"],
distribute(__dirname + "/worker"),
);
2
3
4
5
6
// Наш бот обрабатывает только сообщения, изменения и callback запросы,
// поэтому мы можем игнорировать все остальные обновления и не распространять их.
bot.on(
["message", "edited_message", "callback_query"],
distribute(new URL("./worker.ts", import.meta.url)),
);
2
3
4
5
6
По умолчанию distribute
создает 4 worker’а бота. Вы можете легко изменить это число.
// Распространите между 8 worker'ами бота
bot.use(distribute(workerFile, { count: 8 }));
2
Обратите внимание, что ваше приложение никогда не должно порождать больше потоков, чем имеется физических ядер на вашем процессоре. Это не улучшит производительность, а скорее ухудшит ее.
Как это работает под капотом
Конечно, хотя использование grammY runner выглядит очень просто, под капотом происходит очень многое.
Каждый runner состоит из трех различных частей.
- Источник получает обновления из Telegram.
- Поглотитель поставляет обновления экземпляру бота.
- Компонент runner соединяет источник и поглотитель и позволяет запускать и останавливать бота.
api.telegram.org <—> источник <—> runner <—> поглотитель <—> бот
Исходник
grammY runner поставляется с одним источником по умолчанию, который может работать с любым Update
(ссылка на API). Такой поставщик обновлений легко создать из экземпляра бота. Если вы хотите создать его самостоятельно, обязательно ознакомьтесь с create
(ссылка на API).
Источник представляет собой асинхронный итератор обновлений пакетов, но он может быть активным или неактивным, и вы можете закрыть
его, чтобы отключиться от серверов Telegram.
Поглотитель
grammY runner поставляется с тремя возможными реализациями поглотителя: последовательной (то же поведение, что и у bot
), пакетной (в основном полезной для обратной совместимости с другими фреймворками) и полностью параллельной (используемой run
). Все они работают с объектами Update
(ссылка на API), которые легко создать из экземпляра бота. Если вы хотите создать такой объект самостоятельно, обязательно проверьте функцию handle
на экземпляре Bot
в grammY (ссылка на API).
В поглотителе содержится очередь (ссылка на API) отдельных обновлений, которые в данный момент обрабатываются. Добавление новых обновлений в очередь немедленно заставит Update
обработать их и вернет Promise
, который будет решён, как только в очереди снова появится свободное место. Разрешенное число определяет свободное место. Таким образом, установка ограничения параллельности для grammY runner выполняется через базовый экземпляр очереди.
Очередь также отбрасывает обновления, обработка которых занимает слишком много времени, и вы можете указать timeout
при создании соответствующего поглотителя. Разумеется, при создании поглотителя вы также должны указать обработчик ошибок.
Если вы используете run(bot)
, будет использован обработчик ошибок из bot
.
Runner
Runner — это обычный цикл, который получает обновления из источника и передает их в поглотитель. Как только в поглотителе снова появится свободное место, runner получит следующую порцию обновлений из источника.
Когда вы создаете runner с помощью create
(ссылка на API), вы получаете обработчик, который можно использовать для управления runner’ом. Например, с его помощью можно запускать и останавливать его или получать Promise
, который разрешится, если runner остановится. (Этот обработчик также возвращается функцией run
). Посмотрите документацию API на Runner
.
Функция run
Функция run
делает несколько вещей, которые помогут вам легко использовать описанную выше структуру.
- Она создает поставщика обновлений для вашего бота.
- Создает исходник из поставщика обновлений.
- Создается
Update
от вашего бота.Consumer - Она создает поглотитель от
Update
.Consumer - Создает runner из источника и поглотителя.
- Запускает runner.
Возвращается обработчик созданного runner’а, который позволяет управлять runner’ом.