Concurrency With grammY runner (runner
)
This package can be used if you run your bot using long polling, and you want messages to be processed concurrently.
Make sure to understand Scaling Up II before you use grammY runner.
Why We Need a Bot Runner
If you are hosting your bot using long polling and you want to make it scale up, there is no way around processing updates concurrently as sequential update processing is way too slow. As a result, bots face a number of challenges.
- Are there race conditions?
- Can we still
await
the middleware stack? We must have this for error handling! - What if middleware never resolves for some reason, does this block the bot?
- Can we process some selected updates in sequence?
- Can we constrain the server load?
- Can we process updates on multiple cores?
As you can see, we need a solution that can solve all of the above problems to achieve proper long polling for a bot. This is a problem that is very distinct from composing middleware or sending messages to Telegram. Consequently, it is not solved by the grammY core package. Instead, you can use grammY runner. It has its own API Reference, too.
Usage
Here is a simple example.
import { Bot } from "grammy";
import { run } from "@grammyjs/runner";
// Create a bot.
const bot = new Bot("");
// Add the usual middleware, yada yada
bot.on("message", (ctx) => ctx.reply("Got your message."));
// Run it concurrently!
run(bot);
2
3
4
5
6
7
8
9
10
11
const { Bot } = require("grammy");
const { run } = require("@grammyjs/runner");
// Create a bot.
const bot = new Bot("");
// Add the usual middleware, yada yada
bot.on("message", (ctx) => ctx.reply("Got your message."));
// Run it concurrently!
run(bot);
2
3
4
5
6
7
8
9
10
11
import { Bot } from "https://deno.land/x/grammy@v1.33.0/mod.ts";
import { run } from "https://deno.land/x/grammy_runner@v2.0.3/mod.ts";
// Create a bot.
const bot = new Bot("");
// Add the usual middleware, yada yada
bot.on("message", (ctx) => ctx.reply("Got your message."));
// Run it concurrently!
run(bot);
2
3
4
5
6
7
8
9
10
11
Sequential Processing Where Necessary
Most likely, you want to be guaranteed that messages from the same chat are processed in order. This is useful when installing session middleware, but it also makes sure that your bot does not confuse the order of messages in the same chat.
grammY runner exports the sequentialize
middleware that takes care of this. You can check out this section to learn how to use it.
We are now going to look at more advanced usage of the plugin.
The supplied constraint function can be used not only to specify chat identifier, or user identifier. Instead, you can return a list of constraint identifier strings that determine for every update individually what other computations it must wait for before processing can begin.
As an example, you could return both the chat identifier, and the user identifier of the message author.
bot.use(sequentialize((ctx) => {
const chat = ctx.chat?.id.toString();
const user = ctx.from?.id.toString();
return [chat, user].filter((con) => con !== undefined);
}));
2
3
4
5
This would make sure that messages in the same chat are ordered correctly. In addition, if Alice sends message in a group, and then sends a message to your bot in the private chat, then these two messages are ordered correctly.
In a sense, you can therefore specify a graph of dependencies between updates. grammY runner will resolve all necessary constraints on the fly and block those updates as long as necessary to ensure correct message ordering.
The implementation of this is very efficient. It needs constant memory (unless you specify infinite concurrency), and it needs (amortized) constant processing time per update.
Graceful Shutdown
In order for the bot to complete its work correctly, you should signal it to stop when the process is about to be destroyed.
Note that you can wait for the runner to terminate by await
ing the task
in the Runner
returned from run
.
const handle = run(bot);
// This will get called when the bot stops.
handle.task().then(() => {
console.log("Bot done processing!");
});
// Later, stop the bot via the handle of the runner.
await handle.stop();
2
3
4
5
6
7
8
9
Advanced Options
grammY runner consists of three things: a source, a sink, and a runner. The source pulls in updates, the sink consumes updates, and the runner configures and connects the two.
An in-depth description on how the runner works internally can be found down here.
Each of these three parts can be configured through various options. This can reduce the network traffic, let you specify allowed updates, and more.
Each part of the runner accepts its configuration through a dedicated options object.
run(bot, {
source: {},
runner: {},
sink: {},
});
2
3
4
5
You should check out the Run
in the API reference to see which options are available.
For example, you will there find out that allowed
can be enabled using the following code snippet.
run(bot, { runner: { fetch: { allowed_updates: [] } } });
Multithreading
There is no point to multithreading if your bot does not process at least 50 million updates per day (>500 per second). Skip this section if your bot handles less traffic than that.
JavaScript is single-threaded. This is amazing because concurrency is hard, meaning that if there is only a single thread, a lot of headache is naturally removed.
However, if your bot has an extremely high load (we are talking about 1000 updates per second and up), then doing everything on a single core might not be enough anymore. Basically, a single core will start struggling with the JSON processing of all the messages your bot has to handle.
Bot Workers for Update Handling
There is a simple way out: bot workers! grammY runner lets you create several workers which can process your updates in parallel on actually different cores (using different event loops and with separate memory).
On Node.js, grammY runner uses Worker Threads. On Deno, grammY runner uses Web Workers.
Conceptually, grammY runner provides you with a class called Bot
which can handle updates. It is equivalent to the regular class Bot
(in fact, it even extends Bot
). The main difference between Bot
and Bot
is that Bot
cannot fetch updates. Instead, it has to receive them from a regular Bot
that controls its workers.
1. fetch updates Bot
__// \\__
__/ / \ \__
2. send updates to workers __/ / \ \__
__/ / \ \__
/ / \ \
3. process updates BotWorker BotWorker BotWorker BotWorker
grammY runner provides you with middleware that can send updates to bot workers. The bot workers can then receive this update and handle it. This way, the central bot only has to concern itself with pulling in and distributing updates among the bot workers it orchestrates. The actual update handling (filtering messages, sending replies, etc.) is performed by the bot workers.
Let’s now see how this can be used.
Using Bot Workers
Examples of this can be found in the grammY runner repository.
We will start out by creating the central bot instance that fetches updates and distributes them among workers. Let’s start by creating a file called bot
with the following content.
// bot.ts
import { Bot } from "grammy";
import { distribute, run } from "@grammyjs/runner";
// Create the bot.
const bot = new Bot(""); // <-- put your bot token between the ""
// Optionally, sequentialize updates here.
// bot.use(sequentialize(...))
// Distribute the updates among bot workers.
bot.use(distribute(__dirname + "/worker"));
// Run the bot concurrently with multi-threading.
run(bot);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// bot.js
const { Bot } = require("grammy");
const { distribute, run } = require("@grammyjs/runner");
// Create the bot.
const bot = new Bot(""); // <-- put your bot token between the ""
// Optionally, sequentialize updates here.
// bot.use(sequentialize(...))
// Distribute the updates among bot workers.
bot.use(distribute(__dirname + "/worker"));
// Run the bot concurrently with multi-threading.
run(bot);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// bot.ts
import { Bot } from "https://deno.land/x/grammy@v1.33.0/mod.ts";
import { distribute, run } from "https://deno.land/x/grammy_runner@v2.0.3/mod.ts";
// Create the bot.
const bot = new Bot(""); // <-- put your bot token between the ""
// Optionally, sequentialize updates here.
// bot.use(sequentialize(...))
// Distribute the updates among bot workers.
bot.use(distribute(new URL("./worker.ts", import.meta.url)));
// Run the bot concurrently with multi-threading.
run(bot);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Next to bot
, we create a second file called worker
(as specified on line 12 in the code above). This will contain the actual bot logic.
// worker.ts
import { BotWorker } from "@grammyjs/runner";
// Create a new bot worker.
const bot = new BotWorker(""); // <-- pass your bot token here again
// Add message handling logic.
bot.on("message", (ctx) => ctx.reply("yay!"));
2
3
4
5
6
7
8
// worker.js
const { BotWorker } = require("@grammyjs/runner");
// Create a new bot worker.
const bot = new BotWorker(""); // <-- pass your bot token here again
// Add message handling logic.
bot.on("message", (ctx) => ctx.reply("yay!"));
2
3
4
5
6
7
8
// worker.ts
import { BotWorker } from "https://deno.land/x/grammy_runner@v2.0.3/mod.ts";
// Create a new bot worker.
const bot = new BotWorker(""); // <-- pass your bot token here again
// Add message handling logic.
bot.on("message", (ctx) => ctx.reply("yay!"));
2
3
4
5
6
7
8
Note that each worker is able to send messages back to Telegram. This is why you must give your bot token to each worker, too.
You do not have to start the bot workers, or export anything from the file. It is enough to create an instance of Bot
. It will listen for updates automatically.
It is important to understand that only the raw updates are sent to bot workers. In other words, the context objects are created twice for each update: once in bot
so it can be distributed to a bot worker, and once in worker
so it can actually be handled. What’s more: the properties that are installed on the context object in bot
are not sent to the bot workers. This means that all plugins must be installed in the bot workers.
Distribute Only Some Updates
As a performance optimization, you can drop updates that you do not want to handle. That way, your bot does not have to send the update to a worker, only for it to be ignored there.
// Our bot only handles messages, edits, and callback queries,
// so we can ignore all other updates and not distribute them.
bot.on(
["message", "edited_message", "callback_query"],
distribute(__dirname + "/worker"),
);
2
3
4
5
6
// Our bot only handles messages, edits, and callback queries,
// so we can ignore all other updates and not distribute them.
bot.on(
["message", "edited_message", "callback_query"],
distribute(new URL("./worker.ts", import.meta.url)),
);
2
3
4
5
6
By default, distribute
creates 4 bot workers. You can easily adjust this number.
// Distribute updates among 8 bot workers.
bot.use(distribute(workerFile, { count: 8 }));
2
Note that your application should never spawn more threads than there are physical cores on your CPU. This will not improve performance, but rather degrade it.
How It Works Behind the Scenes
Of course, while using grammY runner looks very simple, a lot is going on under the hood.
Every runner consists of three different parts.
- The source pulls in updates from Telegram.
- The sink supplies the bot instance with updates.
- The runner component connects source and sink, and allows you to start and stop your bot.
api.telegram.org <—> source <—> runner <—> sink <—> bot
Source
grammY runner ships with one default source that can operate on any Update
(API reference). Such an update supplier is straightforward to create from a bot instance. If you want make one yourself, be sure to check out create
(API reference).
The source is an async iterator of update batches, but it can be active or inactive, and you can close
it in order to disconnect from the Telegram servers.
Sink
grammY runner ships with three possible sink implementations, a sequential one (same behavior as bot
), a batched one (mainly useful for backwards compatibility with other frameworks), and a fully concurrent one (used by run
). All of them operate on Update
objects (API reference) which are straightforward to create from a bot instance. If you want make one yourself, be sure to check out handle
on the Bot
instance of grammY (API reference).
The sink contains a queue (API reference) of individual updates that are currently being processed. Adding new updates to the queue will immediately make the update consumer handle them, and return a promise that resolves as soon as there is capacity in the queue again. The resolved integral number determines the free space. Setting a concurrency limit for the grammY runner is therefore respected through the underlying queue instance.
The queue also throws out updates that take too long processing, and you can specify a timeout
when creating the respective sink. Of course, you should also provide an error handler when creating a sink.
If you’re using run(bot)
, the error handler from bot
will be used.
Runner
The runner is a plain loop that pulls in updates from the source and supplies them to the sink. Once the sink has space again, the runner will fetch the next batch of updates from the source.
When you create a runner with create
(API reference), you obtain a handle that you can use to control the runner. For instance, it allows you start and stop it, or obtain a promise that resolves if the runner stops. (This handle is also returned by run
.) Check out the API reference of the Runner
.
The run
Function
The run
function does a few things to help you use the above structure with ease.
- It creates an update supplier from your bot.
- It creates a source from the update supplier.
- It creates an update consumer from your bot.
- It creates a sink from the update consumer.
- It creates a runner from the source and the sink.
- It starts the runner.
The handle of the created runner is returned, which lets you control the runner.