Queue
The queue delivery method lets you consume a topic as an unordered set of messages.
Messages are delivered in parallel across all consumers of a consumer group, and any given message is delivered to only one consumer.
Publishing messages
In the msgs component, publishing is done to topics that are shared across all delivery methods, with the different delivery methods only applying at consumption time. Please refer to the topic publishing API for information on how to publish messages to a topic.
Receiving messages
Receiving messages is done using the diom.msgs.queue.receive API as follows:
import { Diom } from "diom";
import { Temporal } from "temporal-polyfill-lite";
const client = new Diom("AUTH_TOKEN");
const out = await client.msgs.queue.receive("topic-123", "group1", {
batchSize: 100,
batchWait: Temporal.Duration.from({ seconds: 10 }),
leaseDuration: Temporal.Duration.from({ seconds: 30 }),
});Which returns:
{
msgs: [
{
msg_id: "msg1",
topic: "topic-123",
value: Uint8Array(...),
timestamp: Temporal.Instant(...),
},
{
msg_id: "msg2",
topic: "topic-123",
value: Uint8Array(...),
headers: {},
timestamp: Temporal.Instant(...),
},
{
msg_id: "msg3",
topic: "topic-123",
value: Uint8Array(...),
headers: { h1: "v1" },
timestamp: Temporal.Instant(...),
},
]
}Processing messages
When calling receive, the consumers own a message for the lease duration within which they need to finish processing the message and either acknowledge its processing (ack), or report an error (nack). If the lease expires before either is done, the message is treated as if there was an explicit nack.
Acknowledging messages (ack)
Acknowledging a message is how you mark a message processing as successful which means it won’t be returned again using the receive API.
await client.msgs.queue.ack("topic-123", "group1", { msgIds: ["msg2"] });Rejecting messages (nack)
Rejecting a message is how you report an error in the processing of the message. Rejecting a message immediately releases the lease and the message will become available for consumption again based on the configured retry schedule.
await client.msgs.queue.nack("topic-123", "group1", { msgIds: ["msg2"] });Extending message lease
If you need more time when processing a message, you can extend the message lease like so:
await client.msgs.queue.extendLease("topic-123", "group1", {
msgIds: ["msg3"],
// New lease duration starting from this moment
leaseDuration: Temporal.Duration.from({ seconds: 10 }),
});Dead letter queue (DLQ)
A dead letter queue (DLQ) is a holding queue where messages that fail to be processed successfully (after retries) are redirected for later inspection and debugging.
Message retry schedule
You can configure a retry schedule for a specific topic + consumer group combination, letting you manage how many times a message to retry, and whether to add a delay between retries.
Once a retry schedule is exhausted messages are sent to the DLQ if one is configured, otherwise they are just marked as processed and skipped. If no retry schedule is configured, messages are sent directly to the DLQ on failure.
Redriving the DLQ
You can consume the DLQ like any other topic, or alternatively you can automatically redrive all the messages back to the original queue:
await client.msgs.queue.redriveDlq("topic-123", "group1", {});Configuration
You can configure the behavior of consumer groups for a specific topic using the diom.msgs.queue.configure API as follows:
await client.msgs.queue.configure("topic-123", "group1", {
// Retry delays in milliseconds: immediately, after 5s, after 10s, after 10s
retrySchedule: [0, 5_000, 10_000, 10_000],
dlqTopic: "dlq-for-topic-123",
});Advanced functionality
Time travel (reset)
Thanks to how topics and consumer groups work, Diom supports resetting a consumer group so that it behaves as if it started consuming a topic at a previous point in time.
diom.msgs.queue.reset(
topic="topic-123",
consumer_group="group1",
// Can only set either timestamp or position, not both.
"timestamp": Date(...),
"position": "earliest" | "latest",
}