Features
This guide describes all capabilities and features of VibeMQ.
Publish/Subscribe (Pub/Sub)
VibeMQ implements the publish/subscribe pattern through message queues.
Basic concepts:
Publisher — client sending messages
Subscriber — client receiving messages
Queue — buffer for storing messages
Broker — server managing queues
Basic example:
// Publisher
await client.PublishAsync("notifications", new {
Title = "Hello",
Body = "World"
});
// Subscriber
await using var subscription = await client.SubscribeAsync<dynamic>(
"notifications",
async msg => {
Console.WriteLine($"{msg.Title}: {msg.Body}");
}
);
Delivery Modes
VibeMQ supports four message delivery modes:
Round-robin
Description: Each message is delivered to one subscriber cyclically.
Publisher → [Queue] → Subscriber 1 (message 1)
→ Subscriber 2 (message 2)
→ Subscriber 1 (message 3)
Configuration:
options.DefaultDeliveryMode = DeliveryMode.RoundRobin;
Use cases:
Task processing by multiple workers
Load balancing
Task queues
Fan-out with Acknowledgment
Description: Message is delivered to all subscribers, each must acknowledge receipt.
Publisher → [Queue] → Subscriber 1 (copy + ACK)
→ Subscriber 2 (copy + ACK)
→ Subscriber 3 (copy + ACK)
Configuration:
options.DefaultDeliveryMode = DeliveryMode.FanOutWithAck;
options.MaxRetryAttempts = 3;
Use cases:
Notification broadcasting
Data replication
Audit and logging
Fan-out without Acknowledgment
Description: Message is delivered to all subscribers without confirmation.
Publisher → [Queue] → Subscriber 1 (copy)
→ Subscriber 2 (copy)
→ Subscriber 3 (copy)
Configuration:
options.DefaultDeliveryMode = DeliveryMode.FanOutWithoutAck;
Use cases:
Broadcast messages
Real-time updates
Data streaming
Priority-based
Description: Messages are delivered by priority.
[Critical] → [High] → [Normal] → [Low]
Priorities:
Priority |
Value |
Description |
|---|---|---|
Critical |
3 |
Critical, delivered first |
High |
2 |
High priority |
Normal |
1 |
Normal (default) |
Low |
0 |
Low, delivered last |
Configuration:
options.DefaultDeliveryMode = DeliveryMode.PriorityBased;
Publish message:
await client.PublishAsync("alerts", message);
Publish with priority:
using VibeMQ.Core.Enums;
await client.PublishAsync("alerts", message, new Dictionary<string, string> {
["priority"] = MessagePriority.Critical.ToString()
});
Delivery Guarantees
Acknowledgments (ACK)
VibeMQ uses an acknowledgment mechanism for delivery guarantees:
Broker → Deliver message → Client
│
│ Processing...
│
◀────── ACK ─────────┘
How it works:
Broker sends message to client
ACK wait timer starts
Client processes message and sends ACK
Broker receives ACK and marks message as delivered
Automatic ACKs:
By default, the client automatically sends ACK after successful processing:
await using var subscription = await client.SubscribeAsync<dynamic>(
"notifications",
async msg => {
await ProcessMessageAsync(msg);
// ACK is sent automatically
}
);
Retry Attempts
If ACK is not received within the timeout, the message is resent:
Attempt 1 → Timeout → Attempt 2 → Timeout → Attempt 3 → DLQ
Configuration:
options.MaxRetryAttempts = 3;
Exponential backoff:
Exponential delay is used between attempts:
Attempt 1: immediately
Attempt 2: after 1s
Attempt 3: after 2s
Attempt 4: after 4s
…
Dead Letter Queue (DLQ)
Messages that fail to deliver after all attempts are moved to Dead Letter Queue:
options.EnableDeadLetterQueue = true;
options.DeadLetterQueueName = "dead-letters";
options.MaxRetryAttempts = 3;
Reasons for DLQ:
Maximum delivery attempts exceeded
Message TTL expired
Deserialization error
Exception in handler
DLQ processing:
var dlqMessages = await queueManager.GetDeadLetterMessagesAsync(100);
foreach (var message in dlqMessages) {
// Retry or log
await RetryOrLogAsync(message);
}
Queue Management
Creating a Queue
Automatic creation:
When publishing to a non-existent queue, it’s created automatically:
options.EnableAutoCreate = true;
Manual creation:
await queueManager.CreateQueueAsync("my-queue", new QueueOptions {
DeliveryMode = DeliveryMode.RoundRobin,
MaxQueueSize = 10_000,
MessageTtl = TimeSpan.FromHours(1),
});
Deleting a Queue
await queueManager.DeleteQueueAsync("my-queue");
Getting Information
var info = await queueManager.GetQueueInfoAsync("my-queue");
Console.WriteLine($"Queue: {info.Name}");
Console.WriteLine($"Messages: {info.MessageCount}");
Console.WriteLine($"Subscribers: {info.SubscriberCount}");
Console.WriteLine($"Mode: {info.DeliveryMode}");
List Queues
var queues = await queueManager.ListQueuesAsync();
foreach (var queueName in queues) {
Console.WriteLine(queueName);
}
Message Priorities
VibeMQ supports message priorities for important deliveries.
Priority levels:
public enum MessagePriority {
Low = 0, // Low
Normal = 1, // Normal (default)
High = 2, // High
Critical = 3 // Critical
}
Publish messages:
using VibeMQ.Core.Enums;
// Critical message with priority
await client.PublishAsync("alerts", alertData, new Dictionary<string, string> {
["priority"] = MessagePriority.Critical.ToString()
});
// High priority notification
await client.PublishAsync("notifications", data, new Dictionary<string, string> {
["priority"] = MessagePriority.High.ToString()
});
// Normal priority (default) - no headers needed
await client.PublishAsync("logs", logData);
Keep-alive (PING/PONG)
Active connections are maintained using the keep-alive mechanism:
Client Server
│ │
│─── PING (30s) ─────────▶│
│ │
│◄── PONG (immediate) ────│
Client configuration:
var client = await VibeMQClient.ConnectAsync(
"localhost",
8080,
new ClientOptions {
KeepAliveInterval = TimeSpan.FromSeconds(30)
}
);
Automatic Reconnections
The client automatically reconnects when the connection is lost.
Reconnect policy configuration:
var client = await VibeMQClient.ConnectAsync(
"localhost",
8080,
new ClientOptions {
ReconnectPolicy = new ReconnectPolicy {
MaxAttempts = 10,
InitialDelay = TimeSpan.FromSeconds(1),
MaxDelay = TimeSpan.FromMinutes(5),
UseExponentialBackoff = true
}
}
);
Parameters:
Parameter |
Default |
Description |
|---|---|---|
|
int.MaxValue |
Maximum number of attempts |
|
1s |
Initial delay |
|
5min |
Maximum delay |
|
Exponential increase |
|
Authentication
Token-based authentication:
On server:
var broker = BrokerBuilder.Create()
.UseAuthentication("my-secret-token")
.Build();
On client:
var client = await VibeMQClient.ConnectAsync(
"localhost",
8080,
new ClientOptions {
AuthToken = "my-secret-token"
}
);
Warning
Use complex tokens (32+ characters) and store them in a secure location.
TLS/SSL Encryption
Transport encryption support:
On server:
.UseTls(options => {
options.Enabled = true;
options.CertificatePath = "/path/to/cert.pfx";
options.CertificatePassword = "cert-password";
})
On client:
var client = await VibeMQClient.ConnectAsync(
"localhost",
8080,
new ClientOptions {
UseTls = true,
SkipCertificateValidation = false // Only for tests!
}
);
Rate Limiting
Overload protection:
Configuration:
.ConfigureRateLimiting(options => {
options.Enabled = true;
options.MaxConnectionsPerIpPerWindow = 100;
options.ConnectionWindowSeconds = 60;
options.MaxMessagesPerClientPerSecond = 1000;
});
Parameters:
Parameter |
Default |
Description |
|---|---|---|
|
false |
Enable rate limiting |
|
Max connections per IP per window |
|
|
Time window (seconds) |
|
|
Max messages per second per client |
|
Graceful Shutdown
Correct server shutdown without message loss:
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};
await broker.RunAsync(cts.Token);
// StopAsync() is called automatically
Shutdown stages:
Stop accepting new connections
Notify clients about shutdown
Wait for in-flight message processing (up to 30s)
Close all connections
Clean up resources
Health Checks
HTTP endpoints for monitoring:
Enable:
.ConfigureHealthChecks(options => {
options.Enabled = true;
options.Port = 8081;
})
Endpoints:
GET /health/— health status (200 OK or 503)GET /metrics/— broker metrics (JSON)
Example /health/ response:
{
"status": "healthy",
"active_connections": 15,
"queue_count": 5,
"memory_usage_mb": 256
}
Example /metrics/ response:
{
"total_messages_published": 125000,
"total_messages_delivered": 124850,
"total_acknowledged": 124800,
"active_connections": 15,
"active_queues": 5,
"memory_usage_bytes": 268435456,
"average_delivery_latency_ms": 2.5
}
Metrics
Counters:
TotalMessagesPublished— total publishedTotalMessagesDelivered— total deliveredTotalMessagesAcknowledged— total acknowledgedTotalRetries— retry attemptsTotalDeadLettered— in DLQTotalErrors— errorsTotalConnectionsAccepted— connections acceptedTotalConnectionsRejected— connections rejected
Gauge metrics:
ActiveConnections— active connectionsActiveQueues— active queuesInFlightMessages— in flightMemoryUsageBytes— memory usage
Latency:
AverageDeliveryLatencyMs— average delivery latency
Persistence & Storage
VibeMQ supports pluggable storage providers for message durability across server restarts.
Default behavior (InMemory):
All state is kept in memory — zero configuration, maximum performance. All data is lost on restart.
With SQLite storage:
dotnet add package VibeMQ.Server.Storage.Sqlite
var broker = BrokerBuilder.Create()
.UsePort(8080)
.UseSqliteStorage(options => {
options.DatabasePath = "/data/vibemq.db";
})
.Build();
How it works:
When a message is published it is saved to storage before entering the in-memory queue (write-ahead).
When the subscriber acknowledges the message it is removed from storage.
On server restart, all queues and unacknowledged messages are recovered automatically.
DLQ persistence:
Failed messages are saved to the dead_letters table before being added to the in-memory DLQ so they survive restarts.
Storage providers:
Provider |
Description |
|---|---|
|
Default. Fast, no durability. |
|
SQLite-based, single-file DB, zero-config. |
See Persistence & Storage for full configuration reference, database schema, and custom provider guide.
Next Steps
Server Setup — server configuration
Client Usage — client usage
Persistence & Storage — persistence & storage
Monitoring — monitoring